http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
index 237902a..1e077b1 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
@@ -18,11 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.rule.adaptor
 
+import org.apache.griffin.measure.cache.tmst.TempName
 import org.apache.griffin.measure.config.params.user._
 import org.apache.griffin.measure.process.ProcessType
-import org.apache.griffin.measure.process.check.DataChecker
+import org.apache.griffin.measure.process.temp.TableRegisters
 import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.rule.plan._
 import org.apache.spark.sql.SQLContext
 
 import scala.collection.mutable.{Map => MutableMap}
@@ -30,33 +31,39 @@ import scala.collection.mutable.{Map => MutableMap}
 object RuleAdaptorGroup {
 
   val _dslType = "dsl.type"
+//  import RuleInfoKeys._
 
-  var dataSourceNames: Seq[String] = _
-  var functionNames: Seq[String] = _
+  var dataSourceNames: Seq[String] = Nil
+  var functionNames: Seq[String] = Nil
 
-  var dataChecker: DataChecker = _
+  var baselineDsName: String = ""
 
-  def init(sqlContext: SQLContext, dsNames: Seq[String]): Unit = {
+  private val emptyRulePlan = RulePlan(Nil, Nil)
+
+  def init(dsNames: Seq[String], blDsName: String, funcNames: Seq[String]): 
Unit = {
+    dataSourceNames = dsNames
+    baselineDsName = blDsName
+    functionNames = funcNames
+  }
+
+  def init(sqlContext: SQLContext, dsNames: Seq[String], blDsName: String): 
Unit = {
     val functions = sqlContext.sql("show functions")
-    functionNames = functions.map(_.getString(0)).collect
+    functionNames = functions.map(_.getString(0)).collect.toSeq
     dataSourceNames = dsNames
 
-    dataChecker = DataChecker(sqlContext)
+    baselineDsName = blDsName
   }
 
   private def getDslType(param: Map[String, Any], defDslType: DslType) = {
-    val dt = DslType(param.getOrElse(_dslType, "").toString)
-    dt match {
-      case UnknownDslType => defDslType
-      case _ => dt
-    }
+    DslType(param.getOrElse(_dslType, defDslType.desc).toString)
   }
 
-  private def genRuleAdaptor(dslType: DslType, dsNames: Seq[String], 
adaptPhase: AdaptPhase): Option[RuleAdaptor] = {
+  private def genRuleAdaptor(dslType: DslType, dsNames: Seq[String]
+                            ): Option[RuleAdaptor] = {
     dslType match {
-      case SparkSqlType => Some(SparkSqlAdaptor(adaptPhase))
-      case DfOprType => Some(DataFrameOprAdaptor(adaptPhase))
-      case GriffinDslType => Some(GriffinDslAdaptor(dsNames, functionNames, 
adaptPhase))
+      case SparkSqlType => Some(SparkSqlAdaptor())
+      case DfOprType => Some(DataFrameOprAdaptor())
+      case GriffinDslType => Some(GriffinDslAdaptor(dsNames, functionNames))
       case _ => None
     }
   }
@@ -77,29 +84,217 @@ object RuleAdaptorGroup {
 //    steps
 //  }
 
-  def genConcreteRuleSteps(evaluateRuleParam: EvaluateRuleParam,
-                           adaptPhase: AdaptPhase
-                          ): Seq[ConcreteRuleStep] = {
+//  def genConcreteRuleSteps(timeInfo: TimeInfo, evaluateRuleParam: 
EvaluateRuleParam,
+//                           dsTmsts: Map[String, Set[Long]], procType: 
ProcessType,
+//                           adaptPhase: AdaptPhase
+//                          ): Seq[ConcreteRuleStep] = {
+//    val dslTypeStr = if (evaluateRuleParam.dslType == null) "" else 
evaluateRuleParam.dslType
+//    val defaultDslType = DslType(dslTypeStr)
+//    val ruleParams = evaluateRuleParam.rules
+//    genConcreteRuleSteps(timeInfo, ruleParams, dsTmsts, defaultDslType, 
procType, adaptPhase)
+//  }
+//
+//  def genConcreteRuleSteps(timeInfo: TimeInfo, ruleParams: Seq[Map[String, 
Any]],
+//                           dsTmsts: Map[String, Set[Long]], defDslType: 
DslType,
+//                           procType: ProcessType, adaptPhase: AdaptPhase
+//                          ): Seq[ConcreteRuleStep] = {
+//    val (steps, dsNames) = ruleParams.foldLeft((Seq[ConcreteRuleStep](), 
dataSourceNames)) { (res, param) =>
+//      val (preSteps, preNames) = res
+//      val dslType = getDslType(param, defDslType)
+//      val (curSteps, curNames) = genRuleAdaptor(dslType, preNames, procType, 
adaptPhase) match {
+//        case Some(ruleAdaptor) => {
+//          val concreteSteps = ruleAdaptor.genConcreteRuleStep(timeInfo, 
param, dsTmsts)
+//          (concreteSteps, preNames ++ 
ruleAdaptor.getPersistNames(concreteSteps))
+//        }
+//        case _ => (Nil, preNames)
+//      }
+//      (preSteps ++ curSteps, curNames)
+//    }
+//    steps
+//  }
+
+  // -- gen rule plan --
+  def genRulePlan(timeInfo: TimeInfo, evaluateRuleParam: EvaluateRuleParam, 
procType: ProcessType
+                 ): RulePlan = {
     val dslTypeStr = if (evaluateRuleParam.dslType == null) "" else 
evaluateRuleParam.dslType
     val defaultDslType = DslType(dslTypeStr)
     val ruleParams = evaluateRuleParam.rules
-    genConcreteRuleSteps(ruleParams, defaultDslType, adaptPhase)
+    genRulePlan(timeInfo, ruleParams, defaultDslType, procType)
   }
 
-  def genConcreteRuleSteps(ruleParams: Seq[Map[String, Any]],
-                           defDslType: DslType, adaptPhase: AdaptPhase
-                          ): Seq[ConcreteRuleStep] = {
-    val (steps, dsNames) = ruleParams.foldLeft((Seq[ConcreteRuleStep](), 
dataSourceNames)) { (res, param) =>
-      val (preSteps, preNames) = res
-      val dslType = getDslType(param, defDslType)
-      val (curSteps, curNames) = genRuleAdaptor(dslType, preNames, adaptPhase) 
match {
-        case Some(ruleAdaptor) => (ruleAdaptor.genConcreteRuleStep(param), 
preNames ++ ruleAdaptor.getTempSourceNames(param))
-        case _ => (Nil, preNames)
+  def genRulePlan(timeInfo: TimeInfo, ruleParams: Seq[Map[String, Any]],
+                  defaultDslType: DslType, procType: ProcessType
+                 ): RulePlan = {
+    val (rulePlan, dsNames) = ruleParams.foldLeft((emptyRulePlan, 
dataSourceNames)) { (res, param) =>
+      val (plan, names) = res
+      val dslType = getDslType(param, defaultDslType)
+      val curPlan: RulePlan = genRuleAdaptor(dslType, names) match {
+        case Some(adaptor) => adaptor.genRulePlan(timeInfo, param, procType)
+        case _ => emptyRulePlan
       }
-      (preSteps ++ curSteps, curNames)
+      val globalNames = curPlan.globalRuleSteps.map(_.name)
+      globalNames.foreach(TableRegisters.registerCompileGlobalTable(_))
+      val curNames = curPlan.normalRuleSteps.map(_.name)
+      curNames.foreach(TableRegisters.registerCompileTempTable(timeInfo.key, 
_))
+
+      val retPlan = plan.merge(curPlan)
+      (retPlan, names ++ globalNames ++ curNames)
     }
-    steps
+
+    rulePlan
   }
 
 
+  // -- gen steps --
+//  def genRuleSteps(timeInfo: TimeInfo, evaluateRuleParam: EvaluateRuleParam, 
dsTmsts: Map[String, Set[Long]]
+//                  ): Seq[ConcreteRuleStep] = {
+//    val dslTypeStr = if (evaluateRuleParam.dslType == null) "" else 
evaluateRuleParam.dslType
+//    val defaultDslType = DslType(dslTypeStr)
+//    val ruleParams = evaluateRuleParam.rules
+//    val tmsts = dsTmsts.getOrElse(baselineDsName, Set[Long]()).toSeq
+//    genRuleSteps(timeInfo, ruleParams, tmsts, defaultDslType)
+//  }
+//
+//  def genRuleSteps(timeInfo: TimeInfo, ruleParams: Seq[Map[String, Any]],
+//                   tmsts: Seq[Long], defaultDslType: DslType,
+//                   adapthase: AdaptPhase = RunPhase
+//                  ): Seq[ConcreteRuleStep] = {
+//    val calcTime = timeInfo.calcTime
+//    val (ruleInfos, dsNames) = ruleParams.foldLeft((Seq[RuleInfo](), 
dataSourceNames)) { (res, param) =>
+//      val (preRuleInfos, preNames) = res
+//      val dslType = getDslType(param, defaultDslType)
+//      val (curRuleInfos, curNames) = genRuleAdaptor(dslType, preNames) match 
{
+//        case Some(adaptor) => {
+//          val ris = adaptor.genRuleInfos(param, timeInfo)
+//          val rins = ris.filter(!_.global).map(_.name)
+//          (ris, rins)
+//        }
+//        case _ => (Nil, Nil)
+//      }
+//      if (adapthase == RunPhase) {
+//        curNames.foreach(TempTables.registerTempTableNameOnly(timeInfo.key, 
_))
+//      }
+//      (preRuleInfos ++ curRuleInfos, preNames ++ curNames)
+//    }
+//
+//    adapthase match {
+//      case PreProcPhase => {
+//        ruleInfos.flatMap { ri =>
+//          genConcRuleSteps(timeInfo, ri)
+//        }
+//      }
+//      case RunPhase => {
+//        val riGroups = ruleInfos.foldRight(List[(List[RuleInfo], 
Boolean)]()) { (ri, groups) =>
+//          groups match {
+//            case head :: tail if (ri.gather == head._2) => (ri :: head._1, 
head._2) :: tail
+//            case _ => (ri :: Nil, ri.gather) :: groups
+//          }
+//        }.foldLeft(List[(List[RuleInfo], Boolean, List[String], 
List[RuleInfo])]()) { (groups, rigs) =>
+//          val preGatherNames = groups.lastOption match {
+//            case Some(t) => if (t._2) t._3 ::: t._1.map(_.name) else t._3
+//            case _ => baselineDsName :: Nil
+//          }
+//          val persistRuleInfos = groups.lastOption match {
+//            case Some(t) if (t._2) => t._1.filter(_.persistType.needPersist)
+//            case _ => Nil
+//          }
+//          groups :+ (rigs._1, rigs._2, preGatherNames, persistRuleInfos)
+//        }
+//
+//        riGroups.flatMap { group =>
+//          val (ris, gather, srcNames, persistRis) = group
+//          if (gather) {
+//            ris.flatMap { ri =>
+//              genConcRuleSteps(timeInfo, ri)
+//            }
+//          } else {
+//            tmsts.flatMap { tmst =>
+//              val concTimeInfo = TmstTimeInfo(calcTime, tmst)
+//              val tmstInitRuleInfos = genTmstInitRuleInfo(concTimeInfo, 
srcNames, persistRis)
+//              (tmstInitRuleInfos ++ ris).flatMap { ri =>
+//                genConcRuleSteps(concTimeInfo, ri)
+//              }
+//            }
+//          }
+//        }
+//      }
+//    }
+//
+//
+//  }
+//
+//  private def genConcRuleSteps(timeInfo: TimeInfo, ruleInfo: RuleInfo): 
Seq[ConcreteRuleStep] = {
+//    val nri = if (ruleInfo.persistType.needPersist && 
ruleInfo.tmstNameOpt.isEmpty) {
+//      val tmstName = if (ruleInfo.gather) {
+//        TempName.tmstName(ruleInfo.name, timeInfo.calcTime)
+//      } else {
+//        TempName.tmstName(ruleInfo.name, timeInfo)
+//      }
+//      ruleInfo.setTmstNameOpt(Some(tmstName))
+//    } else ruleInfo
+//    ruleInfo.dslType match {
+//      case SparkSqlType => SparkSqlStep(timeInfo, nri) :: Nil
+//      case DfOprType => DfOprStep(timeInfo, nri) :: Nil
+//      case _ => Nil
+//    }
+//  }
+//
+//  private def genTmstInitRuleInfo(timeInfo: TmstTimeInfo, srcNames: 
Seq[String],
+//                                  persistRis: Seq[RuleInfo]): Seq[RuleInfo] 
= {
+//    val TmstTimeInfo(calcTime, tmst, _) = timeInfo
+//    srcNames.map { srcName =>
+//      val srcTmstName = TempName.tmstName(srcName, calcTime)
+//      val filterSql = {
+//        s"SELECT * FROM `${srcTmstName}` WHERE `${InternalColumns.tmst}` = 
${tmst}"
+//      }
+//      val params = persistRis.filter(_.name == srcName).headOption match {
+//        case Some(ri) => ri.details
+//        case _ => Map[String, Any]()
+//      }
+//      RuleInfo(srcName, None, SparkSqlType, filterSql, params, false)
+//    }
+//  }
+
+//  def genRuleSteps(timeInfo: TimeInfo, ruleParams: Seq[Map[String, Any]],
+//                   tmsts: Seq[Long], defaultDslType: DslType,
+//                   adapthase: AdaptPhase = RunPhase
+//                  ): Seq[ConcreteRuleStep] = {
+//    tmsts.flatMap { tmst =>
+//      val newTimeInfo = TimeInfo(timeInfo.calcTime, tmst)
+//      val initSteps: Seq[ConcreteRuleStep] = adapthase match {
+//        case RunPhase => genTmstInitStep(newTimeInfo)
+//        case PreProcPhase => Nil
+//      }
+//      val (steps, dsNames) = ruleParams.foldLeft((initSteps, 
dataSourceNames)) { (res, param) =>
+//        val (preSteps, preNames) = res
+//        val dslType = getDslType(param, defaultDslType)
+//        val (curSteps, curNames) = genRuleAdaptor(dslType, preNames) match {
+//          case Some(ruleAdaptor) => {
+//            val concreteSteps = ruleAdaptor.genConcreteRuleStep(newTimeInfo, 
param)
+//            val persistNames = ruleAdaptor.getPersistNames(concreteSteps)
+//            (concreteSteps, persistNames)
+//          }
+//          case _ => (Nil, Nil)
+//        }
+//        (preSteps ++ curSteps, preNames ++ curNames)
+//      }
+//      steps
+//    }
+//  }
+
+
+
+//  private def genTmstInitStep(timeInfo: TimeInfo): Seq[ConcreteRuleStep] = {
+//    val TimeInfo(calcTime, tmst) = timeInfo
+//    val tmstDsName = TempName.tmstName(baselineDsName, calcTime)
+//    val filterSql = {
+//      s"SELECT * FROM `${tmstDsName}` WHERE `${InternalColumns.tmst}` = 
${tmst}"
+//    }
+//    SparkSqlStep(
+//      timeInfo,
+//      RuleInfo(baselineDsName, None, filterSql, Map[String, Any]())
+//    ) :: Nil
+//  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
index 78121fa..6b3b7cb 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
@@ -18,37 +18,31 @@ under the License.
 */
 package org.apache.griffin.measure.rule.adaptor
 
-import org.apache.griffin.measure.data.connector.GroupByColumn
-import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.cache.tmst.TempName
+import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.rule.dsl.MetricPersistType
+import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
+import org.apache.griffin.measure.utils.ParamUtil._
 
-case class SparkSqlAdaptor(adaptPhase: AdaptPhase) extends RuleAdaptor {
+case class SparkSqlAdaptor() extends RuleAdaptor {
 
-  def genRuleStep(param: Map[String, Any]): Seq[RuleStep] = {
-    SparkSqlStep(getName(param), getRule(param), getDetails(param),
-      getPersistType(param), getUpdateDataSource(param)) :: Nil
-  }
-  def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = {
-    ruleStep match {
-      case rs @ SparkSqlStep(name, rule, details, persistType, udsOpt) => {
-        adaptPhase match {
-          case PreProcPhase => rs :: Nil
-          case RunPhase => {
-            val repSel = rule.replaceFirst("(?i)select", s"SELECT 
`${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`,")
-            val groupbyRule = repSel.concat(s" GROUP BY 
`${GroupByColumn.tmst}`")
-            val nrs = SparkSqlStep(name, groupbyRule, details, persistType, 
udsOpt)
-            nrs :: Nil
-          }
-        }
-      }
-      case _ => Nil
-    }
-  }
+//  def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): 
Seq[RuleStep] = {
+//    val ruleInfo = RuleInfoGen(param, timeInfo)
+//    SparkSqlStep(timeInfo, ruleInfo) :: Nil
+//  }
+//  def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = {
+//    ruleStep match {
+//      case rs @ SparkSqlStep(ti, ri) => rs :: Nil
+//      case _ => Nil
+//    }
+//  }
+
+  import RuleParamKeys._
 
-  def getTempSourceNames(param: Map[String, Any]): Seq[String] = {
-    param.get(_name) match {
-      case Some(name) => name.toString :: Nil
-      case _ => Nil
-    }
+  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: 
ProcessType): RulePlan = {
+    val name = getRuleName(param)
+    val step = SparkSqlStep(name, getRule(param), getDetails(param), 
getCache(param), getGlobal(param))
+    RulePlan(step :: Nil, genRuleExports(param, name, name))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/CollectType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/CollectType.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/CollectType.scala
new file mode 100644
index 0000000..03a43d6
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/CollectType.scala
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl
+
+import scala.util.matching.Regex
+
+sealed trait CollectType {
+  val regex: Regex
+  val desc: String
+}
+
+object CollectType {
+  private val collectTypes: List[CollectType] = List(DefaultCollectType, 
EntriesCollectType, ArrayCollectType, MapCollectType)
+  def apply(ptn: String): CollectType = {
+    collectTypes.filter(tp => ptn match {
+      case tp.regex() => true
+      case _ => false
+    }).headOption.getOrElse(DefaultCollectType)
+  }
+  def unapply(pt: CollectType): Option[String] = Some(pt.desc)
+}
+
+final case object DefaultCollectType extends CollectType {
+  val regex: Regex = "".r
+  val desc: String = "default"
+}
+
+final case object EntriesCollectType extends CollectType {
+  val regex: Regex = "^(?i)entries$".r
+  val desc: String = "entries"
+}
+
+final case object ArrayCollectType extends CollectType {
+  val regex: Regex = "^(?i)array|list$".r
+  val desc: String = "array"
+}
+
+final case object MapCollectType extends CollectType {
+  val regex: Regex = "^(?i)map$".r
+  val desc: String = "map"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
index ac27403..da59348 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
@@ -27,7 +27,9 @@ sealed trait DqType {
 }
 
 object DqType {
-  private val dqTypes: List[DqType] = List(AccuracyType, ProfilingType, 
TimelinessType, UnknownType)
+  private val dqTypes: List[DqType] = List(
+    AccuracyType, ProfilingType, DuplicateType, TimelinessType, UnknownType
+  )
   def apply(ptn: String): DqType = {
     dqTypes.filter(tp => ptn match {
       case tp.regex() => true
@@ -44,7 +46,12 @@ final case object AccuracyType extends DqType {
 
 final case object ProfilingType extends DqType {
   val regex = "^(?i)profiling$".r
-  val desc = "profiling$"
+  val desc = "profiling"
+}
+
+final case object DuplicateType extends DqType {
+  val regex = "^(?i)duplicate$".r
+  val desc = "duplicate"
 }
 
 final case object TimelinessType extends DqType {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala
index cfda393..27ab2ac 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala
@@ -27,12 +27,12 @@ sealed trait DslType {
 }
 
 object DslType {
-  private val dslTypes: List[DslType] = List(SparkSqlType, GriffinDslType, 
DfOprType, UnknownDslType)
+  private val dslTypes: List[DslType] = List(SparkSqlType, GriffinDslType, 
DfOprType)
   def apply(ptn: String): DslType = {
     dslTypes.filter(tp => ptn match {
       case tp.regex() => true
       case _ => false
-    }).headOption.getOrElse(UnknownDslType)
+    }).headOption.getOrElse(GriffinDslType)
   }
   def unapply(pt: DslType): Option[String] = Some(pt.desc)
 }
@@ -50,9 +50,4 @@ final case object DfOprType extends DslType {
 final case object GriffinDslType extends DslType {
   val regex = "^(?i)griffin-?dsl$".r
   val desc = "griffin-dsl"
-}
-
-final case object UnknownDslType extends DslType {
-  val regex = "".r
-  val desc = "unknown"
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/PersistType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/PersistType.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/PersistType.scala
index 10b83c8..f2857e3 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/PersistType.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/PersistType.scala
@@ -26,6 +26,7 @@ sealed trait PersistType {
 //  def temp: Boolean = false
 //  def persist: Boolean = false
 //  def collect: Boolean = false
+  def needPersist: Boolean = true
 }
 
 object PersistType {
@@ -42,6 +43,7 @@ object PersistType {
 final case object NonePersistType extends PersistType {
   val regex: Regex = "".r
   val desc: String = "none"
+  override def needPersist: Boolean = false
 }
 
 final case object RecordPersistType extends PersistType {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/BasicAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/BasicAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/BasicAnalyzer.scala
index 063eb7b..e14e0da 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/BasicAnalyzer.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/BasicAnalyzer.scala
@@ -18,7 +18,7 @@ under the License.
 */
 package org.apache.griffin.measure.rule.dsl.analyzer
 
-import org.apache.griffin.measure.rule.dsl.expr._
+import org.apache.griffin.measure.rule.dsl.expr.{MathExpr, _}
 
 
 trait BasicAnalyzer extends Serializable {
@@ -35,7 +35,7 @@ trait BasicAnalyzer extends Serializable {
 
   val seqSelectionExprs = (dsName: String) => (expr: Expr, v: 
Seq[SelectionExpr]) => {
     expr match {
-      case se @ SelectionExpr(head: DataSourceHeadExpr, _, _) if (head.desc == 
dsName) => v :+ se
+      case se @ SelectionExpr(head: DataSourceHeadExpr, _, _) if (head.name == 
dsName) => v :+ se
       case _ => v
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala
new file mode 100644
index 0000000..1ca2b76
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl.analyzer
+
+import org.apache.griffin.measure.rule.dsl.expr.{AliasableExpr, _}
+
+
+case class DuplicateAnalyzer(expr: DuplicateClause, sourceName: String, 
targetName: String) extends BasicAnalyzer {
+
+  val seqAlias = (expr: Expr, v: Seq[String]) => {
+    expr match {
+      case apr: AliasableExpr => v ++ apr.alias
+      case _ => v
+    }
+  }
+  val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b
+
+  private val exprs = expr.exprs
+  private def genAlias(idx: Int): String = s"alias_${idx}"
+  val selectionPairs = exprs.zipWithIndex.map { pair =>
+    val (pr, idx) = pair
+    val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias)
+    (pr, res.headOption.getOrElse(genAlias(idx)))
+  }
+
+  if (selectionPairs.isEmpty) {
+    throw new Exception(s"duplicate analyzer error: empty selection")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/ProfilingAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/ProfilingAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/ProfilingAnalyzer.scala
index 34bdbd3..6872977 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/ProfilingAnalyzer.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/ProfilingAnalyzer.scala
@@ -25,24 +25,14 @@ case class ProfilingAnalyzer(expr: ProfilingClause, 
sourceName: String) extends
 
   val dataSourceNames = 
expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, 
combDataSourceNames)
 
-  val sourceSelectionExprs = {
-    val seq = seqSelectionExprs(sourceName)
-    expr.selectClause.preOrderTraverseDepthFirst(Seq[SelectionExpr]())(seq, 
combSelectionExprs)
-  }
-
-  val selectionExprs = expr.selectClause.exprs.map(_.extractSelf)
-  def containsAllSelectionExpr = {
-    selectionExprs.filter { expr =>
+  val selectionExprs: Seq[Expr] = {
+    expr.selectClause.exprs.map(_.extractSelf).flatMap { expr =>
       expr match {
-        case SelectionExpr(head: ALLSelectHeadExpr, selectors: 
Seq[SelectExpr], _) => {
-          selectors.isEmpty
-        }
-        case SelectionExpr(head: DataSourceHeadExpr, selectors: 
Seq[SelectExpr], _) => {
-          (head == sourceName) && (selectors.size == 1) && 
(selectors.head.isInstanceOf[AllFieldsSelectExpr])
-        }
-        case _ => false
+        case e: SelectionExpr => Some(e)
+        case e: FunctionExpr => Some(e)
+        case _ => None
       }
-    }.size > 0
+    }
   }
 
   val groupbyExprOpt = expr.groupbyClauseOpt

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/TimelinessAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/TimelinessAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/TimelinessAnalyzer.scala
new file mode 100644
index 0000000..37d4651
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/TimelinessAnalyzer.scala
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl.analyzer
+
+import org.apache.griffin.measure.rule.dsl.expr._
+
+
+case class TimelinessAnalyzer(expr: TimelinessClause, sourceName: String) 
extends BasicAnalyzer {
+
+//  val tsExpr = expr.desc
+
+//  val seqAlias = (expr: Expr, v: Seq[String]) => {
+//    expr match {
+//      case apr: AliasableExpr => v ++ apr.alias
+//      case _ => v
+//    }
+//  }
+//  val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b
+//
+//  private val exprs = expr.exprs.toList
+//  val selectionPairs = exprs.map { pr =>
+//    val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, 
combAlias)
+//    println(res)
+//    println(pr)
+//    (pr, res.headOption)
+//  }
+//
+//  val (tsExprPair, endTsPairOpt) = selectionPairs match {
+//    case Nil => throw new Exception(s"timeliness analyzer error: ts column 
not set")
+//    case tsPair :: Nil => (tsPair, None)
+//    case tsPair :: endTsPair :: _ => (tsPair, Some(endTsPair))
+//  }
+//
+//  def getSelAlias(pair: (Expr, Option[String]), defAlias: String): (String, 
String) = {
+//    val (pr, aliasOpt) = pair
+//    val alias = aliasOpt.getOrElse(defAlias)
+//    (pr.desc, alias)
+//  }
+
+
+  private val exprs = expr.exprs.map(_.desc).toList
+
+  val (btsExpr, etsExprOpt) = exprs match {
+    case Nil => throw new Exception(s"timeliness analyzer error: ts column not 
set")
+    case btsExpr :: Nil => (btsExpr, None)
+    case btsExpr :: etsExpr :: _ => (btsExpr, Some(etsExpr))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
index c0986e1..bc7af42 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
@@ -21,13 +21,23 @@ package org.apache.griffin.measure.rule.dsl.expr
 trait ClauseExpression extends Expr {
 }
 
-case class SelectClause(exprs: Seq[Expr]) extends ClauseExpression {
+case class SelectClause(exprs: Seq[Expr], extraConditionOpt: 
Option[ExtraConditionExpr]
+                       ) extends ClauseExpression {
 
   addChildren(exprs)
 
-  def desc: String = s"${exprs.map(_.desc).mkString(", ")}"
+  def desc: String = {
+    extraConditionOpt match {
+      case Some(cdtn) => s"${cdtn.desc} ${exprs.map(_.desc).mkString(", ")}"
+      case _ => s"${exprs.map(_.desc).mkString(", ")}"
+    }
+  }
   def coalesceDesc: String = desc
 
+  override def map(func: (Expr) => Expr): SelectClause = {
+    SelectClause(exprs.map(func(_)), 
extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr]))
+  }
+
 }
 
 case class FromClause(dataSource: String) extends ClauseExpression {
@@ -44,6 +54,10 @@ case class WhereClause(expr: Expr) extends ClauseExpression {
   def desc: String = s"WHERE ${expr.desc}"
   def coalesceDesc: String = s"WHERE ${expr.coalesceDesc}"
 
+  override def map(func: (Expr) => Expr): WhereClause = {
+    WhereClause(func(expr))
+  }
+
 }
 
 case class GroupbyClause(exprs: Seq[Expr], havingClauseOpt: Option[Expr]) 
extends ClauseExpression {
@@ -79,6 +93,10 @@ case class GroupbyClause(exprs: Seq[Expr], havingClauseOpt: 
Option[Expr]) extend
     GroupbyClause(exprs ++ other.exprs, newHavingClauseOpt)
   }
 
+  override def map(func: (Expr) => Expr): GroupbyClause = {
+    GroupbyClause(exprs.map(func(_)), havingClauseOpt.map(func(_)))
+  }
+
 }
 
 case class OrderbyItem(expr: Expr, orderOpt: Option[String]) extends Expr {
@@ -90,6 +108,10 @@ case class OrderbyItem(expr: Expr, orderOpt: 
Option[String]) extends Expr {
     }
   }
   def coalesceDesc: String = desc
+
+  override def map(func: (Expr) => Expr): OrderbyItem = {
+    OrderbyItem(func(expr), orderOpt)
+  }
 }
 
 case class OrderbyClause(items: Seq[OrderbyItem]) extends ClauseExpression {
@@ -104,6 +126,10 @@ case class OrderbyClause(items: Seq[OrderbyItem]) extends 
ClauseExpression {
     val obs = items.map(_.desc).mkString(", ")
     s"ORDER BY ${obs}"
   }
+
+  override def map(func: (Expr) => Expr): OrderbyClause = {
+    OrderbyClause(items.map(func(_).asInstanceOf[OrderbyItem]))
+  }
 }
 
 case class LimitClause(expr: Expr) extends ClauseExpression {
@@ -112,6 +138,10 @@ case class LimitClause(expr: Expr) extends 
ClauseExpression {
 
   def desc: String = s"LIMIT ${expr.desc}"
   def coalesceDesc: String = s"LIMIT ${expr.coalesceDesc}"
+
+  override def map(func: (Expr) => Expr): LimitClause = {
+    LimitClause(func(expr))
+  }
 }
 
 case class CombinedClause(selectClause: SelectClause, fromClauseOpt: 
Option[FromClause],
@@ -139,6 +169,13 @@ case class CombinedClause(selectClause: SelectClause, 
fromClauseOpt: Option[From
       s"${head} ${tail.coalesceDesc}"
     }
   }
+
+  override def map(func: (Expr) => Expr): CombinedClause = {
+    CombinedClause(func(selectClause).asInstanceOf[SelectClause],
+      fromClauseOpt.map(func(_).asInstanceOf[FromClause]),
+      tails.map(func(_).asInstanceOf[ClauseExpression])
+    )
+  }
 }
 
 case class ProfilingClause(selectClause: SelectClause,
@@ -171,4 +208,29 @@ case class ProfilingClause(selectClause: SelectClause,
     val postDesc = postGroupbyClauses.map(_.coalesceDesc).mkString(" ")
     s"${selectDesc} ${fromDesc} ${preDesc} ${groupbyDesc} ${postDesc}"
   }
+
+  override def map(func: (Expr) => Expr): ProfilingClause = {
+    ProfilingClause(func(selectClause).asInstanceOf[SelectClause],
+      fromClauseOpt.map(func(_).asInstanceOf[FromClause]),
+      groupbyClauseOpt.map(func(_).asInstanceOf[GroupbyClause]),
+      preGroupbyClauses.map(func(_).asInstanceOf[ClauseExpression]),
+      postGroupbyClauses.map(func(_).asInstanceOf[ClauseExpression])
+    )
+  }
+}
+
+case class DuplicateClause(exprs: Seq[Expr]) extends ClauseExpression {
+  addChildren(exprs)
+
+  def desc: String = exprs.map(_.desc).mkString(", ")
+  def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
+  override def map(func: (Expr) => Expr): DuplicateClause = 
DuplicateClause(exprs.map(func(_)))
+}
+
+case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression {
+  addChildren(exprs)
+
+  def desc: String = exprs.map(_.desc).mkString(", ")
+  def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
+  override def map(func: (Expr) => Expr): TimelinessClause = 
TimelinessClause(exprs.map(func(_)))
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala
index 850579c..c089e81 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala
@@ -26,4 +26,7 @@ trait Expr extends TreeNode with Serializable {
 
   def extractSelf: Expr = this
 
+  // execution
+  def map(func: (Expr) => Expr): Expr = func(this)
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExtraConditionExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExtraConditionExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExtraConditionExpr.scala
new file mode 100644
index 0000000..eb7ba48
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExtraConditionExpr.scala
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl.expr
+
+case class ExtraConditionExpr(cdtn: String) extends Expr {
+
+  def desc: String = cdtn.toUpperCase
+
+  def coalesceDesc: String = desc
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala
index e33b03d..1bbed83 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala
@@ -18,16 +18,28 @@ under the License.
 */
 package org.apache.griffin.measure.rule.dsl.expr
 
-case class FunctionExpr(functionName: String, args: Seq[Expr], aliasOpt: 
Option[String]
+case class FunctionExpr(functionName: String, args: Seq[Expr],
+                        extraConditionOpt: Option[ExtraConditionExpr],
+                        aliasOpt: Option[String]
                        ) extends Expr with AliasableExpr {
 
   addChildren(args)
 
-  def desc: String = s"${functionName}(${args.map(_.desc).mkString(", ")})"
+  def desc: String = {
+    extraConditionOpt match {
+      case Some(cdtn) => s"${functionName}(${cdtn.desc} 
${args.map(_.desc).mkString(", ")})"
+      case _ => s"${functionName}(${args.map(_.desc).mkString(", ")})"
+    }
+  }
   def coalesceDesc: String = desc
   def alias: Option[String] = {
     if (aliasOpt.isEmpty) {
       Some(functionName)
     } else aliasOpt
   }
+
+  override def map(func: (Expr) => Expr): FunctionExpr = {
+    FunctionExpr(functionName, args.map(func(_)),
+      extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr]), 
aliasOpt)
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala
index 4b16219..b4c35f5 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala
@@ -33,6 +33,10 @@ case class InExpr(head: Expr, is: Boolean, range: Seq[Expr]) 
extends LogicalExpr
     val notStr = if (is) "" else " NOT"
     s"${head.coalesceDesc}${notStr} IN 
(${range.map(_.coalesceDesc).mkString(", ")})"
   }
+
+  override def map(func: (Expr) => Expr): InExpr = {
+    InExpr(func(head), is, range.map(func(_)))
+  }
 }
 
 case class BetweenExpr(head: Expr, is: Boolean, range: Seq[Expr]) extends 
LogicalExpr {
@@ -58,6 +62,10 @@ case class BetweenExpr(head: Expr, is: Boolean, range: 
Seq[Expr]) extends Logica
     }
     s"${head.coalesceDesc}${notStr} BETWEEN ${rangeStr}"
   }
+
+  override def map(func: (Expr) => Expr): BetweenExpr = {
+    BetweenExpr(func(head), is, range.map(func(_)))
+  }
 }
 
 case class LikeExpr(head: Expr, is: Boolean, value: Expr) extends LogicalExpr {
@@ -72,6 +80,10 @@ case class LikeExpr(head: Expr, is: Boolean, value: Expr) 
extends LogicalExpr {
     val notStr = if (is) "" else " NOT"
     s"${head.coalesceDesc}${notStr} LIKE ${value.coalesceDesc}"
   }
+
+  override def map(func: (Expr) => Expr): LikeExpr = {
+    LikeExpr(func(head), is, func(value))
+  }
 }
 
 case class IsNullExpr(head: Expr, is: Boolean) extends LogicalExpr {
@@ -83,6 +95,10 @@ case class IsNullExpr(head: Expr, is: Boolean) extends 
LogicalExpr {
     s"${head.desc} IS${notStr} NULL"
   }
   def coalesceDesc: String = desc
+
+  override def map(func: (Expr) => Expr): IsNullExpr = {
+    IsNullExpr(func(head), is)
+  }
 }
 
 case class IsNanExpr(head: Expr, is: Boolean) extends LogicalExpr {
@@ -94,6 +110,10 @@ case class IsNanExpr(head: Expr, is: Boolean) extends 
LogicalExpr {
     s"${notStr}isnan(${head.desc})"
   }
   def coalesceDesc: String = desc
+
+  override def map(func: (Expr) => Expr): IsNanExpr = {
+    IsNanExpr(func(head), is)
+  }
 }
 
 // -----------
@@ -110,6 +130,10 @@ case class LogicalFactorExpr(factor: Expr, withBracket: 
Boolean, aliasOpt: Optio
     if (aliasOpt.nonEmpty) this
     else factor.extractSelf
   }
+
+  override def map(func: (Expr) => Expr): LogicalFactorExpr = {
+    LogicalFactorExpr(func(factor), withBracket, aliasOpt)
+  }
 }
 
 case class UnaryLogicalExpr(oprs: Seq[String], factor: LogicalExpr) extends 
LogicalExpr {
@@ -136,6 +160,10 @@ case class UnaryLogicalExpr(oprs: Seq[String], factor: 
LogicalExpr) extends Logi
     if (oprs.nonEmpty) this
     else factor.extractSelf
   }
+
+  override def map(func: (Expr) => Expr): UnaryLogicalExpr = {
+    UnaryLogicalExpr(oprs, func(factor).asInstanceOf[LogicalExpr])
+  }
 }
 
 case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, 
LogicalExpr)]) extends LogicalExpr {
@@ -167,4 +195,10 @@ case class BinaryLogicalExpr(factor: LogicalExpr, tails: 
Seq[(String, LogicalExp
     if (tails.nonEmpty) this
     else factor.extractSelf
   }
+
+  override def map(func: (Expr) => Expr): BinaryLogicalExpr = {
+    BinaryLogicalExpr(func(factor).asInstanceOf[LogicalExpr], tails.map{ pair 
=>
+      (pair._1, func(pair._2).asInstanceOf[LogicalExpr])
+    })
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala
index b3d3db4..4217e44 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala
@@ -33,6 +33,10 @@ case class MathFactorExpr(factor: Expr, withBracket: 
Boolean, aliasOpt: Option[S
     if (aliasOpt.nonEmpty) this
     else factor.extractSelf
   }
+
+  override def map(func: (Expr) => Expr): MathFactorExpr = {
+    MathFactorExpr(func(factor), withBracket, aliasOpt)
+  }
 }
 
 case class UnaryMathExpr(oprs: Seq[String], factor: MathExpr) extends MathExpr 
{
@@ -53,6 +57,10 @@ case class UnaryMathExpr(oprs: Seq[String], factor: 
MathExpr) extends MathExpr {
     if (oprs.nonEmpty) this
     else factor.extractSelf
   }
+
+  override def map(func: (Expr) => Expr): UnaryMathExpr = {
+    UnaryMathExpr(oprs, func(factor).asInstanceOf[MathExpr])
+  }
 }
 
 case class BinaryMathExpr(factor: MathExpr, tails: Seq[(String, MathExpr)]) 
extends MathExpr {
@@ -77,4 +85,10 @@ case class BinaryMathExpr(factor: MathExpr, tails: 
Seq[(String, MathExpr)]) exte
     if (tails.nonEmpty) this
     else factor.extractSelf
   }
+
+  override def map(func: (Expr) => Expr): BinaryMathExpr = {
+    BinaryMathExpr(func(factor).asInstanceOf[MathExpr], tails.map{ pair =>
+      (pair._1, func(pair._2).asInstanceOf[MathExpr])
+    })
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala
index 6525c88..d6e350b 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala
@@ -23,22 +23,17 @@ trait HeadExpr extends Expr with AliasableExpr {
 }
 
 case class DataSourceHeadExpr(name: String) extends HeadExpr {
-  def desc: String = name
+  def desc: String = s"`${name}`"
   def coalesceDesc: String = desc
 }
 
 case class FieldNameHeadExpr(field: String) extends HeadExpr {
-  def desc: String = field
+  def desc: String = s"`${field}`"
   def coalesceDesc: String = desc
-  override def alias: Option[String] = {
-    val innerField = if (field.startsWith("`") && field.endsWith("`")) {
-      field.substring(1, field.length - 1)
-    } else field
-    Some(innerField)
-  }
+  override def alias: Option[String] = Some(field)
 }
 
-case class ALLSelectHeadExpr() extends HeadExpr {
+case class AllSelectHeadExpr() extends HeadExpr {
   def desc: String = "*"
   def coalesceDesc: String = desc
 }
@@ -50,6 +45,10 @@ case class OtherHeadExpr(expr: Expr) extends HeadExpr {
   def desc: String = expr.desc
   def coalesceDesc: String = expr.coalesceDesc
   override def alias: Option[String] = Some(expr.desc)
+
+  override def map(func: (Expr) => Expr): OtherHeadExpr = {
+    OtherHeadExpr(func(expr))
+  }
 }
 
 // -------------
@@ -64,14 +63,9 @@ case class AllFieldsSelectExpr() extends SelectExpr {
 }
 
 case class FieldSelectExpr(field: String) extends SelectExpr {
-  def desc: String = s".${field}"
+  def desc: String = s".`${field}`"
   def coalesceDesc: String = desc
-  override def alias: Option[String] = {
-    val innerField = if (field.startsWith("`") && field.endsWith("`")) {
-      field.substring(1, field.length - 1)
-    } else field
-    Some(innerField)
-  }
+  override def alias: Option[String] = Some(field)
 }
 
 case class IndexSelectExpr(index: Expr) extends SelectExpr {
@@ -81,6 +75,10 @@ case class IndexSelectExpr(index: Expr) extends SelectExpr {
   def desc: String = s"[${index.desc}]"
   def coalesceDesc: String = desc
   def alias: Option[String] = Some(index.desc)
+
+  override def map(func: (Expr) => Expr): IndexSelectExpr = {
+    IndexSelectExpr(func(index))
+  }
 }
 
 case class FunctionSelectExpr(functionName: String, args: Seq[Expr]) extends 
SelectExpr {
@@ -90,6 +88,10 @@ case class FunctionSelectExpr(functionName: String, args: 
Seq[Expr]) extends Sel
   def desc: String = ""
   def coalesceDesc: String = desc
   def alias: Option[String] = Some(functionName)
+
+  override def map(func: (Expr) => Expr): FunctionSelectExpr = {
+    FunctionSelectExpr(functionName, args.map(func(_)))
+  }
 }
 
 // -------------
@@ -122,4 +124,9 @@ case class SelectionExpr(head: HeadExpr, selectors: 
Seq[SelectExpr], aliasOpt: O
       if (aliasSeq.size > 0) Some(aliasSeq.mkString("_")) else None
     } else aliasOpt
   }
+
+  override def map(func: (Expr) => Expr): SelectionExpr = {
+    SelectionExpr(func(head).asInstanceOf[HeadExpr],
+      selectors.map(func(_).asInstanceOf[SelectExpr]), aliasOpt)
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
index 6415a02..846770b 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
@@ -145,6 +145,8 @@ trait BasicParser extends JavaTokenParsers with 
Serializable {
     val COMMA: Parser[String] = ","
 
     val SELECT: Parser[String] = """(?i)select\s""".r
+    val DISTINCT: Parser[String] = """(?i)distinct""".r
+//    val ALL: Parser[String] = """(?i)all""".r
     val FROM: Parser[String] = """(?i)from\s""".r
     val AS: Parser[String] = """(?i)as\s""".r
     val WHERE: Parser[String] = """(?i)where\s""".r
@@ -159,8 +161,6 @@ trait BasicParser extends JavaTokenParsers with 
Serializable {
   import Operator._
 
   object Strings {
-    def innerString(s: String): String = s.substring(1, s.size - 1)
-
     def AnyString: Parser[String] = """"(?:\"|[^\"])*"""".r | 
"""'(?:\'|[^'])*'""".r
     def SimpleTableFieldName: Parser[String] = """[a-zA-Z_]\w*""".r
     def UnQuoteTableFieldName: Parser[String] = """`(?:[\\][`]|[^`])*`""".r
@@ -209,23 +209,23 @@ trait BasicParser extends JavaTokenParsers with 
Serializable {
     case head ~ sels ~ aliasOpt => SelectionExpr(head, sels, aliasOpt)
   }
   def selectionHead: Parser[HeadExpr] = DataSourceName ^^ {
-    DataSourceHeadExpr(_)
+    ds => DataSourceHeadExpr(trim(ds))
   } | function ^^ {
     OtherHeadExpr(_)
   } | SimpleTableFieldName ^^ {
     FieldNameHeadExpr(_)
   } | UnQuoteTableFieldName ^^ { s =>
-    FieldNameHeadExpr(innerString(s))
+    FieldNameHeadExpr(trim(s))
   } | ALLSL ^^ { _ =>
-    ALLSelectHeadExpr()
+    AllSelectHeadExpr()
   }
   def selector: Parser[SelectExpr] = functionSelect | allFieldsSelect | 
fieldSelect | indexSelect
   def allFieldsSelect: Parser[AllFieldsSelectExpr] = DOT ~> ALLSL ^^ { _ => 
AllFieldsSelectExpr() }
   def fieldSelect: Parser[FieldSelectExpr] = DOT ~> (
     SimpleTableFieldName ^^ {
       FieldSelectExpr(_)
-    } | UnQuoteTableFieldName ^^ {s =>
-      FieldSelectExpr(innerString(s))
+    } | UnQuoteTableFieldName ^^ { s =>
+      FieldSelectExpr(trim(s))
     })
   def indexSelect: Parser[IndexSelectExpr] = LSQBR ~> argument <~ RSQBR ^^ { 
IndexSelectExpr(_) }
   def functionSelect: Parser[FunctionSelectExpr] = DOT ~ FunctionName ~ LBR ~ 
repsep(argument, COMMA) ~ RBR ^^ {
@@ -236,7 +236,7 @@ trait BasicParser extends JavaTokenParsers with 
Serializable {
     * -- as alias --
     * <as-alias> ::= <as> <field-name>
     */
-  def asAlias: Parser[String] = AS ~> (SimpleTableFieldName | 
UnQuoteTableFieldName ^^ { innerString(_) })
+  def asAlias: Parser[String] = AS ~> (SimpleTableFieldName | 
UnQuoteTableFieldName ^^ { trim(_) })
 
   /**
     * -- math expr --
@@ -333,8 +333,9 @@ trait BasicParser extends JavaTokenParsers with 
Serializable {
     * <arg> ::= <expr>
     */
 
-  def function: Parser[FunctionExpr] = FunctionName ~ LBR ~ repsep(argument, 
COMMA) ~ RBR ~ opt(asAlias) ^^ {
-    case name ~ _ ~ args ~ _ ~ aliasOpt => FunctionExpr(name, args, aliasOpt)
+  def function: Parser[FunctionExpr] = FunctionName ~ LBR ~ opt(DISTINCT) ~ 
repsep(argument, COMMA) ~ RBR ~ opt(asAlias) ^^ {
+    case name ~ _ ~ extraCdtnOpt ~ args ~ _ ~ aliasOpt =>
+      FunctionExpr(name, args, extraCdtnOpt.map(ExtraConditionExpr(_)), 
aliasOpt)
   }
   def argument: Parser[Expr] = expression
 
@@ -350,7 +351,9 @@ trait BasicParser extends JavaTokenParsers with 
Serializable {
     * <limit-clause> = <limit> <expr>
     */
 
-  def selectClause: Parser[SelectClause] = opt(SELECT) ~> rep1sep(expression, 
COMMA) ^^ { SelectClause(_) }
+  def selectClause: Parser[SelectClause] = opt(SELECT) ~> opt(DISTINCT) ~ 
rep1sep(expression, COMMA) ^^ {
+    case extraCdtnOpt ~ exprs => SelectClause(exprs, 
extraCdtnOpt.map(ExtraConditionExpr(_)))
+  }
   def fromClause: Parser[FromClause] = FROM ~> DataSourceName ^^ { ds => 
FromClause(trim(ds)) }
   def whereClause: Parser[WhereClause] = WHERE ~> expression ^^ { 
WhereClause(_) }
   def havingClause: Parser[Expr] = HAVING ~> expression

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
index 0800f45..8d04e76 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
@@ -38,10 +38,28 @@ case class GriffinDslParser(dataSourceNames: Seq[String], 
functionNames: Seq[Str
     }
   }
 
+  /**
+    * -- duplicate clauses --
+    * <duplicate-clauses> = <expr> [, <expr>]+
+    */
+  def duplicateClause: Parser[DuplicateClause] = rep1sep(expression, 
Operator.COMMA) ^^ {
+    case exprs => DuplicateClause(exprs)
+  }
+
+  /**
+    * -- timeliness clauses --
+    * <timeliness-clauses> = <expr> [, <expr>]+
+    */
+  def timelinessClause: Parser[TimelinessClause] = rep1sep(expression, 
Operator.COMMA) ^^ {
+    case exprs => TimelinessClause(exprs)
+  }
+
   def parseRule(rule: String, dqType: DqType): ParseResult[Expr] = {
     val rootExpr = dqType match {
       case AccuracyType => logicalExpression
       case ProfilingType => profilingClause
+      case DuplicateType => duplicateClause
+      case TimelinessType => timelinessClause
       case _ => expression
     }
     parseAll(rootExpr, rule)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala
new file mode 100644
index 0000000..f0afc6c
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.plan
+
+import org.apache.griffin.measure.rule.dsl._
+
+case class DfOprStep(name: String,
+                     rule: String,
+                     details: Map[String, Any],
+                     cache: Boolean = false,
+                     global: Boolean = false
+                    ) extends RuleStep {
+
+  val dslType: DslType = DfOprType
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
new file mode 100644
index 0000000..10f1f9b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
@@ -0,0 +1,28 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.plan
+
+import org.apache.griffin.measure.rule.dsl._
+
+case class MetricExport(name: String,
+                        stepName: String,
+                        collectType: CollectType
+                       ) extends RuleExport {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
new file mode 100644
index 0000000..a467543
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.plan
+
+case class RecordExport(name: String,
+                        stepName: String,
+                        dataSourceCacheOpt: Option[String],
+                        originDFOpt: Option[String]
+                       ) extends RuleExport {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
new file mode 100644
index 0000000..26a962a
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.plan
+
+trait RuleExport extends Serializable {
+
+  val name: String    // export name
+
+  val stepName: String    // the dependant step name
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala
new file mode 100644
index 0000000..54a6062
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.plan
+
+import scala.reflect.ClassTag
+
+case class RulePlan(ruleSteps: Seq[RuleStep],
+                    ruleExports: Seq[RuleExport]
+                   ) extends Serializable {
+
+  val globalRuleSteps = filterRuleSteps(_.global)
+  val normalRuleSteps = filterRuleSteps(!_.global)
+
+  val metricExports = filterRuleExports[MetricExport](ruleExports)
+  val recordExports = filterRuleExports[RecordExport](ruleExports)
+
+  private def filterRuleSteps(func: (RuleStep) => Boolean): Seq[RuleStep] = {
+    ruleSteps.filter(func)
+  }
+
+  private def filterRuleExports[T <: RuleExport: ClassTag](exports: 
Seq[RuleExport]): Seq[T] = {
+    exports.flatMap { exp =>
+      exp match {
+        case e: T => Some(e)
+        case _ => None
+      }
+    }
+  }
+
+//  def ruleStepNames(func: (RuleStep) => Boolean): Seq[String] = {
+//    ruleSteps.filter(func).map(_.name)
+//  }
+
+  def merge(rp: RulePlan): RulePlan = {
+    RulePlan(this.ruleSteps ++ rp.ruleSteps, this.ruleExports ++ 
rp.ruleExports)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala
new file mode 100644
index 0000000..dbdb2d5
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.plan
+
+import org.apache.griffin.measure.rule.dsl.DslType
+
+trait RuleStep extends Serializable {
+
+  val dslType: DslType
+
+  val name: String
+
+  val rule: String
+
+  val cache: Boolean
+
+  val global: Boolean
+
+  val details: Map[String, Any]
+
+  def needCache: Boolean = cache || global
+
+  def isGlobal: Boolean = global
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala
new file mode 100644
index 0000000..16da9a5
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.plan
+
+import org.apache.griffin.measure.rule.dsl._
+
+case class SparkSqlStep(name: String,
+                        rule: String,
+                        details: Map[String, Any],
+                        cache: Boolean = false,
+                        global: Boolean = false
+                       ) extends RuleStep {
+
+  val dslType: DslType = SparkSqlType
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala
new file mode 100644
index 0000000..129d068
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.plan
+
+trait TimeInfo extends Serializable {
+  val calcTime: Long
+//  val tmst: Long
+  val head: String
+
+  def key: String = if (head.nonEmpty) s"${head}_${calcTime}" else 
s"${calcTime}"
+  def setHead(h: String): TimeInfo
+}
+
+case class CalcTimeInfo(calcTime: Long, head: String = "") extends TimeInfo {
+//  val tmst: Long = calcTime
+  def setHead(h: String): TimeInfo = CalcTimeInfo(calcTime, h)
+}
+
+//case class TmstTimeInfo(calcTime: Long, tmst: Long, head: String = "") 
extends TimeInfo {
+//  def setHead(h: String): TimeInfo = TmstTimeInfo(calcTime, tmst, h)
+//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala
deleted file mode 100644
index 4b3a4d4..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.step
-
-import org.apache.griffin.measure.rule.dsl._
-
-trait ConcreteRuleStep extends RuleStep {
-
-  val persistType: PersistType
-
-  val updateDataSource: Option[String]
-
-//  def isGroupMetric: Boolean = {
-//    val _GroupMetric = "group.metric"
-//    details.get(_GroupMetric) match {
-//      case Some(b: Boolean) => b
-//      case _ => false
-//    }
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala
deleted file mode 100644
index 86f0bf3..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.step
-
-import org.apache.griffin.measure.rule.dsl._
-
-case class DfOprStep(name: String, rule: String, details: Map[String, Any],
-                     persistType: PersistType, updateDataSource: Option[String]
-                    ) extends ConcreteRuleStep {
-
-  val dslType: DslType = DfOprType
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala
deleted file mode 100644
index 21db8cf..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.step
-
-import org.apache.griffin.measure.rule.dsl._
-
-case class GriffinDslStep(name: String, rule: String, dqType: DqType, details: 
Map[String, Any]
-                         ) extends RuleStep {
-
-  val dslType: DslType = GriffinDslType
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala
deleted file mode 100644
index 4675ffe..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.step
-
-import org.apache.griffin.measure.rule.dsl.{DslType, PersistType}
-
-trait RuleStep extends Serializable {
-
-  val dslType: DslType
-
-  val name: String
-  val rule: String
-  val details: Map[String, Any]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala
deleted file mode 100644
index 62c3c35..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.step
-
-import org.apache.griffin.measure.persist._
-import org.apache.griffin.measure.rule.dsl._
-
-case class SparkSqlStep(name: String, rule: String, details: Map[String, Any],
-                        persistType: PersistType, updateDataSource: 
Option[String]
-                       ) extends ConcreteRuleStep {
-
-  val dslType: DslType = SparkSqlType
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala
index 11e8c8f..37d2a5a 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala
@@ -24,10 +24,15 @@ object GriffinUdfs {
 
   def register(sqlContext: SQLContext): Unit = {
     sqlContext.udf.register("index_of", indexOf)
+    sqlContext.udf.register("matches", matches)
   }
 
   private val indexOf = (arr: Seq[String], v: String) => {
     arr.indexOf(v)
   }
 
+  private val matches = (s: String, regex: String) => {
+    s.matches(regex)
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
index 416f567..8e0d9a3 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
@@ -18,11 +18,12 @@ under the License.
 */
 package org.apache.griffin.measure.utils
 
+import org.apache.spark.TaskContext
 import org.apache.spark.rdd.RDD
 
 object HdfsFileDumpUtil {
 
-  val sepCount = 5000
+  val sepCount = 50000
 
   private def suffix(i: Long): String = {
     if (i == 0) "" else s".${i}"
@@ -32,8 +33,15 @@ object HdfsFileDumpUtil {
   }
 
   def splitRdd[T](rdd: RDD[T])(implicit m: Manifest[T]): RDD[(Long, 
Iterable[T])] = {
-    val indexRdd = rdd.zipWithIndex
-    indexRdd.map(p => ((p._2 / sepCount), p._1)).groupByKey()
+//    val indexRdd = rdd.zipWithIndex // slow process
+//    indexRdd.map(p => ((p._2 / sepCount), p._1)).groupByKey() // slow process
+    val count = rdd.count
+    val splitCount = count / sepCount + 1
+    val splitRdd = rdd.mapPartitionsWithIndex { (n, itr) =>
+      val idx = n % splitCount
+      itr.map((idx, _))
+    }
+    splitRdd.groupByKey()
   }
   def splitIterable[T](datas: Iterable[T])(implicit m: Manifest[T]): 
Iterator[(Int, Iterable[T])] = {
     val groupedData = datas.grouped(sepCount).zipWithIndex
@@ -47,23 +55,33 @@ object HdfsFileDumpUtil {
     HdfsUtil.writeContent(path, strRecords)
   }
 
-  def dump(path: String, recordsRdd: RDD[String], lineSep: String): Boolean = {
+  def dump(path: String, recordsRdd: RDD[String], lineSep: String): Unit = {
     val groupedRdd = splitRdd(recordsRdd)
-    groupedRdd.aggregate(true)({ (res, pair) =>
+    groupedRdd.foreach { pair =>
       val (idx, list) = pair
       val filePath = path + suffix(idx)
       directDump(filePath, list, lineSep)
-      true
-    }, _ && _)
+    }
+//    groupedRdd.aggregate(true)({ (res, pair) =>
+//      val (idx, list) = pair
+//      val filePath = path + suffix(idx)
+//      directDump(filePath, list, lineSep)
+//      true
+//    }, _ && _)
   }
-  def dump(path: String, records: Iterable[String], lineSep: String): Boolean 
= {
+  def dump(path: String, records: Iterable[String], lineSep: String): Unit = {
     val groupedRecords = splitIterable(records)
-    groupedRecords.aggregate(true)({ (res, pair) =>
+    groupedRecords.foreach { pair =>
       val (idx, list) = pair
       val filePath = path + suffix(idx)
       directDump(filePath, list, lineSep)
-      true
-    }, _ && _)
+    }
+//    groupedRecords.aggregate(true)({ (res, pair) =>
+//      val (idx, list) = pair
+//      val filePath = path + suffix(idx)
+//      directDump(filePath, list, lineSep)
+//      true
+//    }, _ && _)
   }
 
   def remove(path: String, filename: String, withSuffix: Boolean): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
index 9fa6bcf..aa5643b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
@@ -27,7 +27,7 @@ object HdfsUtil extends Loggable {
   private val seprator = "/"
 
   private val conf = new Configuration()
-  conf.set("dfs.support.append", "true")
+  conf.setBoolean("dfs.support.append", true)
 //  conf.set("fs.defaultFS", "hdfs://localhost")    // debug @localhost
 
   private val dfs = FileSystem.get(conf)
@@ -54,7 +54,9 @@ object HdfsUtil extends Loggable {
 
   def appendOrCreateFile(filePath: String): FSDataOutputStream = {
     val path = new Path(filePath)
-    if (dfs.exists(path)) dfs.append(path) else createFile(filePath)
+    if (dfs.getConf.getBoolean("dfs.support.append", false) && 
dfs.exists(path)) {
+        dfs.append(path)
+    } else createFile(filePath)
   }
 
   def openFile(filePath: String): FSDataInputStream = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
index 7954b6d..1ca32b3 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
@@ -47,6 +47,8 @@ object ParamUtil {
       }
     }
 
+    def getStringOrKey(key: String): String = getString(key, key)
+
     def getByte(key: String, defValue: Byte): Byte = {
       try {
         params.get(key) match {
@@ -153,12 +155,46 @@ object ParamUtil {
       try {
         params.get(key) match {
           case Some(v: String) => v.toBoolean
+          case Some(v: Boolean) => v
+          case _ => defValue
+        }
+      } catch {
+        case _: Throwable => defValue
+      }
+    }
+
+    def getParamMap(key: String, defValue: Map[String, Any] = Map[String, 
Any]()): Map[String, Any] = {
+      try {
+        params.get(key) match {
+          case Some(v: Map[String, Any]) => v
           case _ => defValue
         }
       } catch {
         case _: Throwable => defValue
       }
     }
+
+    def getParamMapOpt(key: String): Option[Map[String, Any]] = {
+      try {
+        params.get(key) match {
+          case Some(v: Map[String, Any]) => Some(v)
+          case _ => None
+        }
+      } catch {
+        case _: Throwable => None
+      }
+    }
+
+    def addIfNotExist(key: String, value: Any): Map[String, Any] = {
+      params.get(key) match {
+        case Some(v) => params
+        case _ => params + (key -> value)
+      }
+    }
+
+    def removeKeys(keys: Iterable[String]): Map[String, Any] = {
+      params -- keys
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
index a8c079b..42a140f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
@@ -18,9 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.utils
 
+import org.apache.griffin.measure.log.Loggable
+
 import scala.util.{Failure, Success, Try}
 
-object TimeUtil {
+object TimeUtil extends Loggable {
 
   final val TimeRegex = """^([+\-]?\d+)(ms|s|m|h|d)$""".r
   final val PureTimeRegex = """^([+\-]?\d+)$""".r
@@ -48,7 +50,7 @@ object TimeUtil {
         }
       } match {
         case Success(v) => Some(v)
-        case Failure(ex) => throw ex
+        case Failure(ex) => None
       }
     }
     value

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/test/resources/_accuracy-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json 
b/measure/src/test/resources/_accuracy-batch-griffindsl.json
new file mode 100644
index 0000000..c702d46
--- /dev/null
+++ b/measure/src/test/resources/_accuracy-batch-griffindsl.json
@@ -0,0 +1,63 @@
+{
+  "name": "accu_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select * from ${this} where user_id > 10010"
+            }
+          ]
+        }
+      ]
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_target.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "name": "accu",
+        "rule": "source.user_id = target.user_id AND upper(source.first_name) 
= upper(target.first_name) AND source.last_name = target.last_name AND 
source.address = target.address AND source.email = target.email AND 
source.phone = target.phone AND source.post_code = target.post_code",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "miss": "miss_count",
+          "total": "total_count",
+          "matched": "matched_count"
+        },
+        "metric": {
+          "name": "accu"
+        },
+        "record": {
+          "name": "missRecords"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file


Reply via email to