http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
index 736ce56..382e302 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
@@ -18,8 +18,6 @@ under the License.
 */
 package org.apache.griffin.measure.process.engine
 
-import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache}
-import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.process._
 import org.apache.griffin.measure.rule.adaptor.InternalColumns
 import org.apache.griffin.measure.rule.dsl._
@@ -101,7 +99,6 @@ trait SparkDqEngine extends DqEngine {
     } else emptyMetricMap
   }
 
-
   private def getTmst(row: Row, defTmst: Long): Long = {
     try {
       row.getAs[Long](InternalColumns.tmst)
@@ -110,46 +107,9 @@ trait SparkDqEngine extends DqEngine {
     }
   }
 
-//  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport): 
Map[Long, DataFrame] = {
-//    if (collectable) {
-//      val RecordExport(_, stepName, _, originDFOpt, defTmst, procType) = 
recordExport
-//      val stepDf = sqlContext.table(s"`${stepName}`")
-//      val recordsDf = originDFOpt match {
-//        case Some(originName) => sqlContext.table(s"`${originName}`")
-//        case _ => stepDf
-//      }
-//
-//      procType match {
-//        case BatchProcessType => {
-//          val recordsDf = sqlContext.table(s"`${stepName}`")
-//          emptyRecordMap + (defTmst -> recordsDf)
-//        }
-//        case StreamingProcessType => {
-//          originDFOpt match {
-//            case Some(originName) => {
-//              val recordsDf = sqlContext.table(s"`${originName}`")
-//              stepDf.map { row =>
-//                val tmst = getTmst(row, defTmst)
-//                val trdf = if 
(recordsDf.columns.contains(InternalColumns.tmst)) {
-//                  recordsDf.filter(s"`${InternalColumns.tmst}` = ${tmst}")
-//                } else recordsDf
-//                (tmst, trdf)
-//              }.collect.toMap
-//            }
-//            case _ => {
-//              val recordsDf = stepDf
-//              emptyRecordMap + (defTmst -> recordsDf)
-//            }
-//          }
-//        }
-//      }
-//    } else emptyRecordMap
-//  }
-
   private def getRecordDataFrame(recordExport: RecordExport): 
Option[DataFrame] = {
     if (collectable) {
-      val RecordExport(_, stepName, _, _, defTmst, procType) = recordExport
-      val stepDf = sqlContext.table(s"`${stepName}`")
+      val stepDf = sqlContext.table(s"`${recordExport.stepName}`")
       Some(stepDf)
     } else None
   }
@@ -209,187 +169,14 @@ trait SparkDqEngine extends DqEngine {
       }
       case _ => (None, Set[Long]())
     }
-//    val recordsOpt = getRecordDataFrame(recordExport).flatMap { stepDf =>
-//      originDFOpt match {
-//        case Some(originName) => {
-//          val tmsts = (stepDf.collect.flatMap { row =>
-//            try {
-//              val tmst = row.getAs[Long](InternalColumns.tmst)
-//              val empty = row.getAs[Boolean](InternalColumns.empty)
-//              Some((tmst, empty))
-//            } catch {
-//              case _: Throwable => None
-//            }
-//          })
-//          val emptyTmsts = tmsts.filter(_._2).map(_._1).toSet
-//          val recordTmsts = tmsts.filter(!_._2).map(_._1).toSet
-//          if (recordTmsts.size > 0) {
-//            val recordsDf = sqlContext.table(s"`${originName}`")
-//            val records = recordsDf.flatMap { row =>
-//              val tmst = row.getAs[Long](InternalColumns.tmst)
-//              if (recordTmsts.contains(tmst)) {
-//                try {
-//                  val map = SparkRowFormatter.formatRow(row)
-//                  val str = JsonUtil.toJson(map)
-//                  Some((tmst, str))
-//                } catch {
-//                  case e: Throwable => None
-//                }
-//              } else None
-//            }
-//            Some((Some(records.groupByKey), emptyTmsts))
-//          } else Some((None, emptyTmsts))
-//        }
-//        case _ => {
-//          val records = stepDf.flatMap { row =>
-//            val tmst = row.getAs[Long](InternalColumns.tmst)
-//            try {
-//              val map = SparkRowFormatter.formatRow(row)
-//              val str = JsonUtil.toJson(map)
-//              Some((tmst, str))
-//            } catch {
-//              case e: Throwable => None
-//            }
-//          }
-//          Some(records.groupByKey)
-//        }
-//      }
-//    }
   }
 
-//
-//  def collectUpdateRDD(ruleStep: ConcreteRuleStep): Option[DataFrame] = {
-//    if (collectable) {
-//      ruleStep match {
-//        case step: ConcreteRuleStep if ((step.ruleInfo.persistType == 
RecordPersistType)
-//          || (step.ruleInfo.cacheDataSourceOpt.nonEmpty)) => {
-//          val tmst = step.timeInfo.tmst
-////          val metricName = step.ruleInfo.name
-//
-//          step.ruleInfo.tmstNameOpt match {
-//            case Some(metricTmstName) => {
-//              try {
-//                val pdf = sqlContext.table(s"`${metricTmstName}`")
-//                Some(pdf)
-//              } catch {
-//                case e: Throwable => {
-//                  error(s"collect records ${metricTmstName} error: 
${e.getMessage}")
-//                  None
-//                }
-//              }
-//            }
-//            case _ => None
-//          }
-//        }
-//        case _ => None
-//      }
-//    } else None
-//  }
-
-
-
-
-
-//  def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]
-//                      ): Option[RDD[(Long, Iterable[String])]] = {
-//    if (collectable) {
-//      ruleStep match {
-//        case step: ConcreteRuleStep if ((step.ruleInfo.persistType == 
RecordPersistType)
-//          || (step.ruleInfo.cacheDataSourceOpt.nonEmpty)) => {
-//          val tmst = step.timeInfo.tmst
-//          val metricName = step.ruleInfo.name
-//
-//          step.ruleInfo.tmstNameOpt match {
-//            case Some(metricTmstName) => {
-//              try {
-//                val pdf = sqlContext.table(s"`${metricTmstName}`")
-//                val cols = pdf.columns
-//                val rdd = pdf.flatMap { row =>
-//                  val values = cols.flatMap { col =>
-//                    Some((col, row.getAs[Any](col)))
-//                  }.toMap
-//                  values.get(GroupByColumn.tmst) match {
-//                    case Some(t: Long) if (timeGroups.exists(_ == t)) => 
Some((t, JsonUtil.toJson(values)))
-//                    case _ => None
-//                  }
-//                }.groupByKey()
-//
-//                // find other keys in time groups, create empty records for 
those timestamps
-//                val existKeys = rdd.keys.collect
-//                val otherKeys = timeGroups.filter(t => !existKeys.exists(_ 
== t))
-//                val otherPairs = otherKeys.map((_, Iterable[String]())).toSeq
-//                val otherPairRdd = 
sqlContext.sparkContext.parallelize(otherPairs)
-//
-//                Some(rdd union otherPairRdd)
-//              } catch {
-//                case e: Throwable => {
-//                  error(s"collect records ${metricTmstName} error: 
${e.getMessage}")
-//                  None
-//                }
-//              }
-//            }
-//            case _ => None
-//          }
-//        }
-//        case _ => None
-//      }
-//    } else None
-//  }
-
-//  def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = {
-//    ruleStep match {
-//      case step: ConcreteRuleStep if (step.persistType == RecordPersistType) 
=> {
-//        val name = step.name
-//        try {
-//          val pdf = sqlContext.table(s"`${name}`")
-//          val cols = pdf.columns
-//          val rdd = pdf.flatMap { row =>
-//            val values = cols.flatMap { col =>
-//              Some((col, row.getAs[Any](col)))
-//            }.toMap
-//            values.get(GroupByColumn.tmst) match {
-//              case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, 
JsonUtil.toJson(values)))
-//              case _ => None
-//            }
-//          }.groupByKey()
-//          Some(rdd)
-//        } catch {
-//          case e: Throwable => {
-//            error(s"collect records ${name} error: ${e.getMessage}")
-//            None
-//          }
-//        }
-//      }
-//      case _ => None
-//    }
-//  }
-//
-//  def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = {
-//    ruleStep match {
-//      case step: ConcreteRuleStep if (step.updateDataSource.nonEmpty) => {
-//        val name = step.name
-//        try {
-//          val pdf = sqlContext.table(s"`${name}`")
-//          val cols = pdf.columns
-//          val rdd = pdf.flatMap { row =>
-//            val values = cols.flatMap { col =>
-//              Some((col, row.getAs[Any](col)))
-//            }.toMap
-//            values.get(GroupByColumn.tmst) match {
-//              case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, 
JsonUtil.toJson(values)))
-//              case _ => None
-//            }
-//          }.groupByKey()
-//          Some(rdd)
-//        } catch {
-//          case e: Throwable => {
-//            error(s"collect update cache datas ${name} error: 
${e.getMessage}")
-//            None
-//          }
-//        }
-//      }
-//      case _ => None
-//    }
-//  }
+  def collectUpdateDf(dsUpdate: DsUpdate): Option[DataFrame] = {
+    if (collectable) {
+      val DsUpdate(_, stepName) = dsUpdate
+      val stepDf = sqlContext.table(s"`${stepName}`")
+      Some(stepDf)
+    } else None
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
index 31fe5ea..db92533 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.process.temp
 
 import scala.math.{min, max}
 
-case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends 
Serializable {
+  case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends 
Serializable {
   def merge(tr: TimeRange): TimeRange = {
     TimeRange(min(begin, tr.begin), max(end, tr.end), tmsts ++ tr.tmsts)
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
index 97589ad..0b0b461 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
@@ -52,7 +52,11 @@ case class DataFrameOprAdaptor() extends RuleAdaptor {
     val name = getRuleName(param)
     val step = DfOprStep(name, getRule(param), getDetails(param), 
getCache(param), getGlobal(param))
     val mode = ExportMode.defaultMode(procType)
-    RulePlan(step :: Nil, genRuleExports(param, name, name, timeInfo.calcTime, 
mode))
+    RulePlan(
+      step :: Nil,
+      genRuleExports(param, name, name, timeInfo.calcTime, mode),
+      genDsUpdates(param, "", name)
+    )
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
index bd27b19..f6f35da 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
@@ -18,54 +18,6 @@ under the License.
 */
 package org.apache.griffin.measure.rule.adaptor
 
-object AccuracyKeys {
-  val _source = "source"
-  val _target = "target"
-  val _miss = "miss"
-  val _total = "total"
-  val _matched = "matched"
-  //  val _missRecords = "missRecords"
-}
-
-object ProfilingKeys {
-  val _source = "source"
-}
-
-object UniquenessKeys {
-  val _source = "source"
-  val _target = "target"
-  val _unique = "unique"
-  val _total = "total"
-  val _dup = "dup"
-  val _num = "num"
-
-  val _duplicationArray = "duplication.array"
-}
-
-object DistinctnessKeys {
-  val _source = "source"
-  val _target = "target"
-  val _distinct = "distinct"
-  val _total = "total"
-  val _dup = "dup"
-  val _accu_dup = "accu_dup"
-  val _num = "num"
-
-  val _duplicationArray = "duplication.array"
-  val _withAccumulate = "with.accumulate"
-}
-
-object TimelinessKeys {
-  val _source = "source"
-  val _latency = "latency"
-  val _total = "total"
-  val _avg = "avg"
-  val _threshold = "threshold"
-  val _step = "step"
-  val _count = "count"
-  val _stepSize = "step.size"
-}
-
 object GlobalKeys {
   val _initRule = "init.rule"
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
index 5655a13..3b4ec31 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
@@ -18,17 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.rule.adaptor
 
-import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache}
-import org.apache.griffin.measure.process.engine.DataFrameOprs.AccuracyOprKeys
-import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange}
+import org.apache.griffin.measure.process.temp._
 import org.apache.griffin.measure.process._
-import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.dsl.analyzer._
-import org.apache.griffin.measure.rule.dsl.expr._
 import org.apache.griffin.measure.rule.dsl.parser.GriffinDslParser
 import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.TimeUtil
+import org.apache.griffin.measure.rule.trans._
 
 case class GriffinDslAdaptor(dataSourceNames: Seq[String],
                              functionNames: Seq[String]
@@ -42,7 +36,6 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
   val parser = GriffinDslParser(dataSourceNames, filteredFunctionNames)
 
   private val emptyRulePlan = RulePlan(Nil, Nil)
-  private val emptyMap = Map[String, Any]()
 
   override def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any],
                            processType: ProcessType, dsTimeRanges: Map[String, 
TimeRange]
@@ -54,14 +47,9 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       val result = parser.parseRule(rule, dqType)
       if (result.successful) {
         val expr = result.get
-        dqType match {
-          case AccuracyType => accuracyRulePlan(timeInfo, name, expr, param, 
processType)
-          case ProfilingType => profilingRulePlan(timeInfo, name, expr, param, 
processType)
-          case UniquenessType => uniquenessRulePlan(timeInfo, name, expr, 
param, processType)
-          case DistinctnessType => distinctRulePlan(timeInfo, name, expr, 
param, processType, dsTimeRanges)
-          case TimelinessType => timelinessRulePlan(timeInfo, name, expr, 
param, processType)
-          case _ => emptyRulePlan
-        }
+        val rulePlanTrans = RulePlanTrans(dqType, dataSourceNames, timeInfo,
+          name, expr, param, processType, dsTimeRanges)
+        rulePlanTrans.trans
       } else {
         warn(s"parse rule [ ${rule} ] fails: \n${result}")
         emptyRulePlan
@@ -74,741 +62,4 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
     }
   }
 
-  // with accuracy opr
-  private def accuracyRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                               param: Map[String, Any], procType: ProcessType
-                              ): RulePlan = {
-    val details = getDetails(param)
-    val sourceName = details.getString(AccuracyKeys._source, 
dataSourceNames.head)
-    val targetName = details.getString(AccuracyKeys._target, 
dataSourceNames.tail.head)
-    val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], 
sourceName, targetName)
-
-    val mode = ExportMode.defaultMode(procType)
-
-    val ct = timeInfo.calcTime
-
-    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      println(s"[${ct}] data source ${sourceName} not exists")
-      emptyRulePlan
-    } else {
-      // 1. miss record
-      val missRecordsTableName = "__missRecords"
-      val selClause = s"`${sourceName}`.*"
-      val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, 
targetName)) {
-        println(s"[${ct}] data source ${targetName} not exists")
-        s"SELECT ${selClause} FROM `${sourceName}`"
-      } else {
-        val onClause = expr.coalesceDesc
-        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
-          s"${sel.desc} IS NULL"
-        }.mkString(" AND ")
-        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
-          s"${sel.desc} IS NULL"
-        }.mkString(" AND ")
-        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
-        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` 
ON ${onClause} WHERE ${whereClause}"
-      }
-      val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, 
emptyMap, true)
-      val missRecordsExports = procType match {
-        case BatchProcessType => {
-          val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          genRecordExport(recordParam, missRecordsTableName, 
missRecordsTableName, ct, mode) :: Nil
-        }
-        case StreamingProcessType => Nil
-      }
-
-      // 2. miss count
-      val missCountTableName = "__missCount"
-      val missColName = details.getStringOrKey(AccuracyKeys._miss)
-      val missCountSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM 
`${missRecordsTableName}`"
-        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY 
`${InternalColumns.tmst}`"
-      }
-      val missCountStep = SparkSqlStep(missCountTableName, missCountSql, 
emptyMap)
-
-      // 3. total count
-      val totalCountTableName = "__totalCount"
-      val totalColName = details.getStringOrKey(AccuracyKeys._total)
-      val totalCountSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
-        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY 
`${InternalColumns.tmst}`"
-      }
-      val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, 
emptyMap)
-
-      // 4. accuracy metric
-      val accuracyTableName = name
-      val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
-      val accuracyMetricSql = procType match {
-        case BatchProcessType => {
-          s"""
-             |SELECT `${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
-             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
-             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
-             |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
-         """.stripMargin
-        }
-        case StreamingProcessType => {
-          s"""
-             |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS 
`${InternalColumns.tmst}`,
-             |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
-             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
-             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
-             |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
-             |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = 
`${missCountTableName}`.`${InternalColumns.tmst}`
-         """.stripMargin
-        }
-      }
-      val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, 
emptyMap)
-      val accuracyExports = procType match {
-        case BatchProcessType => {
-          val metricParam = 
RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-          genMetricExport(metricParam, accuracyTableName, accuracyTableName, 
ct, mode) :: Nil
-        }
-        case StreamingProcessType => Nil
-      }
-
-      // current accu plan
-      val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: 
accuracyStep :: Nil
-      val accuExports = missRecordsExports ++ accuracyExports
-      val accuPlan = RulePlan(accuSteps, accuExports)
-
-      // streaming extra accu plan
-      val streamingAccuPlan = procType match {
-        case BatchProcessType => emptyRulePlan
-        case StreamingProcessType => {
-          // 5. accuracy metric merge
-          val accuracyMetricTableName = "__accuracy"
-          val accuracyMetricRule = "accuracy"
-          val accuracyMetricDetails = Map[String, Any](
-            (AccuracyOprKeys._dfName -> accuracyTableName),
-            (AccuracyOprKeys._miss -> missColName),
-            (AccuracyOprKeys._total -> totalColName),
-            (AccuracyOprKeys._matched -> matchedColName)
-          )
-          val accuracyMetricStep = DfOprStep(accuracyMetricTableName,
-            accuracyMetricRule, accuracyMetricDetails)
-          val metricParam = 
RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-          val accuracyMetricExports = genMetricExport(metricParam, name, 
accuracyMetricTableName, ct, mode) :: Nil
-
-          // 6. collect accuracy records
-          val accuracyRecordTableName = "__accuracyRecords"
-          val accuracyRecordSql = {
-            s"""
-               |SELECT `${InternalColumns.tmst}`, `${InternalColumns.empty}`
-               |FROM `${accuracyMetricTableName}` WHERE 
`${InternalColumns.record}`
-             """.stripMargin
-          }
-          val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, 
accuracyRecordSql, emptyMap)
-          val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          val accuracyRecordParam = 
recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName)
-            .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName)
-          val accuracyRecordExports = genRecordExport(
-            accuracyRecordParam, missRecordsTableName, 
accuracyRecordTableName, ct, mode) :: Nil
-
-          // gen accu plan
-          val extraSteps = accuracyMetricStep :: accuracyRecordStep :: Nil
-          val extraExports = accuracyMetricExports ++ accuracyRecordExports
-          val extraPlan = RulePlan(extraSteps, extraExports)
-
-          extraPlan
-        }
-      }
-
-      // return accu plan
-      accuPlan.merge(streamingAccuPlan)
-
-    }
-  }
-
-  private def profilingRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                                param: Map[String, Any], procType: ProcessType
-                               ): RulePlan = {
-    val details = getDetails(param)
-    val profilingClause = expr.asInstanceOf[ProfilingClause]
-    val sourceName = profilingClause.fromClauseOpt match {
-      case Some(fc) => fc.dataSource
-      case _ => details.getString(ProfilingKeys._source, dataSourceNames.head)
-    }
-    val fromClause = 
profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
-
-    val mode = ExportMode.defaultMode(procType)
-
-    val ct = timeInfo.calcTime
-
-    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      emptyRulePlan
-    } else {
-      val analyzer = ProfilingAnalyzer(profilingClause, sourceName)
-      val selExprDescs = analyzer.selectionExprs.map { sel =>
-        val alias = sel match {
-          case s: AliasableExpr if (s.alias.nonEmpty) => s" AS 
`${s.alias.get}`"
-          case _ => ""
-        }
-        s"${sel.desc}${alias}"
-      }
-      val selCondition = 
profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString
-      val selClause = procType match {
-        case BatchProcessType => selExprDescs.mkString(", ")
-        case StreamingProcessType => (s"`${InternalColumns.tmst}`" +: 
selExprDescs).mkString(", ")
-      }
-      val groupByClauseOpt = analyzer.groupbyExprOpt
-      val groupbyClause = procType match {
-        case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("")
-        case StreamingProcessType => {
-          val tmstGroupbyClause = 
GroupbyClause(LiteralStringExpr(s"`${InternalColumns.tmst}`") :: Nil, None)
-          val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt 
match {
-            case Some(gbc) => gbc
-            case _ => GroupbyClause(Nil, None)
-          })
-          mergedGroubbyClause.desc
-        }
-      }
-      val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ")
-      val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" 
")
-
-      // 1. select statement
-      val profilingSql = {
-        s"SELECT ${selCondition} ${selClause} ${fromClause} 
${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
-      }
-      val profilingName = name
-      val profilingStep = SparkSqlStep(profilingName, profilingSql, details)
-      val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-      val profilingExports = genMetricExport(metricParam, name, profilingName, 
ct, mode) :: Nil
-
-      RulePlan(profilingStep :: Nil, profilingExports)
-    }
-  }
-
-  private def uniquenessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                                 param: Map[String, Any], procType: ProcessType
-                                ): RulePlan = {
-    val details = getDetails(param)
-    val sourceName = details.getString(UniquenessKeys._source, 
dataSourceNames.head)
-    val targetName = details.getString(UniquenessKeys._target, 
dataSourceNames.tail.head)
-    val analyzer = UniquenessAnalyzer(expr.asInstanceOf[UniquenessClause], 
sourceName, targetName)
-
-    val mode = ExportMode.defaultMode(procType)
-
-    val ct = timeInfo.calcTime
-
-    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      println(s"[${ct}] data source ${sourceName} not exists")
-      emptyRulePlan
-    } else if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) {
-      println(s"[${ct}] data source ${targetName} not exists")
-      emptyRulePlan
-    } else {
-      val selItemsClause = analyzer.selectionPairs.map { pair =>
-        val (expr, alias) = pair
-        s"${expr.desc} AS `${alias}`"
-      }.mkString(", ")
-      val aliases = analyzer.selectionPairs.map(_._2)
-
-      val selClause = procType match {
-        case BatchProcessType => selItemsClause
-        case StreamingProcessType => s"`${InternalColumns.tmst}`, 
${selItemsClause}"
-      }
-      val selAliases = procType match {
-        case BatchProcessType => aliases
-        case StreamingProcessType => InternalColumns.tmst +: aliases
-      }
-
-      // 1. source distinct mapping
-      val sourceTableName = "__source"
-      val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}"
-      val sourceStep = SparkSqlStep(sourceTableName, sourceSql, emptyMap)
-
-      // 2. target mapping
-      val targetTableName = "__target"
-      val targetSql = s"SELECT ${selClause} FROM ${targetName}"
-      val targetStep = SparkSqlStep(targetTableName, targetSql, emptyMap)
-
-      // 3. joined
-      val joinedTableName = "__joined"
-      val joinedSelClause = selAliases.map { alias =>
-        s"`${sourceTableName}`.`${alias}` AS `${alias}`"
-      }.mkString(", ")
-      val onClause = aliases.map { alias =>
-        s"coalesce(`${sourceTableName}`.`${alias}`, '') = 
coalesce(`${targetTableName}`.`${alias}`, '')"
-      }.mkString(" AND ")
-      val joinedSql = {
-        s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN 
`${sourceTableName}` ON ${onClause}"
-      }
-      val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap)
-
-      // 4. group
-      val groupTableName = "__group"
-      val groupSelClause = selAliases.map { alias =>
-        s"`${alias}`"
-      }.mkString(", ")
-      val dupColName = details.getStringOrKey(UniquenessKeys._dup)
-      val groupSql = {
-        s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM 
`${joinedTableName}` GROUP BY ${groupSelClause}"
-      }
-      val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap, true)
-
-      // 5. total metric
-      val totalTableName = "__totalMetric"
-      val totalColName = details.getStringOrKey(UniquenessKeys._total)
-      val totalSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
-        case StreamingProcessType => {
-          s"""
-             |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}`
-             |FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`
-           """.stripMargin
-        }
-      }
-      val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap)
-      val totalMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName, ct, mode)
-
-      // 6. unique record
-      val uniqueRecordTableName = "__uniqueRecord"
-      val uniqueRecordSql = {
-        s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0"
-      }
-      val uniqueRecordStep = SparkSqlStep(uniqueRecordTableName, 
uniqueRecordSql, emptyMap)
-
-      // 7. unique metric
-      val uniqueTableName = "__uniqueMetric"
-      val uniqueColName = details.getStringOrKey(UniquenessKeys._unique)
-      val uniqueSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM 
`${uniqueRecordTableName}`"
-        case StreamingProcessType => {
-          s"""
-             |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${uniqueColName}`
-             |FROM `${uniqueRecordTableName}` GROUP BY 
`${InternalColumns.tmst}`
-           """.stripMargin
-        }
-      }
-      val uniqueStep = SparkSqlStep(uniqueTableName, uniqueSql, emptyMap)
-      val uniqueMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val uniqueMetricExport = genMetricExport(uniqueMetricParam, 
uniqueColName, uniqueTableName, ct, mode)
-
-      val uniqueSteps = sourceStep :: targetStep :: joinedStep :: groupStep ::
-        totalStep :: uniqueRecordStep :: uniqueStep :: Nil
-      val uniqueExports = totalMetricExport :: uniqueMetricExport :: Nil
-      val uniqueRulePlan = RulePlan(uniqueSteps, uniqueExports)
-
-      val duplicationArrayName = 
details.getString(UniquenessKeys._duplicationArray, "")
-      val dupRulePlan = if (duplicationArrayName.nonEmpty) {
-        // 8. duplicate record
-        val dupRecordTableName = "__dupRecords"
-        val dupRecordSql = {
-          s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0"
-        }
-        val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
-        val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-        val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, 
dupRecordTableName, ct, mode)
-
-        // 9. duplicate metric
-        val dupMetricTableName = "__dupMetric"
-        val numColName = details.getStringOrKey(UniquenessKeys._num)
-        val dupMetricSelClause = procType match {
-          case BatchProcessType => s"`${dupColName}`, COUNT(*) AS 
`${numColName}`"
-          case StreamingProcessType => s"`${InternalColumns.tmst}`, 
`${dupColName}`, COUNT(*) AS `${numColName}`"
-        }
-        val dupMetricGroupbyClause = procType match {
-          case BatchProcessType => s"`${dupColName}`"
-          case StreamingProcessType => s"`${InternalColumns.tmst}`, 
`${dupColName}`"
-        }
-        val dupMetricSql = {
-          s"""
-             |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}`
-             |GROUP BY ${dupMetricGroupbyClause}
-          """.stripMargin
-        }
-        val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
-        val dupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, ct, mode)
-
-        RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: 
dupMetricExport :: Nil)
-      } else emptyRulePlan
-
-      uniqueRulePlan.merge(dupRulePlan)
-    }
-  }
-
-  private def distinctRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                               param: Map[String, Any], procType: ProcessType,
-                               dsTimeRanges: Map[String, TimeRange]
-                              ): RulePlan = {
-    val details = getDetails(param)
-    val sourceName = details.getString(DistinctnessKeys._source, 
dataSourceNames.head)
-    val targetName = details.getString(UniquenessKeys._target, 
dataSourceNames.tail.head)
-    val analyzer = DistinctnessAnalyzer(expr.asInstanceOf[DistinctnessClause], 
sourceName)
-
-    val mode = SimpleMode
-
-    val ct = timeInfo.calcTime
-
-    val sourceTimeRange = dsTimeRanges.get(sourceName).getOrElse(TimeRange(ct))
-    val beginTime = sourceTimeRange.begin
-
-    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      println(s"[${ct}] data source ${sourceName} not exists")
-      emptyRulePlan
-    } else {
-      val withOlderTable = {
-        details.getBoolean(DistinctnessKeys._withAccumulate, true) &&
-          TableRegisters.existRunTempTable(timeInfo.key, targetName)
-      }
-
-      val selClause = analyzer.selectionPairs.map { pair =>
-        val (expr, alias) = pair
-        s"${expr.desc} AS `${alias}`"
-      }.mkString(", ")
-      val aliases = analyzer.selectionPairs.map(_._2)
-      val aliasesClause = aliases.map( a => s"`${a}`" ).mkString(", ")
-
-      // 1. source alias
-      val sourceAliasTableName = "__sourceAlias"
-      val sourceAliasSql = {
-        s"SELECT ${selClause} FROM `${sourceName}`"
-      }
-      val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, 
emptyMap, true)
-
-      // 2. total metric
-      val totalTableName = "__totalMetric"
-      val totalColName = details.getStringOrKey(DistinctnessKeys._total)
-      val totalSql = {
-        s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
-      }
-      val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap)
-      val totalMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName, beginTime, mode)
-
-      // 3. group by self
-      val selfGroupTableName = "__selfGroup"
-      val dupColName = details.getStringOrKey(DistinctnessKeys._dup)
-      val accuDupColName = details.getStringOrKey(DistinctnessKeys._accu_dup)
-      val selfGroupSql = {
-        s"""
-           |SELECT ${aliasesClause}, (COUNT(*) - 1) AS `${dupColName}`,
-           |TRUE AS `${InternalColumns.distinct}`
-           |FROM `${sourceAliasTableName}` GROUP BY ${aliasesClause}
-          """.stripMargin
-      }
-      val selfGroupStep = SparkSqlStep(selfGroupTableName, selfGroupSql, 
emptyMap, true)
-
-      val selfDistRulePlan = RulePlan(
-        sourceAliasStep :: totalStep :: selfGroupStep :: Nil,
-        totalMetricExport :: Nil
-      )
-
-      val (distRulePlan, dupCountTableName) = procType match {
-        case StreamingProcessType if (withOlderTable) => {
-          // 4. older alias
-          val olderAliasTableName = "__older"
-          val olderAliasSql = {
-            s"SELECT ${selClause} FROM `${targetName}` WHERE 
`${InternalColumns.tmst}` < ${beginTime}"
-          }
-          val olderAliasStep = SparkSqlStep(olderAliasTableName, 
olderAliasSql, emptyMap)
-
-          // 5. join with older data
-          val joinedTableName = "__joined"
-          val selfSelClause = (aliases :+ dupColName).map { alias =>
-            s"`${selfGroupTableName}`.`${alias}`"
-          }.mkString(", ")
-          val onClause = aliases.map { alias =>
-            s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = 
coalesce(`${olderAliasTableName}`.`${alias}`, '')"
-          }.mkString(" AND ")
-          val olderIsNull = aliases.map { alias =>
-            s"`${olderAliasTableName}`.`${alias}` IS NULL"
-          }.mkString(" AND ")
-          val joinedSql = {
-            s"""
-               |SELECT ${selfSelClause}, (${olderIsNull}) AS 
`${InternalColumns.distinct}`
-               |FROM `${olderAliasTableName}` RIGHT JOIN 
`${selfGroupTableName}`
-               |ON ${onClause}
-            """.stripMargin
-          }
-          val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap)
-
-          // 6. group by joined data
-          val groupTableName = "__group"
-          val moreDupColName = "_more_dup"
-          val groupSql = {
-            s"""
-               |SELECT ${aliasesClause}, `${dupColName}`, 
`${InternalColumns.distinct}`,
-               |COUNT(*) AS `${moreDupColName}`
-               |FROM `${joinedTableName}`
-               |GROUP BY ${aliasesClause}, `${dupColName}`, 
`${InternalColumns.distinct}`
-             """.stripMargin
-          }
-          val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap)
-
-          // 7. final duplicate count
-          val finalDupCountTableName = "__finalDupCount"
-          val finalDupCountSql = {
-            s"""
-               |SELECT ${aliasesClause}, `${InternalColumns.distinct}`,
-               |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}`
-               |ELSE (`${dupColName}` + 1) END AS `${dupColName}`,
-               |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}`
-               |ELSE (`${dupColName}` + `${moreDupColName}`) END AS 
`${accuDupColName}`
-               |FROM `${groupTableName}`
-             """.stripMargin
-          }
-          val finalDupCountStep = SparkSqlStep(finalDupCountTableName, 
finalDupCountSql, emptyMap, true)
-
-          val rulePlan = RulePlan(olderAliasStep :: joinedStep :: groupStep :: 
finalDupCountStep :: Nil, Nil)
-          (rulePlan, finalDupCountTableName)
-        }
-        case _ => {
-          (emptyRulePlan, selfGroupTableName)
-        }
-      }
-
-      // 8. distinct metric
-      val distTableName = "__distMetric"
-      val distColName = details.getStringOrKey(DistinctnessKeys._distinct)
-      val distSql = {
-        s"""
-           |SELECT COUNT(*) AS `${distColName}`
-           |FROM `${dupCountTableName}` WHERE `${InternalColumns.distinct}`
-         """.stripMargin
-      }
-      val distStep = SparkSqlStep(distTableName, distSql, emptyMap)
-      val distMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val distMetricExport = genMetricExport(distMetricParam, distColName, 
distTableName, beginTime, mode)
-
-      val distMetricRulePlan = RulePlan(distStep :: Nil, distMetricExport :: 
Nil)
-
-      val duplicationArrayName = 
details.getString(UniquenessKeys._duplicationArray, "")
-      val dupRulePlan = if (duplicationArrayName.nonEmpty) {
-        // 9. duplicate record
-        val dupRecordTableName = "__dupRecords"
-        val dupRecordSelClause = procType match {
-          case StreamingProcessType if (withOlderTable) => s"${aliasesClause}, 
`${dupColName}`, `${accuDupColName}`"
-          case _ => s"${aliasesClause}, `${dupColName}`"
-        }
-        val dupRecordSql = {
-          s"""
-             |SELECT ${dupRecordSelClause}
-             |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
-           """.stripMargin
-        }
-        val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
-        val dupRecordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-        val dupRecordExport = genRecordExport(dupRecordParam, 
dupRecordTableName, dupRecordTableName, beginTime, mode)
-
-        // 10. duplicate metric
-        val dupMetricTableName = "__dupMetric"
-        val numColName = details.getStringOrKey(DistinctnessKeys._num)
-        val dupMetricSql = {
-          s"""
-             |SELECT `${dupColName}`, COUNT(*) AS `${numColName}`
-             |FROM `${dupRecordTableName}` GROUP BY `${dupColName}`
-         """.stripMargin
-        }
-        val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
-        val dupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, beginTime, mode)
-
-        RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: 
dupMetricExport :: Nil)
-      } else emptyRulePlan
-
-      
selfDistRulePlan.merge(distRulePlan).merge(distMetricRulePlan).merge(dupRulePlan)
-
-    }
-  }
-
-  private def timelinessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                                 param: Map[String, Any], procType: ProcessType
-                                ): RulePlan = {
-    val details = getDetails(param)
-    val timelinessClause = expr.asInstanceOf[TimelinessClause]
-    val sourceName = details.getString(TimelinessKeys._source, 
dataSourceNames.head)
-
-    val mode = ExportMode.defaultMode(procType)
-
-    val ct = timeInfo.calcTime
-
-    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      emptyRulePlan
-    } else {
-      val analyzer = TimelinessAnalyzer(timelinessClause, sourceName)
-      val btsSel = analyzer.btsExpr
-      val etsSelOpt = analyzer.etsExprOpt
-
-      // 1. in time
-      val inTimeTableName = "__inTime"
-      val inTimeSql = etsSelOpt match {
-        case Some(etsSel) => {
-          s"""
-             |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`,
-             |(${etsSel}) AS `${InternalColumns.endTs}`
-             |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) 
IS NOT NULL
-           """.stripMargin
-        }
-        case _ => {
-          s"""
-             |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`
-             |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL
-           """.stripMargin
-        }
-      }
-      val inTimeStep = SparkSqlStep(inTimeTableName, inTimeSql, emptyMap)
-
-      // 2. latency
-      val latencyTableName = "__lat"
-      val latencyColName = details.getStringOrKey(TimelinessKeys._latency)
-      val etsColName = etsSelOpt match {
-        case Some(_) => InternalColumns.endTs
-        case _ => InternalColumns.tmst
-      }
-      val latencySql = {
-        s"SELECT *, (`${etsColName}` - `${InternalColumns.beginTs}`) AS 
`${latencyColName}` FROM `${inTimeTableName}`"
-      }
-      val latencyStep = SparkSqlStep(latencyTableName, latencySql, emptyMap, 
true)
-
-      // 3. timeliness metric
-      val metricTableName = name
-      val totalColName = details.getStringOrKey(TimelinessKeys._total)
-      val avgColName = details.getStringOrKey(TimelinessKeys._avg)
-      val metricSql = procType match {
-        case BatchProcessType => {
-          s"""
-             |SELECT COUNT(*) AS `${totalColName}`,
-             |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
-             |FROM `${latencyTableName}`
-           """.stripMargin
-        }
-        case StreamingProcessType => {
-          s"""
-             |SELECT `${InternalColumns.tmst}`,
-             |COUNT(*) AS `${totalColName}`,
-             |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
-             |FROM `${latencyTableName}`
-             |GROUP BY `${InternalColumns.tmst}`
-           """.stripMargin
-        }
-      }
-      val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap)
-      val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-      val metricExports = genMetricExport(metricParam, name, metricTableName, 
ct, mode) :: Nil
-
-      // current timeliness plan
-      val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil
-      val timeExports = metricExports
-      val timePlan = RulePlan(timeSteps, timeExports)
-
-      // 4. timeliness record
-      val recordPlan = 
TimeUtil.milliseconds(details.getString(TimelinessKeys._threshold, "")) match {
-        case Some(tsh) => {
-          val recordTableName = "__lateRecords"
-          val recordSql = {
-            s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > 
${tsh}"
-          }
-          val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap)
-          val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          val recordExports = genRecordExport(recordParam, recordTableName, 
recordTableName, ct, mode) :: Nil
-          RulePlan(recordStep :: Nil, recordExports)
-        }
-        case _ => emptyRulePlan
-      }
-
-      // 5. ranges
-//      val rangePlan = details.get(TimelinessKeys._rangeSplit) match {
-//        case Some(arr: Seq[String]) => {
-//          val ranges = splitTimeRanges(arr)
-//          if (ranges.size > 0) {
-//            try {
-//              // 5.1. range
-//              val rangeTableName = "__range"
-//              val rangeColName = 
details.getStringOrKey(TimelinessKeys._range)
-//              val caseClause = {
-//                val whenClause = ranges.map { range =>
-//                  s"WHEN `${latencyColName}` < ${range._1} THEN 
'<${range._2}'"
-//                }.mkString("\n")
-//                s"CASE ${whenClause} ELSE '>=${ranges.last._2}' END AS 
`${rangeColName}`"
-//              }
-//              val rangeSql = {
-//                s"SELECT *, ${caseClause} FROM `${latencyTableName}`"
-//              }
-//              val rangeStep = SparkSqlStep(rangeTableName, rangeSql, 
emptyMap)
-//
-//              // 5.2. range metric
-//              val rangeMetricTableName = "__rangeMetric"
-//              val countColName = 
details.getStringOrKey(TimelinessKeys._count)
-//              val rangeMetricSql = procType match {
-//                case BatchProcessType => {
-//                  s"""
-//                     |SELECT `${rangeColName}`, COUNT(*) AS `${countColName}`
-//                     |FROM `${rangeTableName}` GROUP BY `${rangeColName}`
-//                  """.stripMargin
-//                }
-//                case StreamingProcessType => {
-//                  s"""
-//                     |SELECT `${InternalColumns.tmst}`, `${rangeColName}`, 
COUNT(*) AS `${countColName}`
-//                     |FROM `${rangeTableName}` GROUP BY 
`${InternalColumns.tmst}`, `${rangeColName}`
-//                  """.stripMargin
-//                }
-//              }
-//              val rangeMetricStep = SparkSqlStep(rangeMetricTableName, 
rangeMetricSql, emptyMap)
-//              val rangeMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-//              val rangeMetricExports = genMetricExport(rangeMetricParam, 
rangeColName, rangeMetricTableName, ct, mode) :: Nil
-//
-//              RulePlan(rangeStep :: rangeMetricStep :: Nil, 
rangeMetricExports)
-//            } catch {
-//              case _: Throwable => emptyRulePlan
-//            }
-//          } else emptyRulePlan
-//        }
-//        case _ => emptyRulePlan
-//      }
-
-      // return timeliness plan
-
-      // 5. ranges
-      val rangePlan = 
TimeUtil.milliseconds(details.getString(TimelinessKeys._stepSize, "")) match {
-        case Some(stepSize) => {
-          // 5.1 range
-          val rangeTableName = "__range"
-          val stepColName = details.getStringOrKey(TimelinessKeys._step)
-          val rangeSql = {
-            s"""
-               |SELECT *, CAST((`${latencyColName}` / ${stepSize}) AS BIGINT) 
AS `${stepColName}`
-               |FROM `${latencyTableName}`
-             """.stripMargin
-          }
-          val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap)
-
-          // 5.2 range metric
-          val rangeMetricTableName = "__rangeMetric"
-          val countColName = details.getStringOrKey(TimelinessKeys._count)
-          val rangeMetricSql = procType match {
-            case BatchProcessType => {
-              s"""
-                 |SELECT `${stepColName}`, COUNT(*) AS `${countColName}`
-                 |FROM `${rangeTableName}` GROUP BY `${stepColName}`
-                """.stripMargin
-            }
-            case StreamingProcessType => {
-              s"""
-                 |SELECT `${InternalColumns.tmst}`, `${stepColName}`, COUNT(*) 
AS `${countColName}`
-                 |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, 
`${stepColName}`
-                """.stripMargin
-            }
-          }
-          val rangeMetricStep = SparkSqlStep(rangeMetricTableName, 
rangeMetricSql, emptyMap)
-          val rangeMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-          val rangeMetricExports = genMetricExport(rangeMetricParam, 
stepColName, rangeMetricTableName, ct, mode) :: Nil
-
-          RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports)
-        }
-        case _ => emptyRulePlan
-      }
-
-      timePlan.merge(recordPlan).merge(rangePlan)
-    }
-  }
-
-  private def splitTimeRanges(tstrs: Seq[String]): List[(Long, String)] = {
-    val ts = tstrs.flatMap(TimeUtil.milliseconds(_)).sorted.toList
-    ts.map { t => (t, TimeUtil.time2String(t)) }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
index 25025ac..e85575f 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
@@ -20,29 +20,12 @@ package org.apache.griffin.measure.rule.adaptor
 
 import java.util.concurrent.atomic.AtomicLong
 
-import org.apache.griffin.measure.cache.tmst.TempName
-
-import scala.collection.mutable.{Set => MutableSet}
-import org.apache.griffin.measure.config.params.user._
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.process.{ExportMode, ProcessType}
 import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
-
-//object RuleInfoKeys {
-//  val _name = "name"
-//  val _rule = "rule"
-//  val _details = "details"
-//  val _dslType = "dsl.type"
-//  val _dqType = "dq.type"
-//  val _global = "global"
-////  val _gatherStep = "gather.step"
-//
-//  val _metric = "metric"
-//  val _record = "record"
-//}
-//import RuleInfoKeys._
+import org.apache.griffin.measure.rule.trans.{DsUpdateFactory, 
RuleExportFactory}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 object RuleParamKeys {
@@ -56,6 +39,7 @@ object RuleParamKeys {
 
   val _metric = "metric"
   val _record = "record"
+  val _dsUpdate = "ds.update"
 
   def getName(param: Map[String, Any], defName: String): String = 
param.getString(_name, defName)
   def getRule(param: Map[String, Any]): String = param.getString(_rule, "")
@@ -66,52 +50,11 @@ object RuleParamKeys {
 
   def getMetricOpt(param: Map[String, Any]): Option[Map[String, Any]] = 
param.getParamMapOpt(_metric)
   def getRecordOpt(param: Map[String, Any]): Option[Map[String, Any]] = 
param.getParamMapOpt(_record)
-}
-
-object ExportParamKeys {
-  val _name = "name"
-  val _collectType = "collect.type"
-  val _dataSourceCache = "data.source.cache"
-  val _originDF = "origin.DF"
-
-  def getName(param: Map[String, Any], defName: String): String = 
param.getString(_name, defName)
-  def getCollectType(param: Map[String, Any]): CollectType = 
CollectType(param.getString(_collectType, ""))
-  def getDataSourceCacheOpt(param: Map[String, Any]): Option[String] = 
param.get(_dataSourceCache).map(_.toString)
-  def getOriginDFOpt(param: Map[String, Any]): Option[String] = 
param.get(_originDF).map(_.toString)
+  def getDsUpdateOpt(param: Map[String, Any]): Option[Map[String, Any]] = 
param.getParamMapOpt(_dsUpdate)
 }
 
 trait RuleAdaptor extends Loggable with Serializable {
 
-//  val adaptPhase: AdaptPhase
-
-//  protected def genRuleInfo(param: Map[String, Any]): RuleInfo = 
RuleInfoGen(param)
-
-//  protected def getName(param: Map[String, Any]) = param.getOrElse(_name, 
RuleStepNameGenerator.genName).toString
-//  protected def getRule(param: Map[String, Any]) = param.getOrElse(_rule, 
"").toString
-//  protected def getDetails(param: Map[String, Any]) = param.get(_details) 
match {
-//    case Some(dt: Map[String, Any]) => dt
-//    case _ => Map[String, Any]()
-//  }
-
-
-
-//  def getPersistNames(steps: Seq[RuleStep]): Seq[String] = 
steps.map(_.ruleInfo.persistName)
-//
-//  protected def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): 
Seq[RuleStep]
-//  protected def adaptConcreteRuleStep(ruleStep: RuleStep): 
Seq[ConcreteRuleStep]
-//  def genConcreteRuleStep(timeInfo: TimeInfo, param: Map[String, Any]
-//                         ): Seq[ConcreteRuleStep] = {
-//    genRuleStep(timeInfo, param).flatMap { rs =>
-//      adaptConcreteRuleStep(rs)
-//    }
-//  }
-
-
-
-//  def genRuleInfos(param: Map[String, Any], timeInfo: TimeInfo): 
Seq[RuleInfo] = {
-//    RuleInfoGen(param) :: Nil
-//  }
-
   protected def getRuleName(param: Map[String, Any]): String = {
     RuleParamKeys.getName(param, RuleStepNameGenerator.genName)
   }
@@ -124,66 +67,25 @@ trait RuleAdaptor extends Loggable with Serializable {
                                mode: ExportMode
                               ): Seq[RuleExport] = {
     val metricOpt = RuleParamKeys.getMetricOpt(param)
-    val metricExportSeq = metricOpt.map(genMetricExport(_, defName, stepName, 
defTimestamp, mode)).toSeq
+    val metricExportSeq = metricOpt.map(
+      RuleExportFactory.genMetricExport(_, defName, stepName, defTimestamp, 
mode)
+    ).toSeq
     val recordOpt = RuleParamKeys.getRecordOpt(param)
-    val recordExportSeq = recordOpt.map(genRecordExport(_, defName, stepName, 
defTimestamp, mode)).toSeq
+    val recordExportSeq = recordOpt.map(
+      RuleExportFactory.genRecordExport(_, defName, stepName, defTimestamp, 
mode)
+    ).toSeq
     metricExportSeq ++ recordExportSeq
   }
-  protected def genMetricExport(param: Map[String, Any], name: String, 
stepName: String,
-                                defTimestamp: Long, mode: ExportMode
-                               ): MetricExport = {
-    MetricExport(
-      ExportParamKeys.getName(param, name),
-      stepName,
-      ExportParamKeys.getCollectType(param),
-      defTimestamp,
-      mode
-    )
-  }
-  protected def genRecordExport(param: Map[String, Any], name: String, 
stepName: String,
-                                defTimestamp: Long, mode: ExportMode
-                               ): RecordExport = {
-    RecordExport(
-      ExportParamKeys.getName(param, name),
-      stepName,
-      ExportParamKeys.getDataSourceCacheOpt(param),
-      ExportParamKeys.getOriginDFOpt(param),
-      defTimestamp,
-      mode
-    )
-  }
-
 
+  protected def genDsUpdates(param: Map[String, Any], defDsName: String,
+                             stepName: String
+                            ): Seq[DsUpdate] = {
+    val dsUpdateOpt = RuleParamKeys.getDsUpdateOpt(param)
+    dsUpdateOpt.map(DsUpdateFactory.genDsUpdate(_, defDsName, stepName)).toSeq
+  }
 
 }
 
-
-
-//object RuleInfoGen {
-//  def apply(param: Map[String, Any]): RuleInfo = {
-//    val name = param.get(_name) match {
-//      case Some(n: String) => n
-//      case _ => RuleStepNameGenerator.genName
-//    }
-//    RuleInfo(
-//      name,
-//      None,
-//      DslType(param.getString(_dslType, "")),
-//      param.getString(_rule, ""),
-//      param.getParamMap(_details),
-//      param.getBoolean(_gatherStep, false)
-//    )
-//  }
-//  def apply(ri: RuleInfo, timeInfo: TimeInfo): RuleInfo = {
-//    if (ri.persistType.needPersist) {
-//      val tmstName = TempName.tmstName(ri.name, timeInfo)
-//      ri.setTmstNameOpt(Some(tmstName))
-//    } else ri
-//  }
-//
-//  def dqType(param: Map[String, Any]): DqType = 
DqType(param.getString(_dqType, ""))
-//}
-
 object RuleStepNameGenerator {
   private val counter: AtomicLong = new AtomicLong(0L)
   private val head: String = "rs"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
index 1fce03b..b7c68b5 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
@@ -45,7 +45,11 @@ case class SparkSqlAdaptor() extends RuleAdaptor {
     val name = getRuleName(param)
     val step = SparkSqlStep(name, getRule(param), getDetails(param), 
getCache(param), getGlobal(param))
     val mode = ExportMode.defaultMode(procType)
-    RulePlan(step :: Nil, genRuleExports(param, name, name, timeInfo.calcTime, 
mode))
+    RulePlan(
+      step :: Nil,
+      genRuleExports(param, name, name, timeInfo.calcTime, mode),
+      genDsUpdates(param, "", name)
+    )
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala
new file mode 100644
index 0000000..4956b29
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.plan
+
+case class DsUpdate(dsName: String,
+                    stepName: String
+                   ) extends Serializable {
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
index ac14153..84313e4 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
@@ -28,7 +28,4 @@ case class MetricExport(name: String,
                         mode: ExportMode
                        ) extends RuleExport {
 
-  def setDefTimestamp(t: Long): RuleExport =
-    MetricExport(name, stepName, collectType, t, mode)
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
index 6afc836..c69dc55 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
@@ -28,7 +28,4 @@ case class RecordExport(name: String,
                         mode: ExportMode
                        ) extends RuleExport {
 
-  def setDefTimestamp(t: Long): RuleExport =
-    RecordExport(name, stepName, dataSourceCacheOpt, originDFOpt, t, mode)
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
index 84467c2..da5eb9d 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
@@ -30,6 +30,4 @@ trait RuleExport extends Serializable {
 
   val mode: ExportMode   // export mode
 
-  def setDefTimestamp(t: Long): RuleExport
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala
index 54a6062..678ab3e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala
@@ -21,7 +21,8 @@ package org.apache.griffin.measure.rule.plan
 import scala.reflect.ClassTag
 
 case class RulePlan(ruleSteps: Seq[RuleStep],
-                    ruleExports: Seq[RuleExport]
+                    ruleExports: Seq[RuleExport],
+                    dsUpdates: Seq[DsUpdate] = Nil
                    ) extends Serializable {
 
   val globalRuleSteps = filterRuleSteps(_.global)
@@ -48,7 +49,11 @@ case class RulePlan(ruleSteps: Seq[RuleStep],
 //  }
 
   def merge(rp: RulePlan): RulePlan = {
-    RulePlan(this.ruleSteps ++ rp.ruleSteps, this.ruleExports ++ 
rp.ruleExports)
+    RulePlan(
+      this.ruleSteps ++ rp.ruleSteps,
+      this.ruleExports ++ rp.ruleExports,
+      this.dsUpdates ++ rp.dsUpdates
+    )
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
new file mode 100644
index 0000000..2ff8feb
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
@@ -0,0 +1,198 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.trans
+
+import org.apache.griffin.measure.process.engine.DataFrameOprs.AccuracyOprKeys
+import org.apache.griffin.measure.process.temp.TableRegisters
+import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, 
ProcessType, StreamingProcessType}
+import org.apache.griffin.measure.rule.adaptor._
+import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
+import org.apache.griffin.measure.rule.dsl.analyzer.AccuracyAnalyzer
+import org.apache.griffin.measure.rule.dsl.expr.{Expr, LogicalExpr}
+import org.apache.griffin.measure.rule.plan._
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.rule.trans.RuleExportFactory._
+import org.apache.griffin.measure.rule.trans.DsUpdateFactory._
+
+case class AccuracyRulePlanTrans(dataSourceNames: Seq[String],
+                                 timeInfo: TimeInfo, name: String, expr: Expr,
+                                 param: Map[String, Any], procType: ProcessType
+                                ) extends RulePlanTrans {
+
+  private object AccuracyKeys {
+    val _source = "source"
+    val _target = "target"
+    val _miss = "miss"
+    val _total = "total"
+    val _matched = "matched"
+  }
+  import AccuracyKeys._
+
+  def trans(): RulePlan = {
+    val details = getDetails(param)
+    val sourceName = details.getString(_source, dataSourceNames.head)
+    val targetName = details.getString(_target, dataSourceNames.tail.head)
+    val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], 
sourceName, targetName)
+
+    val mode = ExportMode.defaultMode(procType)
+
+    val ct = timeInfo.calcTime
+
+    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+      println(s"[${ct}] data source ${sourceName} not exists")
+      emptyRulePlan
+    } else {
+      // 1. miss record
+      val missRecordsTableName = "__missRecords"
+      val selClause = s"`${sourceName}`.*"
+      val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, 
targetName)) {
+        println(s"[${ct}] data source ${targetName} not exists")
+        s"SELECT ${selClause} FROM `${sourceName}`"
+      } else {
+        val onClause = expr.coalesceDesc
+        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
+          s"${sel.desc} IS NULL"
+        }.mkString(" AND ")
+        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
+          s"${sel.desc} IS NULL"
+        }.mkString(" AND ")
+        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` 
ON ${onClause} WHERE ${whereClause}"
+      }
+      val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, 
emptyMap, true)
+      val missRecordsExports = procType match {
+        case BatchProcessType => {
+          val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+          genRecordExport(recordParam, missRecordsTableName, 
missRecordsTableName, ct, mode) :: Nil
+        }
+        case StreamingProcessType => Nil
+      }
+      val missRecordsUpdates = procType match {
+        case BatchProcessType => Nil
+        case StreamingProcessType => {
+          val updateParam = emptyMap
+          genDsUpdate(updateParam, sourceName, missRecordsTableName) :: Nil
+        }
+      }
+
+      // 2. miss count
+      val missCountTableName = "__missCount"
+      val missColName = details.getStringOrKey(_miss)
+      val missCountSql = procType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM 
`${missRecordsTableName}`"
+        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY 
`${InternalColumns.tmst}`"
+      }
+      val missCountStep = SparkSqlStep(missCountTableName, missCountSql, 
emptyMap)
+
+      // 3. total count
+      val totalCountTableName = "__totalCount"
+      val totalColName = details.getStringOrKey(_total)
+      val totalCountSql = procType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
+        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY 
`${InternalColumns.tmst}`"
+      }
+      val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, 
emptyMap)
+
+      // 4. accuracy metric
+      val accuracyTableName = name
+      val matchedColName = details.getStringOrKey(_matched)
+      val accuracyMetricSql = procType match {
+        case BatchProcessType => {
+          s"""
+             |SELECT `${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
+             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
+             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+             |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
+         """.stripMargin
+        }
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS 
`${InternalColumns.tmst}`,
+             |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
+             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+             |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
+             |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = 
`${missCountTableName}`.`${InternalColumns.tmst}`
+         """.stripMargin
+        }
+      }
+      val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, 
emptyMap)
+      val accuracyExports = procType match {
+        case BatchProcessType => {
+          val metricParam = 
RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+          genMetricExport(metricParam, accuracyTableName, accuracyTableName, 
ct, mode) :: Nil
+        }
+        case StreamingProcessType => Nil
+      }
+
+      // current accu plan
+      val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: 
accuracyStep :: Nil
+      val accuExports = missRecordsExports ++ accuracyExports
+      val accuUpdates = missRecordsUpdates
+      val accuPlan = RulePlan(accuSteps, accuExports, accuUpdates)
+
+      // streaming extra accu plan
+      val streamingAccuPlan = procType match {
+        case BatchProcessType => emptyRulePlan
+        case StreamingProcessType => {
+          // 5. accuracy metric merge
+          val accuracyMetricTableName = "__accuracy"
+          val accuracyMetricRule = "accuracy"
+          val accuracyMetricDetails = Map[String, Any](
+            (AccuracyOprKeys._dfName -> accuracyTableName),
+            (AccuracyOprKeys._miss -> missColName),
+            (AccuracyOprKeys._total -> totalColName),
+            (AccuracyOprKeys._matched -> matchedColName)
+          )
+          val accuracyMetricStep = DfOprStep(accuracyMetricTableName,
+            accuracyMetricRule, accuracyMetricDetails)
+          val metricParam = 
RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+          val accuracyMetricExports = genMetricExport(metricParam, name, 
accuracyMetricTableName, ct, mode) :: Nil
+
+          // 6. collect accuracy records
+          val accuracyRecordTableName = "__accuracyRecords"
+          val accuracyRecordSql = {
+            s"""
+               |SELECT `${InternalColumns.tmst}`, `${InternalColumns.empty}`
+               |FROM `${accuracyMetricTableName}` WHERE 
`${InternalColumns.record}`
+             """.stripMargin
+          }
+          val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, 
accuracyRecordSql, emptyMap)
+          val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+          val accuracyRecordParam = 
recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName)
+            .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName)
+          val accuracyRecordExports = genRecordExport(
+            accuracyRecordParam, missRecordsTableName, 
accuracyRecordTableName, ct, mode) :: Nil
+
+          // gen accu plan
+          val extraSteps = accuracyMetricStep :: accuracyRecordStep :: Nil
+          val extraExports = accuracyMetricExports ++ accuracyRecordExports
+          val extraPlan = RulePlan(extraSteps, extraExports)
+
+          extraPlan
+        }
+      }
+
+      // return accu plan
+      accuPlan.merge(streamingAccuPlan)
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
new file mode 100644
index 0000000..0f4e7c4
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
@@ -0,0 +1,234 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.trans
+
+import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange}
+import org.apache.griffin.measure.process._
+import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
+import org.apache.griffin.measure.rule.adaptor._
+import org.apache.griffin.measure.rule.dsl.{ArrayCollectType, 
EntriesCollectType}
+import org.apache.griffin.measure.rule.dsl.analyzer.DistinctnessAnalyzer
+import org.apache.griffin.measure.rule.dsl.expr._
+import org.apache.griffin.measure.rule.plan._
+import org.apache.griffin.measure.rule.trans.RuleExportFactory._
+import org.apache.griffin.measure.utils.ParamUtil._
+
+case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String],
+                                     timeInfo: TimeInfo, name: String, expr: 
Expr,
+                                     param: Map[String, Any], procType: 
ProcessType,
+                                     dsTimeRanges: Map[String, TimeRange]
+                                    ) extends RulePlanTrans {
+
+  private object DistinctnessKeys {
+    val _source = "source"
+    val _target = "target"
+    val _distinct = "distinct"
+    val _total = "total"
+    val _dup = "dup"
+    val _accu_dup = "accu_dup"
+    val _num = "num"
+
+    val _duplicationArray = "duplication.array"
+    val _withAccumulate = "with.accumulate"
+  }
+  import DistinctnessKeys._
+
+  def trans(): RulePlan = {
+    val details = getDetails(param)
+    val sourceName = details.getString(_source, dataSourceNames.head)
+    val targetName = details.getString(_target, dataSourceNames.tail.head)
+    val analyzer = DistinctnessAnalyzer(expr.asInstanceOf[DistinctnessClause], 
sourceName)
+
+    val mode = SimpleMode
+
+    val ct = timeInfo.calcTime
+
+    val sourceTimeRange = dsTimeRanges.get(sourceName).getOrElse(TimeRange(ct))
+    val beginTime = sourceTimeRange.begin
+
+    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+      println(s"[${ct}] data source ${sourceName} not exists")
+      emptyRulePlan
+    } else {
+      val withOlderTable = {
+        details.getBoolean(_withAccumulate, true) &&
+          TableRegisters.existRunTempTable(timeInfo.key, targetName)
+      }
+
+      val selClause = analyzer.selectionPairs.map { pair =>
+        val (expr, alias) = pair
+        s"${expr.desc} AS `${alias}`"
+      }.mkString(", ")
+      val aliases = analyzer.selectionPairs.map(_._2)
+      val aliasesClause = aliases.map( a => s"`${a}`" ).mkString(", ")
+
+      // 1. source alias
+      val sourceAliasTableName = "__sourceAlias"
+      val sourceAliasSql = {
+        s"SELECT ${selClause} FROM `${sourceName}`"
+      }
+      val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, 
emptyMap, true)
+
+      // 2. total metric
+      val totalTableName = "__totalMetric"
+      val totalColName = details.getStringOrKey(_total)
+      val totalSql = {
+        s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
+      }
+      val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap)
+      val totalMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
+      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName, beginTime, mode)
+
+      // 3. group by self
+      val selfGroupTableName = "__selfGroup"
+      val dupColName = details.getStringOrKey(_dup)
+      val accuDupColName = details.getStringOrKey(_accu_dup)
+      val selfGroupSql = {
+        s"""
+           |SELECT ${aliasesClause}, (COUNT(*) - 1) AS `${dupColName}`,
+           |TRUE AS `${InternalColumns.distinct}`
+           |FROM `${sourceAliasTableName}` GROUP BY ${aliasesClause}
+          """.stripMargin
+      }
+      val selfGroupStep = SparkSqlStep(selfGroupTableName, selfGroupSql, 
emptyMap, true)
+
+      val selfDistRulePlan = RulePlan(
+        sourceAliasStep :: totalStep :: selfGroupStep :: Nil,
+        totalMetricExport :: Nil
+      )
+
+      val (distRulePlan, dupCountTableName) = procType match {
+        case StreamingProcessType if (withOlderTable) => {
+          // 4. older alias
+          val olderAliasTableName = "__older"
+          val olderAliasSql = {
+            s"SELECT ${selClause} FROM `${targetName}` WHERE 
`${InternalColumns.tmst}` < ${beginTime}"
+          }
+          val olderAliasStep = SparkSqlStep(olderAliasTableName, 
olderAliasSql, emptyMap)
+
+          // 5. join with older data
+          val joinedTableName = "__joined"
+          val selfSelClause = (aliases :+ dupColName).map { alias =>
+            s"`${selfGroupTableName}`.`${alias}`"
+          }.mkString(", ")
+          val onClause = aliases.map { alias =>
+            s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = 
coalesce(`${olderAliasTableName}`.`${alias}`, '')"
+          }.mkString(" AND ")
+          val olderIsNull = aliases.map { alias =>
+            s"`${olderAliasTableName}`.`${alias}` IS NULL"
+          }.mkString(" AND ")
+          val joinedSql = {
+            s"""
+               |SELECT ${selfSelClause}, (${olderIsNull}) AS 
`${InternalColumns.distinct}`
+               |FROM `${olderAliasTableName}` RIGHT JOIN 
`${selfGroupTableName}`
+               |ON ${onClause}
+            """.stripMargin
+          }
+          val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap)
+
+          // 6. group by joined data
+          val groupTableName = "__group"
+          val moreDupColName = "_more_dup"
+          val groupSql = {
+            s"""
+               |SELECT ${aliasesClause}, `${dupColName}`, 
`${InternalColumns.distinct}`,
+               |COUNT(*) AS `${moreDupColName}`
+               |FROM `${joinedTableName}`
+               |GROUP BY ${aliasesClause}, `${dupColName}`, 
`${InternalColumns.distinct}`
+             """.stripMargin
+          }
+          val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap)
+
+          // 7. final duplicate count
+          val finalDupCountTableName = "__finalDupCount"
+          val finalDupCountSql = {
+            s"""
+               |SELECT ${aliasesClause}, `${InternalColumns.distinct}`,
+               |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}`
+               |ELSE (`${dupColName}` + 1) END AS `${dupColName}`,
+               |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}`
+               |ELSE (`${dupColName}` + `${moreDupColName}`) END AS 
`${accuDupColName}`
+               |FROM `${groupTableName}`
+             """.stripMargin
+          }
+          val finalDupCountStep = SparkSqlStep(finalDupCountTableName, 
finalDupCountSql, emptyMap, true)
+
+          val rulePlan = RulePlan(olderAliasStep :: joinedStep :: groupStep :: 
finalDupCountStep :: Nil, Nil)
+          (rulePlan, finalDupCountTableName)
+        }
+        case _ => {
+          (emptyRulePlan, selfGroupTableName)
+        }
+      }
+
+      // 8. distinct metric
+      val distTableName = "__distMetric"
+      val distColName = details.getStringOrKey(_distinct)
+      val distSql = {
+        s"""
+           |SELECT COUNT(*) AS `${distColName}`
+           |FROM `${dupCountTableName}` WHERE `${InternalColumns.distinct}`
+         """.stripMargin
+      }
+      val distStep = SparkSqlStep(distTableName, distSql, emptyMap)
+      val distMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
+      val distMetricExport = genMetricExport(distMetricParam, distColName, 
distTableName, beginTime, mode)
+
+      val distMetricRulePlan = RulePlan(distStep :: Nil, distMetricExport :: 
Nil)
+
+      val duplicationArrayName = details.getString(_duplicationArray, "")
+      val dupRulePlan = if (duplicationArrayName.nonEmpty) {
+        // 9. duplicate record
+        val dupRecordTableName = "__dupRecords"
+        val dupRecordSelClause = procType match {
+          case StreamingProcessType if (withOlderTable) => s"${aliasesClause}, 
`${dupColName}`, `${accuDupColName}`"
+          case _ => s"${aliasesClause}, `${dupColName}`"
+        }
+        val dupRecordSql = {
+          s"""
+             |SELECT ${dupRecordSelClause}
+             |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
+           """.stripMargin
+        }
+        val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
+        val dupRecordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+        val dupRecordExport = genRecordExport(dupRecordParam, 
dupRecordTableName, dupRecordTableName, beginTime, mode)
+
+        // 10. duplicate metric
+        val dupMetricTableName = "__dupMetric"
+        val numColName = details.getStringOrKey(_num)
+        val dupMetricSql = {
+          s"""
+             |SELECT `${dupColName}`, COUNT(*) AS `${numColName}`
+             |FROM `${dupRecordTableName}` GROUP BY `${dupColName}`
+         """.stripMargin
+        }
+        val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
+        val dupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, beginTime, mode)
+
+        RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: 
dupMetricExport :: Nil)
+      } else emptyRulePlan
+
+      
selfDistRulePlan.merge(distRulePlan).merge(distMetricRulePlan).merge(dupRulePlan)
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala
new file mode 100644
index 0000000..772163e
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala
@@ -0,0 +1,37 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.trans
+
+import org.apache.griffin.measure.rule.plan._
+import org.apache.griffin.measure.utils.ParamUtil._
+
+object DsUpdateFactory {
+
+  def genDsUpdate(param: Map[String, Any], defDsName: String,
+                  stepName: String): DsUpdate = {
+    DsUpdate(UpdateParamKeys.getName(param, defDsName), stepName)
+  }
+
+}
+
+object UpdateParamKeys {
+  val _name = "name"
+
+  def getName(param: Map[String, Any], defName: String): String = 
param.getString(_name, defName)
+}
\ No newline at end of file

Reply via email to