http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala
deleted file mode 100644
index b4c35f5..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.dsl.expr
-
-trait LogicalExpr extends Expr {
-}
-
-case class InExpr(head: Expr, is: Boolean, range: Seq[Expr]) extends 
LogicalExpr {
-
-  addChildren(head +: range)
-
-  def desc: String = {
-    val notStr = if (is) "" else " NOT"
-    s"${head.desc}${notStr} IN (${range.map(_.desc).mkString(", ")})"
-  }
-  def coalesceDesc: String = {
-    val notStr = if (is) "" else " NOT"
-    s"${head.coalesceDesc}${notStr} IN 
(${range.map(_.coalesceDesc).mkString(", ")})"
-  }
-
-  override def map(func: (Expr) => Expr): InExpr = {
-    InExpr(func(head), is, range.map(func(_)))
-  }
-}
-
-case class BetweenExpr(head: Expr, is: Boolean, range: Seq[Expr]) extends 
LogicalExpr {
-
-  range match {
-    case first :: second :: _ => addChildren(head :: first :: second :: Nil)
-    case _ => throw new Exception("between expression exception: range less 
than 2")
-  }
-
-  def desc: String = {
-    val notStr = if (is) "" else " NOT"
-    val rangeStr = range match {
-      case first :: second :: _ => s"${first.desc} AND ${second.desc}"
-      case _ => throw new Exception("between expression exception: range less 
than 2")
-    }
-    s"${head.desc}${notStr} BETWEEN ${rangeStr}"
-  }
-  def coalesceDesc: String = {
-    val notStr = if (is) "" else " NOT"
-    val rangeStr = range match {
-      case first :: second :: _ => s"${first.coalesceDesc} AND 
${second.coalesceDesc}"
-      case _ => throw new Exception("between expression exception: range less 
than 2")
-    }
-    s"${head.coalesceDesc}${notStr} BETWEEN ${rangeStr}"
-  }
-
-  override def map(func: (Expr) => Expr): BetweenExpr = {
-    BetweenExpr(func(head), is, range.map(func(_)))
-  }
-}
-
-case class LikeExpr(head: Expr, is: Boolean, value: Expr) extends LogicalExpr {
-
-  addChildren(head :: value :: Nil)
-
-  def desc: String = {
-    val notStr = if (is) "" else " NOT"
-    s"${head.desc}${notStr} LIKE ${value.desc}"
-  }
-  def coalesceDesc: String = {
-    val notStr = if (is) "" else " NOT"
-    s"${head.coalesceDesc}${notStr} LIKE ${value.coalesceDesc}"
-  }
-
-  override def map(func: (Expr) => Expr): LikeExpr = {
-    LikeExpr(func(head), is, func(value))
-  }
-}
-
-case class IsNullExpr(head: Expr, is: Boolean) extends LogicalExpr {
-
-  addChild(head)
-
-  def desc: String = {
-    val notStr = if (is) "" else " NOT"
-    s"${head.desc} IS${notStr} NULL"
-  }
-  def coalesceDesc: String = desc
-
-  override def map(func: (Expr) => Expr): IsNullExpr = {
-    IsNullExpr(func(head), is)
-  }
-}
-
-case class IsNanExpr(head: Expr, is: Boolean) extends LogicalExpr {
-
-  addChild(head)
-
-  def desc: String = {
-    val notStr = if (is) "" else "NOT "
-    s"${notStr}isnan(${head.desc})"
-  }
-  def coalesceDesc: String = desc
-
-  override def map(func: (Expr) => Expr): IsNanExpr = {
-    IsNanExpr(func(head), is)
-  }
-}
-
-// -----------
-
-case class LogicalFactorExpr(factor: Expr, withBracket: Boolean, aliasOpt: 
Option[String]
-                            ) extends LogicalExpr with AliasableExpr {
-
-  addChild(factor)
-
-  def desc: String = if (withBracket) s"(${factor.desc})" else factor.desc
-  def coalesceDesc: String = factor.coalesceDesc
-  def alias: Option[String] = aliasOpt
-  override def extractSelf: Expr = {
-    if (aliasOpt.nonEmpty) this
-    else factor.extractSelf
-  }
-
-  override def map(func: (Expr) => Expr): LogicalFactorExpr = {
-    LogicalFactorExpr(func(factor), withBracket, aliasOpt)
-  }
-}
-
-case class UnaryLogicalExpr(oprs: Seq[String], factor: LogicalExpr) extends 
LogicalExpr {
-
-  addChild(factor)
-
-  def desc: String = {
-    oprs.foldRight(factor.desc) { (opr, fac) =>
-      s"(${trans(opr)} ${fac})"
-    }
-  }
-  def coalesceDesc: String = {
-    oprs.foldRight(factor.coalesceDesc) { (opr, fac) =>
-      s"(${trans(opr)} ${fac})"
-    }
-  }
-  private def trans(s: String): String = {
-    s match {
-      case "!" => "NOT"
-      case _ => s.toUpperCase
-    }
-  }
-  override def extractSelf: Expr = {
-    if (oprs.nonEmpty) this
-    else factor.extractSelf
-  }
-
-  override def map(func: (Expr) => Expr): UnaryLogicalExpr = {
-    UnaryLogicalExpr(oprs, func(factor).asInstanceOf[LogicalExpr])
-  }
-}
-
-case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, 
LogicalExpr)]) extends LogicalExpr {
-
-  addChildren(factor +: tails.map(_._2))
-
-  def desc: String = {
-    val res = tails.foldLeft(factor.desc) { (fac, tail) =>
-      val (opr, expr) = tail
-      s"${fac} ${trans(opr)} ${expr.desc}"
-    }
-    if (tails.size <= 0) res else s"${res}"
-  }
-  def coalesceDesc: String = {
-    val res = tails.foldLeft(factor.coalesceDesc) { (fac, tail) =>
-      val (opr, expr) = tail
-      s"${fac} ${trans(opr)} ${expr.coalesceDesc}"
-    }
-    if (tails.size <= 0) res else s"${res}"
-  }
-  private def trans(s: String): String = {
-    s match {
-      case "&&" => "AND"
-      case "||" => "OR"
-      case _ => s.trim.toUpperCase
-    }
-  }
-  override def extractSelf: Expr = {
-    if (tails.nonEmpty) this
-    else factor.extractSelf
-  }
-
-  override def map(func: (Expr) => Expr): BinaryLogicalExpr = {
-    BinaryLogicalExpr(func(factor).asInstanceOf[LogicalExpr], tails.map{ pair 
=>
-      (pair._1, func(pair._2).asInstanceOf[LogicalExpr])
-    })
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala
deleted file mode 100644
index 4217e44..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.dsl.expr
-
-trait MathExpr extends Expr {
-}
-
-case class MathFactorExpr(factor: Expr, withBracket: Boolean, aliasOpt: 
Option[String]
-                         ) extends MathExpr with AliasableExpr {
-
-  addChild(factor)
-
-  def desc: String = if (withBracket) s"(${factor.desc})" else factor.desc
-  def coalesceDesc: String = factor.coalesceDesc
-  def alias: Option[String] = aliasOpt
-  override def extractSelf: Expr = {
-    if (aliasOpt.nonEmpty) this
-    else factor.extractSelf
-  }
-
-  override def map(func: (Expr) => Expr): MathFactorExpr = {
-    MathFactorExpr(func(factor), withBracket, aliasOpt)
-  }
-}
-
-case class UnaryMathExpr(oprs: Seq[String], factor: MathExpr) extends MathExpr 
{
-
-  addChild(factor)
-
-  def desc: String = {
-    oprs.foldRight(factor.desc) { (opr, fac) =>
-      s"(${opr}${fac})"
-    }
-  }
-  def coalesceDesc: String = {
-    oprs.foldRight(factor.coalesceDesc) { (opr, fac) =>
-      s"(${opr}${fac})"
-    }
-  }
-  override def extractSelf: Expr = {
-    if (oprs.nonEmpty) this
-    else factor.extractSelf
-  }
-
-  override def map(func: (Expr) => Expr): UnaryMathExpr = {
-    UnaryMathExpr(oprs, func(factor).asInstanceOf[MathExpr])
-  }
-}
-
-case class BinaryMathExpr(factor: MathExpr, tails: Seq[(String, MathExpr)]) 
extends MathExpr {
-
-  addChildren(factor +: tails.map(_._2))
-
-  def desc: String = {
-    val res = tails.foldLeft(factor.desc) { (fac, tail) =>
-      val (opr, expr) = tail
-      s"${fac} ${opr} ${expr.desc}"
-    }
-    if (tails.size <= 0) res else s"${res}"
-  }
-  def coalesceDesc: String = {
-    val res = tails.foldLeft(factor.coalesceDesc) { (fac, tail) =>
-      val (opr, expr) = tail
-      s"${fac} ${opr} ${expr.coalesceDesc}"
-    }
-    if (tails.size <= 0) res else s"${res}"
-  }
-  override def extractSelf: Expr = {
-    if (tails.nonEmpty) this
-    else factor.extractSelf
-  }
-
-  override def map(func: (Expr) => Expr): BinaryMathExpr = {
-    BinaryMathExpr(func(factor).asInstanceOf[MathExpr], tails.map{ pair =>
-      (pair._1, func(pair._2).asInstanceOf[MathExpr])
-    })
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala
deleted file mode 100644
index 4f84b10..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.dsl.expr
-
-trait HeadExpr extends Expr with AliasableExpr {
-  def alias: Option[String] = None
-}
-
-case class DataSourceHeadExpr(name: String) extends HeadExpr {
-  def desc: String = s"`${name}`"
-  def coalesceDesc: String = desc
-}
-
-case class FieldNameHeadExpr(field: String) extends HeadExpr {
-  def desc: String = s"`${field}`"
-  def coalesceDesc: String = desc
-  override def alias: Option[String] = Some(field)
-}
-
-case class AllSelectHeadExpr() extends HeadExpr {
-  def desc: String = "*"
-  def coalesceDesc: String = desc
-}
-
-case class OtherHeadExpr(expr: Expr) extends HeadExpr {
-
-  addChild(expr)
-
-  def desc: String = expr.desc
-  def coalesceDesc: String = expr.coalesceDesc
-  override def alias: Option[String] = Some(expr.desc)
-
-  override def map(func: (Expr) => Expr): OtherHeadExpr = {
-    OtherHeadExpr(func(expr))
-  }
-}
-
-// -------------
-
-trait SelectExpr extends Expr with AliasableExpr {
-}
-
-case class AllFieldsSelectExpr() extends SelectExpr {
-  def desc: String = s".*"
-  def coalesceDesc: String = desc
-  def alias: Option[String] = None
-}
-
-case class FieldSelectExpr(field: String) extends SelectExpr {
-  def desc: String = s".`${field}`"
-  def coalesceDesc: String = desc
-  override def alias: Option[String] = Some(field)
-}
-
-case class IndexSelectExpr(index: Expr) extends SelectExpr {
-
-  addChild(index)
-
-  def desc: String = s"[${index.desc}]"
-  def coalesceDesc: String = desc
-  def alias: Option[String] = Some(index.desc)
-
-  override def map(func: (Expr) => Expr): IndexSelectExpr = {
-    IndexSelectExpr(func(index))
-  }
-}
-
-case class FunctionSelectExpr(functionName: String, args: Seq[Expr]) extends 
SelectExpr {
-
-  addChildren(args)
-
-  def desc: String = ""
-  def coalesceDesc: String = desc
-  def alias: Option[String] = Some(functionName)
-
-  override def map(func: (Expr) => Expr): FunctionSelectExpr = {
-    FunctionSelectExpr(functionName, args.map(func(_)))
-  }
-}
-
-// -------------
-
-case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: 
Option[String]) extends SelectExpr {
-
-  addChildren(head +: selectors)
-
-  def desc: String = {
-    selectors.foldLeft(head.desc) { (hd, sel) =>
-      sel match {
-        case FunctionSelectExpr(funcName, args) => {
-          val nargs = hd +: args.map(_.desc)
-          s"${funcName}(${nargs.mkString(", ")})"
-        }
-        case _ => s"${hd}${sel.desc}"
-      }
-    }
-  }
-  def coalesceDesc: String = {
-    selectors.lastOption match {
-      case None => desc
-      case Some(sel: FunctionSelectExpr) => desc
-      case _ => s"coalesce(${desc}, '')"
-    }
-  }
-  def alias: Option[String] = {
-    if (aliasOpt.isEmpty) {
-      val aliasSeq = (head +: selectors).flatMap(_.alias)
-      if (aliasSeq.nonEmpty) Some(aliasSeq.mkString("_")) else None
-    } else aliasOpt
-  }
-
-  override def map(func: (Expr) => Expr): SelectionExpr = {
-    SelectionExpr(func(head).asInstanceOf[HeadExpr],
-      selectors.map(func(_).asInstanceOf[SelectExpr]), aliasOpt)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/TreeNode.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/TreeNode.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/TreeNode.scala
deleted file mode 100644
index aab16b4..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/TreeNode.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.dsl.expr
-
-trait TreeNode extends Serializable {
-
-  var children = Seq[TreeNode]()
-
-  def addChild(expr: TreeNode) = { children :+= expr }
-  def addChildren(exprs: Seq[TreeNode]) = { children ++= exprs }
-
-  def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, 
combOp: (T, T) => T): T = {
-    if (this.isInstanceOf[A]) {
-      val tv = seqOp(this.asInstanceOf[A], z)
-      children.foldLeft(combOp(z, tv)) { (ov, tn) =>
-        combOp(ov, tn.preOrderTraverseDepthFirst(z)(seqOp, combOp))
-      }
-    } else z
-  }
-  def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, 
combOp: (T, T) => T): T = {
-    if (this.isInstanceOf[A]) {
-      val cv = children.foldLeft(z) { (ov, tn) =>
-        combOp(ov, tn.postOrderTraverseDepthFirst(z)(seqOp, combOp))
-      }
-      combOp(z, seqOp(this.asInstanceOf[A], cv))
-    } else z
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
deleted file mode 100644
index 3a0d737..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
+++ /dev/null
@@ -1,388 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.dsl.parser
-
-import org.apache.griffin.measure.rule.dsl.expr._
-
-import scala.util.parsing.combinator.JavaTokenParsers
-
-trait BasicParser extends JavaTokenParsers with Serializable {
-
-  val dataSourceNames: Seq[String]
-  val functionNames: Seq[String]
-
-  private def trim(str: String): String = {
-    val regex = """`(.*)`""".r
-    str match {
-      case regex(s) => s
-      case _ => str
-    }
-  }
-
-  /**
-    * BNF for basic parser
-    *
-    * -- literal --
-    * <literal> ::= <literal-string> | <literal-number> | <literal-time> | 
<literal-boolean> | <literal-null> | <literal-nan>
-    * <literal-string> ::= <any-string>
-    * <literal-number> ::= <integer> | <double>
-    * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms")
-    * <literal-boolean> ::= true | false
-    * <literal-null> ::= null
-    * <literal-nan> ::= nan
-    *
-    * -- selection --
-    * <selection> ::= <selection-head> [ <field-sel> | <index-sel> | 
<function-sel> ]* [<as-alias>]?
-    * <selection-head> ::= ("data source name registered") | <function> | 
<field-name> | <all-selection>
-    * <field-sel> ::= "." <field-name> | "[" <quote-field-name> "]"
-    * <index-sel> ::= "[" <arg> "]"
-    * <function-sel> ::= "." <function-name> "(" [<arg>]? [, <arg>]* ")"
-    * <arg> ::= <math-expr>
-    *
-    * -- as alias --
-    * <as-alias> ::= <as> <field-name>
-    *
-    * -- math expr --
-    * <math-factor> ::= <literal> | <function> | <selection> | "(" <math-expr> 
")" [<as-alias>]?
-    * <unary-math-expr> ::= [<unary-opr>]* <math-factor>
-    * <binary-math-expr> ::= <unary-math-expr> [<binary-opr> 
<unary-math-expr>]+
-    * <math-expr> ::= <binary-math-expr>
-    *
-    * -- logical expr --
-    * <in-expr> ::= <math-expr> [<not>]? <in> <range-expr>
-    * <between-expr> ::= <math-expr> [<not>]? <between> (<math-expr> <and> 
<math-expr> | <range-expr>)
-    * <range-expr> ::= "(" [<math-expr>]? [, <math-expr>]+ ")"
-    * <like-expr> ::= <math-expr> [<not>]? <like> <math-expr>
-    * <is-null-expr> ::= <math-expr> <is> [<not>]? <null>
-    * <is-nan-expr> ::= <math-expr> <is> [<not>]? <nan>
-    *
-    * <logical-factor> ::= <math-expr> | <in-expr> | <between-expr> | 
<like-expr> | <is-null-expr> | <is-nan-expr> | "(" <logical-expr> ")" 
[<as-alias>]?
-    * <unary-logical-expr> ::= [<unary-logical-opr>]* <logical-factor>
-    * <binary-logical-expr> ::= <unary-logical-expr> [<binary-logical-opr> 
<unary-logical-expr>]+
-    * <logical-expr> ::= <binary-logical-expr>
-    *
-    * -- expression --
-    * <expr> = <math-expr> | <logical-expr>
-    *
-    * -- function expr --
-    * <function> ::= <function-name> "(" [<arg>] [, <arg>]+ ")" [<as-alias>]?
-    * <function-name> ::= ("function name registered")
-    * <arg> ::= <expr>
-    *
-    * -- clauses --
-    * <select-clause> = <expr> [, <expr>]*
-    * <where-clause> = <where> <expr>
-    * <from-clause> = <from> ("data source name registered")
-    * <having-clause> = <having> <expr>
-    * <groupby-clause> = <group> <by> <expr> [ <having-clause> ]?
-    * <orderby-item> = <expr> [ <DESC> ]?
-    * <orderby-clause> = <order> <by> <orderby-item> [ , <orderby-item> ]*
-    * <limit-clause> = <limit> <expr>
-    *
-    * -- combined clauses --
-    * <combined-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> 
]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+
-    */
-
-  protected def genDataSourceNamesParser(names: Seq[String]): Parser[String] = 
{
-    names.reverse.map {
-      fn => s"""(?i)`${fn}`|${fn}""".r: Parser[String]
-    }.reduce(_ | _)
-  }
-  protected def genFunctionNamesParser(names: Seq[String]): Parser[String] = {
-    names.reverse.map {
-      fn => s"""(?i)${fn}""".r: Parser[String]
-    }.reduce(_ | _)
-  }
-
-  object Literal {
-    val NULL: Parser[String] = """(?i)null""".r
-    val NAN: Parser[String] = """(?i)nan""".r
-  }
-  import Literal._
-
-  object Operator {
-    val MATH_UNARY: Parser[String] = "+" | "-"
-    val MATH_BINARIES: Seq[Parser[String]] = Seq(("*" | "/" | "%"), ("+" | 
"-"))
-
-    val NOT: Parser[String] = """(?i)not\s""".r | "!"
-    val AND: Parser[String] = """(?i)and\s""".r | "&&"
-    val OR: Parser[String] = """(?i)or\s""".r | "||"
-    val IN: Parser[String] = """(?i)in\s""".r
-    val BETWEEN: Parser[String] = """(?i)between\s""".r
-    val AND_ONLY: Parser[String] = """(?i)and\s""".r
-    val IS: Parser[String] = """(?i)is\s""".r
-    val LIKE: Parser[String] = """(?i)like\s""".r
-    val COMPARE: Parser[String] = "=" | "!=" | "<>" | "<=" | ">=" | "<" | ">"
-    val LOGICAL_UNARY: Parser[String] = NOT
-    val LOGICAL_BINARIES: Seq[Parser[String]] = Seq((COMPARE), (AND), (OR))
-
-    val LSQBR: Parser[String] = "["
-    val RSQBR: Parser[String] = "]"
-    val LBR: Parser[String] = "("
-    val RBR: Parser[String] = ")"
-
-    val DOT: Parser[String] = "."
-    val ALLSL: Parser[String] = "*"
-    val SQUOTE: Parser[String] = "'"
-    val DQUOTE: Parser[String] = "\""
-    val UQUOTE: Parser[String] = "`"
-    val COMMA: Parser[String] = ","
-
-    val SELECT: Parser[String] = """(?i)select\s""".r
-    val DISTINCT: Parser[String] = """(?i)distinct""".r
-//    val ALL: Parser[String] = """(?i)all""".r
-    val FROM: Parser[String] = """(?i)from\s""".r
-    val AS: Parser[String] = """(?i)as\s""".r
-    val WHERE: Parser[String] = """(?i)where\s""".r
-    val GROUP: Parser[String] = """(?i)group\s""".r
-    val ORDER: Parser[String] = """(?i)order\s""".r
-    val SORT: Parser[String] = """(?i)sort\s""".r
-    val BY: Parser[String] = """(?i)by\s""".r
-    val DESC: Parser[String] = """(?i)desc""".r
-    val ASC: Parser[String] = """(?i)asc""".r
-    val HAVING: Parser[String] = """(?i)having\s""".r
-    val LIMIT: Parser[String] = """(?i)limit\s""".r
-  }
-  import Operator._
-
-  object Strings {
-    def AnyString: Parser[String] = """"(?:\"|[^\"])*"""".r | 
"""'(?:\'|[^'])*'""".r
-    def SimpleTableFieldName: Parser[String] = """[a-zA-Z_]\w*""".r
-    def UnQuoteTableFieldName: Parser[String] = """`(?:[\\][`]|[^`])*`""".r
-//    def FieldName: Parser[String] = UnQuoteTableFieldName | 
SimpleTableFieldName
-    def DataSourceName: Parser[String] = 
genDataSourceNamesParser(dataSourceNames)
-    def FunctionName: Parser[String] = genFunctionNamesParser(functionNames)
-
-    def IntegerNumber: Parser[String] = """[+\-]?\d+""".r
-    def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r
-    def IndexNumber: Parser[String] = IntegerNumber
-
-    def TimeString: Parser[String] = """([+\-]?\d+)(d|h|m|s|ms)""".r
-    def BooleanString: Parser[String] = """(?i)true|false""".r
-  }
-  import Strings._
-
-  /**
-    * -- literal --
-    * <literal> ::= <literal-string> | <literal-number> | <literal-time> | 
<literal-boolean> | <literal-null> | <literal-nan>
-    * <literal-string> ::= <any-string>
-    * <literal-number> ::= <integer> | <double>
-    * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms")
-    * <literal-boolean> ::= true | false
-    * <literal-null> ::= null
-    * <literal-nan> ::= nan
-    */
-  def literal: Parser[LiteralExpr] = literalNull | literalNan | literalBoolean 
| literalString | literalTime | literalNumber
-  def literalNull: Parser[LiteralNullExpr] = NULL ^^ { LiteralNullExpr(_) }
-  def literalNan: Parser[LiteralNanExpr] = NAN ^^ { LiteralNanExpr(_) }
-  def literalString: Parser[LiteralStringExpr] = AnyString ^^ { 
LiteralStringExpr(_) }
-  def literalNumber: Parser[LiteralNumberExpr] = (DoubleNumber | 
IntegerNumber) ^^ { LiteralNumberExpr(_) }
-  def literalTime: Parser[LiteralTimeExpr] = TimeString ^^ { 
LiteralTimeExpr(_) }
-  def literalBoolean: Parser[LiteralBooleanExpr] = BooleanString ^^ { 
LiteralBooleanExpr(_) }
-
-  /**
-    * -- selection --
-    * <selection> ::= <selection-head> [ <field-sel> | <index-sel> | 
<function-sel> ]* [<as-alias>]?
-    * <selection-head> ::= ("data source name registered") | <function> | 
<field-name> | <all-selection>
-    * <field-sel> ::= "." <field-name> | "[" <quote-field-name> "]"
-    * <index-sel> ::= "[" <arg> "]"
-    * <function-sel> ::= "." <function-name> "(" [<arg>]? [, <arg>]* ")"
-    * <arg> ::= <math-expr>
-    */
-
-  def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ~ 
opt(asAlias) ^^ {
-    case head ~ sels ~ aliasOpt => SelectionExpr(head, sels, aliasOpt)
-  }
-  def selectionHead: Parser[HeadExpr] = DataSourceName ^^ {
-    ds => DataSourceHeadExpr(trim(ds))
-  } | function ^^ {
-    OtherHeadExpr(_)
-  } | SimpleTableFieldName ^^ {
-    FieldNameHeadExpr(_)
-  } | UnQuoteTableFieldName ^^ { s =>
-    FieldNameHeadExpr(trim(s))
-  } | ALLSL ^^ { _ =>
-    AllSelectHeadExpr()
-  }
-  def selector: Parser[SelectExpr] = functionSelect | allFieldsSelect | 
fieldSelect | indexSelect
-  def allFieldsSelect: Parser[AllFieldsSelectExpr] = DOT ~> ALLSL ^^ { _ => 
AllFieldsSelectExpr() }
-  def fieldSelect: Parser[FieldSelectExpr] = DOT ~> (
-    SimpleTableFieldName ^^ {
-      FieldSelectExpr(_)
-    } | UnQuoteTableFieldName ^^ { s =>
-      FieldSelectExpr(trim(s))
-    })
-  def indexSelect: Parser[IndexSelectExpr] = LSQBR ~> argument <~ RSQBR ^^ { 
IndexSelectExpr(_) }
-  def functionSelect: Parser[FunctionSelectExpr] = DOT ~ FunctionName ~ LBR ~ 
repsep(argument, COMMA) ~ RBR ^^ {
-    case _ ~ name ~ _ ~ args ~ _ => FunctionSelectExpr(name, args)
-  }
-
-  /**
-    * -- as alias --
-    * <as-alias> ::= <as> <field-name>
-    */
-  def asAlias: Parser[String] = AS ~> (SimpleTableFieldName | 
UnQuoteTableFieldName ^^ { trim(_) })
-
-  /**
-    * -- math expr --
-    * <math-factor> ::= <literal> | <function> | <selection> | "(" <math-expr> 
")" [<as-alias>]?
-    * <unary-math-expr> ::= [<unary-opr>]* <math-factor>
-    * <binary-math-expr> ::= <unary-math-expr> [<binary-opr> 
<unary-math-expr>]+
-    * <math-expr> ::= <binary-math-expr>
-    */
-
-  def mathFactor: Parser[MathExpr] = (literal | function | selection) ^^ {
-    MathFactorExpr(_, false, None)
-  } | LBR ~ mathExpression ~ RBR ~ opt(asAlias) ^^ {
-    case _ ~ expr ~ _ ~ aliasOpt => MathFactorExpr(expr, true, aliasOpt)
-  }
-  def unaryMathExpression: Parser[MathExpr] = rep(MATH_UNARY) ~ mathFactor ^^ {
-    case Nil ~ a => a
-    case list ~ a => UnaryMathExpr(list, a)
-  }
-  def binaryMathExpressions: Seq[Parser[MathExpr]] =
-    MATH_BINARIES.foldLeft(List[Parser[MathExpr]](unaryMathExpression)) { 
(parsers, binaryParser) =>
-      val pre = parsers.head
-      val cur = pre ~ rep(binaryParser ~ pre) ^^ {
-        case a ~ Nil => a
-        case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
-      }
-      cur :: parsers
-    }
-  def mathExpression: Parser[MathExpr] = binaryMathExpressions.head
-
-  /**
-    * -- logical expr --
-    * <in-expr> ::= <math-expr> [<not>]? <in> <range-expr>
-    * <between-expr> ::= <math-expr> [<not>]? <between> (<math-expr> <and> 
<math-expr> | <range-expr>)
-    * <range-expr> ::= "(" [<math-expr>]? [, <math-expr>]+ ")"
-    * <like-expr> ::= <math-expr> [<not>]? <like> <math-expr>
-    * <is-null-expr> ::= <math-expr> <is> [<not>]? <null>
-    * <is-nan-expr> ::= <math-expr> <is> [<not>]? <nan>
-    *
-    * <logical-factor> ::= <math-expr> | <in-expr> | <between-expr> | 
<like-expr> | <is-null-expr> | <is-nan-expr> | "(" <logical-expr> ")" 
[<as-alias>]?
-    * <unary-logical-expr> ::= [<unary-logical-opr>]* <logical-factor>
-    * <binary-logical-expr> ::= <unary-logical-expr> [<binary-logical-opr> 
<unary-logical-expr>]+
-    * <logical-expr> ::= <binary-logical-expr>
-    */
-
-  def inExpr: Parser[LogicalExpr] = mathExpression ~ opt(NOT) ~ IN ~ LBR ~ 
repsep(mathExpression, COMMA) ~ RBR ^^ {
-    case head ~ notOpt ~ _ ~ _ ~ list ~ _ => InExpr(head, notOpt.isEmpty, list)
-  }
-  def betweenExpr: Parser[LogicalExpr] = mathExpression ~ opt(NOT) ~ BETWEEN ~ 
LBR ~ repsep(mathExpression, COMMA) ~ RBR ^^ {
-    case head ~ notOpt ~ _ ~ _ ~ list ~ _ => BetweenExpr(head, notOpt.isEmpty, 
list)
-  } | mathExpression ~ opt(NOT) ~ BETWEEN ~ mathExpression ~ AND_ONLY ~ 
mathExpression ^^ {
-    case head ~ notOpt ~ _ ~ first ~ _ ~ second => BetweenExpr(head, 
notOpt.isEmpty, Seq(first, second))
-  }
-  def likeExpr: Parser[LogicalExpr] = mathExpression ~ opt(NOT) ~ LIKE ~ 
mathExpression ^^ {
-    case head ~ notOpt ~ _ ~ value => LikeExpr(head, notOpt.isEmpty, value)
-  }
-  def isNullExpr: Parser[LogicalExpr] = mathExpression ~ IS ~ opt(NOT) ~ NULL 
^^ {
-    case head ~ _ ~ notOpt ~ _ => IsNullExpr(head, notOpt.isEmpty)
-  }
-  def isNanExpr: Parser[LogicalExpr] = mathExpression ~ IS ~ opt(NOT) ~ NAN ^^ 
{
-    case head ~ _ ~ notOpt ~ _ => IsNanExpr(head, notOpt.isEmpty)
-  }
-
-  def logicalFactor: Parser[LogicalExpr] = (inExpr | betweenExpr | likeExpr | 
isNullExpr | isNanExpr | mathExpression) ^^ {
-    LogicalFactorExpr(_, false, None)
-  } | LBR ~ logicalExpression ~ RBR ~ opt(asAlias) ^^ {
-    case _ ~ expr ~ _ ~ aliasOpt => LogicalFactorExpr(expr, true, aliasOpt)
-  }
-  def unaryLogicalExpression: Parser[LogicalExpr] = rep(LOGICAL_UNARY) ~ 
logicalFactor ^^ {
-    case Nil ~ a => a
-    case list ~ a => UnaryLogicalExpr(list, a)
-  }
-  def binaryLogicalExpressions: Seq[Parser[LogicalExpr]] =
-    
LOGICAL_BINARIES.foldLeft(List[Parser[LogicalExpr]](unaryLogicalExpression)) { 
(parsers, binaryParser) =>
-      val pre = parsers.head
-      val cur = pre ~ rep(binaryParser ~ pre) ^^ {
-        case a ~ Nil => a
-        case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
-      }
-      cur :: parsers
-    }
-  def logicalExpression: Parser[LogicalExpr] = binaryLogicalExpressions.head
-
-  /**
-    * -- expression --
-    * <expr> = <math-expr> | <logical-expr>
-    */
-
-  def expression: Parser[Expr] = logicalExpression | mathExpression
-
-  /**
-    * -- function expr --
-    * <function> ::= <function-name> "(" [<arg>] [, <arg>]+ ")" [<as-alias>]?
-    * <function-name> ::= ("function name registered")
-    * <arg> ::= <expr>
-    */
-
-  def function: Parser[FunctionExpr] = FunctionName ~ LBR ~ opt(DISTINCT) ~ 
repsep(argument, COMMA) ~ RBR ~ opt(asAlias) ^^ {
-    case name ~ _ ~ extraCdtnOpt ~ args ~ _ ~ aliasOpt =>
-      FunctionExpr(name, args, extraCdtnOpt.map(ExtraConditionExpr(_)), 
aliasOpt)
-  }
-  def argument: Parser[Expr] = expression
-
-  /**
-    * -- clauses --
-    * <select-clause> = <expr> [, <expr>]*
-    * <where-clause> = <where> <expr>
-    * <from-clause> = <from> ("data source name registered")
-    * <having-clause> = <having> <expr>
-    * <groupby-clause> = <group> <by> <expr> [ <having-clause> ]?
-    * <orderby-item> = <expr> [ <DESC> ]?
-    * <orderby-clause> = <order> <by> <orderby-item> [ , <orderby-item> ]*
-    * <limit-clause> = <limit> <expr>
-    */
-
-  def selectClause: Parser[SelectClause] = opt(SELECT) ~> opt(DISTINCT) ~ 
rep1sep(expression, COMMA) ^^ {
-    case extraCdtnOpt ~ exprs => SelectClause(exprs, 
extraCdtnOpt.map(ExtraConditionExpr(_)))
-  }
-  def fromClause: Parser[FromClause] = FROM ~> DataSourceName ^^ { ds => 
FromClause(trim(ds)) }
-  def whereClause: Parser[WhereClause] = WHERE ~> expression ^^ { 
WhereClause(_) }
-  def havingClause: Parser[Expr] = HAVING ~> expression
-  def groupbyClause: Parser[GroupbyClause] = GROUP ~ BY ~ rep1sep(expression, 
COMMA) ~ opt(havingClause) ^^ {
-    case _ ~ _ ~ cols ~ havingOpt => GroupbyClause(cols, havingOpt)
-  }
-  def orderItem: Parser[OrderItem] = expression ~ opt(DESC | ASC) ^^ {
-    case expr ~ orderOpt => OrderItem(expr, orderOpt)
-  }
-  def orderbyClause: Parser[OrderbyClause] = ORDER ~ BY ~ rep1sep(orderItem, 
COMMA) ^^ {
-    case _ ~ _ ~ cols => OrderbyClause(cols)
-  }
-  def sortbyClause: Parser[SortbyClause] = SORT ~ BY ~ rep1sep(orderItem, 
COMMA) ^^ {
-    case _ ~ _ ~ cols => SortbyClause(cols)
-  }
-  def limitClause: Parser[LimitClause] = LIMIT ~> expression ^^ { 
LimitClause(_) }
-
-  /**
-    * -- combined clauses --
-    * <combined-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> 
]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+
-    */
-
-  def combinedClause: Parser[CombinedClause] = selectClause ~ opt(fromClause) 
~ opt(whereClause) ~
-    opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ {
-    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
-      val tails = Seq(whereOpt, groupbyOpt, orderbyOpt, limitOpt).flatMap(opt 
=> opt)
-      CombinedClause(sel, fromOpt, tails)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
deleted file mode 100644
index b4496e7..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.dsl.parser
-
-import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.dsl.expr._
-
-case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: 
Seq[String]
-                           ) extends BasicParser {
-
-  import Operator._
-
-  /**
-    * -- profiling clauses --
-    * <profiling-clauses> = <select-clause> [ <from-clause> ]+ [ 
<where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> 
]+
-    */
-
-  def profilingClause: Parser[ProfilingClause] = selectClause ~ 
opt(fromClause) ~ opt(whereClause) ~
-    opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ {
-    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
-      val preClauses = Seq(whereOpt).flatMap(opt => opt)
-      val postClauses = Seq(orderbyOpt, limitOpt).flatMap(opt => opt)
-      ProfilingClause(sel, fromOpt, groupbyOpt, preClauses, postClauses)
-    }
-  }
-
-  /**
-    * -- uniqueness clauses --
-    * <uniqueness-clauses> = <expr> [, <expr>]+
-    */
-  def uniquenessClause: Parser[UniquenessClause] = rep1sep(expression, 
Operator.COMMA) ^^ {
-    case exprs => UniquenessClause(exprs)
-  }
-
-  /**
-    * -- distinctness clauses --
-    * <sqbr-expr> = "[" <expr> "]"
-    * <dist-expr> = <sqbr-expr> | <expr>
-    * <distinctness-clauses> = <distExpr> [, <distExpr>]+
-    */
-  def sqbrExpr: Parser[Expr] = LSQBR ~> expression <~ RSQBR ^^ {
-    case expr => { expr.tag = "[]"; expr}
-  }
-  def distExpr: Parser[Expr] = expression | sqbrExpr
-  def distinctnessClause: Parser[DistinctnessClause] = rep1sep(distExpr, 
Operator.COMMA) ^^ {
-    case exprs => DistinctnessClause(exprs)
-  }
-
-  /**
-    * -- timeliness clauses --
-    * <timeliness-clauses> = <expr> [, <expr>]+
-    */
-  def timelinessClause: Parser[TimelinessClause] = rep1sep(expression, 
Operator.COMMA) ^^ {
-    case exprs => TimelinessClause(exprs)
-  }
-
-  /**
-    * -- completeness clauses --
-    * <completeness-clauses> = <expr> [, <expr>]+
-    */
-  def completenessClause: Parser[CompletenessClause] = rep1sep(expression, 
Operator.COMMA) ^^ {
-    case exprs => CompletenessClause(exprs)
-  }
-
-  def parseRule(rule: String, dqType: DqType): ParseResult[Expr] = {
-    val rootExpr = dqType match {
-      case AccuracyType => logicalExpression
-      case ProfilingType => profilingClause
-      case UniquenessType => uniquenessClause
-      case DistinctnessType => distinctnessClause
-      case TimelinessType => timelinessClause
-      case CompletenessType => completenessClause
-      case _ => expression
-    }
-    parseAll(rootExpr, rule)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala
deleted file mode 100644
index f0afc6c..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.plan
-
-import org.apache.griffin.measure.rule.dsl._
-
-case class DfOprStep(name: String,
-                     rule: String,
-                     details: Map[String, Any],
-                     cache: Boolean = false,
-                     global: Boolean = false
-                    ) extends RuleStep {
-
-  val dslType: DslType = DfOprType
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala
deleted file mode 100644
index 4956b29..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.plan
-
-case class DsUpdate(dsName: String,
-                    stepName: String
-                   ) extends Serializable {
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
deleted file mode 100644
index 84313e4..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.plan
-
-import org.apache.griffin.measure.process.ExportMode
-import org.apache.griffin.measure.rule.dsl._
-
-case class MetricExport(name: String,
-                        stepName: String,
-                        collectType: CollectType,
-                        defTimestamp: Long,
-                        mode: ExportMode
-                       ) extends RuleExport {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
deleted file mode 100644
index c69dc55..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.plan
-
-import org.apache.griffin.measure.process.ExportMode
-
-case class RecordExport(name: String,
-                        stepName: String,
-                        dataSourceCacheOpt: Option[String],
-                        originDFOpt: Option[String],
-                        defTimestamp: Long,
-                        mode: ExportMode
-                       ) extends RuleExport {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
deleted file mode 100644
index da5eb9d..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.plan
-
-import org.apache.griffin.measure.process.ExportMode
-
-trait RuleExport extends Serializable {
-
-  val name: String    // export name
-
-  val stepName: String    // the dependant step name
-
-  val defTimestamp: Long    // the default timestamp if tmst not in value
-
-  val mode: ExportMode   // export mode
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala
deleted file mode 100644
index 678ab3e..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.plan
-
-import scala.reflect.ClassTag
-
-case class RulePlan(ruleSteps: Seq[RuleStep],
-                    ruleExports: Seq[RuleExport],
-                    dsUpdates: Seq[DsUpdate] = Nil
-                   ) extends Serializable {
-
-  val globalRuleSteps = filterRuleSteps(_.global)
-  val normalRuleSteps = filterRuleSteps(!_.global)
-
-  val metricExports = filterRuleExports[MetricExport](ruleExports)
-  val recordExports = filterRuleExports[RecordExport](ruleExports)
-
-  private def filterRuleSteps(func: (RuleStep) => Boolean): Seq[RuleStep] = {
-    ruleSteps.filter(func)
-  }
-
-  private def filterRuleExports[T <: RuleExport: ClassTag](exports: 
Seq[RuleExport]): Seq[T] = {
-    exports.flatMap { exp =>
-      exp match {
-        case e: T => Some(e)
-        case _ => None
-      }
-    }
-  }
-
-//  def ruleStepNames(func: (RuleStep) => Boolean): Seq[String] = {
-//    ruleSteps.filter(func).map(_.name)
-//  }
-
-  def merge(rp: RulePlan): RulePlan = {
-    RulePlan(
-      this.ruleSteps ++ rp.ruleSteps,
-      this.ruleExports ++ rp.ruleExports,
-      this.dsUpdates ++ rp.dsUpdates
-    )
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala
deleted file mode 100644
index dbdb2d5..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.plan
-
-import org.apache.griffin.measure.rule.dsl.DslType
-
-trait RuleStep extends Serializable {
-
-  val dslType: DslType
-
-  val name: String
-
-  val rule: String
-
-  val cache: Boolean
-
-  val global: Boolean
-
-  val details: Map[String, Any]
-
-  def needCache: Boolean = cache || global
-
-  def isGlobal: Boolean = global
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala
deleted file mode 100644
index 16da9a5..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.plan
-
-import org.apache.griffin.measure.rule.dsl._
-
-case class SparkSqlStep(name: String,
-                        rule: String,
-                        details: Map[String, Any],
-                        cache: Boolean = false,
-                        global: Boolean = false
-                       ) extends RuleStep {
-
-  val dslType: DslType = SparkSqlType
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala
deleted file mode 100644
index 129d068..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.plan
-
-trait TimeInfo extends Serializable {
-  val calcTime: Long
-//  val tmst: Long
-  val head: String
-
-  def key: String = if (head.nonEmpty) s"${head}_${calcTime}" else 
s"${calcTime}"
-  def setHead(h: String): TimeInfo
-}
-
-case class CalcTimeInfo(calcTime: Long, head: String = "") extends TimeInfo {
-//  val tmst: Long = calcTime
-  def setHead(h: String): TimeInfo = CalcTimeInfo(calcTime, h)
-}
-
-//case class TmstTimeInfo(calcTime: Long, tmst: Long, head: String = "") 
extends TimeInfo {
-//  def setHead(h: String): TimeInfo = TmstTimeInfo(calcTime, tmst, h)
-//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala
deleted file mode 100644
index 22d64d8..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.preproc
-
-object PreProcRuleGenerator {
-
-  val _name = "name"
-
-  def genPreProcRules(rules: Seq[Map[String, Any]], suffix: String): 
Seq[Map[String, Any]] = {
-    if (rules == null) Nil else {
-      rules.map { rule =>
-        genPreProcRule(rule, suffix)
-      }
-    }
-  }
-
-  def getRuleNames(rules: Seq[Map[String, Any]]): Seq[String] = {
-    if (rules == null) Nil else {
-      rules.flatMap { rule =>
-        rule.get(_name) match {
-          case Some(s: String) => Some(s)
-          case _ => None
-        }
-      }
-    }
-  }
-
-  private def genPreProcRule(param: Map[String, Any], suffix: String
-                            ): Map[String, Any] = {
-    val keys = param.keys
-    keys.foldLeft(param) { (map, key) =>
-      map.get(key) match {
-        case Some(s: String) => map + (key -> genNewString(s, suffix))
-        case Some(subMap: Map[String, Any]) => map + (key -> 
genPreProcRule(subMap, suffix))
-        case Some(arr: Seq[_]) => map + (key -> genPreProcRule(arr, suffix))
-        case _ => map
-      }
-    }
-  }
-
-  private def genPreProcRule(paramArr: Seq[Any], suffix: String): Seq[Any] = {
-    paramArr.foldLeft(Nil: Seq[Any]) { (res, param) =>
-      param match {
-        case s: String => res :+ genNewString(s, suffix)
-        case map: Map[String, Any] => res :+ genPreProcRule(map, suffix)
-        case arr: Seq[_] => res :+ genPreProcRule(arr, suffix)
-        case _ => res :+ param
-      }
-    }
-  }
-
-  private def genNewString(str: String, suffix: String): String = {
-    str.replaceAll("""\$\{(.*)\}""", s"$$1_${suffix}")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
deleted file mode 100644
index ec746d2..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.trans
-
-import org.apache.griffin.measure.process.engine.DataFrameOprs.AccuracyOprKeys
-import org.apache.griffin.measure.process.temp.TableRegisters
-import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, 
ProcessType, StreamingProcessType}
-import org.apache.griffin.measure.rule.adaptor._
-import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
-import org.apache.griffin.measure.rule.dsl.analyzer.AccuracyAnalyzer
-import org.apache.griffin.measure.rule.dsl.expr.{Expr, LogicalExpr}
-import org.apache.griffin.measure.rule.plan._
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.rule.trans.RuleExportFactory._
-import org.apache.griffin.measure.rule.trans.DsUpdateFactory._
-
-import scala.util.Try
-
-case class AccuracyRulePlanTrans(dataSourceNames: Seq[String],
-                                 timeInfo: TimeInfo, name: String, expr: Expr,
-                                 param: Map[String, Any], procType: ProcessType
-                                ) extends RulePlanTrans {
-
-  private object AccuracyKeys {
-    val _source = "source"
-    val _target = "target"
-    val _miss = "miss"
-    val _total = "total"
-    val _matched = "matched"
-  }
-  import AccuracyKeys._
-
-  def trans(): Try[RulePlan] = Try {
-    val details = getDetails(param)
-    val sourceName = details.getString(_source, dataSourceNames.head)
-    val targetName = details.getString(_target, dataSourceNames.tail.head)
-    val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], 
sourceName, targetName)
-
-    val mode = ExportMode.defaultMode(procType)
-
-    val ct = timeInfo.calcTime
-
-    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      println(s"[${ct}] data source ${sourceName} not exists")
-      emptyRulePlan
-    } else {
-      // 1. miss record
-      val missRecordsTableName = "__missRecords"
-      val selClause = s"`${sourceName}`.*"
-      val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, 
targetName)) {
-        println(s"[${ct}] data source ${targetName} not exists")
-        s"SELECT ${selClause} FROM `${sourceName}`"
-      } else {
-        val onClause = expr.coalesceDesc
-        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
-          s"${sel.desc} IS NULL"
-        }.mkString(" AND ")
-        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
-          s"${sel.desc} IS NULL"
-        }.mkString(" AND ")
-        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
-        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` 
ON ${onClause} WHERE ${whereClause}"
-      }
-      val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, 
emptyMap, true)
-      val missRecordsExports = procType match {
-        case BatchProcessType => {
-          val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          genRecordExport(recordParam, missRecordsTableName, 
missRecordsTableName, ct, mode) :: Nil
-        }
-        case StreamingProcessType => Nil
-      }
-      val missRecordsUpdates = procType match {
-        case BatchProcessType => Nil
-        case StreamingProcessType => {
-          val updateParam = emptyMap
-          genDsUpdate(updateParam, sourceName, missRecordsTableName) :: Nil
-        }
-      }
-
-      // 2. miss count
-      val missCountTableName = "__missCount"
-      val missColName = details.getStringOrKey(_miss)
-      val missCountSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM 
`${missRecordsTableName}`"
-        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY 
`${InternalColumns.tmst}`"
-      }
-      val missCountStep = SparkSqlStep(missCountTableName, missCountSql, 
emptyMap)
-
-      // 3. total count
-      val totalCountTableName = "__totalCount"
-      val totalColName = details.getStringOrKey(_total)
-      val totalCountSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
-        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY 
`${InternalColumns.tmst}`"
-      }
-      val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, 
emptyMap)
-
-      // 4. accuracy metric
-      val accuracyTableName = name
-      val matchedColName = details.getStringOrKey(_matched)
-      val accuracyMetricSql = procType match {
-        case BatchProcessType => {
-          s"""
-             |SELECT `${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
-             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
-             |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
-             |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
-         """.stripMargin
-        }
-        case StreamingProcessType => {
-          s"""
-             |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS 
`${InternalColumns.tmst}`,
-             |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
-             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
-             |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
-             |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
-             |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = 
`${missCountTableName}`.`${InternalColumns.tmst}`
-         """.stripMargin
-        }
-      }
-      val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, 
emptyMap)
-      val accuracyExports = procType match {
-        case BatchProcessType => {
-          val metricParam = 
RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-          genMetricExport(metricParam, accuracyTableName, accuracyTableName, 
ct, mode) :: Nil
-        }
-        case StreamingProcessType => Nil
-      }
-
-      // current accu plan
-      val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: 
accuracyStep :: Nil
-      val accuExports = missRecordsExports ++ accuracyExports
-      val accuUpdates = missRecordsUpdates
-      val accuPlan = RulePlan(accuSteps, accuExports, accuUpdates)
-
-      // streaming extra accu plan
-      val streamingAccuPlan = procType match {
-        case BatchProcessType => emptyRulePlan
-        case StreamingProcessType => {
-          // 5. accuracy metric merge
-          val accuracyMetricTableName = "__accuracy"
-          val accuracyMetricRule = "accuracy"
-          val accuracyMetricDetails = Map[String, Any](
-            (AccuracyOprKeys._dfName -> accuracyTableName),
-            (AccuracyOprKeys._miss -> missColName),
-            (AccuracyOprKeys._total -> totalColName),
-            (AccuracyOprKeys._matched -> matchedColName)
-          )
-          val accuracyMetricStep = DfOprStep(accuracyMetricTableName,
-            accuracyMetricRule, accuracyMetricDetails)
-          val metricParam = 
RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-          val accuracyMetricExports = genMetricExport(metricParam, name, 
accuracyMetricTableName, ct, mode) :: Nil
-
-          // 6. collect accuracy records
-          val accuracyRecordTableName = "__accuracyRecords"
-          val accuracyRecordSql = {
-            s"""
-               |SELECT `${InternalColumns.tmst}`, `${InternalColumns.empty}`
-               |FROM `${accuracyMetricTableName}` WHERE 
`${InternalColumns.record}`
-             """.stripMargin
-          }
-          val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, 
accuracyRecordSql, emptyMap)
-          val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          val accuracyRecordParam = 
recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName)
-            .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName)
-          val accuracyRecordExports = genRecordExport(
-            accuracyRecordParam, missRecordsTableName, 
accuracyRecordTableName, ct, mode) :: Nil
-
-          // gen accu plan
-          val extraSteps = accuracyMetricStep :: accuracyRecordStep :: Nil
-          val extraExports = accuracyMetricExports ++ accuracyRecordExports
-          val extraPlan = RulePlan(extraSteps, extraExports)
-
-          extraPlan
-        }
-      }
-
-      // return accu plan
-      accuPlan.merge(streamingAccuPlan)
-
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
deleted file mode 100644
index 1b89587..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.trans
-
-import org.apache.griffin.measure.process.temp.TableRegisters
-import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, 
ProcessType, StreamingProcessType}
-import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
-import org.apache.griffin.measure.rule.adaptor._
-import org.apache.griffin.measure.rule.dsl.analyzer.CompletenessAnalyzer
-import org.apache.griffin.measure.rule.dsl.expr._
-import org.apache.griffin.measure.rule.plan._
-import org.apache.griffin.measure.rule.trans.RuleExportFactory._
-import org.apache.griffin.measure.utils.ParamUtil._
-
-import scala.util.Try
-
-case class CompletenessRulePlanTrans(dataSourceNames: Seq[String],
-                                     timeInfo: TimeInfo, name: String, expr: 
Expr,
-                                     param: Map[String, Any], procType: 
ProcessType
-                                    ) extends RulePlanTrans {
-
-  private object CompletenessKeys {
-    val _source = "source"
-    val _total = "total"
-    val _complete = "complete"
-    val _incomplete = "incomplete"
-  }
-  import CompletenessKeys._
-
-  def trans(): Try[RulePlan] =  Try {
-    val details = getDetails(param)
-    val completenessClause = expr.asInstanceOf[CompletenessClause]
-    val sourceName = details.getString(_source, dataSourceNames.head)
-
-    val mode = ExportMode.defaultMode(procType)
-
-    val ct = timeInfo.calcTime
-
-    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      emptyRulePlan
-    } else {
-      val analyzer = CompletenessAnalyzer(completenessClause, sourceName)
-
-      val selItemsClause = analyzer.selectionPairs.map { pair =>
-        val (expr, alias) = pair
-        s"${expr.desc} AS `${alias}`"
-      }.mkString(", ")
-      val aliases = analyzer.selectionPairs.map(_._2)
-
-      val selClause = procType match {
-        case BatchProcessType => selItemsClause
-        case StreamingProcessType => s"`${InternalColumns.tmst}`, 
${selItemsClause}"
-      }
-      val selAliases = procType match {
-        case BatchProcessType => aliases
-        case StreamingProcessType => InternalColumns.tmst +: aliases
-      }
-
-      // 1. source alias
-      val sourceAliasTableName = "__sourceAlias"
-      val sourceAliasSql = {
-        s"SELECT ${selClause} FROM `${sourceName}`"
-      }
-      val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, 
emptyMap, true)
-
-      // 2. incomplete record
-      val incompleteRecordsTableName = "__incompleteRecords"
-      val completeWhereClause = aliases.map(a => s"`${a}` IS NOT 
NULL").mkString(" AND ")
-      val incompleteWhereClause = s"NOT (${completeWhereClause})"
-      val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` 
WHERE ${incompleteWhereClause}"
-      val incompleteRecordStep = SparkSqlStep(incompleteRecordsTableName, 
incompleteRecordsSql, emptyMap, true)
-      val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-      val incompleteRecordExport = genRecordExport(recordParam, 
incompleteRecordsTableName, incompleteRecordsTableName, ct, mode)
-
-      // 3. incomplete count
-      val incompleteCountTableName = "__incompleteCount"
-      val incompleteColName = details.getStringOrKey(_incomplete)
-      val incompleteCountSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` 
FROM `${incompleteRecordsTableName}`"
-        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}` GROUP 
BY `${InternalColumns.tmst}`"
-      }
-      val incompleteCountStep = SparkSqlStep(incompleteCountTableName, 
incompleteCountSql, emptyMap)
-
-      // 4. total count
-      val totalCountTableName = "__totalCount"
-      val totalColName = details.getStringOrKey(_total)
-      val totalCountSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceAliasTableName}`"
-        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}` GROUP BY 
`${InternalColumns.tmst}`"
-      }
-      val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, 
emptyMap)
-
-      // 5. complete metric
-      val completeTableName = name
-      val completeColName = details.getStringOrKey(_complete)
-      val completeMetricSql = procType match {
-        case BatchProcessType => {
-          s"""
-             |SELECT `${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
-             |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 
0) AS `${incompleteColName}`,
-             |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS 
`${completeColName}`
-             |FROM `${totalCountTableName}` LEFT JOIN 
`${incompleteCountTableName}`
-         """.stripMargin
-        }
-        case StreamingProcessType => {
-          s"""
-             |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS 
`${InternalColumns.tmst}`,
-             |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
-             |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 
0) AS `${incompleteColName}`,
-             |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS 
`${completeColName}`
-             |FROM `${totalCountTableName}` LEFT JOIN 
`${incompleteCountTableName}`
-             |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = 
`${incompleteCountTableName}`.`${InternalColumns.tmst}`
-         """.stripMargin
-        }
-      }
-      val completeStep = SparkSqlStep(completeTableName, completeMetricSql, 
emptyMap)
-      val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-      val completeExport = genMetricExport(metricParam, completeTableName, 
completeTableName, ct, mode)
-
-      // complete plan
-      val completeSteps = sourceAliasStep :: incompleteRecordStep :: 
incompleteCountStep :: totalCountStep :: completeStep :: Nil
-      val completeExports = incompleteRecordExport :: completeExport :: Nil
-      val completePlan = RulePlan(completeSteps, completeExports)
-
-      completePlan
-    }
-  }
-
-}

Reply via email to