http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
new file mode 100644
index 0000000..17e678e
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
@@ -0,0 +1,270 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.expr
+
+trait ClauseExpression extends Expr {
+}
+
+case class SelectClause(exprs: Seq[Expr], extraConditionOpt: 
Option[ExtraConditionExpr]
+                       ) extends ClauseExpression {
+
+  addChildren(exprs)
+
+  def desc: String = {
+    extraConditionOpt match {
+      case Some(cdtn) => s"${cdtn.desc} ${exprs.map(_.desc).mkString(", ")}"
+      case _ => s"${exprs.map(_.desc).mkString(", ")}"
+    }
+  }
+  def coalesceDesc: String = desc
+
+  override def map(func: (Expr) => Expr): SelectClause = {
+    SelectClause(exprs.map(func(_)), 
extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr]))
+  }
+
+}
+
+case class FromClause(dataSource: String) extends ClauseExpression {
+
+  def desc: String = s"FROM `${dataSource}`"
+  def coalesceDesc: String = desc
+
+}
+
+case class WhereClause(expr: Expr) extends ClauseExpression {
+
+  addChild(expr)
+
+  def desc: String = s"WHERE ${expr.desc}"
+  def coalesceDesc: String = s"WHERE ${expr.coalesceDesc}"
+
+  override def map(func: (Expr) => Expr): WhereClause = {
+    WhereClause(func(expr))
+  }
+
+}
+
+case class GroupbyClause(exprs: Seq[Expr], havingClauseOpt: Option[Expr]) 
extends ClauseExpression {
+
+  addChildren(exprs ++ havingClauseOpt.toSeq)
+
+  def desc: String = {
+    val gbs = exprs.map(_.desc).mkString(", ")
+    havingClauseOpt match {
+      case Some(having) => s"GROUP BY ${gbs} HAVING ${having.desc}"
+      case _ => s"GROUP BY ${gbs}"
+    }
+  }
+  def coalesceDesc: String = {
+    val gbs = exprs.map(_.desc).mkString(", ")
+    havingClauseOpt match {
+      case Some(having) => s"GROUP BY ${gbs} HAVING ${having.coalesceDesc}"
+      case _ => s"GROUP BY ${gbs}"
+    }
+  }
+
+  def merge(other: GroupbyClause): GroupbyClause = {
+    val newHavingClauseOpt = (havingClauseOpt, other.havingClauseOpt) match {
+      case (Some(hc), Some(ohc)) => {
+        val logical1 = LogicalFactorExpr(hc, false, None)
+        val logical2 = LogicalFactorExpr(ohc, false, None)
+        Some(BinaryLogicalExpr(logical1, ("AND", logical2) :: Nil))
+      }
+      case (a @ Some(_), _) => a
+      case (_, b @ Some(_)) => b
+      case (_, _) => None
+    }
+    GroupbyClause(exprs ++ other.exprs, newHavingClauseOpt)
+  }
+
+  override def map(func: (Expr) => Expr): GroupbyClause = {
+    GroupbyClause(exprs.map(func(_)), havingClauseOpt.map(func(_)))
+  }
+
+}
+
+case class OrderItem(expr: Expr, orderOpt: Option[String]) extends Expr {
+  addChild(expr)
+  def desc: String = {
+    orderOpt match {
+      case Some(os) => s"${expr.desc} ${os.toUpperCase}"
+      case _ => s"${expr.desc}"
+    }
+  }
+  def coalesceDesc: String = desc
+
+  override def map(func: (Expr) => Expr): OrderItem = {
+    OrderItem(func(expr), orderOpt)
+  }
+}
+
+case class OrderbyClause(items: Seq[OrderItem]) extends ClauseExpression {
+
+  addChildren(items.map(_.expr))
+
+  def desc: String = {
+    val obs = items.map(_.desc).mkString(", ")
+    s"ORDER BY ${obs}"
+  }
+  def coalesceDesc: String = {
+    val obs = items.map(_.desc).mkString(", ")
+    s"ORDER BY ${obs}"
+  }
+
+  override def map(func: (Expr) => Expr): OrderbyClause = {
+    OrderbyClause(items.map(func(_).asInstanceOf[OrderItem]))
+  }
+}
+
+case class SortbyClause(items: Seq[OrderItem]) extends ClauseExpression {
+
+  addChildren(items.map(_.expr))
+
+  def desc: String = {
+    val obs = items.map(_.desc).mkString(", ")
+    s"SORT BY ${obs}"
+  }
+  def coalesceDesc: String = {
+    val obs = items.map(_.desc).mkString(", ")
+    s"SORT BY ${obs}"
+  }
+
+  override def map(func: (Expr) => Expr): SortbyClause = {
+    SortbyClause(items.map(func(_).asInstanceOf[OrderItem]))
+  }
+}
+
+case class LimitClause(expr: Expr) extends ClauseExpression {
+
+  addChild(expr)
+
+  def desc: String = s"LIMIT ${expr.desc}"
+  def coalesceDesc: String = s"LIMIT ${expr.coalesceDesc}"
+
+  override def map(func: (Expr) => Expr): LimitClause = {
+    LimitClause(func(expr))
+  }
+}
+
+case class CombinedClause(selectClause: SelectClause, fromClauseOpt: 
Option[FromClause],
+                          tails: Seq[ClauseExpression]
+                         ) extends ClauseExpression {
+
+  addChildren({
+    val headClauses: Seq[ClauseExpression] = selectClause +: 
(fromClauseOpt.toSeq)
+    headClauses ++ tails
+  })
+
+  def desc: String = {
+    val selectDesc = s"SELECT ${selectClause.desc}"
+    val fromDesc = fromClauseOpt.map(_.desc).mkString(" ")
+    val headDesc = s"${selectDesc} ${fromDesc}"
+    tails.foldLeft(headDesc) { (head, tail) =>
+      s"${head} ${tail.desc}"
+    }
+  }
+  def coalesceDesc: String = {
+    val selectDesc = s"SELECT ${selectClause.coalesceDesc}"
+    val fromDesc = fromClauseOpt.map(_.coalesceDesc).mkString(" ")
+    val headDesc = s"${selectDesc} ${fromDesc}"
+    tails.foldLeft(headDesc) { (head, tail) =>
+      s"${head} ${tail.coalesceDesc}"
+    }
+  }
+
+  override def map(func: (Expr) => Expr): CombinedClause = {
+    CombinedClause(func(selectClause).asInstanceOf[SelectClause],
+      fromClauseOpt.map(func(_).asInstanceOf[FromClause]),
+      tails.map(func(_).asInstanceOf[ClauseExpression])
+    )
+  }
+}
+
+case class ProfilingClause(selectClause: SelectClause,
+                           fromClauseOpt: Option[FromClause],
+                           groupbyClauseOpt: Option[GroupbyClause],
+                           preGroupbyClauses: Seq[ClauseExpression],
+                           postGroupbyClauses: Seq[ClauseExpression]
+                          ) extends ClauseExpression {
+  addChildren({
+    val headClauses: Seq[ClauseExpression] = selectClause +: 
(fromClauseOpt.toSeq)
+    groupbyClauseOpt match {
+      case Some(gc) => (headClauses ++ preGroupbyClauses) ++ (gc +: 
postGroupbyClauses)
+      case _ => (headClauses ++ preGroupbyClauses) ++ postGroupbyClauses
+    }
+  })
+
+  def desc: String = {
+    val selectDesc = selectClause.desc
+    val fromDesc = fromClauseOpt.map(_.desc).mkString(" ")
+    val groupbyDesc = groupbyClauseOpt.map(_.desc).mkString(" ")
+    val preDesc = preGroupbyClauses.map(_.desc).mkString(" ")
+    val postDesc = postGroupbyClauses.map(_.desc).mkString(" ")
+    s"${selectDesc} ${fromDesc} ${preDesc} ${groupbyDesc} ${postDesc}"
+  }
+  def coalesceDesc: String = {
+    val selectDesc = selectClause.coalesceDesc
+    val fromDesc = fromClauseOpt.map(_.coalesceDesc).mkString(" ")
+    val groupbyDesc = groupbyClauseOpt.map(_.coalesceDesc).mkString(" ")
+    val preDesc = preGroupbyClauses.map(_.coalesceDesc).mkString(" ")
+    val postDesc = postGroupbyClauses.map(_.coalesceDesc).mkString(" ")
+    s"${selectDesc} ${fromDesc} ${preDesc} ${groupbyDesc} ${postDesc}"
+  }
+
+  override def map(func: (Expr) => Expr): ProfilingClause = {
+    ProfilingClause(func(selectClause).asInstanceOf[SelectClause],
+      fromClauseOpt.map(func(_).asInstanceOf[FromClause]),
+      groupbyClauseOpt.map(func(_).asInstanceOf[GroupbyClause]),
+      preGroupbyClauses.map(func(_).asInstanceOf[ClauseExpression]),
+      postGroupbyClauses.map(func(_).asInstanceOf[ClauseExpression])
+    )
+  }
+}
+
+case class UniquenessClause(exprs: Seq[Expr]) extends ClauseExpression {
+  addChildren(exprs)
+
+  def desc: String = exprs.map(_.desc).mkString(", ")
+  def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
+  override def map(func: (Expr) => Expr): UniquenessClause = 
UniquenessClause(exprs.map(func(_)))
+}
+
+case class DistinctnessClause(exprs: Seq[Expr]) extends ClauseExpression {
+  addChildren(exprs)
+
+  def desc: String = exprs.map(_.desc).mkString(", ")
+  def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
+  override def map(func: (Expr) => Expr): DistinctnessClause = 
DistinctnessClause(exprs.map(func(_)))
+}
+
+case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression {
+  addChildren(exprs)
+
+  def desc: String = exprs.map(_.desc).mkString(", ")
+  def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
+  override def map(func: (Expr) => Expr): TimelinessClause = 
TimelinessClause(exprs.map(func(_)))
+}
+
+case class CompletenessClause(exprs: Seq[Expr]) extends ClauseExpression {
+  addChildren(exprs)
+
+  def desc: String = exprs.map(_.desc).mkString(", ")
+  def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
+  override def map(func: (Expr) => Expr): CompletenessClause = 
CompletenessClause(exprs.map(func(_)))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/Expr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/Expr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/Expr.scala
new file mode 100644
index 0000000..3cf9f39
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/Expr.scala
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.expr
+
+/**
+  * expr parsed by griffin dsl
+  */
+trait Expr extends TreeNode with ExprTag with Serializable {
+
+  def desc: String
+
+  def coalesceDesc: String
+
+  def extractSelf: Expr = this
+
+  // execution
+  def map(func: (Expr) => Expr): Expr = func(this)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExprTag.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExprTag.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExprTag.scala
new file mode 100644
index 0000000..583067f
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExprTag.scala
@@ -0,0 +1,23 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.expr
+
+trait ExprTag { this: Expr =>
+  var tag: String = ""
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExtraConditionExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExtraConditionExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExtraConditionExpr.scala
new file mode 100644
index 0000000..856e263
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExtraConditionExpr.scala
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.expr
+
+case class ExtraConditionExpr(cdtn: String) extends Expr {
+
+  def desc: String = cdtn.toUpperCase
+
+  def coalesceDesc: String = desc
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/FunctionExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/FunctionExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/FunctionExpr.scala
new file mode 100644
index 0000000..d87b7f7
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/FunctionExpr.scala
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.expr
+
+case class FunctionExpr(functionName: String, args: Seq[Expr],
+                        extraConditionOpt: Option[ExtraConditionExpr],
+                        aliasOpt: Option[String]
+                       ) extends Expr with AliasableExpr {
+
+  addChildren(args)
+
+  def desc: String = {
+    extraConditionOpt match {
+      case Some(cdtn) => s"${functionName}(${cdtn.desc} 
${args.map(_.desc).mkString(", ")})"
+      case _ => s"${functionName}(${args.map(_.desc).mkString(", ")})"
+    }
+  }
+  def coalesceDesc: String = desc
+  def alias: Option[String] = {
+    if (aliasOpt.isEmpty) {
+      Some(functionName)
+    } else aliasOpt
+  }
+
+  override def map(func: (Expr) => Expr): FunctionExpr = {
+    FunctionExpr(functionName, args.map(func(_)),
+      extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr]), 
aliasOpt)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LiteralExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LiteralExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LiteralExpr.scala
new file mode 100644
index 0000000..afc2dd9
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LiteralExpr.scala
@@ -0,0 +1,72 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.expr
+
+import org.apache.griffin.measure.utils.TimeUtil
+
+trait LiteralExpr extends Expr {
+  def coalesceDesc: String = desc
+}
+
+case class LiteralNullExpr(str: String) extends LiteralExpr {
+  def desc: String = "NULL"
+}
+
+case class LiteralNanExpr(str: String) extends LiteralExpr {
+  def desc: String = "NaN"
+}
+
+case class LiteralStringExpr(str: String) extends LiteralExpr {
+  def desc: String = str
+}
+
+case class LiteralNumberExpr(str: String) extends LiteralExpr {
+  def desc: String = {
+    try {
+      if (str.contains(".")) {
+        str.toDouble.toString
+      } else {
+        str.toLong.toString
+      }
+    } catch {
+      case e: Throwable => throw new Exception(s"${str} is invalid number")
+    }
+  }
+}
+
+case class LiteralTimeExpr(str: String) extends LiteralExpr {
+  def desc: String = {
+    TimeUtil.milliseconds(str) match {
+      case Some(t) => t.toString
+      case _ => throw new Exception(s"${str} is invalid time")
+    }
+  }
+}
+
+case class LiteralBooleanExpr(str: String) extends LiteralExpr {
+  final val TrueRegex = """(?i)true""".r
+  final val FalseRegex = """(?i)false""".r
+  def desc: String = {
+    str match {
+      case TrueRegex() => true.toString
+      case FalseRegex() => false.toString
+      case _ => throw new Exception(s"${str} is invalid boolean")
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
new file mode 100644
index 0000000..af1223f
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
@@ -0,0 +1,204 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.expr
+
+trait LogicalExpr extends Expr {
+}
+
+case class InExpr(head: Expr, is: Boolean, range: Seq[Expr]) extends 
LogicalExpr {
+
+  addChildren(head +: range)
+
+  def desc: String = {
+    val notStr = if (is) "" else " NOT"
+    s"${head.desc}${notStr} IN (${range.map(_.desc).mkString(", ")})"
+  }
+  def coalesceDesc: String = {
+    val notStr = if (is) "" else " NOT"
+    s"${head.coalesceDesc}${notStr} IN 
(${range.map(_.coalesceDesc).mkString(", ")})"
+  }
+
+  override def map(func: (Expr) => Expr): InExpr = {
+    InExpr(func(head), is, range.map(func(_)))
+  }
+}
+
+case class BetweenExpr(head: Expr, is: Boolean, range: Seq[Expr]) extends 
LogicalExpr {
+
+  range match {
+    case first :: second :: _ => addChildren(head :: first :: second :: Nil)
+    case _ => throw new Exception("between expression exception: range less 
than 2")
+  }
+
+  def desc: String = {
+    val notStr = if (is) "" else " NOT"
+    val rangeStr = range match {
+      case first :: second :: _ => s"${first.desc} AND ${second.desc}"
+      case _ => throw new Exception("between expression exception: range less 
than 2")
+    }
+    s"${head.desc}${notStr} BETWEEN ${rangeStr}"
+  }
+  def coalesceDesc: String = {
+    val notStr = if (is) "" else " NOT"
+    val rangeStr = range match {
+      case first :: second :: _ => s"${first.coalesceDesc} AND 
${second.coalesceDesc}"
+      case _ => throw new Exception("between expression exception: range less 
than 2")
+    }
+    s"${head.coalesceDesc}${notStr} BETWEEN ${rangeStr}"
+  }
+
+  override def map(func: (Expr) => Expr): BetweenExpr = {
+    BetweenExpr(func(head), is, range.map(func(_)))
+  }
+}
+
+case class LikeExpr(head: Expr, is: Boolean, value: Expr) extends LogicalExpr {
+
+  addChildren(head :: value :: Nil)
+
+  def desc: String = {
+    val notStr = if (is) "" else " NOT"
+    s"${head.desc}${notStr} LIKE ${value.desc}"
+  }
+  def coalesceDesc: String = {
+    val notStr = if (is) "" else " NOT"
+    s"${head.coalesceDesc}${notStr} LIKE ${value.coalesceDesc}"
+  }
+
+  override def map(func: (Expr) => Expr): LikeExpr = {
+    LikeExpr(func(head), is, func(value))
+  }
+}
+
+case class IsNullExpr(head: Expr, is: Boolean) extends LogicalExpr {
+
+  addChild(head)
+
+  def desc: String = {
+    val notStr = if (is) "" else " NOT"
+    s"${head.desc} IS${notStr} NULL"
+  }
+  def coalesceDesc: String = desc
+
+  override def map(func: (Expr) => Expr): IsNullExpr = {
+    IsNullExpr(func(head), is)
+  }
+}
+
+case class IsNanExpr(head: Expr, is: Boolean) extends LogicalExpr {
+
+  addChild(head)
+
+  def desc: String = {
+    val notStr = if (is) "" else "NOT "
+    s"${notStr}isnan(${head.desc})"
+  }
+  def coalesceDesc: String = desc
+
+  override def map(func: (Expr) => Expr): IsNanExpr = {
+    IsNanExpr(func(head), is)
+  }
+}
+
+// -----------
+
+case class LogicalFactorExpr(factor: Expr, withBracket: Boolean, aliasOpt: 
Option[String]
+                            ) extends LogicalExpr with AliasableExpr {
+
+  addChild(factor)
+
+  def desc: String = if (withBracket) s"(${factor.desc})" else factor.desc
+  def coalesceDesc: String = factor.coalesceDesc
+  def alias: Option[String] = aliasOpt
+  override def extractSelf: Expr = {
+    if (aliasOpt.nonEmpty) this
+    else factor.extractSelf
+  }
+
+  override def map(func: (Expr) => Expr): LogicalFactorExpr = {
+    LogicalFactorExpr(func(factor), withBracket, aliasOpt)
+  }
+}
+
+case class UnaryLogicalExpr(oprs: Seq[String], factor: LogicalExpr) extends 
LogicalExpr {
+
+  addChild(factor)
+
+  def desc: String = {
+    oprs.foldRight(factor.desc) { (opr, fac) =>
+      s"(${trans(opr)} ${fac})"
+    }
+  }
+  def coalesceDesc: String = {
+    oprs.foldRight(factor.coalesceDesc) { (opr, fac) =>
+      s"(${trans(opr)} ${fac})"
+    }
+  }
+  private def trans(s: String): String = {
+    s match {
+      case "!" => "NOT"
+      case _ => s.toUpperCase
+    }
+  }
+  override def extractSelf: Expr = {
+    if (oprs.nonEmpty) this
+    else factor.extractSelf
+  }
+
+  override def map(func: (Expr) => Expr): UnaryLogicalExpr = {
+    UnaryLogicalExpr(oprs, func(factor).asInstanceOf[LogicalExpr])
+  }
+}
+
+case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, 
LogicalExpr)]) extends LogicalExpr {
+
+  addChildren(factor +: tails.map(_._2))
+
+  def desc: String = {
+    val res = tails.foldLeft(factor.desc) { (fac, tail) =>
+      val (opr, expr) = tail
+      s"${fac} ${trans(opr)} ${expr.desc}"
+    }
+    if (tails.size <= 0) res else s"${res}"
+  }
+  def coalesceDesc: String = {
+    val res = tails.foldLeft(factor.coalesceDesc) { (fac, tail) =>
+      val (opr, expr) = tail
+      s"${fac} ${trans(opr)} ${expr.coalesceDesc}"
+    }
+    if (tails.size <= 0) res else s"${res}"
+  }
+  private def trans(s: String): String = {
+    s match {
+      case "&&" => "AND"
+      case "||" => "OR"
+      case _ => s.trim.toUpperCase
+    }
+  }
+  override def extractSelf: Expr = {
+    if (tails.nonEmpty) this
+    else factor.extractSelf
+  }
+
+  override def map(func: (Expr) => Expr): BinaryLogicalExpr = {
+    BinaryLogicalExpr(func(factor).asInstanceOf[LogicalExpr], tails.map{ pair 
=>
+      (pair._1, func(pair._2).asInstanceOf[LogicalExpr])
+    })
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/MathExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/MathExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/MathExpr.scala
new file mode 100644
index 0000000..38732a7
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/MathExpr.scala
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.expr
+
+trait MathExpr extends Expr {
+}
+
+case class MathFactorExpr(factor: Expr, withBracket: Boolean, aliasOpt: 
Option[String]
+                         ) extends MathExpr with AliasableExpr {
+
+  addChild(factor)
+
+  def desc: String = if (withBracket) s"(${factor.desc})" else factor.desc
+  def coalesceDesc: String = factor.coalesceDesc
+  def alias: Option[String] = aliasOpt
+  override def extractSelf: Expr = {
+    if (aliasOpt.nonEmpty) this
+    else factor.extractSelf
+  }
+
+  override def map(func: (Expr) => Expr): MathFactorExpr = {
+    MathFactorExpr(func(factor), withBracket, aliasOpt)
+  }
+}
+
+case class UnaryMathExpr(oprs: Seq[String], factor: MathExpr) extends MathExpr 
{
+
+  addChild(factor)
+
+  def desc: String = {
+    oprs.foldRight(factor.desc) { (opr, fac) =>
+      s"(${opr}${fac})"
+    }
+  }
+  def coalesceDesc: String = {
+    oprs.foldRight(factor.coalesceDesc) { (opr, fac) =>
+      s"(${opr}${fac})"
+    }
+  }
+  override def extractSelf: Expr = {
+    if (oprs.nonEmpty) this
+    else factor.extractSelf
+  }
+
+  override def map(func: (Expr) => Expr): UnaryMathExpr = {
+    UnaryMathExpr(oprs, func(factor).asInstanceOf[MathExpr])
+  }
+}
+
+case class BinaryMathExpr(factor: MathExpr, tails: Seq[(String, MathExpr)]) 
extends MathExpr {
+
+  addChildren(factor +: tails.map(_._2))
+
+  def desc: String = {
+    val res = tails.foldLeft(factor.desc) { (fac, tail) =>
+      val (opr, expr) = tail
+      s"${fac} ${opr} ${expr.desc}"
+    }
+    if (tails.size <= 0) res else s"${res}"
+  }
+  def coalesceDesc: String = {
+    val res = tails.foldLeft(factor.coalesceDesc) { (fac, tail) =>
+      val (opr, expr) = tail
+      s"${fac} ${opr} ${expr.coalesceDesc}"
+    }
+    if (tails.size <= 0) res else s"${res}"
+  }
+  override def extractSelf: Expr = {
+    if (tails.nonEmpty) this
+    else factor.extractSelf
+  }
+
+  override def map(func: (Expr) => Expr): BinaryMathExpr = {
+    BinaryMathExpr(func(factor).asInstanceOf[MathExpr], tails.map{ pair =>
+      (pair._1, func(pair._2).asInstanceOf[MathExpr])
+    })
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
new file mode 100644
index 0000000..b4ed2ac
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
@@ -0,0 +1,132 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.expr
+
+trait HeadExpr extends Expr with AliasableExpr {
+  def alias: Option[String] = None
+}
+
+case class DataSourceHeadExpr(name: String) extends HeadExpr {
+  def desc: String = s"`${name}`"
+  def coalesceDesc: String = desc
+}
+
+case class FieldNameHeadExpr(field: String) extends HeadExpr {
+  def desc: String = s"`${field}`"
+  def coalesceDesc: String = desc
+  override def alias: Option[String] = Some(field)
+}
+
+case class AllSelectHeadExpr() extends HeadExpr {
+  def desc: String = "*"
+  def coalesceDesc: String = desc
+}
+
+case class OtherHeadExpr(expr: Expr) extends HeadExpr {
+
+  addChild(expr)
+
+  def desc: String = expr.desc
+  def coalesceDesc: String = expr.coalesceDesc
+  override def alias: Option[String] = Some(expr.desc)
+
+  override def map(func: (Expr) => Expr): OtherHeadExpr = {
+    OtherHeadExpr(func(expr))
+  }
+}
+
+// -------------
+
+trait SelectExpr extends Expr with AliasableExpr {
+}
+
+case class AllFieldsSelectExpr() extends SelectExpr {
+  def desc: String = s".*"
+  def coalesceDesc: String = desc
+  def alias: Option[String] = None
+}
+
+case class FieldSelectExpr(field: String) extends SelectExpr {
+  def desc: String = s".`${field}`"
+  def coalesceDesc: String = desc
+  override def alias: Option[String] = Some(field)
+}
+
+case class IndexSelectExpr(index: Expr) extends SelectExpr {
+
+  addChild(index)
+
+  def desc: String = s"[${index.desc}]"
+  def coalesceDesc: String = desc
+  def alias: Option[String] = Some(index.desc)
+
+  override def map(func: (Expr) => Expr): IndexSelectExpr = {
+    IndexSelectExpr(func(index))
+  }
+}
+
+case class FunctionSelectExpr(functionName: String, args: Seq[Expr]) extends 
SelectExpr {
+
+  addChildren(args)
+
+  def desc: String = ""
+  def coalesceDesc: String = desc
+  def alias: Option[String] = Some(functionName)
+
+  override def map(func: (Expr) => Expr): FunctionSelectExpr = {
+    FunctionSelectExpr(functionName, args.map(func(_)))
+  }
+}
+
+// -------------
+
+case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: 
Option[String]) extends SelectExpr {
+
+  addChildren(head +: selectors)
+
+  def desc: String = {
+    selectors.foldLeft(head.desc) { (hd, sel) =>
+      sel match {
+        case FunctionSelectExpr(funcName, args) => {
+          val nargs = hd +: args.map(_.desc)
+          s"${funcName}(${nargs.mkString(", ")})"
+        }
+        case _ => s"${hd}${sel.desc}"
+      }
+    }
+  }
+  def coalesceDesc: String = {
+    selectors.lastOption match {
+      case None => desc
+      case Some(sel: FunctionSelectExpr) => desc
+      case _ => s"coalesce(${desc}, '')"
+    }
+  }
+  def alias: Option[String] = {
+    if (aliasOpt.isEmpty) {
+      val aliasSeq = (head +: selectors).flatMap(_.alias)
+      if (aliasSeq.size > 0) Some(aliasSeq.mkString("_")) else None
+    } else aliasOpt
+  }
+
+  override def map(func: (Expr) => Expr): SelectionExpr = {
+    SelectionExpr(func(head).asInstanceOf[HeadExpr],
+      selectors.map(func(_).asInstanceOf[SelectExpr]), aliasOpt)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
new file mode 100644
index 0000000..32330cd
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.expr
+
+trait TreeNode extends Serializable {
+
+  var children = Seq[TreeNode]()
+
+  def addChild(expr: TreeNode) = { children :+= expr }
+  def addChildren(exprs: Seq[TreeNode]) = { children ++= exprs }
+
+  def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, 
combOp: (T, T) => T): T = {
+    if (this.isInstanceOf[A]) {
+      val tv = seqOp(this.asInstanceOf[A], z)
+      children.foldLeft(combOp(z, tv)) { (ov, tn) =>
+        combOp(ov, tn.preOrderTraverseDepthFirst(z)(seqOp, combOp))
+      }
+    } else z
+  }
+  def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, 
combOp: (T, T) => T): T = {
+    if (this.isInstanceOf[A]) {
+      val cv = children.foldLeft(z) { (ov, tn) =>
+        combOp(ov, tn.postOrderTraverseDepthFirst(z)(seqOp, combOp))
+      }
+      combOp(z, seqOp(this.asInstanceOf[A], cv))
+    } else z
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
new file mode 100644
index 0000000..ac540b0
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
@@ -0,0 +1,391 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.parser
+
+import org.apache.griffin.measure.step.builder.dsl.expr._
+
+import scala.util.parsing.combinator.JavaTokenParsers
+
+/**
+  * basic parser for sql like syntax
+  */
+trait BasicParser extends JavaTokenParsers with Serializable {
+
+  val dataSourceNames: Seq[String]
+  val functionNames: Seq[String]
+
+  private def trim(str: String): String = {
+    val regex = """`(.*)`""".r
+    str match {
+      case regex(s) => s
+      case _ => str
+    }
+  }
+
+  /**
+    * BNF for basic parser
+    *
+    * -- literal --
+    * <literal> ::= <literal-string> | <literal-number> | <literal-time> | 
<literal-boolean> | <literal-null> | <literal-nan>
+    * <literal-string> ::= <any-string>
+    * <literal-number> ::= <integer> | <double>
+    * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms")
+    * <literal-boolean> ::= true | false
+    * <literal-null> ::= null
+    * <literal-nan> ::= nan
+    *
+    * -- selection --
+    * <selection> ::= <selection-head> [ <field-sel> | <index-sel> | 
<function-sel> ]* [<as-alias>]?
+    * <selection-head> ::= ("data source name registered") | <function> | 
<field-name> | <all-selection>
+    * <field-sel> ::= "." <field-name> | "[" <quote-field-name> "]"
+    * <index-sel> ::= "[" <arg> "]"
+    * <function-sel> ::= "." <function-name> "(" [<arg>]? [, <arg>]* ")"
+    * <arg> ::= <math-expr>
+    *
+    * -- as alias --
+    * <as-alias> ::= <as> <field-name>
+    *
+    * -- math expr --
+    * <math-factor> ::= <literal> | <function> | <selection> | "(" <math-expr> 
")" [<as-alias>]?
+    * <unary-math-expr> ::= [<unary-opr>]* <math-factor>
+    * <binary-math-expr> ::= <unary-math-expr> [<binary-opr> 
<unary-math-expr>]+
+    * <math-expr> ::= <binary-math-expr>
+    *
+    * -- logical expr --
+    * <in-expr> ::= <math-expr> [<not>]? <in> <range-expr>
+    * <between-expr> ::= <math-expr> [<not>]? <between> (<math-expr> <and> 
<math-expr> | <range-expr>)
+    * <range-expr> ::= "(" [<math-expr>]? [, <math-expr>]+ ")"
+    * <like-expr> ::= <math-expr> [<not>]? <like> <math-expr>
+    * <is-null-expr> ::= <math-expr> <is> [<not>]? <null>
+    * <is-nan-expr> ::= <math-expr> <is> [<not>]? <nan>
+    *
+    * <logical-factor> ::= <math-expr> | <in-expr> | <between-expr> | 
<like-expr> | <is-null-expr> | <is-nan-expr> | "(" <logical-expr> ")" 
[<as-alias>]?
+    * <unary-logical-expr> ::= [<unary-logical-opr>]* <logical-factor>
+    * <binary-logical-expr> ::= <unary-logical-expr> [<binary-logical-opr> 
<unary-logical-expr>]+
+    * <logical-expr> ::= <binary-logical-expr>
+    *
+    * -- expression --
+    * <expr> = <math-expr> | <logical-expr>
+    *
+    * -- function expr --
+    * <function> ::= <function-name> "(" [<arg>] [, <arg>]+ ")" [<as-alias>]?
+    * <function-name> ::= ("function name registered")
+    * <arg> ::= <expr>
+    *
+    * -- clauses --
+    * <select-clause> = <expr> [, <expr>]*
+    * <where-clause> = <where> <expr>
+    * <from-clause> = <from> ("data source name registered")
+    * <having-clause> = <having> <expr>
+    * <groupby-clause> = <group> <by> <expr> [ <having-clause> ]?
+    * <orderby-item> = <expr> [ <DESC> ]?
+    * <orderby-clause> = <order> <by> <orderby-item> [ , <orderby-item> ]*
+    * <limit-clause> = <limit> <expr>
+    *
+    * -- combined clauses --
+    * <combined-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> 
]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+
+    */
+
+  protected def genDataSourceNamesParser(names: Seq[String]): Parser[String] = 
{
+    names.reverse.map {
+      fn => s"""(?i)`${fn}`|${fn}""".r: Parser[String]
+    }.reduce(_ | _)
+  }
+  protected def genFunctionNamesParser(names: Seq[String]): Parser[String] = {
+    names.reverse.map {
+      fn => s"""(?i)${fn}""".r: Parser[String]
+    }.reduce(_ | _)
+  }
+
+  object Literal {
+    val NULL: Parser[String] = """(?i)null""".r
+    val NAN: Parser[String] = """(?i)nan""".r
+  }
+  import Literal._
+
+  object Operator {
+    val MATH_UNARY: Parser[String] = "+" | "-"
+    val MATH_BINARIES: Seq[Parser[String]] = Seq(("*" | "/" | "%"), ("+" | 
"-"))
+
+    val NOT: Parser[String] = """(?i)not\s""".r | "!"
+    val AND: Parser[String] = """(?i)and\s""".r | "&&"
+    val OR: Parser[String] = """(?i)or\s""".r | "||"
+    val IN: Parser[String] = """(?i)in\s""".r
+    val BETWEEN: Parser[String] = """(?i)between\s""".r
+    val AND_ONLY: Parser[String] = """(?i)and\s""".r
+    val IS: Parser[String] = """(?i)is\s""".r
+    val LIKE: Parser[String] = """(?i)like\s""".r
+    val COMPARE: Parser[String] = "=" | "!=" | "<>" | "<=" | ">=" | "<" | ">"
+    val LOGICAL_UNARY: Parser[String] = NOT
+    val LOGICAL_BINARIES: Seq[Parser[String]] = Seq((COMPARE), (AND), (OR))
+
+    val LSQBR: Parser[String] = "["
+    val RSQBR: Parser[String] = "]"
+    val LBR: Parser[String] = "("
+    val RBR: Parser[String] = ")"
+
+    val DOT: Parser[String] = "."
+    val ALLSL: Parser[String] = "*"
+    val SQUOTE: Parser[String] = "'"
+    val DQUOTE: Parser[String] = "\""
+    val UQUOTE: Parser[String] = "`"
+    val COMMA: Parser[String] = ","
+
+    val SELECT: Parser[String] = """(?i)select\s""".r
+    val DISTINCT: Parser[String] = """(?i)distinct""".r
+//    val ALL: Parser[String] = """(?i)all""".r
+    val FROM: Parser[String] = """(?i)from\s""".r
+    val AS: Parser[String] = """(?i)as\s""".r
+    val WHERE: Parser[String] = """(?i)where\s""".r
+    val GROUP: Parser[String] = """(?i)group\s""".r
+    val ORDER: Parser[String] = """(?i)order\s""".r
+    val SORT: Parser[String] = """(?i)sort\s""".r
+    val BY: Parser[String] = """(?i)by\s""".r
+    val DESC: Parser[String] = """(?i)desc""".r
+    val ASC: Parser[String] = """(?i)asc""".r
+    val HAVING: Parser[String] = """(?i)having\s""".r
+    val LIMIT: Parser[String] = """(?i)limit\s""".r
+  }
+  import Operator._
+
+  object Strings {
+    def AnyString: Parser[String] = """"(?:\"|[^\"])*"""".r | 
"""'(?:\'|[^'])*'""".r
+    def SimpleTableFieldName: Parser[String] = """[a-zA-Z_]\w*""".r
+    def UnQuoteTableFieldName: Parser[String] = """`(?:[\\][`]|[^`])*`""".r
+//    def FieldName: Parser[String] = UnQuoteTableFieldName | 
SimpleTableFieldName
+    def DataSourceName: Parser[String] = 
genDataSourceNamesParser(dataSourceNames)
+    def FunctionName: Parser[String] = genFunctionNamesParser(functionNames)
+
+    def IntegerNumber: Parser[String] = """[+\-]?\d+""".r
+    def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r
+    def IndexNumber: Parser[String] = IntegerNumber
+
+    def TimeString: Parser[String] = """([+\-]?\d+)(d|h|m|s|ms)""".r
+    def BooleanString: Parser[String] = """(?i)true|false""".r
+  }
+  import Strings._
+
+  /**
+    * -- literal --
+    * <literal> ::= <literal-string> | <literal-number> | <literal-time> | 
<literal-boolean> | <literal-null> | <literal-nan>
+    * <literal-string> ::= <any-string>
+    * <literal-number> ::= <integer> | <double>
+    * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms")
+    * <literal-boolean> ::= true | false
+    * <literal-null> ::= null
+    * <literal-nan> ::= nan
+    */
+  def literal: Parser[LiteralExpr] = literalNull | literalNan | literalBoolean 
| literalString | literalTime | literalNumber
+  def literalNull: Parser[LiteralNullExpr] = NULL ^^ { LiteralNullExpr(_) }
+  def literalNan: Parser[LiteralNanExpr] = NAN ^^ { LiteralNanExpr(_) }
+  def literalString: Parser[LiteralStringExpr] = AnyString ^^ { 
LiteralStringExpr(_) }
+  def literalNumber: Parser[LiteralNumberExpr] = (DoubleNumber | 
IntegerNumber) ^^ { LiteralNumberExpr(_) }
+  def literalTime: Parser[LiteralTimeExpr] = TimeString ^^ { 
LiteralTimeExpr(_) }
+  def literalBoolean: Parser[LiteralBooleanExpr] = BooleanString ^^ { 
LiteralBooleanExpr(_) }
+
+  /**
+    * -- selection --
+    * <selection> ::= <selection-head> [ <field-sel> | <index-sel> | 
<function-sel> ]* [<as-alias>]?
+    * <selection-head> ::= ("data source name registered") | <function> | 
<field-name> | <all-selection>
+    * <field-sel> ::= "." <field-name> | "[" <quote-field-name> "]"
+    * <index-sel> ::= "[" <arg> "]"
+    * <function-sel> ::= "." <function-name> "(" [<arg>]? [, <arg>]* ")"
+    * <arg> ::= <math-expr>
+    */
+
+  def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ~ 
opt(asAlias) ^^ {
+    case head ~ sels ~ aliasOpt => SelectionExpr(head, sels, aliasOpt)
+  }
+  def selectionHead: Parser[HeadExpr] = DataSourceName ^^ {
+    ds => DataSourceHeadExpr(trim(ds))
+  } | function ^^ {
+    OtherHeadExpr(_)
+  } | SimpleTableFieldName ^^ {
+    FieldNameHeadExpr(_)
+  } | UnQuoteTableFieldName ^^ { s =>
+    FieldNameHeadExpr(trim(s))
+  } | ALLSL ^^ { _ =>
+    AllSelectHeadExpr()
+  }
+  def selector: Parser[SelectExpr] = functionSelect | allFieldsSelect | 
fieldSelect | indexSelect
+  def allFieldsSelect: Parser[AllFieldsSelectExpr] = DOT ~> ALLSL ^^ { _ => 
AllFieldsSelectExpr() }
+  def fieldSelect: Parser[FieldSelectExpr] = DOT ~> (
+    SimpleTableFieldName ^^ {
+      FieldSelectExpr(_)
+    } | UnQuoteTableFieldName ^^ { s =>
+      FieldSelectExpr(trim(s))
+    })
+  def indexSelect: Parser[IndexSelectExpr] = LSQBR ~> argument <~ RSQBR ^^ { 
IndexSelectExpr(_) }
+  def functionSelect: Parser[FunctionSelectExpr] = DOT ~ FunctionName ~ LBR ~ 
repsep(argument, COMMA) ~ RBR ^^ {
+    case _ ~ name ~ _ ~ args ~ _ => FunctionSelectExpr(name, args)
+  }
+
+  /**
+    * -- as alias --
+    * <as-alias> ::= <as> <field-name>
+    */
+  def asAlias: Parser[String] = AS ~> (SimpleTableFieldName | 
UnQuoteTableFieldName ^^ { trim(_) })
+
+  /**
+    * -- math expr --
+    * <math-factor> ::= <literal> | <function> | <selection> | "(" <math-expr> 
")" [<as-alias>]?
+    * <unary-math-expr> ::= [<unary-opr>]* <math-factor>
+    * <binary-math-expr> ::= <unary-math-expr> [<binary-opr> 
<unary-math-expr>]+
+    * <math-expr> ::= <binary-math-expr>
+    */
+
+  def mathFactor: Parser[MathExpr] = (literal | function | selection) ^^ {
+    MathFactorExpr(_, false, None)
+  } | LBR ~ mathExpression ~ RBR ~ opt(asAlias) ^^ {
+    case _ ~ expr ~ _ ~ aliasOpt => MathFactorExpr(expr, true, aliasOpt)
+  }
+  def unaryMathExpression: Parser[MathExpr] = rep(MATH_UNARY) ~ mathFactor ^^ {
+    case Nil ~ a => a
+    case list ~ a => UnaryMathExpr(list, a)
+  }
+  def binaryMathExpressions: Seq[Parser[MathExpr]] =
+    MATH_BINARIES.foldLeft(List[Parser[MathExpr]](unaryMathExpression)) { 
(parsers, binaryParser) =>
+      val pre = parsers.head
+      val cur = pre ~ rep(binaryParser ~ pre) ^^ {
+        case a ~ Nil => a
+        case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2)))
+      }
+      cur :: parsers
+    }
+  def mathExpression: Parser[MathExpr] = binaryMathExpressions.head
+
+  /**
+    * -- logical expr --
+    * <in-expr> ::= <math-expr> [<not>]? <in> <range-expr>
+    * <between-expr> ::= <math-expr> [<not>]? <between> (<math-expr> <and> 
<math-expr> | <range-expr>)
+    * <range-expr> ::= "(" [<math-expr>]? [, <math-expr>]+ ")"
+    * <like-expr> ::= <math-expr> [<not>]? <like> <math-expr>
+    * <is-null-expr> ::= <math-expr> <is> [<not>]? <null>
+    * <is-nan-expr> ::= <math-expr> <is> [<not>]? <nan>
+    *
+    * <logical-factor> ::= <math-expr> | <in-expr> | <between-expr> | 
<like-expr> | <is-null-expr> | <is-nan-expr> | "(" <logical-expr> ")" 
[<as-alias>]?
+    * <unary-logical-expr> ::= [<unary-logical-opr>]* <logical-factor>
+    * <binary-logical-expr> ::= <unary-logical-expr> [<binary-logical-opr> 
<unary-logical-expr>]+
+    * <logical-expr> ::= <binary-logical-expr>
+    */
+
+  def inExpr: Parser[LogicalExpr] = mathExpression ~ opt(NOT) ~ IN ~ LBR ~ 
repsep(mathExpression, COMMA) ~ RBR ^^ {
+    case head ~ notOpt ~ _ ~ _ ~ list ~ _ => InExpr(head, notOpt.isEmpty, list)
+  }
+  def betweenExpr: Parser[LogicalExpr] = mathExpression ~ opt(NOT) ~ BETWEEN ~ 
LBR ~ repsep(mathExpression, COMMA) ~ RBR ^^ {
+    case head ~ notOpt ~ _ ~ _ ~ list ~ _ => BetweenExpr(head, notOpt.isEmpty, 
list)
+  } | mathExpression ~ opt(NOT) ~ BETWEEN ~ mathExpression ~ AND_ONLY ~ 
mathExpression ^^ {
+    case head ~ notOpt ~ _ ~ first ~ _ ~ second => BetweenExpr(head, 
notOpt.isEmpty, Seq(first, second))
+  }
+  def likeExpr: Parser[LogicalExpr] = mathExpression ~ opt(NOT) ~ LIKE ~ 
mathExpression ^^ {
+    case head ~ notOpt ~ _ ~ value => LikeExpr(head, notOpt.isEmpty, value)
+  }
+  def isNullExpr: Parser[LogicalExpr] = mathExpression ~ IS ~ opt(NOT) ~ NULL 
^^ {
+    case head ~ _ ~ notOpt ~ _ => IsNullExpr(head, notOpt.isEmpty)
+  }
+  def isNanExpr: Parser[LogicalExpr] = mathExpression ~ IS ~ opt(NOT) ~ NAN ^^ 
{
+    case head ~ _ ~ notOpt ~ _ => IsNanExpr(head, notOpt.isEmpty)
+  }
+
+  def logicalFactor: Parser[LogicalExpr] = (inExpr | betweenExpr | likeExpr | 
isNullExpr | isNanExpr | mathExpression) ^^ {
+    LogicalFactorExpr(_, false, None)
+  } | LBR ~ logicalExpression ~ RBR ~ opt(asAlias) ^^ {
+    case _ ~ expr ~ _ ~ aliasOpt => LogicalFactorExpr(expr, true, aliasOpt)
+  }
+  def unaryLogicalExpression: Parser[LogicalExpr] = rep(LOGICAL_UNARY) ~ 
logicalFactor ^^ {
+    case Nil ~ a => a
+    case list ~ a => UnaryLogicalExpr(list, a)
+  }
+  def binaryLogicalExpressions: Seq[Parser[LogicalExpr]] =
+    
LOGICAL_BINARIES.foldLeft(List[Parser[LogicalExpr]](unaryLogicalExpression)) { 
(parsers, binaryParser) =>
+      val pre = parsers.head
+      val cur = pre ~ rep(binaryParser ~ pre) ^^ {
+        case a ~ Nil => a
+        case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2)))
+      }
+      cur :: parsers
+    }
+  def logicalExpression: Parser[LogicalExpr] = binaryLogicalExpressions.head
+
+  /**
+    * -- expression --
+    * <expr> = <math-expr> | <logical-expr>
+    */
+
+  def expression: Parser[Expr] = logicalExpression | mathExpression
+
+  /**
+    * -- function expr --
+    * <function> ::= <function-name> "(" [<arg>] [, <arg>]+ ")" [<as-alias>]?
+    * <function-name> ::= ("function name registered")
+    * <arg> ::= <expr>
+    */
+
+  def function: Parser[FunctionExpr] = FunctionName ~ LBR ~ opt(DISTINCT) ~ 
repsep(argument, COMMA) ~ RBR ~ opt(asAlias) ^^ {
+    case name ~ _ ~ extraCdtnOpt ~ args ~ _ ~ aliasOpt =>
+      FunctionExpr(name, args, extraCdtnOpt.map(ExtraConditionExpr(_)), 
aliasOpt)
+  }
+  def argument: Parser[Expr] = expression
+
+  /**
+    * -- clauses --
+    * <select-clause> = <expr> [, <expr>]*
+    * <where-clause> = <where> <expr>
+    * <from-clause> = <from> ("data source name registered")
+    * <having-clause> = <having> <expr>
+    * <groupby-clause> = <group> <by> <expr> [ <having-clause> ]?
+    * <orderby-item> = <expr> [ <DESC> ]?
+    * <orderby-clause> = <order> <by> <orderby-item> [ , <orderby-item> ]*
+    * <limit-clause> = <limit> <expr>
+    */
+
+  def selectClause: Parser[SelectClause] = opt(SELECT) ~> opt(DISTINCT) ~ 
rep1sep(expression, COMMA) ^^ {
+    case extraCdtnOpt ~ exprs => SelectClause(exprs, 
extraCdtnOpt.map(ExtraConditionExpr(_)))
+  }
+  def fromClause: Parser[FromClause] = FROM ~> DataSourceName ^^ { ds => 
FromClause(trim(ds)) }
+  def whereClause: Parser[WhereClause] = WHERE ~> expression ^^ { 
WhereClause(_) }
+  def havingClause: Parser[Expr] = HAVING ~> expression
+  def groupbyClause: Parser[GroupbyClause] = GROUP ~ BY ~ rep1sep(expression, 
COMMA) ~ opt(havingClause) ^^ {
+    case _ ~ _ ~ cols ~ havingOpt => GroupbyClause(cols, havingOpt)
+  }
+  def orderItem: Parser[OrderItem] = expression ~ opt(DESC | ASC) ^^ {
+    case expr ~ orderOpt => OrderItem(expr, orderOpt)
+  }
+  def orderbyClause: Parser[OrderbyClause] = ORDER ~ BY ~ rep1sep(orderItem, 
COMMA) ^^ {
+    case _ ~ _ ~ cols => OrderbyClause(cols)
+  }
+  def sortbyClause: Parser[SortbyClause] = SORT ~ BY ~ rep1sep(orderItem, 
COMMA) ^^ {
+    case _ ~ _ ~ cols => SortbyClause(cols)
+  }
+  def limitClause: Parser[LimitClause] = LIMIT ~> expression ^^ { 
LimitClause(_) }
+
+  /**
+    * -- combined clauses --
+    * <combined-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> 
]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+
+    */
+
+  def combinedClause: Parser[CombinedClause] = selectClause ~ opt(fromClause) 
~ opt(whereClause) ~
+    opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ {
+    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
+      val tails = Seq(whereOpt, groupbyOpt, orderbyOpt, limitOpt).flatMap(opt 
=> opt)
+      CombinedClause(sel, fromOpt, tails)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
new file mode 100644
index 0000000..2cd638c
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
@@ -0,0 +1,98 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.parser
+
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.step.builder.dsl._
+import org.apache.griffin.measure.step.builder.dsl.expr._
+
+/**
+  * parser for griffin dsl rule
+  */
+case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: 
Seq[String]
+                           ) extends BasicParser {
+
+  import Operator._
+
+  /**
+    * -- profiling clauses --
+    * <profiling-clauses> = <select-clause> [ <from-clause> ]+ [ 
<where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> 
]+
+    */
+
+  def profilingClause: Parser[ProfilingClause] = selectClause ~ 
opt(fromClause) ~ opt(whereClause) ~
+    opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ {
+    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
+      val preClauses = Seq(whereOpt).flatMap(opt => opt)
+      val postClauses = Seq(orderbyOpt, limitOpt).flatMap(opt => opt)
+      ProfilingClause(sel, fromOpt, groupbyOpt, preClauses, postClauses)
+    }
+  }
+
+  /**
+    * -- uniqueness clauses --
+    * <uniqueness-clauses> = <expr> [, <expr>]+
+    */
+  def uniquenessClause: Parser[UniquenessClause] = rep1sep(expression, 
Operator.COMMA) ^^ {
+    case exprs => UniquenessClause(exprs)
+  }
+
+  /**
+    * -- distinctness clauses --
+    * <sqbr-expr> = "[" <expr> "]"
+    * <dist-expr> = <sqbr-expr> | <expr>
+    * <distinctness-clauses> = <distExpr> [, <distExpr>]+
+    */
+  def sqbrExpr: Parser[Expr] = LSQBR ~> expression <~ RSQBR ^^ {
+    case expr => { expr.tag = "[]"; expr}
+  }
+  def distExpr: Parser[Expr] = expression | sqbrExpr
+  def distinctnessClause: Parser[DistinctnessClause] = rep1sep(distExpr, 
Operator.COMMA) ^^ {
+    case exprs => DistinctnessClause(exprs)
+  }
+
+  /**
+    * -- timeliness clauses --
+    * <timeliness-clauses> = <expr> [, <expr>]+
+    */
+  def timelinessClause: Parser[TimelinessClause] = rep1sep(expression, 
Operator.COMMA) ^^ {
+    case exprs => TimelinessClause(exprs)
+  }
+
+  /**
+    * -- completeness clauses --
+    * <completeness-clauses> = <expr> [, <expr>]+
+    */
+  def completenessClause: Parser[CompletenessClause] = rep1sep(expression, 
Operator.COMMA) ^^ {
+    case exprs => CompletenessClause(exprs)
+  }
+
+  def parseRule(rule: String, dqType: DqType): ParseResult[Expr] = {
+    val rootExpr = dqType match {
+      case AccuracyType => logicalExpression
+      case ProfilingType => profilingClause
+      case UniquenessType => uniquenessClause
+      case DistinctnessType => distinctnessClause
+      case TimelinessType => timelinessClause
+      case CompletenessType => completenessClause
+      case _ => expression
+    }
+    parseAll(rootExpr, rule)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
new file mode 100644
index 0000000..a0e5ca3
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -0,0 +1,200 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform
+
+import org.apache.griffin.measure.configuration.enums.{BatchProcessType, 
NormalizeType, StreamingProcessType}
+import org.apache.griffin.measure.configuration.params.RuleParam
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.step.builder.dsl.expr._
+import 
org.apache.griffin.measure.step.builder.dsl.transform.analyzer.AccuracyAnalyzer
+import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys
+import org.apache.griffin.measure.step.transform.{DataFrameOps, 
DataFrameOpsTransformStep, SparkSqlTransformStep}
+import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, 
MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * generate accuracy dq steps
+  */
+case class AccuracyExpr2DQSteps(context: DQContext,
+                                expr: Expr,
+                                ruleParam: RuleParam
+                               ) extends Expr2DQSteps {
+
+  private object AccuracyKeys {
+    val _source = "source"
+    val _target = "target"
+    val _miss = "miss"
+    val _total = "total"
+    val _matched = "matched"
+  }
+  import AccuracyKeys._
+
+  def getDQSteps(): Seq[DQStep] = {
+    val details = ruleParam.getDetails
+    val accuracyExpr = expr.asInstanceOf[LogicalExpr]
+
+    val sourceName = details.getString(_source, context.getDataSourceName(0))
+    val targetName = details.getString(_target, context.getDataSourceName(1))
+    val analyzer = AccuracyAnalyzer(accuracyExpr, sourceName, targetName)
+
+    val procType = context.procType
+    val timestamp = context.contextId.timestamp
+
+    if (!context.runTimeTableRegister.existsTable(sourceName)) {
+      warn(s"[${timestamp}] data source ${sourceName} not exists")
+      Nil
+    } else {
+      // 1. miss record
+      val missRecordsTableName = "__missRecords"
+      val selClause = s"`${sourceName}`.*"
+      val missRecordsSql = if 
(!context.runTimeTableRegister.existsTable(targetName)) {
+        warn(s"[${timestamp}] data source ${targetName} not exists")
+        s"SELECT ${selClause} FROM `${sourceName}`"
+      } else {
+        val onClause = expr.coalesceDesc
+        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+          s"${sel.desc} IS NULL"
+        }.mkString(" AND ")
+        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+          s"${sel.desc} IS NULL"
+        }.mkString(" AND ")
+        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` 
ON ${onClause} WHERE ${whereClause}"
+      }
+      val missRecordsTransStep = SparkSqlTransformStep(missRecordsTableName, 
missRecordsSql, emptyMap, true)
+      val missRecordsWriteSteps = procType match {
+        case BatchProcessType => {
+          val rwName = 
ruleParam.recordOpt.map(_.name).getOrElse(missRecordsTableName)
+          RecordWriteStep(rwName, missRecordsTableName) :: Nil
+        }
+        case StreamingProcessType => Nil
+      }
+      val missRecordsUpdateWriteSteps = procType match {
+        case BatchProcessType => Nil
+        case StreamingProcessType => {
+          val dsName = 
ruleParam.dsCacheUpdateOpt.map(_.dsName).getOrElse(sourceName)
+          DsCacheUpdateWriteStep(dsName, missRecordsTableName) :: Nil
+        }
+      }
+
+      // 2. miss count
+      val missCountTableName = "__missCount"
+      val missColName = details.getStringOrKey(_miss)
+      val missCountSql = procType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM 
`${missRecordsTableName}`"
+        case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, 
COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY 
`${ConstantColumns.tmst}`"
+      }
+      val missCountTransStep = SparkSqlTransformStep(missCountTableName, 
missCountSql, emptyMap)
+
+      // 3. total count
+      val totalCountTableName = "__totalCount"
+      val totalColName = details.getStringOrKey(_total)
+      val totalCountSql = procType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
+        case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, 
COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY 
`${ConstantColumns.tmst}`"
+      }
+      val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, 
totalCountSql, emptyMap)
+
+      // 4. accuracy metric
+      val accuracyTableName = ruleParam.name
+      val matchedColName = details.getStringOrKey(_matched)
+      val accuracyMetricSql = procType match {
+        case BatchProcessType => {
+          s"""
+             |SELECT `${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
+             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
+             |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
+             |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
+         """.stripMargin
+        }
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS 
`${ConstantColumns.tmst}`,
+             |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
+             |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
+             |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
+             |ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = 
`${missCountTableName}`.`${ConstantColumns.tmst}`
+         """.stripMargin
+        }
+      }
+      val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, 
accuracyMetricSql, emptyMap)
+      val accuracyMetricWriteSteps = procType match {
+        case BatchProcessType => {
+          val mwName = 
ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name)
+          val collectType = 
NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse(""))
+          MetricWriteStep(mwName, accuracyTableName, collectType) :: Nil
+        }
+        case StreamingProcessType => Nil
+      }
+
+      // accuracy current steps
+      val transSteps1 = missRecordsTransStep :: missCountTransStep :: 
totalCountTransStep :: accuracyTransStep :: Nil
+      val writeSteps1 = accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ 
missRecordsUpdateWriteSteps
+
+      // streaming extra steps
+      val (transSteps2, writeSteps2) = procType match {
+        case BatchProcessType => (Nil, Nil)
+        case StreamingProcessType => {
+          // 5. accuracy metric merge
+          val accuracyMetricTableName = "__accuracy"
+          val accuracyMetricRule = DataFrameOps._accuracy
+          val accuracyMetricDetails = Map[String, Any](
+            (AccuracyOprKeys._dfName -> accuracyTableName),
+            (AccuracyOprKeys._miss -> missColName),
+            (AccuracyOprKeys._total -> totalColName),
+            (AccuracyOprKeys._matched -> matchedColName)
+          )
+          val accuracyMetricTransStep = 
DataFrameOpsTransformStep(accuracyMetricTableName,
+            accuracyMetricRule, accuracyMetricDetails)
+          val accuracyMetricWriteStep = {
+            val mwName = 
ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name)
+            val collectType = 
NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse(""))
+            MetricWriteStep(mwName, accuracyMetricTableName, collectType)
+          }
+
+          // 6. collect accuracy records
+          val accuracyRecordTableName = "__accuracyRecords"
+          val accuracyRecordSql = {
+            s"""
+               |SELECT `${ConstantColumns.tmst}`, `${ConstantColumns.empty}`
+               |FROM `${accuracyMetricTableName}` WHERE 
`${ConstantColumns.record}`
+             """.stripMargin
+          }
+          val accuracyRecordTransStep = SparkSqlTransformStep(
+            accuracyRecordTableName, accuracyRecordSql, emptyMap)
+          val accuracyRecordWriteStep = {
+            val rwName = 
ruleParam.recordOpt.map(_.name).getOrElse(missRecordsTableName)
+            RecordWriteStep(rwName, missRecordsTableName, 
Some(accuracyRecordTableName))
+          }
+
+          // extra steps
+          (accuracyMetricTransStep :: accuracyRecordTransStep :: Nil,
+            accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil)
+        }
+      }
+
+      // full steps
+      transSteps1 ++ transSteps2 ++ writeSteps1 ++ writeSteps2
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
new file mode 100644
index 0000000..fcb576c
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -0,0 +1,157 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform
+
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.params.RuleParam
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.step.builder.dsl.expr._
+import 
org.apache.griffin.measure.step.builder.dsl.transform.analyzer.CompletenessAnalyzer
+import org.apache.griffin.measure.step.transform.SparkSqlTransformStep
+import org.apache.griffin.measure.step.write.{MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * generate completeness dq steps
+  */
+case class CompletenessExpr2DQSteps(context: DQContext,
+                                    expr: Expr,
+                                    ruleParam: RuleParam
+                                   ) extends Expr2DQSteps {
+
+  private object CompletenessKeys {
+    val _source = "source"
+    val _total = "total"
+    val _complete = "complete"
+    val _incomplete = "incomplete"
+  }
+  import CompletenessKeys._
+
+  def getDQSteps(): Seq[DQStep] = {
+    val details = ruleParam.getDetails
+    val completenessExpr = expr.asInstanceOf[CompletenessClause]
+
+    val sourceName = details.getString(_source, context.getDataSourceName(0))
+
+    val procType = context.procType
+    val timestamp = context.contextId.timestamp
+
+    if (!context.runTimeTableRegister.existsTable(sourceName)) {
+      warn(s"[${timestamp}] data source ${sourceName} not exists")
+      Nil
+    } else {
+      val analyzer = CompletenessAnalyzer(completenessExpr, sourceName)
+
+      val selItemsClause = analyzer.selectionPairs.map { pair =>
+        val (expr, alias) = pair
+        s"${expr.desc} AS `${alias}`"
+      }.mkString(", ")
+      val aliases = analyzer.selectionPairs.map(_._2)
+
+      val selClause = procType match {
+        case BatchProcessType => selItemsClause
+        case StreamingProcessType => s"`${ConstantColumns.tmst}`, 
${selItemsClause}"
+      }
+      val selAliases = procType match {
+        case BatchProcessType => aliases
+        case StreamingProcessType => ConstantColumns.tmst +: aliases
+      }
+
+      // 1. source alias
+      val sourceAliasTableName = "__sourceAlias"
+      val sourceAliasSql = {
+        s"SELECT ${selClause} FROM `${sourceName}`"
+      }
+      val sourceAliasTransStep = SparkSqlTransformStep(sourceAliasTableName, 
sourceAliasSql, emptyMap, true)
+
+      // 2. incomplete record
+      val incompleteRecordsTableName = "__incompleteRecords"
+      val completeWhereClause = aliases.map(a => s"`${a}` IS NOT 
NULL").mkString(" AND ")
+      val incompleteWhereClause = s"NOT (${completeWhereClause})"
+      val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` 
WHERE ${incompleteWhereClause}"
+      val incompleteRecordTransStep = 
SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, 
emptyMap, true)
+      val incompleteRecordWriteStep = {
+        val rwName = 
ruleParam.recordOpt.map(_.name).getOrElse(incompleteRecordsTableName)
+        RecordWriteStep(rwName, incompleteRecordsTableName)
+      }
+
+      // 3. incomplete count
+      val incompleteCountTableName = "__incompleteCount"
+      val incompleteColName = details.getStringOrKey(_incomplete)
+      val incompleteCountSql = procType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` 
FROM `${incompleteRecordsTableName}`"
+        case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, 
COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}` GROUP 
BY `${ConstantColumns.tmst}`"
+      }
+      val incompleteCountTransStep = 
SparkSqlTransformStep(incompleteCountTableName, incompleteCountSql, emptyMap)
+
+      // 4. total count
+      val totalCountTableName = "__totalCount"
+      val totalColName = details.getStringOrKey(_total)
+      val totalCountSql = procType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceAliasTableName}`"
+        case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, 
COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}` GROUP BY 
`${ConstantColumns.tmst}`"
+      }
+      val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, 
totalCountSql, emptyMap)
+
+      // 5. complete metric
+      val completeTableName = ruleParam.name
+      val completeColName = details.getStringOrKey(_complete)
+      val completeMetricSql = procType match {
+        case BatchProcessType => {
+          s"""
+             |SELECT `${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
+             |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 
0) AS `${incompleteColName}`,
+             |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS 
`${completeColName}`
+             |FROM `${totalCountTableName}` LEFT JOIN 
`${incompleteCountTableName}`
+         """.stripMargin
+        }
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS 
`${ConstantColumns.tmst}`,
+             |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+             |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 
0) AS `${incompleteColName}`,
+             |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS 
`${completeColName}`
+             |FROM `${totalCountTableName}` LEFT JOIN 
`${incompleteCountTableName}`
+             |ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = 
`${incompleteCountTableName}`.`${ConstantColumns.tmst}`
+         """.stripMargin
+        }
+      }
+      val completeTransStep = SparkSqlTransformStep(completeTableName, 
completeMetricSql, emptyMap)
+      val completeWriteStep = {
+        val mwName = 
ruleParam.metricOpt.map(_.name).getOrElse(completeTableName)
+        MetricWriteStep(mwName, completeTableName, DefaultNormalizeType)
+      }
+
+      val transSteps = {
+        sourceAliasTransStep :: incompleteRecordTransStep ::
+          incompleteCountTransStep :: totalCountTransStep ::
+          completeTransStep :: Nil
+      }
+      val writeSteps = {
+        incompleteRecordWriteStep :: completeWriteStep :: Nil
+      }
+
+      // full steps
+      transSteps ++ writeSteps
+    }
+  }
+
+}

Reply via email to