http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
new file mode 100644
index 0000000..3482955
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -0,0 +1,358 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform
+
+import org.apache.griffin.measure.configuration.enums.{ArrayNormalizeType, 
EntriesNormalizeType, ProcessType, StreamingProcessType}
+import org.apache.griffin.measure.configuration.params.RuleParam
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.step.builder.dsl.expr.{DistinctnessClause, _}
+import 
org.apache.griffin.measure.step.builder.dsl.transform.analyzer.DistinctnessAnalyzer
+import org.apache.griffin.measure.step.transform.SparkSqlTransformStep
+import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, 
MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * generate distinctness dq steps
+  */
+case class DistinctnessExpr2DQSteps(context: DQContext,
+                                    expr: Expr,
+                                    ruleParam: RuleParam
+                                   ) extends Expr2DQSteps {
+
+  private object DistinctnessKeys {
+    val _source = "source"
+    val _target = "target"
+    val _distinct = "distinct"
+    val _total = "total"
+    val _dup = "dup"
+    val _accu_dup = "accu_dup"
+    val _num = "num"
+
+    val _duplicationArray = "duplication.array"
+    val _withAccumulate = "with.accumulate"
+
+    val _recordEnable = "record.enable"
+  }
+  import DistinctnessKeys._
+
+  def getDQSteps(): Seq[DQStep] = {
+    val details = ruleParam.getDetails
+    val distinctnessExpr = expr.asInstanceOf[DistinctnessClause]
+
+    val sourceName = details.getString(_source, context.getDataSourceName(0))
+    val targetName = details.getString(_target, context.getDataSourceName(1))
+    val analyzer = DistinctnessAnalyzer(distinctnessExpr, sourceName)
+
+    val procType = context.procType
+    val timestamp = context.contextId.timestamp
+    val dsTimeRanges = context.dataSourceTimeRanges
+
+    val beginTmst = dsTimeRanges.get(sourceName).map(_.begin) match {
+      case Some(t) => t
+      case _ => throw new Exception(s"empty begin tmst from ${sourceName}")
+    }
+    val endTmst = dsTimeRanges.get(sourceName).map(_.end) match {
+      case Some(t) => t
+      case _ => throw new Exception(s"empty end tmst from ${sourceName}")
+    }
+
+    val writeTimestampOpt = Some(endTmst)
+
+    if (!context.runTimeTableRegister.existsTable(sourceName)) {
+      warn(s"[${timestamp}] data source ${sourceName} not exists")
+      Nil
+    } else {
+      val withOlderTable = {
+        details.getBoolean(_withAccumulate, true) &&
+          context.runTimeTableRegister.existsTable(targetName)
+      }
+
+      val selClause = analyzer.selectionPairs.map { pair =>
+        val (expr, alias, _) = pair
+        s"${expr.desc} AS `${alias}`"
+      }.mkString(", ")
+      val distAliases = analyzer.selectionPairs.filter(_._3).map(_._2)
+      val distAliasesClause = distAliases.map( a => s"`${a}`" ).mkString(", ")
+      val allAliases = analyzer.selectionPairs.map(_._2)
+      val allAliasesClause = allAliases.map( a => s"`${a}`" ).mkString(", ")
+      val groupAliases = analyzer.selectionPairs.filter(!_._3).map(_._2)
+      val groupAliasesClause = groupAliases.map( a => s"`${a}`" ).mkString(", 
")
+
+      // 1. source alias
+      val sourceAliasTableName = "__sourceAlias"
+      val sourceAliasSql = {
+        s"SELECT ${selClause} FROM `${sourceName}`"
+      }
+      val sourceAliasTransStep = SparkSqlTransformStep(sourceAliasTableName, 
sourceAliasSql, emptyMap, true)
+
+      // 2. total metric
+      val totalTableName = "__totalMetric"
+      val totalColName = details.getStringOrKey(_total)
+      val totalSql = {
+        s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
+      }
+      val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, 
emptyMap)
+      val totalMetricWriteStep = {
+        MetricWriteStep(totalColName, totalTableName, EntriesNormalizeType, 
writeTimestampOpt)
+      }
+
+      // 3. group by self
+      val selfGroupTableName = "__selfGroup"
+      val dupColName = details.getStringOrKey(_dup)
+      val accuDupColName = details.getStringOrKey(_accu_dup)
+      val selfGroupSql = {
+        s"""
+           |SELECT ${distAliasesClause}, (COUNT(*) - 1) AS `${dupColName}`,
+           |TRUE AS `${ConstantColumns.distinct}`
+           |FROM `${sourceAliasTableName}` GROUP BY ${distAliasesClause}
+          """.stripMargin
+      }
+      val selfGroupTransStep = SparkSqlTransformStep(selfGroupTableName, 
selfGroupSql, emptyMap, true)
+
+      val transSteps1 = sourceAliasTransStep :: totalTransStep :: 
selfGroupTransStep :: Nil
+      val writeSteps1 = totalMetricWriteStep :: Nil
+
+      val ((transSteps2, writeSteps2), dupCountTableName) = procType match {
+        case StreamingProcessType if (withOlderTable) => {
+          // 4.0 update old data
+          val targetDsUpdateWriteStep = DsCacheUpdateWriteStep(targetName, 
targetName)
+
+          // 4. older alias
+          val olderAliasTableName = "__older"
+          val olderAliasSql = {
+            s"SELECT ${selClause} FROM `${targetName}` WHERE 
`${ConstantColumns.tmst}` <= ${beginTmst}"
+          }
+          val olderAliasTransStep = SparkSqlTransformStep(olderAliasTableName, 
olderAliasSql, emptyMap)
+
+          // 5. join with older data
+          val joinedTableName = "__joined"
+          val selfSelClause = (distAliases :+ dupColName).map { alias =>
+            s"`${selfGroupTableName}`.`${alias}`"
+          }.mkString(", ")
+          val onClause = distAliases.map { alias =>
+            s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = 
coalesce(`${olderAliasTableName}`.`${alias}`, '')"
+          }.mkString(" AND ")
+          val olderIsNull = distAliases.map { alias =>
+            s"`${olderAliasTableName}`.`${alias}` IS NULL"
+          }.mkString(" AND ")
+          val joinedSql = {
+            s"""
+               |SELECT ${selfSelClause}, (${olderIsNull}) AS 
`${ConstantColumns.distinct}`
+               |FROM `${olderAliasTableName}` RIGHT JOIN 
`${selfGroupTableName}`
+               |ON ${onClause}
+            """.stripMargin
+          }
+          val joinedTransStep = SparkSqlTransformStep(joinedTableName, 
joinedSql, emptyMap)
+
+          // 6. group by joined data
+          val groupTableName = "__group"
+          val moreDupColName = "_more_dup"
+          val groupSql = {
+            s"""
+               |SELECT ${distAliasesClause}, `${dupColName}`, 
`${ConstantColumns.distinct}`,
+               |COUNT(*) AS `${moreDupColName}`
+               |FROM `${joinedTableName}`
+               |GROUP BY ${distAliasesClause}, `${dupColName}`, 
`${ConstantColumns.distinct}`
+             """.stripMargin
+          }
+          val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, 
emptyMap)
+
+          // 7. final duplicate count
+          val finalDupCountTableName = "__finalDupCount"
+          /**
+            * dupColName:      the duplicate count of duplicated items only 
occurs in new data,
+            *                  which means the distinct one in new data is 
also duplicate
+            * accuDupColName:  the count of duplicated items accumulated in 
new data and old data,
+            *                  which means the accumulated distinct count in 
all data
+            * e.g.:  new data [A, A, B, B, C, D], old data [A, A, B, C]
+            *        selfGroupTable will be (A, 1, F), (B, 1, F), (C, 0, T), 
(D, 0, T)
+            *        joinedTable will be (A, 1, F), (A, 1, F), (B, 1, F), (C, 
0, F), (D, 0, T)
+            *        groupTable will be (A, 1, F, 2), (B, 1, F, 1), (C, 0, F, 
1), (D, 0, T, 1)
+            *        finalDupCountTable will be (A, F, 2, 3), (B, F, 2, 2), 
(C, F, 1, 1), (D, T, 0, 0)
+            *        The distinct result of new data only should be: (A, 2), 
(B, 2), (C, 1), (D, 0),
+            *        which means in new data [A, A, B, B, C, D], [A, A, B, B, 
C] are all duplicated, only [D] is distinct
+            */
+          val finalDupCountSql = {
+            s"""
+               |SELECT ${distAliasesClause}, `${ConstantColumns.distinct}`,
+               |CASE WHEN `${ConstantColumns.distinct}` THEN `${dupColName}`
+               |ELSE (`${dupColName}` + 1) END AS `${dupColName}`,
+               |CASE WHEN `${ConstantColumns.distinct}` THEN `${dupColName}`
+               |ELSE (`${dupColName}` + `${moreDupColName}`) END AS 
`${accuDupColName}`
+               |FROM `${groupTableName}`
+             """.stripMargin
+          }
+          val finalDupCountTransStep = 
SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true)
+
+          ((olderAliasTransStep :: joinedTransStep :: groupTransStep :: 
finalDupCountTransStep :: Nil,
+            targetDsUpdateWriteStep :: Nil), finalDupCountTableName)
+        }
+        case _ => {
+          ((Nil, Nil), selfGroupTableName)
+        }
+      }
+
+      // 8. distinct metric
+      val distTableName = "__distMetric"
+      val distColName = details.getStringOrKey(_distinct)
+      val distSql = {
+        s"""
+           |SELECT COUNT(*) AS `${distColName}`
+           |FROM `${dupCountTableName}` WHERE `${ConstantColumns.distinct}`
+         """.stripMargin
+      }
+      val distTransStep = SparkSqlTransformStep(distTableName, distSql, 
emptyMap)
+      val distMetricWriteStep = {
+        MetricWriteStep(distColName, distTableName, EntriesNormalizeType, 
writeTimestampOpt)
+      }
+
+      val transSteps3 = distTransStep :: Nil
+      val writeSteps3 = distMetricWriteStep :: Nil
+
+      val duplicationArrayName = details.getString(_duplicationArray, "")
+      val (transSteps4, writeSteps4) = if (duplicationArrayName.nonEmpty) {
+        val recordEnable = details.getBoolean(_recordEnable, false)
+        if (groupAliases.size > 0) {
+          // with some group by requirement
+          // 9. origin data join with distinct information
+          val informedTableName = "__informed"
+          val onClause = distAliases.map { alias =>
+            s"coalesce(`${sourceAliasTableName}`.`${alias}`, '') = 
coalesce(`${dupCountTableName}`.`${alias}`, '')"
+          }.mkString(" AND ")
+          val informedSql = {
+            s"""
+               |SELECT `${sourceAliasTableName}`.*,
+               |`${dupCountTableName}`.`${dupColName}` AS `${dupColName}`,
+               |`${dupCountTableName}`.`${ConstantColumns.distinct}` AS 
`${ConstantColumns.distinct}`
+               |FROM `${sourceAliasTableName}` LEFT JOIN `${dupCountTableName}`
+               |ON ${onClause}
+               """.stripMargin
+          }
+          val informedTransStep = SparkSqlTransformStep(informedTableName, 
informedSql, emptyMap)
+
+          // 10. add row number
+          val rnTableName = "__rowNumber"
+          val rnDistClause = distAliasesClause
+          val rnSortClause = s"SORT BY `${ConstantColumns.distinct}`"
+          val rnSql = {
+            s"""
+               |SELECT *,
+               |ROW_NUMBER() OVER (DISTRIBUTE BY ${rnDistClause} 
${rnSortClause}) `${ConstantColumns.rowNumber}`
+               |FROM `${informedTableName}`
+               """.stripMargin
+          }
+          val rnTransStep = SparkSqlTransformStep(rnTableName, rnSql, emptyMap)
+
+          // 11. recognize duplicate items
+          val dupItemsTableName = "__dupItems"
+          val dupItemsSql = {
+            s"""
+               |SELECT ${allAliasesClause}, `${dupColName}` FROM 
`${rnTableName}`
+               |WHERE NOT `${ConstantColumns.distinct}` OR 
`${ConstantColumns.rowNumber}` > 1
+               """.stripMargin
+          }
+          val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName, 
dupItemsSql, emptyMap)
+          val dupItemsWriteStep = {
+            val rwName = 
ruleParam.recordOpt.map(_.name).getOrElse(dupItemsTableName)
+            RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt)
+          }
+
+          // 12. group by dup Record metric
+          val groupDupMetricTableName = "__groupDupMetric"
+          val numColName = details.getStringOrKey(_num)
+          val groupSelClause = groupAliasesClause
+          val groupDupMetricSql = {
+            s"""
+               |SELECT ${groupSelClause}, `${dupColName}`, COUNT(*) AS 
`${numColName}`
+               |FROM `${dupItemsTableName}` GROUP BY ${groupSelClause}, 
`${dupColName}`
+             """.stripMargin
+          }
+          val groupDupMetricTransStep = 
SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap)
+          val groupDupMetricWriteStep = {
+            MetricWriteStep(duplicationArrayName, groupDupMetricTableName, 
ArrayNormalizeType, writeTimestampOpt)
+          }
+
+          val msteps = {
+            informedTransStep :: rnTransStep :: dupItemsTransStep :: 
groupDupMetricTransStep :: Nil
+          }
+          val wsteps = if (recordEnable) {
+            dupItemsWriteStep :: groupDupMetricWriteStep :: Nil
+          } else {
+            groupDupMetricWriteStep :: Nil
+          }
+
+          (msteps, wsteps)
+
+        } else {
+          // no group by requirement
+          // 9. duplicate record
+          val dupRecordTableName = "__dupRecords"
+          val dupRecordSelClause = procType match {
+            case StreamingProcessType if (withOlderTable) => 
s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`"
+            case _ => s"${distAliasesClause}, `${dupColName}`"
+          }
+          val dupRecordSql = {
+            s"""
+               |SELECT ${dupRecordSelClause}
+               |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
+              """.stripMargin
+          }
+          val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, 
dupRecordSql, emptyMap, true)
+          val dupRecordWriteStep = {
+            val rwName = 
ruleParam.recordOpt.map(_.name).getOrElse(dupRecordTableName)
+            RecordWriteStep(rwName, dupRecordTableName, None, 
writeTimestampOpt)
+          }
+
+          // 10. duplicate metric
+          val dupMetricTableName = "__dupMetric"
+          val numColName = details.getStringOrKey(_num)
+          val dupMetricSql = {
+            s"""
+               |SELECT `${dupColName}`, COUNT(*) AS `${numColName}`
+               |FROM `${dupRecordTableName}` GROUP BY `${dupColName}`
+              """.stripMargin
+          }
+          val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, 
dupMetricSql, emptyMap)
+          val dupMetricWriteStep = {
+            MetricWriteStep(duplicationArrayName, dupMetricTableName, 
ArrayNormalizeType, writeTimestampOpt)
+          }
+
+          val msteps = {
+            dupRecordTransStep :: dupMetricTransStep :: Nil
+          }
+          val wsteps = if (recordEnable) {
+            dupRecordWriteStep :: dupMetricWriteStep :: Nil
+          } else {
+            dupMetricWriteStep :: Nil
+          }
+
+          (msteps, wsteps)
+        }
+      } else (Nil, Nil)
+
+      // full steps
+      transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4 ++
+        writeSteps1 ++ writeSteps2 ++ writeSteps3 ++ writeSteps4
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
new file mode 100644
index 0000000..509e678
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
@@ -0,0 +1,59 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.params.RuleParam
+import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
+import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.builder.dsl.expr.Expr
+
+trait Expr2DQSteps extends Loggable with Serializable {
+
+  protected val emtptDQSteps = Seq[DQStep]()
+  protected val emptyMap = Map[String, Any]()
+
+  def getDQSteps(): Seq[DQStep]
+
+}
+
+/**
+  * get dq steps generator for griffin dsl rule
+  */
+object Expr2DQSteps {
+  private val emtptExpr2DQSteps = new Expr2DQSteps {
+    def getDQSteps(): Seq[DQStep] = emtptDQSteps
+  }
+
+  def apply(context: DQContext,
+            expr: Expr,
+            ruleParam: RuleParam
+           ): Expr2DQSteps = {
+    ruleParam.getDqType match {
+      case AccuracyType => AccuracyExpr2DQSteps(context, expr, ruleParam)
+      case ProfilingType => ProfilingExpr2DQSteps(context, expr, ruleParam)
+      case UniquenessType => UniquenessExpr2DQSteps(context, expr, ruleParam)
+      case DistinctnessType => DistinctnessExpr2DQSteps(context, expr, 
ruleParam)
+      case TimelinessType => TimelinessExpr2DQSteps(context, expr, ruleParam)
+      case CompletenessType => CompletenessExpr2DQSteps(context, expr, 
ruleParam)
+      case _ => emtptExpr2DQSteps
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
new file mode 100644
index 0000000..b4da7eb
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -0,0 +1,105 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform
+
+import org.apache.griffin.measure.configuration.enums.{BatchProcessType, 
NormalizeType, StreamingProcessType}
+import org.apache.griffin.measure.configuration.params.RuleParam
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.step.builder.dsl.expr._
+import 
org.apache.griffin.measure.step.builder.dsl.transform.analyzer.ProfilingAnalyzer
+import org.apache.griffin.measure.step.transform.SparkSqlTransformStep
+import org.apache.griffin.measure.step.write.MetricWriteStep
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * generate profiling dq steps
+  */
+case class ProfilingExpr2DQSteps(context: DQContext,
+                                 expr: Expr,
+                                 ruleParam: RuleParam
+                                ) extends Expr2DQSteps {
+
+  private object ProfilingKeys {
+    val _source = "source"
+  }
+  import ProfilingKeys._
+
+  def getDQSteps(): Seq[DQStep] = {
+    val details = ruleParam.getDetails
+    val profilingExpr = expr.asInstanceOf[ProfilingClause]
+
+    val sourceName = profilingExpr.fromClauseOpt match {
+      case Some(fc) => fc.dataSource
+      case _ => details.getString(_source, context.getDataSourceName(0))
+    }
+    val fromClause = 
profilingExpr.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
+
+    val procType = context.procType
+    val timestamp = context.contextId.timestamp
+
+    if (!context.runTimeTableRegister.existsTable(sourceName)) {
+      warn(s"[${timestamp}] data source ${sourceName} not exists")
+      Nil
+    } else {
+      val analyzer = ProfilingAnalyzer(profilingExpr, sourceName)
+      val selExprDescs = analyzer.selectionExprs.map { sel =>
+        val alias = sel match {
+          case s: AliasableExpr if (s.alias.nonEmpty) => s" AS 
`${s.alias.get}`"
+          case _ => ""
+        }
+        s"${sel.desc}${alias}"
+      }
+      val selCondition = 
profilingExpr.selectClause.extraConditionOpt.map(_.desc).mkString
+      val selClause = procType match {
+        case BatchProcessType => selExprDescs.mkString(", ")
+        case StreamingProcessType => (s"`${ConstantColumns.tmst}`" +: 
selExprDescs).mkString(", ")
+      }
+      val groupByClauseOpt = analyzer.groupbyExprOpt
+      val groupbyClause = procType match {
+        case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("")
+        case StreamingProcessType => {
+          val tmstGroupbyClause = 
GroupbyClause(LiteralStringExpr(s"`${ConstantColumns.tmst}`") :: Nil, None)
+          val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt 
match {
+            case Some(gbc) => gbc
+            case _ => GroupbyClause(Nil, None)
+          })
+          mergedGroubbyClause.desc
+        }
+      }
+      val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ")
+      val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" 
")
+
+      // 1. select statement
+      val profilingSql = {
+        s"SELECT ${selCondition} ${selClause} ${fromClause} 
${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
+      }
+      val profilingName = ruleParam.name
+      val profilingTransStep = SparkSqlTransformStep(profilingName, 
profilingSql, details)
+      val profilingMetricWriteStep = {
+        val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name)
+        val collectType = 
NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse(""))
+        MetricWriteStep(mwName, profilingName, collectType)
+      }
+      profilingTransStep :: profilingMetricWriteStep :: Nil
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
new file mode 100644
index 0000000..a56937c
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -0,0 +1,234 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform
+
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.params.RuleParam
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.step.builder.dsl.expr._
+import 
org.apache.griffin.measure.step.builder.dsl.transform.analyzer.TimelinessAnalyzer
+import org.apache.griffin.measure.step.transform.SparkSqlTransformStep
+import org.apache.griffin.measure.step.write.{MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.TimeUtil
+
+/**
+  * generate timeliness dq steps
+  */
+case class TimelinessExpr2DQSteps(context: DQContext,
+                                  expr: Expr,
+                                  ruleParam: RuleParam
+                                 ) extends Expr2DQSteps {
+
+  private object TimelinessKeys {
+    val _source = "source"
+    val _latency = "latency"
+    val _total = "total"
+    val _avg = "avg"
+    val _threshold = "threshold"
+    val _step = "step"
+    val _count = "count"
+    val _stepSize = "step.size"
+    val _percentileColPrefix = "percentile"
+    val _percentileValues = "percentile.values"
+  }
+  import TimelinessKeys._
+
+  def getDQSteps(): Seq[DQStep] = {
+    val details = ruleParam.getDetails
+    val timelinessExpr = expr.asInstanceOf[TimelinessClause]
+
+    val sourceName = details.getString(_source, context.getDataSourceName(0))
+
+    val procType = context.procType
+    val timestamp = context.contextId.timestamp
+    val dsTimeRanges = context.dataSourceTimeRanges
+
+    val minTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.minTmstOpt)
+    val minTmst = minTmstOpt match {
+      case Some(t) => t
+      case _ => throw new Exception(s"empty min tmst from ${sourceName}")
+    }
+
+    if (!context.runTimeTableRegister.existsTable(sourceName)) {
+      warn(s"[${timestamp}] data source ${sourceName} not exists")
+      Nil
+    } else {
+      val analyzer = TimelinessAnalyzer(timelinessExpr, sourceName)
+      val btsSel = analyzer.btsExpr
+      val etsSelOpt = analyzer.etsExprOpt
+
+      // 1. in time
+      val inTimeTableName = "__inTime"
+      val inTimeSql = etsSelOpt match {
+        case Some(etsSel) => {
+          s"""
+             |SELECT *, (${btsSel}) AS `${ConstantColumns.beginTs}`,
+             |(${etsSel}) AS `${ConstantColumns.endTs}`
+             |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) 
IS NOT NULL
+           """.stripMargin
+        }
+        case _ => {
+          s"""
+             |SELECT *, (${btsSel}) AS `${ConstantColumns.beginTs}`
+             |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL
+           """.stripMargin
+        }
+      }
+      val inTimeTransStep = SparkSqlTransformStep(inTimeTableName, inTimeSql, 
emptyMap)
+
+      // 2. latency
+      val latencyTableName = "__lat"
+      val latencyColName = details.getStringOrKey(_latency)
+      val etsColName = etsSelOpt match {
+        case Some(_) => ConstantColumns.endTs
+        case _ => ConstantColumns.tmst
+      }
+      val latencySql = {
+        s"SELECT *, (`${etsColName}` - `${ConstantColumns.beginTs}`) AS 
`${latencyColName}` FROM `${inTimeTableName}`"
+      }
+      val latencyTransStep = SparkSqlTransformStep(latencyTableName, 
latencySql, emptyMap, true)
+
+      // 3. timeliness metric
+      val metricTableName = ruleParam.name
+      val totalColName = details.getStringOrKey(_total)
+      val avgColName = details.getStringOrKey(_avg)
+      val metricSql = procType match {
+        case BatchProcessType => {
+          s"""
+             |SELECT COUNT(*) AS `${totalColName}`,
+             |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
+             |FROM `${latencyTableName}`
+           """.stripMargin
+        }
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${ConstantColumns.tmst}`,
+             |COUNT(*) AS `${totalColName}`,
+             |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
+             |FROM `${latencyTableName}`
+             |GROUP BY `${ConstantColumns.tmst}`
+           """.stripMargin
+        }
+      }
+      val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, 
emptyMap)
+      val metricWriteStep = {
+        val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name)
+        val collectType = 
NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse(""))
+        MetricWriteStep(mwName, metricTableName, collectType)
+      }
+
+      // current steps
+      val transSteps1 = inTimeTransStep :: latencyTransStep :: metricTransStep 
:: Nil
+      val writeSteps1 = metricWriteStep :: Nil
+
+      // 4. timeliness record
+      val (transSteps2, writeSteps2) = 
TimeUtil.milliseconds(details.getString(_threshold, "")) match {
+        case Some(tsh) => {
+          val recordTableName = "__lateRecords"
+          val recordSql = {
+            s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > 
${tsh}"
+          }
+          val recordTransStep = SparkSqlTransformStep(recordTableName, 
recordSql, emptyMap)
+          val recordWriteStep = {
+            val rwName = 
ruleParam.recordOpt.map(_.name).getOrElse(recordTableName)
+            RecordWriteStep(rwName, recordTableName, None)
+          }
+          (recordTransStep :: Nil, recordWriteStep :: Nil)
+        }
+        case _ => (Nil, Nil)
+      }
+
+      // 5. ranges
+      val (transSteps3, writeSteps3) = 
TimeUtil.milliseconds(details.getString(_stepSize, "")) match {
+        case Some(stepSize) => {
+          // 5.1 range
+          val rangeTableName = "__range"
+          val stepColName = details.getStringOrKey(_step)
+          val rangeSql = {
+            s"""
+               |SELECT *, CAST((`${latencyColName}` / ${stepSize}) AS BIGINT) 
AS `${stepColName}`
+               |FROM `${latencyTableName}`
+             """.stripMargin
+          }
+          val rangeTransStep = SparkSqlTransformStep(rangeTableName, rangeSql, 
emptyMap)
+
+          // 5.2 range metric
+          val rangeMetricTableName = "__rangeMetric"
+          val countColName = details.getStringOrKey(_count)
+          val rangeMetricSql = procType match {
+            case BatchProcessType => {
+              s"""
+                 |SELECT `${stepColName}`, COUNT(*) AS `${countColName}`
+                 |FROM `${rangeTableName}` GROUP BY `${stepColName}`
+                """.stripMargin
+            }
+            case StreamingProcessType => {
+              s"""
+                 |SELECT `${ConstantColumns.tmst}`, `${stepColName}`, COUNT(*) 
AS `${countColName}`
+                 |FROM `${rangeTableName}` GROUP BY `${ConstantColumns.tmst}`, 
`${stepColName}`
+                """.stripMargin
+            }
+          }
+          val rangeMetricTransStep = 
SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap)
+          val rangeMetricWriteStep = {
+            MetricWriteStep(stepColName, rangeMetricTableName, 
ArrayNormalizeType)
+          }
+
+          (rangeTransStep :: rangeMetricTransStep :: Nil, rangeMetricWriteStep 
:: Nil)
+        }
+        case _ => (Nil, Nil)
+      }
+
+      // 6. percentiles
+      val percentiles = getPercentiles(details)
+      val (transSteps4, writeSteps4) = if (percentiles.size > 0) {
+        val percentileTableName = "__percentile"
+        val percentileColName = details.getStringOrKey(_percentileColPrefix)
+        val percentileCols = percentiles.map { pct =>
+          val pctName = (pct * 100).toInt.toString
+          s"floor(percentile_approx(${latencyColName}, ${pct})) AS 
`${percentileColName}_${pctName}`"
+        }.mkString(", ")
+        val percentileSql = {
+          s"""
+             |SELECT ${percentileCols}
+             |FROM `${latencyTableName}`
+            """.stripMargin
+        }
+        val percentileTransStep = SparkSqlTransformStep(percentileTableName, 
percentileSql, emptyMap)
+        val percentileWriteStep = {
+          MetricWriteStep(percentileTableName, percentileTableName, 
DefaultNormalizeType)
+        }
+
+        (percentileTransStep :: Nil, percentileWriteStep :: Nil)
+      } else (Nil, Nil)
+
+      // full steps
+      transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4 ++
+        writeSteps1 ++ writeSteps2 ++ writeSteps3 ++ writeSteps4
+    }
+  }
+
+  private def getPercentiles(details: Map[String, Any]): Seq[Double] = {
+    details.getArr[Double](_percentileValues).filter(d => (d >= 0 && d <= 1))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
new file mode 100644
index 0000000..9fecb6d
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -0,0 +1,204 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform
+
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.params.RuleParam
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.step.builder.dsl.expr._
+import 
org.apache.griffin.measure.step.builder.dsl.transform.analyzer.UniquenessAnalyzer
+import org.apache.griffin.measure.step.transform.SparkSqlTransformStep
+import org.apache.griffin.measure.step.write.{MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * generate uniqueness dq steps
+  */
+case class UniquenessExpr2DQSteps(context: DQContext,
+                                  expr: Expr,
+                                  ruleParam: RuleParam
+                                 ) extends Expr2DQSteps {
+
+  private object UniquenessKeys {
+    val _source = "source"
+    val _target = "target"
+    val _unique = "unique"
+    val _total = "total"
+    val _dup = "dup"
+    val _num = "num"
+
+    val _duplicationArray = "duplication.array"
+  }
+  import UniquenessKeys._
+
+  def getDQSteps(): Seq[DQStep] = {
+    val details = ruleParam.getDetails
+    val uniquenessExpr = expr.asInstanceOf[UniquenessClause]
+
+    val sourceName = details.getString(_source, context.getDataSourceName(0))
+    val targetName = details.getString(_target, context.getDataSourceName(1))
+    val analyzer = UniquenessAnalyzer(uniquenessExpr, sourceName, targetName)
+
+    val procType = context.procType
+    val timestamp = context.contextId.timestamp
+
+    if (!context.runTimeTableRegister.existsTable(sourceName)) {
+      warn(s"[${timestamp}] data source ${sourceName} not exists")
+      Nil
+    } else if (!context.runTimeTableRegister.existsTable(targetName)) {
+      println(s"[${timestamp}] data source ${targetName} not exists")
+      Nil
+    } else {
+      val selItemsClause = analyzer.selectionPairs.map { pair =>
+        val (expr, alias) = pair
+        s"${expr.desc} AS `${alias}`"
+      }.mkString(", ")
+      val aliases = analyzer.selectionPairs.map(_._2)
+
+      val selClause = procType match {
+        case BatchProcessType => selItemsClause
+        case StreamingProcessType => s"`${ConstantColumns.tmst}`, 
${selItemsClause}"
+      }
+      val selAliases = procType match {
+        case BatchProcessType => aliases
+        case StreamingProcessType => ConstantColumns.tmst +: aliases
+      }
+
+      // 1. source distinct mapping
+      val sourceTableName = "__source"
+      val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}"
+      val sourceTransStep = SparkSqlTransformStep(sourceTableName, sourceSql, 
emptyMap)
+
+      // 2. target mapping
+      val targetTableName = "__target"
+      val targetSql = s"SELECT ${selClause} FROM ${targetName}"
+      val targetTransStep = SparkSqlTransformStep(targetTableName, targetSql, 
emptyMap)
+
+      // 3. joined
+      val joinedTableName = "__joined"
+      val joinedSelClause = selAliases.map { alias =>
+        s"`${sourceTableName}`.`${alias}` AS `${alias}`"
+      }.mkString(", ")
+      val onClause = aliases.map { alias =>
+        s"coalesce(`${sourceTableName}`.`${alias}`, '') = 
coalesce(`${targetTableName}`.`${alias}`, '')"
+      }.mkString(" AND ")
+      val joinedSql = {
+        s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN 
`${sourceTableName}` ON ${onClause}"
+      }
+      val joinedTransStep = SparkSqlTransformStep(joinedTableName, joinedSql, 
emptyMap)
+
+      // 4. group
+      val groupTableName = "__group"
+      val groupSelClause = selAliases.map { alias =>
+        s"`${alias}`"
+      }.mkString(", ")
+      val dupColName = details.getStringOrKey(_dup)
+      val groupSql = {
+        s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM 
`${joinedTableName}` GROUP BY ${groupSelClause}"
+      }
+      val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, 
emptyMap, true)
+
+      // 5. total metric
+      val totalTableName = "__totalMetric"
+      val totalColName = details.getStringOrKey(_total)
+      val totalSql = procType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}`
+             |FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`
+           """.stripMargin
+        }
+      }
+      val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, 
emptyMap)
+      val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, 
EntriesNormalizeType)
+
+      // 6. unique record
+      val uniqueRecordTableName = "__uniqueRecord"
+      val uniqueRecordSql = {
+        s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0"
+      }
+      val uniqueRecordTransStep = SparkSqlTransformStep(uniqueRecordTableName, 
uniqueRecordSql, emptyMap)
+
+      // 7. unique metric
+      val uniqueTableName = "__uniqueMetric"
+      val uniqueColName = details.getStringOrKey(_unique)
+      val uniqueSql = procType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM 
`${uniqueRecordTableName}`"
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${uniqueColName}`
+             |FROM `${uniqueRecordTableName}` GROUP BY 
`${ConstantColumns.tmst}`
+           """.stripMargin
+        }
+      }
+      val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, 
emptyMap)
+      val uniqueMetricWriteStep = MetricWriteStep(uniqueColName, 
uniqueTableName, EntriesNormalizeType)
+
+      val transSteps1 = sourceTransStep :: targetTransStep :: joinedTransStep 
:: groupTransStep ::
+        totalTransStep :: uniqueRecordTransStep :: uniqueTransStep :: Nil
+      val writeSteps1 = totalMetricWriteStep :: uniqueMetricWriteStep :: Nil
+
+      val duplicationArrayName = details.getString(_duplicationArray, "")
+      val (transSteps2, writeSteps2) = if (duplicationArrayName.nonEmpty) {
+        // 8. duplicate record
+        val dupRecordTableName = "__dupRecords"
+        val dupRecordSql = {
+          s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0"
+        }
+        val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, 
dupRecordSql, emptyMap, true)
+        val dupRecordWriteStep = {
+          val rwName = 
ruleParam.recordOpt.map(_.name).getOrElse(dupRecordTableName)
+          RecordWriteStep(rwName, dupRecordTableName)
+        }
+
+        // 9. duplicate metric
+        val dupMetricTableName = "__dupMetric"
+        val numColName = details.getStringOrKey(_num)
+        val dupMetricSelClause = procType match {
+          case BatchProcessType => s"`${dupColName}`, COUNT(*) AS 
`${numColName}`"
+          case StreamingProcessType => s"`${ConstantColumns.tmst}`, 
`${dupColName}`, COUNT(*) AS `${numColName}`"
+        }
+        val dupMetricGroupbyClause = procType match {
+          case BatchProcessType => s"`${dupColName}`"
+          case StreamingProcessType => s"`${ConstantColumns.tmst}`, 
`${dupColName}`"
+        }
+        val dupMetricSql = {
+          s"""
+             |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}`
+             |GROUP BY ${dupMetricGroupbyClause}
+          """.stripMargin
+        }
+        val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, 
dupMetricSql, emptyMap)
+        val dupMetricWriteStep = {
+          MetricWriteStep(duplicationArrayName, dupMetricTableName, 
ArrayNormalizeType)
+        }
+
+        (dupRecordTransStep :: dupMetricTransStep :: Nil,
+          dupRecordWriteStep :: dupMetricWriteStep :: Nil)
+      } else (Nil, Nil)
+
+      // full steps
+      transSteps1 ++ transSteps2 ++ writeSteps1 ++ writeSteps2
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
new file mode 100644
index 0000000..b2a95ce
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
@@ -0,0 +1,41 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
+
+import org.apache.griffin.measure.step.builder.dsl.expr._
+
+
+case class AccuracyAnalyzer(expr: LogicalExpr, sourceName: String, targetName: 
String) extends BasicAnalyzer {
+
+  val dataSourceNames = 
expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, 
combDataSourceNames)
+
+  val sourceSelectionExprs = {
+    val seq = seqSelectionExprs(sourceName)
+    expr.preOrderTraverseDepthFirst(Seq[SelectionExpr]())(seq, 
combSelectionExprs)
+  }
+  val targetSelectionExprs = {
+    val seq = seqSelectionExprs(targetName)
+    expr.preOrderTraverseDepthFirst(Seq[SelectionExpr]())(seq, 
combSelectionExprs)
+  }
+
+  val selectionExprs = sourceSelectionExprs ++ {
+    expr.preOrderTraverseDepthFirst(Seq[AliasableExpr]())(seqWithAliasExprs, 
combWithAliasExprs)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
new file mode 100644
index 0000000..2925272
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala
@@ -0,0 +1,55 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
+
+import org.apache.griffin.measure.step.builder.dsl.expr._
+
+/**
+  * analyzer of expr, to help generate dq steps by expr
+  */
+trait BasicAnalyzer extends Serializable {
+
+  val expr: Expr
+
+  val seqDataSourceNames = (expr: Expr, v: Set[String]) => {
+    expr match {
+      case DataSourceHeadExpr(name) => v + name
+      case _ => v
+    }
+  }
+  val combDataSourceNames = (a: Set[String], b: Set[String]) => a ++ b
+
+  val seqSelectionExprs = (dsName: String) => (expr: Expr, v: 
Seq[SelectionExpr]) => {
+    expr match {
+      case se @ SelectionExpr(head: DataSourceHeadExpr, _, _) if (head.name == 
dsName) => v :+ se
+      case _ => v
+    }
+  }
+  val combSelectionExprs = (a: Seq[SelectionExpr], b: Seq[SelectionExpr]) => a 
++ b
+
+  val seqWithAliasExprs = (expr: Expr, v: Seq[AliasableExpr]) => {
+    expr match {
+      case se: SelectExpr => v
+      case a: AliasableExpr if (a.alias.nonEmpty) => v :+ a
+      case _ => v
+    }
+  }
+  val combWithAliasExprs = (a: Seq[AliasableExpr], b: Seq[AliasableExpr]) => a 
++ b
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
new file mode 100644
index 0000000..eab568c
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
+
+import org.apache.griffin.measure.step.builder.dsl.expr._
+
+
+case class CompletenessAnalyzer(expr: CompletenessClause, sourceName: String) 
extends BasicAnalyzer {
+
+  val seqAlias = (expr: Expr, v: Seq[String]) => {
+    expr match {
+      case apr: AliasableExpr => v ++ apr.alias
+      case _ => v
+    }
+  }
+  val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b
+
+  private val exprs = expr.exprs
+  private def genAlias(idx: Int): String = s"alias_${idx}"
+  val selectionPairs = exprs.zipWithIndex.map { pair =>
+    val (pr, idx) = pair
+    val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias)
+    (pr, res.headOption.getOrElse(genAlias(idx)))
+  }
+
+  if (selectionPairs.isEmpty) {
+    throw new Exception(s"completeness analyzer error: empty selection")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
new file mode 100644
index 0000000..efa1754
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
+
+import org.apache.griffin.measure.step.builder.dsl.expr._
+
+
+//case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: 
String, targetName: String) extends BasicAnalyzer {
+case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String) 
extends BasicAnalyzer {
+
+  val seqAlias = (expr: Expr, v: Seq[String]) => {
+    expr match {
+      case apr: AliasableExpr => v ++ apr.alias
+      case _ => v
+    }
+  }
+  val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b
+
+  private val exprs = expr.exprs
+  private def genAlias(idx: Int): String = s"alias_${idx}"
+  val selectionPairs = exprs.zipWithIndex.map { pair =>
+    val (pr, idx) = pair
+    val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias)
+    (pr, res.headOption.getOrElse(genAlias(idx)), pr.tag.isEmpty)
+  }
+
+  if (selectionPairs.isEmpty) {
+    throw new Exception(s"uniqueness analyzer error: empty selection")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
new file mode 100644
index 0000000..049e6fd
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
+
+import org.apache.griffin.measure.step.builder.dsl.expr._
+
+
+case class ProfilingAnalyzer(expr: ProfilingClause, sourceName: String) 
extends BasicAnalyzer {
+
+  val dataSourceNames = 
expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, 
combDataSourceNames)
+
+  val selectionExprs: Seq[Expr] = {
+    expr.selectClause.exprs.map(_.extractSelf).flatMap { expr =>
+      expr match {
+        case e: SelectionExpr => Some(e)
+        case e: FunctionExpr => Some(e)
+        case _ => None
+      }
+    }
+  }
+
+  val groupbyExprOpt = expr.groupbyClauseOpt
+  val preGroupbyExprs = expr.preGroupbyClauses.map(_.extractSelf)
+  val postGroupbyExprs = expr.postGroupbyClauses.map(_.extractSelf)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/TimelinessAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/TimelinessAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/TimelinessAnalyzer.scala
new file mode 100644
index 0000000..af00080
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/TimelinessAnalyzer.scala
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
+
+import org.apache.griffin.measure.step.builder.dsl.expr._
+
+
+case class TimelinessAnalyzer(expr: TimelinessClause, sourceName: String) 
extends BasicAnalyzer {
+
+//  val tsExpr = expr.desc
+
+//  val seqAlias = (expr: Expr, v: Seq[String]) => {
+//    expr match {
+//      case apr: AliasableExpr => v ++ apr.alias
+//      case _ => v
+//    }
+//  }
+//  val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b
+//
+//  private val exprs = expr.exprs.toList
+//  val selectionPairs = exprs.map { pr =>
+//    val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, 
combAlias)
+//    println(res)
+//    println(pr)
+//    (pr, res.headOption)
+//  }
+//
+//  val (tsExprPair, endTsPairOpt) = selectionPairs match {
+//    case Nil => throw new Exception(s"timeliness analyzer error: ts column 
not set")
+//    case tsPair :: Nil => (tsPair, None)
+//    case tsPair :: endTsPair :: _ => (tsPair, Some(endTsPair))
+//  }
+//
+//  def getSelAlias(pair: (Expr, Option[String]), defAlias: String): (String, 
String) = {
+//    val (pr, aliasOpt) = pair
+//    val alias = aliasOpt.getOrElse(defAlias)
+//    (pr.desc, alias)
+//  }
+
+
+  private val exprs = expr.exprs.map(_.desc).toList
+
+  val (btsExpr, etsExprOpt) = exprs match {
+    case Nil => throw new Exception(s"timeliness analyzer error: ts column not 
set")
+    case btsExpr :: Nil => (btsExpr, None)
+    case btsExpr :: etsExpr :: _ => (btsExpr, Some(etsExpr))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
new file mode 100644
index 0000000..21a1628
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.dsl.transform.analyzer
+
+import org.apache.griffin.measure.step.builder.dsl.expr.{AliasableExpr, _}
+
+
+case class UniquenessAnalyzer(expr: UniquenessClause, sourceName: String, 
targetName: String) extends BasicAnalyzer {
+
+  val seqAlias = (expr: Expr, v: Seq[String]) => {
+    expr match {
+      case apr: AliasableExpr => v ++ apr.alias
+      case _ => v
+    }
+  }
+  val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b
+
+  private val exprs = expr.exprs
+  private def genAlias(idx: Int): String = s"alias_${idx}"
+  val selectionPairs = exprs.zipWithIndex.map { pair =>
+    val (pr, idx) = pair
+    val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias)
+    (pr, res.headOption.getOrElse(genAlias(idx)))
+  }
+
+  if (selectionPairs.isEmpty) {
+    throw new Exception(s"uniqueness analyzer error: empty selection")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
new file mode 100644
index 0000000..f1543be
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
@@ -0,0 +1,72 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.preproc
+
+import org.apache.griffin.measure.configuration.params.RuleParam
+
+/**
+  * generate rule params by template defined in pre-proc param
+  */
+object PreProcRuleParamGenerator {
+
+  val _name = "name"
+
+  def getNewPreProcRules(rules: Seq[RuleParam], suffix: String): 
Seq[RuleParam] = {
+    if (rules == null) Nil else {
+      rules.map { rule =>
+        getNewPreProcRule(rule, suffix)
+      }
+    }
+  }
+
+  private def getNewPreProcRule(param: RuleParam, suffix: String): RuleParam = 
{
+    val newName = genNewString(param.getName, suffix)
+    val newRule = genNewString(param.getRule, suffix)
+    val newDetails = getNewMap(param.getDetails, suffix)
+    param.replaceName(newName).replaceRule(newRule).replaceDetails(newDetails)
+  }
+
+  private def getNewMap(details: Map[String, Any], suffix: String): 
Map[String, Any] = {
+    val keys = details.keys
+    keys.foldLeft(details) { (map, key) =>
+      map.get(key) match {
+        case Some(s: String) => map + (key -> genNewString(s, suffix))
+        case Some(subMap: Map[String, Any]) => map + (key -> getNewMap(subMap, 
suffix))
+        case Some(arr: Seq[_]) => map + (key -> getNewArr(arr, suffix))
+        case _ => map
+      }
+    }
+  }
+
+  private def getNewArr(paramArr: Seq[Any], suffix: String): Seq[Any] = {
+    paramArr.foldLeft(Nil: Seq[Any]) { (res, param) =>
+      param match {
+        case s: String => res :+ genNewString(s, suffix)
+        case map: Map[String, Any] => res :+ getNewMap(map, suffix)
+        case arr: Seq[_] => res :+ getNewArr(arr, suffix)
+        case _ => res :+ param
+      }
+    }
+  }
+
+  private def genNewString(str: String, suffix: String): String = {
+    str.replaceAll("""\$\{(.*)\}""", s"$$1_${suffix}")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
new file mode 100644
index 0000000..61b93ab
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.udf
+
+import org.apache.spark.sql.SQLContext
+
+object GriffinUDFAgent {
+  def register(sqlContext: SQLContext): Unit = {
+    GriffinUDFs.register(sqlContext)
+    GriffinUDAggFs.register(sqlContext)
+  }
+}
+
+/**
+  * user defined functions extension
+  */
+object GriffinUDFs {
+
+  def register(sqlContext: SQLContext): Unit = {
+    sqlContext.udf.register("index_of", indexOf _)
+    sqlContext.udf.register("matches", matches _)
+    sqlContext.udf.register("reg_replace", regReplace _)
+  }
+
+  private def indexOf(arr: Seq[String], v: String) = {
+    arr.indexOf(v)
+  }
+
+  private def matches(s: String, regex: String) = {
+    s.matches(regex)
+  }
+
+  private def regReplace(s: String, regex: String, replacement: String) = {
+    s.replaceAll(regex, replacement)
+  }
+
+}
+
+/**
+  * aggregation functions extension
+  */
+object GriffinUDAggFs {
+
+  def register(sqlContext: SQLContext): Unit = {
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
new file mode 100644
index 0000000..8b1df82
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.read
+
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.DQStep
+import org.apache.spark.sql._
+
+trait ReadStep extends DQStep {
+
+  val config: Map[String, Any]
+
+  val cache: Boolean
+
+  def execute(context: DQContext): Boolean = {
+    info(s"read data source [${name}]")
+    read(context) match {
+      case Some(df) => {
+//        if (needCache) context.dataFrameCache.cacheDataFrame(name, df)
+        context.runTimeTableRegister.registerTable(name, df)
+        true
+      }
+      case _ => {
+        warn(s"read data source [${name}] fails")
+        false
+      }
+    }
+  }
+
+  def read(context: DQContext): Option[DataFrame]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
new file mode 100644
index 0000000..6dae1cb
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
@@ -0,0 +1,41 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.read
+
+import org.apache.griffin.measure.context.DQContext
+import org.apache.spark.sql._
+import org.apache.griffin.measure.utils.DataFrameUtil._
+
+case class UnionReadStep(name: String,
+                         readSteps: Seq[ReadStep]
+                        ) extends ReadStep {
+
+  val config: Map[String, Any] = Map()
+  val cache: Boolean = false
+
+  def read(context: DQContext): Option[DataFrame] = {
+    val dfOpts = readSteps.map { readStep =>
+      readStep.read(context)
+    }
+    if (dfOpts.size > 0) {
+      dfOpts.reduce((a, b) => unionDfOpts(a, b))
+    } else None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
new file mode 100644
index 0000000..dc9a3f8
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
@@ -0,0 +1,134 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.transform
+
+import java.util.Date
+
+import org.apache.griffin.measure.context.ContextId
+import 
org.apache.griffin.measure.context.streaming.metric.CacheResults.CacheResult
+import org.apache.griffin.measure.context.streaming.metric._
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.spark.sql.types.{BooleanType, LongType, StructField, 
StructType}
+import org.apache.spark.sql.{Encoders, Row, SQLContext, _}
+
+/**
+  * pre-defined data frame operations
+  */
+object DataFrameOps {
+
+  final val _fromJson = "from_json"
+  final val _accuracy = "accuracy"
+  final val _clear = "clear"
+
+  object AccuracyOprKeys {
+    val _dfName = "df.name"
+    val _miss = "miss"
+    val _total = "total"
+    val _matched = "matched"
+  }
+
+  def fromJson(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = 
{
+    val _dfName = "df.name"
+    val _colName = "col.name"
+    val dfName = details.getOrElse(_dfName, "").toString
+    val colNameOpt = details.get(_colName).map(_.toString)
+
+    implicit val encoder = Encoders.STRING
+
+    val df: DataFrame = sqlContext.table(s"`${dfName}`")
+    val rdd = colNameOpt match {
+      case Some(colName: String) => df.map(r => r.getAs[String](colName))
+      case _ => df.map(_.getAs[String](0))
+    }
+    sqlContext.read.json(rdd) // slow process
+  }
+
+  def accuracy(sqlContext: SQLContext, contextId: ContextId, details: 
Map[String, Any]): DataFrame = {
+    import AccuracyOprKeys._
+
+    val dfName = details.getStringOrKey(_dfName)
+    val miss = details.getStringOrKey(_miss)
+    val total = details.getStringOrKey(_total)
+    val matched = details.getStringOrKey(_matched)
+
+//    val _enableIgnoreCache = "enable.ignore.cache"
+//    val enableIgnoreCache = details.getBoolean(_enableIgnoreCache, false)
+
+//    val tmst = InternalColumns.tmst
+
+    val updateTime = new Date().getTime
+
+    def getLong(r: Row, k: String): Option[Long] = {
+      try {
+        Some(r.getAs[Long](k))
+      } catch {
+        case e: Throwable => None
+      }
+    }
+
+    val df = sqlContext.table(s"`${dfName}`")
+
+    val results = df.rdd.flatMap { row =>
+      try {
+        val tmst = getLong(row, 
ConstantColumns.tmst).getOrElse(contextId.timestamp)
+        val missCount = getLong(row, miss).getOrElse(0L)
+        val totalCount = getLong(row, total).getOrElse(0L)
+        val ar = AccuracyMetric(missCount, totalCount)
+        if (ar.isLegal) Some((tmst, ar)) else None
+      } catch {
+        case e: Throwable => None
+      }
+    }.collect
+
+    // cache and update results
+    val updatedResults = CacheResults.update(results.map{ pair =>
+      val (t, r) = pair
+      CacheResult(t, updateTime, r)
+    })
+
+    // generate metrics
+    val schema = StructType(Array(
+      StructField(ConstantColumns.tmst, LongType),
+      StructField(miss, LongType),
+      StructField(total, LongType),
+      StructField(matched, LongType),
+      StructField(ConstantColumns.record, BooleanType),
+      StructField(ConstantColumns.empty, BooleanType)
+    ))
+    val rows = updatedResults.map { r =>
+      val ar = r.result.asInstanceOf[AccuracyMetric]
+      Row(r.timeStamp, ar.miss, ar.total, ar.getMatch, !ar.initial, 
ar.eventual)
+    }.toArray
+    val rowRdd = sqlContext.sparkContext.parallelize(rows)
+    val retDf = sqlContext.createDataFrame(rowRdd, schema)
+
+    retDf
+  }
+
+  def clear(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = {
+    val _dfName = "df.name"
+    val dfName = details.getOrElse(_dfName, "").toString
+
+    val df = sqlContext.table(s"`${dfName}`")
+    val emptyRdd = sqlContext.sparkContext.emptyRDD[Row]
+    sqlContext.createDataFrame(emptyRdd, df.schema)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
new file mode 100644
index 0000000..e2f90f9
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -0,0 +1,52 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.transform
+
+import org.apache.griffin.measure.context.DQContext
+
+/**
+  * data frame ops transform step
+  */
+case class DataFrameOpsTransformStep(name: String,
+                                     rule: String,
+                                     details: Map[String, Any],
+                                     cache: Boolean = false
+                                    ) extends TransformStep {
+
+  def execute(context: DQContext): Boolean = {
+    val sqlContext = context.sqlContext
+    try {
+      val df = rule match {
+        case DataFrameOps._fromJson => DataFrameOps.fromJson(sqlContext, 
details)
+        case DataFrameOps._accuracy => DataFrameOps.accuracy(sqlContext, 
context.contextId, details)
+        case DataFrameOps._clear => DataFrameOps.clear(sqlContext, details)
+        case _ => throw new Exception(s"df opr [ ${rule} ] not supported")
+      }
+      if (cache) context.dataFrameCache.cacheDataFrame(name, df)
+      context.runTimeTableRegister.registerTable(name, df)
+      true
+    } catch {
+      case e: Throwable => {
+        error(s"run data frame ops [ ${rule} ] error: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
new file mode 100644
index 0000000..ead7344
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.transform
+
+import org.apache.griffin.measure.context.DQContext
+
+/**
+  * spark sql transform step
+  */
+case class SparkSqlTransformStep(name: String,
+                                 rule: String,
+                                 details: Map[String, Any],
+                                 cache: Boolean = false
+                                ) extends TransformStep {
+
+  def execute(context: DQContext): Boolean = {
+    val sqlContext = context.sqlContext
+    try {
+      val df = sqlContext.sql(rule)
+      if (cache) context.dataFrameCache.cacheDataFrame(name, df)
+      context.runTimeTableRegister.registerTable(name, df)
+      true
+    } catch {
+      case e: Throwable => {
+        error(s"run spark sql [ ${rule} ] error: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
new file mode 100644
index 0000000..995ce49
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.transform
+
+import org.apache.griffin.measure.step.DQStep
+
+trait TransformStep extends DQStep {
+
+  val rule: String
+
+  val details: Map[String, Any]
+
+  val cache: Boolean
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala
new file mode 100644
index 0000000..27dbb3c
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.write
+
+import org.apache.commons.lang.StringUtils
+import org.apache.griffin.measure.context.DQContext
+import org.apache.spark.sql.DataFrame
+
+/**
+  * update streaming data source cache
+  */
+case class DsCacheUpdateWriteStep(dsName: String,
+                                  inputName: String
+                                 ) extends WriteStep {
+
+  val name: String = ""
+  val writeTimestampOpt: Option[Long] = None
+
+  def execute(context: DQContext): Boolean = {
+    collectDsCacheUpdateDf(context) match {
+      case Some(df) => {
+        context.dataSources.find(ds => StringUtils.equals(ds.name, 
dsName)).foreach(_.updateData(df))
+      }
+      case _ => {
+        warn(s"update ${dsName} from ${inputName} fails")
+      }
+    }
+    true
+  }
+
+  private def getDataFrame(context: DQContext, name: String): 
Option[DataFrame] = {
+    try {
+      val df = context.sqlContext.table(s"`${name}`")
+      Some(df)
+    } catch {
+      case e: Throwable => {
+        error(s"get data frame ${name} fails")
+        None
+      }
+    }
+  }
+
+  private def collectDsCacheUpdateDf(context: DQContext): Option[DataFrame] = 
getDataFrame(context, inputName)
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
new file mode 100644
index 0000000..40c9b05
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.write
+
+import org.apache.griffin.measure.context.DQContext
+
+/**
+  * flush final metric map in context and write
+  */
+case class MetricFlushStep() extends WriteStep {
+
+  val name: String = ""
+  val inputName: String = ""
+  val writeTimestampOpt: Option[Long] = None
+
+  def execute(context: DQContext): Boolean = {
+    context.metricWrapper.flush.foldLeft(true) { (ret, pair) =>
+      val (t, metric) = pair
+      val pr = try {
+        context.getPersist(t).persistMetrics(metric)
+        true
+      } catch {
+        case e: Throwable => false
+      }
+      ret && pr
+    }
+  }
+
+}

Reply via email to