http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
deleted file mode 100644
index 3ed6839..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.trans
-
-import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange}
-import org.apache.griffin.measure.process._
-import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
-import org.apache.griffin.measure.rule.adaptor._
-import org.apache.griffin.measure.rule.dsl.{ArrayCollectType, 
EntriesCollectType}
-import org.apache.griffin.measure.rule.dsl.analyzer.DistinctnessAnalyzer
-import org.apache.griffin.measure.rule.dsl.expr._
-import org.apache.griffin.measure.rule.plan._
-import org.apache.griffin.measure.rule.trans.RuleExportFactory._
-import org.apache.griffin.measure.rule.trans.DsUpdateFactory._
-import org.apache.griffin.measure.utils.ParamUtil._
-
-import scala.util.Try
-
-case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String],
-                                     timeInfo: TimeInfo, name: String, expr: 
Expr,
-                                     param: Map[String, Any], procType: 
ProcessType,
-                                     dsTimeRanges: Map[String, TimeRange]
-                                    ) extends RulePlanTrans {
-
-  private object DistinctnessKeys {
-    val _source = "source"
-    val _target = "target"
-    val _distinct = "distinct"
-    val _total = "total"
-    val _dup = "dup"
-    val _accu_dup = "accu_dup"
-    val _num = "num"
-
-    val _duplicationArray = "duplication.array"
-    val _withAccumulate = "with.accumulate"
-
-    val _recordEnable = "record.enable"
-  }
-  import DistinctnessKeys._
-
-  def trans(): Try[RulePlan] = Try {
-    val details = getDetails(param)
-    val sourceName = details.getString(_source, dataSourceNames.head)
-    val targetName = details.getString(_target, dataSourceNames.tail.head)
-    val analyzer = DistinctnessAnalyzer(expr.asInstanceOf[DistinctnessClause], 
sourceName)
-
-    val mode = SimpleMode
-
-    val ct = timeInfo.calcTime
-
-    val beginTmst = dsTimeRanges.get(sourceName).map(_.begin) match {
-      case Some(t) => t
-      case _ => throw new Exception(s"empty begin tmst from ${sourceName}")
-    }
-    val endTmst = dsTimeRanges.get(sourceName).map(_.end) match {
-      case Some(t) => t
-      case _ => throw new Exception(s"empty end tmst from ${sourceName}")
-    }
-
-    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      println(s"[${ct}] data source ${sourceName} not exists")
-      emptyRulePlan
-    } else {
-      val withOlderTable = {
-        details.getBoolean(_withAccumulate, true) &&
-          TableRegisters.existRunTempTable(timeInfo.key, targetName)
-      }
-
-      val selClause = analyzer.selectionPairs.map { pair =>
-        val (expr, alias, _) = pair
-        s"${expr.desc} AS `${alias}`"
-      }.mkString(", ")
-      val distAliases = analyzer.selectionPairs.filter(_._3).map(_._2)
-      val distAliasesClause = distAliases.map( a => s"`${a}`" ).mkString(", ")
-      val allAliases = analyzer.selectionPairs.map(_._2)
-      val allAliasesClause = allAliases.map( a => s"`${a}`" ).mkString(", ")
-      val groupAliases = analyzer.selectionPairs.filter(!_._3).map(_._2)
-      val groupAliasesClause = groupAliases.map( a => s"`${a}`" ).mkString(", 
")
-
-      // 1. source alias
-      val sourceAliasTableName = "__sourceAlias"
-      val sourceAliasSql = {
-        s"SELECT ${selClause} FROM `${sourceName}`"
-      }
-      val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, 
emptyMap, true)
-
-      // 2. total metric
-      val totalTableName = "__totalMetric"
-      val totalColName = details.getStringOrKey(_total)
-      val totalSql = {
-        s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
-      }
-      val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap)
-      val totalMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName, endTmst, mode)
-
-      // 3. group by self
-      val selfGroupTableName = "__selfGroup"
-      val dupColName = details.getStringOrKey(_dup)
-      val accuDupColName = details.getStringOrKey(_accu_dup)
-      val selfGroupSql = {
-        s"""
-           |SELECT ${distAliasesClause}, (COUNT(*) - 1) AS `${dupColName}`,
-           |TRUE AS `${InternalColumns.distinct}`
-           |FROM `${sourceAliasTableName}` GROUP BY ${distAliasesClause}
-          """.stripMargin
-      }
-      val selfGroupStep = SparkSqlStep(selfGroupTableName, selfGroupSql, 
emptyMap, true)
-
-      val selfDistRulePlan = RulePlan(
-        sourceAliasStep :: totalStep :: selfGroupStep :: Nil,
-        totalMetricExport :: Nil
-      )
-
-      val (distRulePlan, dupCountTableName) = procType match {
-        case StreamingProcessType if (withOlderTable) => {
-          // 4.0 update old data
-//          val updateOldTableName = "__updateOld"
-//          val updateOldSql = {
-//            s"SELECT * FROM `${targetName}`"
-//          }
-          val updateParam = emptyMap
-          val targetDsUpdate = genDsUpdate(updateParam, targetName, targetName)
-
-          // 4. older alias
-          val olderAliasTableName = "__older"
-          val olderAliasSql = {
-            s"SELECT ${selClause} FROM `${targetName}` WHERE 
`${InternalColumns.tmst}` <= ${beginTmst}"
-          }
-          val olderAliasStep = SparkSqlStep(olderAliasTableName, 
olderAliasSql, emptyMap)
-
-          // 5. join with older data
-          val joinedTableName = "__joined"
-          val selfSelClause = (distAliases :+ dupColName).map { alias =>
-            s"`${selfGroupTableName}`.`${alias}`"
-          }.mkString(", ")
-          val onClause = distAliases.map { alias =>
-            s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = 
coalesce(`${olderAliasTableName}`.`${alias}`, '')"
-          }.mkString(" AND ")
-          val olderIsNull = distAliases.map { alias =>
-            s"`${olderAliasTableName}`.`${alias}` IS NULL"
-          }.mkString(" AND ")
-          val joinedSql = {
-            s"""
-               |SELECT ${selfSelClause}, (${olderIsNull}) AS 
`${InternalColumns.distinct}`
-               |FROM `${olderAliasTableName}` RIGHT JOIN 
`${selfGroupTableName}`
-               |ON ${onClause}
-            """.stripMargin
-          }
-          val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap)
-
-          // 6. group by joined data
-          val groupTableName = "__group"
-          val moreDupColName = "_more_dup"
-          val groupSql = {
-            s"""
-               |SELECT ${distAliasesClause}, `${dupColName}`, 
`${InternalColumns.distinct}`,
-               |COUNT(*) AS `${moreDupColName}`
-               |FROM `${joinedTableName}`
-               |GROUP BY ${distAliasesClause}, `${dupColName}`, 
`${InternalColumns.distinct}`
-             """.stripMargin
-          }
-          val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap)
-
-          // 7. final duplicate count
-          val finalDupCountTableName = "__finalDupCount"
-          // dupColName:      the duplicate count of duplicated items only 
occurs in new data,
-          //                  which means the distinct one in new data is also 
duplicate
-          // accuDupColName:  the count of duplicated items accumulated in new 
data and old data,
-          //                  which means the accumulated distinct count in 
all data
-          // e.g.:  new data [A, A, B, B, C, D], old data [A, A, B, C]
-          //        selfGroupTable will be (A, 1, F), (B, 1, F), (C, 0, T), 
(D, 0, T)
-          //        joinedTable will be (A, 1, F), (A, 1, F), (B, 1, F), (C, 
0, F), (D, 0, T)
-          //        groupTable will be (A, 1, F, 2), (B, 1, F, 1), (C, 0, F, 
1), (D, 0, T, 1)
-          //        finalDupCountTable will be (A, F, 2, 3), (B, F, 2, 2), (C, 
F, 1, 1), (D, T, 0, 0)
-          //        The distinct result of new data only should be: (A, 2), 
(B, 2), (C, 1), (D, 0),
-          //        which means in new data [A, A, B, B, C, D], [A, A, B, B, 
C] are all duplicated, only [D] is distinct
-          val finalDupCountSql = {
-            s"""
-               |SELECT ${distAliasesClause}, `${InternalColumns.distinct}`,
-               |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}`
-               |ELSE (`${dupColName}` + 1) END AS `${dupColName}`,
-               |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}`
-               |ELSE (`${dupColName}` + `${moreDupColName}`) END AS 
`${accuDupColName}`
-               |FROM `${groupTableName}`
-             """.stripMargin
-          }
-          val finalDupCountStep = SparkSqlStep(finalDupCountTableName, 
finalDupCountSql, emptyMap, true)
-
-          val rulePlan = RulePlan(
-            olderAliasStep :: joinedStep :: groupStep :: finalDupCountStep :: 
Nil,
-            Nil,
-            targetDsUpdate :: Nil
-          )
-          (rulePlan, finalDupCountTableName)
-        }
-        case _ => {
-          (emptyRulePlan, selfGroupTableName)
-        }
-      }
-
-      // 8. distinct metric
-      val distTableName = "__distMetric"
-      val distColName = details.getStringOrKey(_distinct)
-      val distSql = {
-        s"""
-           |SELECT COUNT(*) AS `${distColName}`
-           |FROM `${dupCountTableName}` WHERE `${InternalColumns.distinct}`
-         """.stripMargin
-      }
-      val distStep = SparkSqlStep(distTableName, distSql, emptyMap)
-      val distMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val distMetricExport = genMetricExport(distMetricParam, distColName, 
distTableName, endTmst, mode)
-
-      val distMetricRulePlan = RulePlan(distStep :: Nil, distMetricExport :: 
Nil)
-
-      val duplicationArrayName = details.getString(_duplicationArray, "")
-      val dupRulePlan = if (duplicationArrayName.nonEmpty) {
-        val recordEnable = details.getBoolean(_recordEnable, false)
-        if (groupAliases.nonEmpty) {
-          // with some group by requirement
-          // 9. origin data join with distinct information
-          val informedTableName = "__informed"
-          val onClause = distAliases.map { alias =>
-            s"coalesce(`${sourceAliasTableName}`.`${alias}`, '') = 
coalesce(`${dupCountTableName}`.`${alias}`, '')"
-          }.mkString(" AND ")
-          val informedSql = {
-            s"""
-               |SELECT `${sourceAliasTableName}`.*,
-               |`${dupCountTableName}`.`${dupColName}` AS `${dupColName}`,
-               |`${dupCountTableName}`.`${InternalColumns.distinct}` AS 
`${InternalColumns.distinct}`
-               |FROM `${sourceAliasTableName}` LEFT JOIN `${dupCountTableName}`
-               |ON ${onClause}
-               """.stripMargin
-          }
-          val informedStep = SparkSqlStep(informedTableName, informedSql, 
emptyMap)
-
-          // 10. add row number
-          val rnTableName = "__rowNumber"
-          val rnDistClause = distAliasesClause
-          val rnSortClause = s"SORT BY `${InternalColumns.distinct}`"
-          val rnSql = {
-            s"""
-               |SELECT *,
-               |ROW_NUMBER() OVER (DISTRIBUTE BY ${rnDistClause} 
${rnSortClause}) `${InternalColumns.rowNumber}`
-               |FROM `${informedTableName}`
-               """.stripMargin
-          }
-          val rnStep = SparkSqlStep(rnTableName, rnSql, emptyMap)
-
-          // 11. recognize duplicate items
-          val dupItemsTableName = "__dupItems"
-          val dupItemsSql = {
-            s"""
-               |SELECT ${allAliasesClause}, `${dupColName}` FROM 
`${rnTableName}`
-               |WHERE NOT `${InternalColumns.distinct}` OR 
`${InternalColumns.rowNumber}` > 1
-               """.stripMargin
-          }
-          val dupItemsStep = SparkSqlStep(dupItemsTableName, dupItemsSql, 
emptyMap)
-          val dupItemsParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          val dupItemsExport = genRecordExport(dupItemsParam, 
dupItemsTableName, dupItemsTableName, endTmst, mode)
-
-          // 12. group by dup Record metric
-          val groupDupMetricTableName = "__groupDupMetric"
-          val numColName = details.getStringOrKey(_num)
-          val groupSelClause = groupAliasesClause
-          val groupDupMetricSql = {
-            s"""
-               |SELECT ${groupSelClause}, `${dupColName}`, COUNT(*) AS 
`${numColName}`
-               |FROM `${dupItemsTableName}` GROUP BY ${groupSelClause}, 
`${dupColName}`
-             """.stripMargin
-          }
-          val groupDupMetricStep = SparkSqlStep(groupDupMetricTableName, 
groupDupMetricSql, emptyMap)
-          val groupDupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-          val groupDupMetricExport = genMetricExport(groupDupMetricParam, 
duplicationArrayName, groupDupMetricTableName, endTmst, mode)
-
-          val exports = if (recordEnable) {
-            dupItemsExport :: groupDupMetricExport :: Nil
-          } else {
-            groupDupMetricExport :: Nil
-          }
-          RulePlan(
-            informedStep :: rnStep :: dupItemsStep :: groupDupMetricStep :: 
Nil,
-            exports
-          )
-
-        } else {
-          // no group by requirement
-          // 9. duplicate record
-          val dupRecordTableName = "__dupRecords"
-          val dupRecordSelClause = procType match {
-            case StreamingProcessType if (withOlderTable) => 
s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`"
-            case _ => s"${distAliasesClause}, `${dupColName}`"
-          }
-          val dupRecordSql = {
-            s"""
-               |SELECT ${dupRecordSelClause}
-               |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
-              """.stripMargin
-          }
-          val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
-          val dupRecordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          val dupRecordExport = genRecordExport(dupRecordParam, 
dupRecordTableName, dupRecordTableName, endTmst, mode)
-
-          // 10. duplicate metric
-          val dupMetricTableName = "__dupMetric"
-          val numColName = details.getStringOrKey(_num)
-          val dupMetricSql = {
-            s"""
-               |SELECT `${dupColName}`, COUNT(*) AS `${numColName}`
-               |FROM `${dupRecordTableName}` GROUP BY `${dupColName}`
-              """.stripMargin
-          }
-          val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
-          val dupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-          val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, endTmst, mode)
-
-          val exports = if (recordEnable) {
-            dupRecordExport :: dupMetricExport :: Nil
-          } else {
-            dupMetricExport :: Nil
-          }
-          RulePlan(dupRecordStep :: dupMetricStep :: Nil, exports)
-        }
-      } else emptyRulePlan
-
-      
selfDistRulePlan.merge(distRulePlan).merge(distMetricRulePlan).merge(dupRulePlan)
-
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala
deleted file mode 100644
index 772163e..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.trans
-
-import org.apache.griffin.measure.rule.plan._
-import org.apache.griffin.measure.utils.ParamUtil._
-
-object DsUpdateFactory {
-
-  def genDsUpdate(param: Map[String, Any], defDsName: String,
-                  stepName: String): DsUpdate = {
-    DsUpdate(UpdateParamKeys.getName(param, defDsName), stepName)
-  }
-
-}
-
-object UpdateParamKeys {
-  val _name = "name"
-
-  def getName(param: Map[String, Any], defName: String): String = 
param.getString(_name, defName)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala
deleted file mode 100644
index f80f3c1..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.trans
-
-import org.apache.griffin.measure.process.temp.TableRegisters
-import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, 
ProcessType, StreamingProcessType}
-import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
-import org.apache.griffin.measure.rule.adaptor._
-import org.apache.griffin.measure.rule.dsl.analyzer.ProfilingAnalyzer
-import org.apache.griffin.measure.rule.dsl.expr._
-import org.apache.griffin.measure.rule.plan._
-import org.apache.griffin.measure.rule.trans.RuleExportFactory._
-import org.apache.griffin.measure.utils.ParamUtil._
-
-import scala.util.Try
-
-case class ProfilingRulePlanTrans(dataSourceNames: Seq[String],
-                                  timeInfo: TimeInfo, name: String, expr: Expr,
-                                  param: Map[String, Any], procType: 
ProcessType
-                                 ) extends RulePlanTrans {
-
-  private object ProfilingKeys {
-    val _source = "source"
-  }
-  import ProfilingKeys._
-
-  def trans(): Try[RulePlan] = Try {
-    val details = getDetails(param)
-    val profilingClause = expr.asInstanceOf[ProfilingClause]
-    val sourceName = profilingClause.fromClauseOpt match {
-      case Some(fc) => fc.dataSource
-      case _ => details.getString(_source, dataSourceNames.head)
-    }
-    val fromClause = 
profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
-
-    val mode = ExportMode.defaultMode(procType)
-
-    val ct = timeInfo.calcTime
-
-    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      emptyRulePlan
-    } else {
-      val analyzer = ProfilingAnalyzer(profilingClause, sourceName)
-      val selExprDescs = analyzer.selectionExprs.map { sel =>
-        val alias = sel match {
-          case s: AliasableExpr if (s.alias.nonEmpty) => s" AS 
`${s.alias.get}`"
-          case _ => ""
-        }
-        s"${sel.desc}${alias}"
-      }
-      val selCondition = 
profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString
-      val selClause = procType match {
-        case BatchProcessType => selExprDescs.mkString(", ")
-        case StreamingProcessType => (s"`${InternalColumns.tmst}`" +: 
selExprDescs).mkString(", ")
-      }
-      val groupByClauseOpt = analyzer.groupbyExprOpt
-      val groupbyClause = procType match {
-        case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("")
-        case StreamingProcessType => {
-          val tmstGroupbyClause = 
GroupbyClause(LiteralStringExpr(s"`${InternalColumns.tmst}`") :: Nil, None)
-          val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt 
match {
-            case Some(gbc) => gbc
-            case _ => GroupbyClause(Nil, None)
-          })
-          mergedGroubbyClause.desc
-        }
-      }
-      val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ")
-      val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" 
")
-
-      // 1. select statement
-      val profilingSql = {
-        s"SELECT ${selCondition} ${selClause} ${fromClause} 
${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
-      }
-      val profilingName = name
-      val profilingStep = SparkSqlStep(profilingName, profilingSql, details)
-      val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-      val profilingExports = genMetricExport(metricParam, name, profilingName, 
ct, mode) :: Nil
-
-      RulePlan(profilingStep :: Nil, profilingExports)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala
deleted file mode 100644
index 915e654..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.trans
-
-import org.apache.griffin.measure.process.ExportMode
-import org.apache.griffin.measure.rule.dsl.CollectType
-import org.apache.griffin.measure.rule.plan._
-
-import org.apache.griffin.measure.utils.ParamUtil._
-
-object RuleExportFactory {
-
-  def genMetricExport(param: Map[String, Any], name: String, stepName: String,
-                      defTimestamp: Long, mode: ExportMode
-                     ): MetricExport = {
-    MetricExport(
-      ExportParamKeys.getName(param, name),
-      stepName,
-      ExportParamKeys.getCollectType(param),
-      defTimestamp,
-      mode
-    )
-  }
-  def genRecordExport(param: Map[String, Any], name: String, stepName: String,
-                      defTimestamp: Long, mode: ExportMode
-                     ): RecordExport = {
-    RecordExport(
-      ExportParamKeys.getName(param, name),
-      stepName,
-      ExportParamKeys.getDataSourceCacheOpt(param),
-      ExportParamKeys.getOriginDFOpt(param),
-      defTimestamp,
-      mode
-    )
-  }
-
-}
-
-object ExportParamKeys {
-  val _name = "name"
-  val _collectType = "collect.type"
-  val _dataSourceCache = "data.source.cache"
-  val _originDF = "origin.DF"
-
-  def getName(param: Map[String, Any], defName: String): String = 
param.getString(_name, defName)
-  def getCollectType(param: Map[String, Any]): CollectType = 
CollectType(param.getString(_collectType, ""))
-  def getDataSourceCacheOpt(param: Map[String, Any]): Option[String] = 
param.get(_dataSourceCache).map(_.toString)
-  def getOriginDFOpt(param: Map[String, Any]): Option[String] = 
param.get(_originDF).map(_.toString)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
deleted file mode 100644
index ba9565f..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.trans
-
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.process.ProcessType
-import org.apache.griffin.measure.process.temp.TimeRange
-import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.dsl.expr.Expr
-import org.apache.griffin.measure.rule.plan._
-
-import scala.util.Try
-
-trait RulePlanTrans extends Loggable with Serializable {
-
-  protected val emptyRulePlan = RulePlan(Nil, Nil)
-  protected val emptyMap = Map[String, Any]()
-
-  def trans(): Try[RulePlan]
-
-}
-
-object RulePlanTrans {
-  private val emptyRulePlanTrans = new RulePlanTrans {
-    def trans(): Try[RulePlan] = Try(emptyRulePlan)
-  }
-
-  def apply(dqType: DqType,
-            dsNames: Seq[String],
-            ti: TimeInfo, name: String, expr: Expr,
-            param: Map[String, Any], procType: ProcessType,
-            dsTimeRanges: Map[String, TimeRange]
-           ): RulePlanTrans = {
-    dqType match {
-      case AccuracyType => AccuracyRulePlanTrans(dsNames, ti, name, expr, 
param, procType)
-      case ProfilingType => ProfilingRulePlanTrans(dsNames, ti, name, expr, 
param, procType)
-      case UniquenessType => UniquenessRulePlanTrans(dsNames, ti, name, expr, 
param, procType)
-      case DistinctnessType => DistinctnessRulePlanTrans(dsNames, ti, name, 
expr, param, procType, dsTimeRanges)
-      case TimelinessType => TimelinessRulePlanTrans(dsNames, ti, name, expr, 
param, procType, dsTimeRanges)
-      case CompletenessType => CompletenessRulePlanTrans(dsNames, ti, name, 
expr, param, procType)
-      case _ => emptyRulePlanTrans
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
deleted file mode 100644
index deee65b..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.trans
-
-import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange}
-import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, 
ProcessType, StreamingProcessType}
-import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
-import org.apache.griffin.measure.rule.adaptor._
-import org.apache.griffin.measure.rule.dsl.ArrayCollectType
-import org.apache.griffin.measure.rule.dsl.analyzer.TimelinessAnalyzer
-import org.apache.griffin.measure.rule.dsl.expr._
-import org.apache.griffin.measure.rule.plan._
-import org.apache.griffin.measure.rule.trans.RuleExportFactory._
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.TimeUtil
-
-import scala.util.Try
-
-case class TimelinessRulePlanTrans(dataSourceNames: Seq[String],
-                                   timeInfo: TimeInfo, name: String, expr: 
Expr,
-                                   param: Map[String, Any], procType: 
ProcessType,
-                                   dsTimeRanges: Map[String, TimeRange]
-                                  ) extends RulePlanTrans {
-
-  private object TimelinessKeys {
-    val _source = "source"
-    val _latency = "latency"
-    val _total = "total"
-    val _avg = "avg"
-    val _threshold = "threshold"
-    val _step = "step"
-    val _count = "count"
-    val _stepSize = "step.size"
-    val _percentileColPrefix = "percentile"
-    val _percentileValues = "percentile.values"
-  }
-  import TimelinessKeys._
-
-  def trans(): Try[RulePlan] =  Try {
-    val details = getDetails(param)
-    val timelinessClause = expr.asInstanceOf[TimelinessClause]
-    val sourceName = details.getString(_source, dataSourceNames.head)
-
-    val mode = ExportMode.defaultMode(procType)
-
-//    val ct = timeInfo.calcTime
-
-    val minTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.minTmstOpt)
-    val minTmst = minTmstOpt match {
-      case Some(t) => t
-      case _ => throw new Exception(s"empty min tmst from ${sourceName}")
-    }
-
-    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      emptyRulePlan
-    } else {
-      val analyzer = TimelinessAnalyzer(timelinessClause, sourceName)
-      val btsSel = analyzer.btsExpr
-      val etsSelOpt = analyzer.etsExprOpt
-
-      // 1. in time
-      val inTimeTableName = "__inTime"
-      val inTimeSql = etsSelOpt match {
-        case Some(etsSel) => {
-          s"""
-             |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`,
-             |(${etsSel}) AS `${InternalColumns.endTs}`
-             |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) 
IS NOT NULL
-           """.stripMargin
-        }
-        case _ => {
-          s"""
-             |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`
-             |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL
-           """.stripMargin
-        }
-      }
-      val inTimeStep = SparkSqlStep(inTimeTableName, inTimeSql, emptyMap)
-
-      // 2. latency
-      val latencyTableName = "__lat"
-      val latencyColName = details.getStringOrKey(_latency)
-      val etsColName = etsSelOpt match {
-        case Some(_) => InternalColumns.endTs
-        case _ => InternalColumns.tmst
-      }
-      val latencySql = {
-        s"SELECT *, (`${etsColName}` - `${InternalColumns.beginTs}`) AS 
`${latencyColName}` FROM `${inTimeTableName}`"
-      }
-      val latencyStep = SparkSqlStep(latencyTableName, latencySql, emptyMap, 
true)
-
-      // 3. timeliness metric
-      val metricTableName = name
-      val totalColName = details.getStringOrKey(_total)
-      val avgColName = details.getStringOrKey(_avg)
-      val metricSql = procType match {
-        case BatchProcessType => {
-          s"""
-             |SELECT COUNT(*) AS `${totalColName}`,
-             |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
-             |FROM `${latencyTableName}`
-           """.stripMargin
-        }
-        case StreamingProcessType => {
-          s"""
-             |SELECT `${InternalColumns.tmst}`,
-             |COUNT(*) AS `${totalColName}`,
-             |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
-             |FROM `${latencyTableName}`
-             |GROUP BY `${InternalColumns.tmst}`
-           """.stripMargin
-        }
-      }
-      val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap)
-      val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-      val metricExports = genMetricExport(metricParam, name, metricTableName, 
minTmst, mode) :: Nil
-
-      // current timeliness plan
-      val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil
-      val timeExports = metricExports
-      val timePlan = RulePlan(timeSteps, timeExports)
-
-      // 4. timeliness record
-      val recordPlan = TimeUtil.milliseconds(details.getString(_threshold, 
"")) match {
-        case Some(tsh) => {
-          val recordTableName = "__lateRecords"
-          val recordSql = {
-            s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > 
${tsh}"
-          }
-          val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap)
-          val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          val recordExports = genRecordExport(recordParam, recordTableName, 
recordTableName, minTmst, mode) :: Nil
-          RulePlan(recordStep :: Nil, recordExports)
-        }
-        case _ => emptyRulePlan
-      }
-
-      // 5. ranges
-      val rangePlan = TimeUtil.milliseconds(details.getString(_stepSize, "")) 
match {
-        case Some(stepSize) => {
-          // 5.1 range
-          val rangeTableName = "__range"
-          val stepColName = details.getStringOrKey(_step)
-          val rangeSql = {
-            s"""
-               |SELECT *, CAST((`${latencyColName}` / ${stepSize}) AS BIGINT) 
AS `${stepColName}`
-               |FROM `${latencyTableName}`
-             """.stripMargin
-          }
-          val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap)
-
-          // 5.2 range metric
-          val rangeMetricTableName = "__rangeMetric"
-          val countColName = details.getStringOrKey(_count)
-          val rangeMetricSql = procType match {
-            case BatchProcessType => {
-              s"""
-                 |SELECT `${stepColName}`, COUNT(*) AS `${countColName}`
-                 |FROM `${rangeTableName}` GROUP BY `${stepColName}`
-                """.stripMargin
-            }
-            case StreamingProcessType => {
-              s"""
-                 |SELECT `${InternalColumns.tmst}`, `${stepColName}`, COUNT(*) 
AS `${countColName}`
-                 |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, 
`${stepColName}`
-                """.stripMargin
-            }
-          }
-          val rangeMetricStep = SparkSqlStep(rangeMetricTableName, 
rangeMetricSql, emptyMap)
-          val rangeMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-          val rangeMetricExports = genMetricExport(rangeMetricParam, 
stepColName, rangeMetricTableName, minTmst, mode) :: Nil
-
-          RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports)
-        }
-        case _ => emptyRulePlan
-      }
-
-      // 6. percentiles
-      val percentiles = getPercentiles(details)
-      val percentilePlan = if (percentiles.nonEmpty) {
-        val percentileTableName = "__percentile"
-        val percentileColName = details.getStringOrKey(_percentileColPrefix)
-        val percentileCols = percentiles.map { pct =>
-          val pctName = (pct * 100).toInt.toString
-          s"floor(percentile_approx(${latencyColName}, ${pct})) AS 
`${percentileColName}_${pctName}`"
-        }.mkString(", ")
-        val percentileSql = {
-          s"""
-             |SELECT ${percentileCols}
-             |FROM `${latencyTableName}`
-            """.stripMargin
-        }
-        val percentileStep = SparkSqlStep(percentileTableName, percentileSql, 
emptyMap)
-        val percentileParam = emptyMap
-        val percentileExports = genMetricExport(percentileParam, 
percentileTableName, percentileTableName, minTmst, mode) :: Nil
-
-        RulePlan(percentileStep :: Nil, percentileExports)
-      } else emptyRulePlan
-
-      timePlan.merge(recordPlan).merge(rangePlan).merge(percentilePlan)
-    }
-  }
-
-  private def getPercentiles(details: Map[String, Any]): Seq[Double] = {
-    details.getArr[Double](_percentileValues).filter(d => (d >= 0 && d <= 1))
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala
deleted file mode 100644
index baa5572..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.trans
-
-import org.apache.griffin.measure.process._
-import org.apache.griffin.measure.process.temp._
-import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
-import org.apache.griffin.measure.rule.adaptor._
-import org.apache.griffin.measure.rule.dsl.analyzer.UniquenessAnalyzer
-import org.apache.griffin.measure.rule.dsl.expr._
-import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.plan._
-import org.apache.griffin.measure.rule.trans.RuleExportFactory._
-import org.apache.griffin.measure.utils.ParamUtil._
-
-import scala.util.Try
-
-case class UniquenessRulePlanTrans(dataSourceNames: Seq[String],
-                                   timeInfo: TimeInfo, name: String, expr: 
Expr,
-                                   param: Map[String, Any], procType: 
ProcessType
-                                  ) extends RulePlanTrans {
-
-  private object UniquenessKeys {
-    val _source = "source"
-    val _target = "target"
-    val _unique = "unique"
-    val _total = "total"
-    val _dup = "dup"
-    val _num = "num"
-
-    val _duplicationArray = "duplication.array"
-  }
-  import UniquenessKeys._
-
-  def trans(): Try[RulePlan] = Try {
-    val details = getDetails(param)
-    val sourceName = details.getString(_source, dataSourceNames.head)
-    val targetName = details.getString(_target, dataSourceNames.tail.head)
-    val analyzer = UniquenessAnalyzer(expr.asInstanceOf[UniquenessClause], 
sourceName, targetName)
-
-    val mode = ExportMode.defaultMode(procType)
-
-    val ct = timeInfo.calcTime
-
-    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      println(s"[${ct}] data source ${sourceName} not exists")
-      emptyRulePlan
-    } else if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) {
-      println(s"[${ct}] data source ${targetName} not exists")
-      emptyRulePlan
-    } else {
-      val selItemsClause = analyzer.selectionPairs.map { pair =>
-        val (expr, alias) = pair
-        s"${expr.desc} AS `${alias}`"
-      }.mkString(", ")
-      val aliases = analyzer.selectionPairs.map(_._2)
-
-      val selClause = procType match {
-        case BatchProcessType => selItemsClause
-        case StreamingProcessType => s"`${InternalColumns.tmst}`, 
${selItemsClause}"
-      }
-      val selAliases = procType match {
-        case BatchProcessType => aliases
-        case StreamingProcessType => InternalColumns.tmst +: aliases
-      }
-
-      // 1. source distinct mapping
-      val sourceTableName = "__source"
-      val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}"
-      val sourceStep = SparkSqlStep(sourceTableName, sourceSql, emptyMap)
-
-      // 2. target mapping
-      val targetTableName = "__target"
-      val targetSql = s"SELECT ${selClause} FROM ${targetName}"
-      val targetStep = SparkSqlStep(targetTableName, targetSql, emptyMap)
-
-      // 3. joined
-      val joinedTableName = "__joined"
-      val joinedSelClause = selAliases.map { alias =>
-        s"`${sourceTableName}`.`${alias}` AS `${alias}`"
-      }.mkString(", ")
-      val onClause = aliases.map { alias =>
-        s"coalesce(`${sourceTableName}`.`${alias}`, '') = 
coalesce(`${targetTableName}`.`${alias}`, '')"
-      }.mkString(" AND ")
-      val joinedSql = {
-        s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN 
`${sourceTableName}` ON ${onClause}"
-      }
-      val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap)
-
-      // 4. group
-      val groupTableName = "__group"
-      val groupSelClause = selAliases.map { alias =>
-        s"`${alias}`"
-      }.mkString(", ")
-      val dupColName = details.getStringOrKey(_dup)
-      val groupSql = {
-        s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM 
`${joinedTableName}` GROUP BY ${groupSelClause}"
-      }
-      val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap, true)
-
-      // 5. total metric
-      val totalTableName = "__totalMetric"
-      val totalColName = details.getStringOrKey(_total)
-      val totalSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
-        case StreamingProcessType => {
-          s"""
-             |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}`
-             |FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`
-           """.stripMargin
-        }
-      }
-      val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap)
-      val totalMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName, ct, mode)
-
-      // 6. unique record
-      val uniqueRecordTableName = "__uniqueRecord"
-      val uniqueRecordSql = {
-        s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0"
-      }
-      val uniqueRecordStep = SparkSqlStep(uniqueRecordTableName, 
uniqueRecordSql, emptyMap)
-
-      // 7. unique metric
-      val uniqueTableName = "__uniqueMetric"
-      val uniqueColName = details.getStringOrKey(_unique)
-      val uniqueSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM 
`${uniqueRecordTableName}`"
-        case StreamingProcessType => {
-          s"""
-             |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${uniqueColName}`
-             |FROM `${uniqueRecordTableName}` GROUP BY 
`${InternalColumns.tmst}`
-           """.stripMargin
-        }
-      }
-      val uniqueStep = SparkSqlStep(uniqueTableName, uniqueSql, emptyMap)
-      val uniqueMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val uniqueMetricExport = genMetricExport(uniqueMetricParam, 
uniqueColName, uniqueTableName, ct, mode)
-
-      val uniqueSteps = sourceStep :: targetStep :: joinedStep :: groupStep ::
-        totalStep :: uniqueRecordStep :: uniqueStep :: Nil
-      val uniqueExports = totalMetricExport :: uniqueMetricExport :: Nil
-      val uniqueRulePlan = RulePlan(uniqueSteps, uniqueExports)
-
-      val duplicationArrayName = details.getString(_duplicationArray, "")
-      val dupRulePlan = if (duplicationArrayName.nonEmpty) {
-        // 8. duplicate record
-        val dupRecordTableName = "__dupRecords"
-        val dupRecordSql = {
-          s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0"
-        }
-        val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
-        val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-        val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, 
dupRecordTableName, ct, mode)
-
-        // 9. duplicate metric
-        val dupMetricTableName = "__dupMetric"
-        val numColName = details.getStringOrKey(_num)
-        val dupMetricSelClause = procType match {
-          case BatchProcessType => s"`${dupColName}`, COUNT(*) AS 
`${numColName}`"
-          case StreamingProcessType => s"`${InternalColumns.tmst}`, 
`${dupColName}`, COUNT(*) AS `${numColName}`"
-        }
-        val dupMetricGroupbyClause = procType match {
-          case BatchProcessType => s"`${dupColName}`"
-          case StreamingProcessType => s"`${InternalColumns.tmst}`, 
`${dupColName}`"
-        }
-        val dupMetricSql = {
-          s"""
-             |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}`
-             |GROUP BY ${dupMetricGroupbyClause}
-          """.stripMargin
-        }
-        val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
-        val dupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, ct, mode)
-
-        RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: 
dupMetricExport :: Nil)
-      } else emptyRulePlan
-
-      uniqueRulePlan.merge(dupRulePlan)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala
deleted file mode 100644
index cb00641..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.udf
-
-import org.apache.spark.sql.SQLContext
-
-object GriffinUdafs {
-
-  def register(sqlContext: SQLContext): Unit = {
-//    sqlContext.udf.register("my_mean", new MeanUdaf)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala
deleted file mode 100644
index 1d9eb8b..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.udf
-
-import org.apache.spark.sql.SQLContext
-
-object GriffinUdfs {
-
-  def register(sqlContext: SQLContext): Unit = {
-    sqlContext.udf.register("index_of", indexOf _)
-    sqlContext.udf.register("matches", matches _)
-    sqlContext.udf.register("reg_replace", regReplace _)
-  }
-
-  private def indexOf(arr: Seq[String], v: String) = {
-    arr.indexOf(v)
-  }
-
-  private def matches(s: String, regex: String) = {
-    s.matches(regex)
-  }
-
-  private def regReplace(s: String, regex: String, replacement: String) = {
-    s.replaceAll(regex, replacement)
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala
deleted file mode 100644
index 80b3a02..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.udf
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
-import org.apache.spark.sql.types._
-
-class MeanUdaf extends UserDefinedAggregateFunction {
-  def inputSchema: StructType = StructType(Array(StructField("item", 
LongType)))
-
-  def bufferSchema = StructType(Array(
-    StructField("sum", DoubleType),
-    StructField("cnt", LongType)
-  ))
-
-  def dataType: DataType = DoubleType
-
-  def deterministic: Boolean = true
-
-  def initialize(buffer: MutableAggregationBuffer): Unit = {
-    buffer.update(0, 0.toDouble)
-    buffer.update(1, 0L)
-  }
-
-  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
-    val sum = buffer.getDouble(0)
-    val cnt = buffer.getLong(1)
-    val value = input.getLong(0)
-    buffer.update(0, sum + value)
-    buffer.update(1, cnt + 1)
-  }
-
-  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
-    buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0))
-    buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))
-  }
-
-  def evaluate(buffer: Row): Any = {
-    buffer.getDouble(0) / buffer.getLong(1).toDouble
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
new file mode 100644
index 0000000..8a34118
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.DQContext
+
+trait DQStep extends Loggable {
+
+  val name: String
+
+  def execute(context: DQContext): Boolean
+
+  def getNames(): Seq[String] = name :: Nil
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala
new file mode 100644
index 0000000..b827604
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala
@@ -0,0 +1,44 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step
+
+import org.apache.griffin.measure.context.DQContext
+
+/**
+  * sequence of dq steps
+  */
+case class SeqDQStep(dqSteps: Seq[DQStep]) extends DQStep {
+
+  val name: String = ""
+  val rule: String = ""
+  val details: Map[String, Any] = Map()
+
+  def execute(context: DQContext): Boolean = {
+    dqSteps.foldLeft(true) { (ret, dqStep) =>
+      ret && dqStep.execute(context)
+    }
+  }
+
+  override def getNames(): Seq[String] = {
+    dqSteps.foldLeft(Nil: Seq[String]) { (ret, dqStep) =>
+      ret ++ dqStep.getNames
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/BatchDataSourceStepBuilder.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/BatchDataSourceStepBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/BatchDataSourceStepBuilder.scala
new file mode 100644
index 0000000..2d739c4
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/BatchDataSourceStepBuilder.scala
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder
+
+import org.apache.griffin.measure.configuration.params._
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.read.ReadStep
+
+case class BatchDataSourceStepBuilder() extends DataSourceParamStepBuilder {
+
+  def buildReadSteps(context: DQContext, dcParam: DataConnectorParam): 
Option[ReadStep] = {
+    None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/ConstantColumns.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/ConstantColumns.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/ConstantColumns.scala
new file mode 100644
index 0000000..e8bec89
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/ConstantColumns.scala
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder
+
+/**
+  * for griffin dsl rules, the constant columns might be used during 
calculation,
+  */
+object ConstantColumns {
+  val tmst = "__tmst"
+  val metric = "__metric"
+  val record = "__record"
+  val empty = "__empty"
+
+  val beginTs = "__begin_ts"
+  val endTs = "__end_ts"
+
+  val distinct = "__distinct"
+
+  val rowNumber = "__rn"
+
+  val columns = List[String](tmst, metric, record, empty, beginTs, endTs, 
distinct, rowNumber)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
new file mode 100644
index 0000000..f5b69ad
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder
+
+import org.apache.commons.lang.StringUtils
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.params.{DataSourceParam, 
Param, RuleParam}
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step._
+
+/**
+  * build dq step by param
+  */
+trait DQStepBuilder extends Loggable with Serializable {
+
+  type ParamType <: Param
+
+  def buildDQStep(context: DQContext, param: ParamType): Option[DQStep]
+
+  protected def getStepName(name: String): String = {
+    if (StringUtils.isNotBlank(name)) name
+    else DQStepNameGenerator.genName
+  }
+
+}
+
+object DQStepBuilder {
+
+  def buildStepOptByDataSourceParam(context: DQContext, dsParam: 
DataSourceParam
+                                   ): Option[DQStep] = {
+    getDataSourceParamStepBuilder(context.procType)
+      .flatMap(_.buildDQStep(context, dsParam))
+  }
+
+  private def getDataSourceParamStepBuilder(procType: ProcessType): 
Option[DataSourceParamStepBuilder] = {
+    procType match {
+      case BatchProcessType => Some(BatchDataSourceStepBuilder())
+      case StreamingProcessType => Some(StreamingDataSourceStepBuilder())
+      case _ => None
+    }
+  }
+
+  def buildStepOptByRuleParam(context: DQContext, ruleParam: RuleParam, 
defaultDslType: DslType
+                             ): Option[DQStep] = {
+    val dslType = ruleParam.getDslType(defaultDslType)
+    val dsNames = context.dataSourceNames
+    val funcNames = context.functionNames
+    val dqStepOpt = getRuleParamStepBuilder(dslType, dsNames, funcNames)
+      .flatMap(_.buildDQStep(context, ruleParam))
+    dqStepOpt.toSeq.flatMap(_.getNames).foreach(name => 
context.compileTableRegister.registerTable(name))
+    dqStepOpt
+  }
+
+  private def getRuleParamStepBuilder(dslType: DslType, dsNames: Seq[String], 
funcNames: Seq[String]
+                                     ): Option[RuleParamStepBuilder] = {
+    dslType match {
+      case SparkSqlType => Some(SparkSqlDQStepBuilder())
+      case DataFrameOpsType => Some(DataFrameOpsDQStepBuilder())
+      case GriffinDslType => Some(GriffinDslDQStepBuilder(dsNames, funcNames))
+      case _ => None
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepNameGenerator.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepNameGenerator.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepNameGenerator.scala
new file mode 100644
index 0000000..9af568a
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepNameGenerator.scala
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder
+
+import java.util.concurrent.atomic.AtomicLong
+
+object DQStepNameGenerator {
+  private val counter: AtomicLong = new AtomicLong(0L)
+  private val head: String = "step"
+
+  def genName: String = {
+    s"${head}${increment}"
+  }
+
+  private def increment: Long = {
+    counter.incrementAndGet()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
new file mode 100644
index 0000000..c3854f9
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder
+
+import org.apache.griffin.measure.configuration.params.RuleParam
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.transform.DataFrameOpsTransformStep
+
+case class DataFrameOpsDQStepBuilder() extends RuleParamStepBuilder {
+
+  def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
+    val name = getStepName(ruleParam.getName)
+    val transformStep = DataFrameOpsTransformStep(
+      name, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache)
+    transformStep +: buildDirectWriteSteps(ruleParam)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
new file mode 100644
index 0000000..b941211
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala
@@ -0,0 +1,44 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder
+
+import org.apache.griffin.measure.configuration.params.{DataConnectorParam, 
DataSourceParam}
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.read._
+
+/**
+  * build dq step by data source param
+  */
+trait DataSourceParamStepBuilder extends DQStepBuilder {
+
+  type ParamType = DataSourceParam
+
+  def buildDQStep(context: DQContext, param: ParamType): Option[DQStep] = {
+    val name = getStepName(param.name)
+    val steps = param.getConnectors.flatMap { dc =>
+      buildReadSteps(context, dc)
+    }
+    if (steps.nonEmpty) Some(UnionReadStep(name, steps))
+    else None
+  }
+
+  protected def buildReadSteps(context: DQContext, dcParam: 
DataConnectorParam): Option[ReadStep]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
new file mode 100644
index 0000000..7ce78d1
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder
+
+import org.apache.griffin.measure.configuration.params.RuleParam
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.builder.dsl.parser.GriffinDslParser
+import org.apache.griffin.measure.step.builder.dsl.transform.Expr2DQSteps
+
+import scala.util.{Failure, Success}
+
+case class GriffinDslDQStepBuilder(dataSourceNames: Seq[String],
+                                   functionNames: Seq[String]
+                                  ) extends RuleParamStepBuilder {
+
+  val filteredFunctionNames = functionNames.filter { fn =>
+    fn.matches("""^[a-zA-Z_]\w*$""")
+  }
+  val parser = GriffinDslParser(dataSourceNames, filteredFunctionNames)
+
+  def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
+    val name = getStepName(ruleParam.getName)
+    val rule = ruleParam.getRule
+    val dqType = ruleParam.getDqType
+    try {
+      val result = parser.parseRule(rule, dqType)
+      if (result.successful) {
+        val expr = result.get
+        val expr2DQSteps = Expr2DQSteps(context, expr, 
ruleParam.replaceName(name))
+        expr2DQSteps.getDQSteps()
+      } else {
+        warn(s"parse rule [ ${rule} ] fails: \n${result}")
+        Nil
+      }
+    } catch {
+      case e: Throwable => {
+        error(s"generate rule plan ${name} fails: ${e.getMessage}")
+        Nil
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
new file mode 100644
index 0000000..9a2c7b5
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder
+
+import org.apache.griffin.measure.configuration.enums.NormalizeType
+import org.apache.griffin.measure.configuration.params.RuleParam
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, 
MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.step.{DQStep, SeqDQStep}
+
+/**
+  * build dq step by rule param
+  */
+trait RuleParamStepBuilder extends DQStepBuilder {
+
+  type ParamType = RuleParam
+
+  def buildDQStep(context: DQContext, param: ParamType): Option[DQStep] = {
+    val steps = buildSteps(context, param)
+    if (steps.size > 1) Some(SeqDQStep(steps))
+    else if (steps.size == 1) steps.headOption
+    else None
+  }
+
+  def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep]
+
+  protected def buildDirectWriteSteps(ruleParam: RuleParam): Seq[DQStep] = {
+    val name = getStepName(ruleParam.getName)
+    // metric writer
+    val metricSteps = ruleParam.metricOpt.map { metric =>
+      MetricWriteStep(metric.name, name, NormalizeType(metric.collectType))
+    }.toSeq
+    // record writer
+    val recordSteps = ruleParam.recordOpt.map { record =>
+      RecordWriteStep(record.name, name)
+    }.toSeq
+    // update writer
+    val dsCacheUpdateSteps = ruleParam.dsCacheUpdateOpt.map { dsCacheUpdate =>
+      DsCacheUpdateWriteStep(dsCacheUpdate.dsName, name)
+    }.toSeq
+
+    metricSteps ++ recordSteps ++ dsCacheUpdateSteps
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
new file mode 100644
index 0000000..36237d7
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder
+
+import org.apache.griffin.measure.configuration.params.RuleParam
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.transform.SparkSqlTransformStep
+
+case class SparkSqlDQStepBuilder() extends RuleParamStepBuilder {
+
+  def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
+    val name = getStepName(ruleParam.getName)
+    val transformStep = SparkSqlTransformStep(
+      name, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache)
+    transformStep +: buildDirectWriteSteps(ruleParam)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/StreamingDataSourceStepBuilder.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/StreamingDataSourceStepBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/StreamingDataSourceStepBuilder.scala
new file mode 100644
index 0000000..0139d19
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/StreamingDataSourceStepBuilder.scala
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder
+
+import org.apache.griffin.measure.configuration.params._
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.read.ReadStep
+
+case class StreamingDataSourceStepBuilder() extends DataSourceParamStepBuilder 
{
+
+  def buildReadSteps(context: DQContext, dcParam: DataConnectorParam): 
Option[ReadStep] = {
+    None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/AliasableExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/AliasableExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/AliasableExpr.scala
new file mode 100644
index 0000000..412aa3c
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/AliasableExpr.scala
@@ -0,0 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.expr
+
+trait AliasableExpr extends Expr {
+
+  def alias: Option[String]
+
+}

Reply via email to