Repository: incubator-griffin
Updated Branches:
  refs/heads/master 2d268aaf1 -> fad7daf7e


Uniqueness enhance

add total and unique in uniqueness metric

Author: Lionel Liu <[email protected]>

Closes #187 from bhlx3lyx7/tmst.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/fad7daf7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/fad7daf7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/fad7daf7

Branch: refs/heads/master
Commit: fad7daf7e3199584d6a8f985bc115e72d52e6cc5
Parents: 2d268aa
Author: Lionel Liu <[email protected]>
Authored: Wed Jan 10 15:14:33 2018 +0800
Committer: Lionel Liu <[email protected]>
Committed: Wed Jan 10 15:14:33 2018 +0800

----------------------------------------------------------------------
 griffin-doc/measure/dsl-guide.md                |  13 +-
 .../rule/adaptor/GriffinDslAdaptor.scala        | 872 +++----------------
 .../griffin/measure/rule/dsl/DqType.scala       |   8 +-
 .../rule/dsl/analyzer/DuplicateAnalyzer.scala   |  46 -
 .../rule/dsl/analyzer/UniquenessAnalyzer.scala  |  46 +
 .../rule/dsl/expr/ClauseExpression.scala        |   4 +-
 .../rule/dsl/parser/GriffinDslParser.scala      |  10 +-
 .../resources/_duplicate-batch-griffindsl.json  |  56 --
 .../_duplicate-streaming-griffindsl.json        | 116 ---
 .../_duplicate-streaming-sparksql.json          | 130 ---
 .../resources/_uniqueness-batch-griffindsl.json |  58 ++
 .../_uniqueness-streaming-griffindsl.json       | 119 +++
 .../_uniqueness-streaming-sparksql.json         | 130 +++
 .../rule/adaptor/GriffinDslAdaptorTest.scala    |   4 +-
 14 files changed, 489 insertions(+), 1123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/griffin-doc/measure/dsl-guide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/dsl-guide.md b/griffin-doc/measure/dsl-guide.md
index fb2eeb9..0fc8059 100644
--- a/griffin-doc/measure/dsl-guide.md
+++ b/griffin-doc/measure/dsl-guide.md
@@ -121,8 +121,8 @@ Accuracy rule expression in Griffin DSL is a logical 
expression, telling the map
 Profiling rule expression in Griffin DSL is a sql-like expression, with select 
clause ahead, following optional from clause, where clause, group-by clause, 
order-by clause, limit clause in order.  
        e.g. `source.gender, source.id.count() where source.age > 20 group by 
source.gender`, `select country, max(age), min(age), count(*) as cnt from 
source group by country order by cnt desc limit 5`
 
-### Duplicate Rule
-Duplicate rule expression in Griffin DSL is a list of selection expressions 
separated by comma, indicates the duplicate columns to measure.  
+### Uniqueness Rule
+Uniqueness rule expression in Griffin DSL is a list of selection expressions 
separated by comma, indicates the columns to check if is unique.  
        e.g. `name, age`, `name, (age + 1) as next_age`
 
 ### Timeliness Rule
@@ -151,13 +151,16 @@ For example, the dsl rule is `source.cntry, 
source.id.count(), source.age.max()
 
 After the translation, the metrics will be persisted in table `profiling`.  
 
-### Duplicate
-For duplicate, or called uniqueness, is to find out the duplicate items of 
data, and rollup the items count group by duplicate times.  
+### Uniqueness
+For uniqueness, or called duplicate, is to find out the duplicate items of 
data, and rollup the items count group by duplicate times.  
 For example, the dsl rule is `name, age`, which represents the duplicate 
requests, in this case, source and target are the same data set. After the 
translation, the sql rule is as below:  
 - **get distinct items from source**: `SELECT name, age FROM source`, save as 
table `src`.  
 - **get all items from target**: `SELECT name, age FROM target`, save as table 
`tgt`.
 - **join two tables**: `SELECT src.name, src.age FROM tgt RIGHT JOIN src ON 
coalesce(src.name, '') = coalesce(tgt.name, '') AND coalesce(src.age, '') = 
coalesce(tgt.age, '')`, save as table `joined`.
-- **get duplicate items**: `SELECT name, age, (count(*) - 1) AS dup FROM 
joined GROUP BY name, age`, save as table `grouped`.
+- **get items duplication**: `SELECT name, age, (count(*) - 1) AS dup FROM 
joined GROUP BY name, age`, save as table `grouped`.
+- **get total metric**: `SELECT count(*) FROM source`, save as table 
`total_metric`.
+- **get unique record**: `SELECT * FROM grouped WHERE dup = 0`, save as table 
`unique_record`.
+- **get unique metric**: `SELECT count(*) FROM unique_record`, save as table 
`unique_metric`.
 - **get duplicate record**: `SELECT * FROM grouped WHERE dup > 0`, save as 
table `dup_record`.
 - **get duplicate metric**: `SELECT dup, count(*) AS num FROM dup_records 
GROUP BY dup`, save as table `dup_metric`.
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
index a02335a..98545d8 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
@@ -43,11 +43,14 @@ object ProfilingKeys {
   val _source = "source"
 }
 
-object DuplicateKeys {
+object UniquenessKeys {
   val _source = "source"
   val _target = "target"
+  val _unique = "unique"
+  val _total = "total"
   val _dup = "dup"
   val _num = "num"
+  val _duplicationArray = "duplication.array"
 }
 
 object TimelinessKeys {
@@ -58,7 +61,6 @@ object TimelinessKeys {
 
 object GlobalKeys {
   val _initRule = "init.rule"
-//  val _globalMetricKeep = "global.metric.keep"
 }
 
 case class GriffinDslAdaptor(dataSourceNames: Seq[String],
@@ -87,7 +89,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         dqType match {
           case AccuracyType => accuracyRulePlan(timeInfo, name, expr, param, 
processType)
           case ProfilingType => profilingRulePlan(timeInfo, name, expr, param, 
processType)
-          case DuplicateType => duplicateRulePlan(timeInfo, name, expr, param, 
processType)
+          case UniquenessType => uniquenessRulePlan(timeInfo, name, expr, 
param, processType)
           case TimelinessType => timelinessRulePlan(timeInfo, name, expr, 
param, processType)
           case _ => emptyRulePlan
         }
@@ -169,7 +171,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
              |SELECT `${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
              |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
              |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
-             |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}`
+             |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
          """.stripMargin
         }
         case StreamingProcessType => {
@@ -178,7 +180,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
              |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
              |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
              |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
-             |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}`
+             |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
              |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = 
`${missCountTableName}`.`${InternalColumns.tmst}`
          """.stripMargin
         }
@@ -245,187 +247,6 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
     }
   }
 
-//  private def accuracyRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-//                               param: Map[String, Any], processType: 
ProcessType
-//                              ): RulePlan = {
-//    val details = getDetails(param)
-//    val sourceName = details.getString(AccuracyKeys._source, 
dataSourceNames.head)
-//    val targetName = details.getString(AccuracyKeys._target, 
dataSourceNames.tail.head)
-//    val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], 
sourceName, targetName)
-//
-//    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-//      emptyRulePlan
-//    } else {
-//      // 1. miss record
-//      val missRecordsTableName = "__missRecords"
-//      val selClause = s"`${sourceName}`.*"
-//      val missRecordsSql = if 
(!TableRegisters.existRunTempTable(timeInfo.key, targetName)) {
-//        s"SELECT ${selClause} FROM `${sourceName}`"
-//      } else {
-//        val onClause = expr.coalesceDesc
-//        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
-//          s"${sel.desc} IS NULL"
-//        }.mkString(" AND ")
-//        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
-//          s"${sel.desc} IS NULL"
-//        }.mkString(" AND ")
-//        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
-//        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` 
ON ${onClause} WHERE ${whereClause}"
-//      }
-//      val missRecordsStep = SparkSqlStep(missRecordsTableName, 
missRecordsSql, emptyMap, true)
-//      val missRecordsExports = processType match {
-//        case BatchProcessType => {
-//          val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-//          genRecordExport(recordParam, missRecordsTableName, 
missRecordsTableName) :: Nil
-//        }
-//        case StreamingProcessType => Nil
-//      }
-//
-//      // 2. miss count
-//      val missCountTableName = "__missCount"
-//      val missColName = details.getStringOrKey(AccuracyKeys._miss)
-//      val missCountSql = processType match {
-//        case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM 
`${missRecordsTableName}`"
-//        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY 
`${InternalColumns.tmst}`"
-//      }
-//      val missCountStep = SparkSqlStep(missCountTableName, missCountSql, 
emptyMap)
-//
-//      // 3. total count
-//      val totalCountTableName = "__totalCount"
-//      val totalColName = details.getStringOrKey(AccuracyKeys._total)
-//      val totalCountSql = processType match {
-//        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
-//        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY 
`${InternalColumns.tmst}`"
-//      }
-//      val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, 
emptyMap)
-//
-//      // 4. accuracy metric
-//      val accuracyTableName = name
-//      val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
-//      val accuracyMetricSql = processType match {
-//        case BatchProcessType => {
-//          s"""
-//             |SELECT `${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
-//             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
-//             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
-//             |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}`
-//         """.stripMargin
-//        }
-//        case StreamingProcessType => {
-//          s"""
-//             |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS 
`${InternalColumns.tmst}`,
-//             |`${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
-//             |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
-//             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
-//             |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}`
-//             |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = 
`${missCountTableName}`.`${InternalColumns.tmst}`
-//         """.stripMargin
-//        }
-//      }
-//      val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, 
emptyMap, true)
-//      val accuracyExports = processType match {
-//        case BatchProcessType => {
-//          val metricParam = 
RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-//          genMetricExport(metricParam, accuracyTableName, accuracyTableName) 
:: Nil
-//        }
-//        case StreamingProcessType => Nil
-//      }
-//
-//      // current accu plan
-//      val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: 
accuracyStep :: Nil
-//      val accuExports = missRecordsExports ++ accuracyExports
-//      val accuPlan = RulePlan(accuSteps, accuExports)
-//
-//      // streaming extra accu plan
-//      val streamingAccuPlan = processType match {
-//        case BatchProcessType => emptyRulePlan
-//        case StreamingProcessType => {
-//          // 5. global accuracy metric merge
-//          val globalAccuracyTableName = "__globalAccuracy"
-//          val globalAccuracySql = {
-//            s"""
-//               |SELECT 
coalesce(`${globalAccuracyTableName}`.`${InternalColumns.tmst}`, 
`${accuracyTableName}`.`${InternalColumns.tmst}`) AS `${InternalColumns.tmst}`,
-//               |coalesce(`${accuracyTableName}`.`${missColName}`, 
`${globalAccuracyTableName}`.`${missColName}`) AS `${missColName}`,
-//               |coalesce(`${globalAccuracyTableName}`.`${totalColName}`, 
`${accuracyTableName}`.`${totalColName}`) AS `${totalColName}`,
-//               |((`${accuracyTableName}`.`${missColName}` IS NOT NULL) AND 
((`${globalAccuracyTableName}`.`${missColName}` IS NULL) OR 
(`${accuracyTableName}`.`${missColName}` < 
`${globalAccuracyTableName}`.`${missColName}`))) AS `${InternalColumns.metric}`
-//               |FROM `${globalAccuracyTableName}` FULL JOIN 
`${accuracyTableName}`
-//               |ON `${globalAccuracyTableName}`.`${InternalColumns.tmst}` = 
`${accuracyTableName}`.`${InternalColumns.tmst}`
-//            """.stripMargin
-//          }
-//          val globalAccuracyInitSql = {
-//            s"""
-//               |SELECT `${InternalColumns.tmst}`, `${totalColName}`, 
`${missColName}`,
-//               |(true) AS `${InternalColumns.metric}`
-//               |FROM `${accuracyTableName}`
-//             """.stripMargin
-//          }
-//          val globalAccuracyDetails = Map[String, Any](GlobalKeys._initRule 
-> globalAccuracyInitSql)
-//          val globalAccuracyStep = SparkSqlStep(globalAccuracyTableName,
-//            globalAccuracySql, globalAccuracyDetails, true, true)
-//
-//          // 6. collect accuracy metrics
-//          val accuracyMetricTableName = name
-//          val accuracyMetricSql = {
-//            s"""
-//               |SELECT `${InternalColumns.tmst}`, `${totalColName}`, 
`${missColName}`,
-//               |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
-//               |FROM `${globalAccuracyTableName}` WHERE 
`${InternalColumns.metric}`
-//             """.stripMargin
-//          }
-//          val accuracyMetricStep = SparkSqlStep(accuracyMetricTableName, 
accuracyMetricSql, emptyMap)
-//          val metricParam = 
RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-//          val accuracyMetricExports = genMetricExport(metricParam, 
accuracyMetricTableName, accuracyMetricTableName) :: Nil
-//
-//          // 7. collect accuracy records
-//          val accuracyRecordTableName = "__accuracyRecords"
-//          val accuracyRecordSql = {
-//            s"""
-//               |SELECT `${InternalColumns.tmst}`
-//               |FROM `${accuracyMetricTableName}` WHERE `${matchedColName}` 
> 0
-//             """.stripMargin
-//          }
-//          val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, 
accuracyRecordSql, emptyMap)
-//          val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-//          val accuracyRecordParam = 
recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName)
-//            .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName)
-//          val accuracyRecordExports = genRecordExport(
-//            accuracyRecordParam, missRecordsTableName, 
accuracyRecordTableName) :: Nil
-//
-//          // 8. update global accuracy metric
-//          val updateGlobalAccuracyTableName = globalAccuracyTableName
-//          val globalMetricKeepTime = 
details.getString(GlobalKeys._globalMetricKeep, "")
-//          val updateGlobalAccuracySql = 
TimeUtil.milliseconds(globalMetricKeepTime) match {
-//            case Some(kt) => {
-//              s"""
-//                 |SELECT * FROM `${globalAccuracyTableName}`
-//                 |WHERE (`${missColName}` > 0) AND 
(`${InternalColumns.tmst}` > ${timeInfo.calcTime - kt})
-//               """.stripMargin
-//            }
-//            case _ => {
-//              s"""
-//                 |SELECT * FROM `${globalAccuracyTableName}`
-//                 |WHERE (`${missColName}` > 0)
-//               """.stripMargin
-//            }
-//          }
-//          val updateGlobalAccuracyStep = 
SparkSqlStep(updateGlobalAccuracyTableName,
-//            updateGlobalAccuracySql, emptyMap, true, true)
-//
-//          // gen accu plan
-//          val extraSteps = globalAccuracyStep :: accuracyMetricStep :: 
accuracyRecordStep :: updateGlobalAccuracyStep :: Nil
-//          val extraExports = accuracyMetricExports ++ accuracyRecordExports
-//          val extraPlan = RulePlan(extraSteps, extraExports)
-//
-//          extraPlan
-//        }
-//      }
-//
-//      // return accu plan
-//      accuPlan.merge(streamingAccuPlan)
-//
-//    }
-//  }
-
   private def profilingRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
                                 param: Map[String, Any], processType: 
ProcessType
                                ): RulePlan = {
@@ -481,13 +302,13 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
     }
   }
 
-  private def duplicateRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                                param: Map[String, Any], processType: 
ProcessType
-                               ): RulePlan = {
+  private def uniquenessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
+                                 param: Map[String, Any], processType: 
ProcessType
+                                ): RulePlan = {
     val details = getDetails(param)
-    val sourceName = details.getString(DuplicateKeys._source, 
dataSourceNames.head)
-    val targetName = details.getString(DuplicateKeys._target, 
dataSourceNames.tail.head)
-    val analyzer = DuplicateAnalyzer(expr.asInstanceOf[DuplicateClause], 
sourceName, targetName)
+    val sourceName = details.getString(UniquenessKeys._source, 
dataSourceNames.head)
+    val targetName = details.getString(UniquenessKeys._target, 
dataSourceNames.tail.head)
+    val analyzer = UniquenessAnalyzer(expr.asInstanceOf[UniquenessClause], 
sourceName, targetName)
 
     if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
       println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists")
@@ -539,49 +360,117 @@ case class GriffinDslAdaptor(dataSourceNames: 
Seq[String],
       val groupSelClause = selAliases.map { alias =>
         s"`${alias}`"
       }.mkString(", ")
-      val dupColName = details.getStringOrKey(DuplicateKeys._dup)
+      val dupColName = details.getStringOrKey(UniquenessKeys._dup)
       val groupSql = {
         s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM 
`${joinedTableName}` GROUP BY ${groupSelClause}"
       }
-      val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap)
-
-      // 5. duplicate record
-      val dupRecordTableName = "__dupRecords"
-      val dupRecordSql = {
-        s"""
-           |SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0
-         """.stripMargin
-      }
-      val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
-      val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-      val dupRecordxports = genRecordExport(recordParam, dupRecordTableName, 
dupRecordTableName) :: Nil
+      val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap, true)
 
-      // 6. duplicate metric
-      val dupMetricTableName = name
-      val numColName = details.getStringOrKey(DuplicateKeys._num)
-      val dupMetricSelClause = processType match {
-        case BatchProcessType => s"`${dupColName}`, COUNT(*) AS 
`${numColName}`"
-        case StreamingProcessType => s"`${InternalColumns.tmst}`, 
`${dupColName}`, COUNT(*) AS `${numColName}`"
+      // 5. total metric
+      val totalTableName = "__totalMetric"
+      val totalColName = details.getStringOrKey(UniquenessKeys._total)
+      val totalSql = processType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}`
+             |FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`
+           """.stripMargin
+        }
       }
-      val dupMetricGroupbyClause = processType match {
-        case BatchProcessType => s"`${dupColName}`"
-        case StreamingProcessType => s"`${InternalColumns.tmst}`, 
`${dupColName}`"
+      val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap)
+      val totalMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
+      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName)
+
+      // 6. unique record
+      val uniqueRecordTableName = "__uniqueRecord"
+      val uniqueRecordSql = {
+        s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0"
       }
-      val dupMetricSql = {
-        s"""
-           |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}`
-           |GROUP BY ${dupMetricGroupbyClause}
-         """.stripMargin
+      val uniqueRecordStep = SparkSqlStep(uniqueRecordTableName, 
uniqueRecordSql, emptyMap)
+
+      // 7. unique metric
+      val uniqueTableName = "__uniqueMetric"
+      val uniqueColName = details.getStringOrKey(UniquenessKeys._unique)
+      val uniqueSql = processType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM 
`${uniqueRecordTableName}`"
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${uniqueColName}`
+             |FROM `${uniqueRecordTableName}` GROUP BY 
`${InternalColumns.tmst}`
+           """.stripMargin
+        }
       }
-      val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
-      val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-        .addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-      val dupMetricExports = genMetricExport(metricParam, name, 
dupMetricTableName) :: Nil
+      val uniqueStep = SparkSqlStep(uniqueTableName, uniqueSql, emptyMap)
+      val uniqueMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
+      val uniqueMetricExport = genMetricExport(uniqueMetricParam, 
uniqueColName, uniqueTableName)
+
+      // 8. count metric
+//      val countMetricTableName = "__countMetric"
+//      val countMetricSql = processType match {
+//        case BatchProcessType => {
+//          s"""
+//             |SELECT `${totalTableName}`.`${totalColName}` AS 
`${totalColName}`,
+//             |coalesce(`${uniqueTableName}`.`${uniqueColName}`, 0) AS 
`${uniqueColName}`
+//             |FROM `${totalTableName}` LEFT JOIN `${uniqueTableName}`
+//          """.stripMargin
+//        }
+//        case StreamingProcessType => {
+//          s"""
+//             |SELECT `${totalTableName}`.`${InternalColumns.tmst}` AS 
`${InternalColumns.tmst}`,
+//             |`${totalTableName}`.`${totalColName}` AS `${totalColName}`,
+//             |coalesce(`${uniqueTableName}`.`${uniqueColName}`, 0) AS 
`${uniqueColName}`
+//             |FROM `${totalTableName}` LEFT JOIN `${uniqueTableName}`
+//             |ON `${totalTableName}`.`${InternalColumns.tmst}` = 
`${uniqueTableName}`.`${InternalColumns.tmst}`
+//          """.stripMargin
+//        }
+//      }
+//      val countMetricStep = SparkSqlStep(countMetricTableName, 
countMetricSql, emptyMap)
+//      val countMetricParam = 
RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+//        .addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
+//      val countMetricExport = genMetricExport(countMetricParam, "", 
countMetricTableName)
+
+      val uniqueSteps = sourceStep :: targetStep :: joinedStep :: groupStep ::
+        totalStep :: uniqueRecordStep :: uniqueStep :: Nil
+      val uniqueExports = totalMetricExport :: uniqueMetricExport :: Nil
+      val uniqueRulePlan = RulePlan(uniqueSteps, uniqueExports)
+
+      val duplicationArrayName = 
details.getString(UniquenessKeys._duplicationArray, "")
+      val dupRulePlan = if (duplicationArrayName.nonEmpty) {
+        // 8. duplicate record
+        val dupRecordTableName = "__dupRecords"
+        val dupRecordSql = {
+          s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0"
+        }
+        val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
+        val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+        val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, 
dupRecordTableName)
+
+        // 9. duplicate metric
+        val dupMetricTableName = "__dupMetric"
+        val numColName = details.getStringOrKey(UniquenessKeys._num)
+        val dupMetricSelClause = processType match {
+          case BatchProcessType => s"`${dupColName}`, COUNT(*) AS 
`${numColName}`"
+          case StreamingProcessType => s"`${InternalColumns.tmst}`, 
`${dupColName}`, COUNT(*) AS `${numColName}`"
+        }
+        val dupMetricGroupbyClause = processType match {
+          case BatchProcessType => s"`${dupColName}`"
+          case StreamingProcessType => s"`${InternalColumns.tmst}`, 
`${dupColName}`"
+        }
+        val dupMetricSql = {
+          s"""
+             |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}`
+             |GROUP BY ${dupMetricGroupbyClause}
+          """.stripMargin
+        }
+        val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
+        val dupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName)
 
-      val dupSteps = sourceStep :: targetStep :: joinedStep :: groupStep :: 
dupRecordStep :: dupMetricStep :: Nil
-      val dupExports = dupRecordxports ++ dupMetricExports
+        RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: 
dupMetricExport :: Nil)
+      } else emptyRulePlan
 
-      RulePlan(dupSteps, dupExports)
+      uniqueRulePlan.merge(dupRulePlan)
     }
   }
 
@@ -681,535 +570,4 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
     }
   }
 
-  //  override def genRuleInfos(param: Map[String, Any], timeInfo: TimeInfo): 
Seq[RuleInfo] = {
-//    val ruleInfo = RuleInfoGen(param)
-//    val dqType = RuleInfoGen.dqType(param)
-//    try {
-//      val result = parser.parseRule(ruleInfo.rule, dqType)
-//      if (result.successful) {
-//        val expr = result.get
-//        dqType match {
-//          case AccuracyType => accuracyRuleInfos(ruleInfo, expr, timeInfo)
-//          case ProfilingType => profilingRuleInfos(ruleInfo, expr, timeInfo)
-//          case TimelinessType => Nil
-//          case _ => Nil
-//        }
-//      } else {
-//        warn(s"parse rule [ ${ruleInfo.rule} ] fails: \n${result}")
-//        Nil
-//      }
-//    } catch {
-//      case e: Throwable => {
-//        error(s"generate rule info ${ruleInfo} fails: ${e.getMessage}")
-//        Nil
-//      }
-//    }
-//  }
-
-  // group by version
-//  private def accuracyRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: 
TimeInfo): Seq[RuleInfo] = {
-//    val calcTime = timeInfo.calcTime
-//    val details = ruleInfo.details
-//    val sourceName = details.getString(AccuracyKeys._source, 
dataSourceNames.head)
-//    val targetName = details.getString(AccuracyKeys._target, 
dataSourceNames.tail.head)
-//    val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], 
sourceName, targetName)
-//
-//    if (!TempTables.existTable(timeInfo.key, sourceName)) {
-//      Nil
-//    } else {
-//      // 1. miss record
-//      val missRecordsSql = if (!TempTables.existTable(timeInfo.key, 
targetName)) {
-//        val selClause = s"`${sourceName}`.*"
-//        s"SELECT ${selClause} FROM `${sourceName}`"
-//      } else {
-//        val selClause = s"`${sourceName}`.*"
-//        val onClause = expr.coalesceDesc
-//        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
-//          s"${sel.desc} IS NULL"
-//        }.mkString(" AND ")
-//        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
-//          s"${sel.desc} IS NULL"
-//        }.mkString(" AND ")
-//        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
-//        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` 
ON ${onClause} WHERE ${whereClause}"
-//      }
-//      val missRecordsName = AccuracyKeys._missRecords
-//      //      val tmstMissRecordsName = TempName.tmstName(missRecordsName, 
timeInfo)
-//      val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords)
-//        .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc)
-//        .addIfNotExist(RuleDetailKeys._persistName, missRecordsName)
-//      val missRecordsRuleInfo = RuleInfo(missRecordsName, None, SparkSqlType,
-//        missRecordsSql, missRecordsParams, true)
-//      //      val missRecordsStep = SparkSqlStep(
-//      //        timeInfo,
-//      //        RuleInfo(missRecordsName, Some(tmstMissRecordsName), 
missRecordsSql, missRecordsParams)
-//      //      )
-//
-//      // 2. miss count
-//      val missTableName = "_miss_"
-//      //      val tmstMissTableName = TempName.tmstName(missTableName, 
timeInfo)
-//      val missColName = details.getStringOrKey(AccuracyKeys._miss)
-//      val missSql = {
-//        s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` 
FROM `${missRecordsName}` GROUP BY `${InternalColumns.tmst}`"
-//      }
-//      val missRuleInfo = RuleInfo(missTableName, None, SparkSqlType,
-//        missSql, Map[String, Any](), true)
-//      //      val missStep = SparkSqlStep(
-//      //        timeInfo,
-//      //        RuleInfo(missTableName, None, missSql, Map[String, Any]())
-//      //      )
-//
-//      // 3. total count
-//      val totalTableName = "_total_"
-//      //      val tmstTotalTableName = TempName.tmstName(totalTableName, 
timeInfo)
-//      val totalColName = details.getStringOrKey(AccuracyKeys._total)
-//      val totalSql = {
-//        s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` 
FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`"
-//      }
-//      val totalRuleInfo = RuleInfo(totalTableName, None, SparkSqlType,
-//        totalSql, Map[String, Any](), true)
-//      //      val totalStep = SparkSqlStep(
-//      //        timeInfo,
-//      //        RuleInfo(totalTableName, None, totalSql, Map[String, Any]())
-//      //      )
-//
-//      // 4. accuracy metric
-//      val accuracyMetricName = 
details.getString(RuleDetailKeys._persistName, ruleInfo.name)
-//      //      val tmstAccuracyMetricName = 
TempName.tmstName(accuracyMetricName, timeInfo)
-//      val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
-//      val accuracyMetricSql = {
-//        s"""
-//           |SELECT `${totalTableName}`.`${InternalColumns.tmst}` AS 
`${InternalColumns.tmst}`,
-//           |`${missTableName}`.`${missColName}` AS `${missColName}`,
-//           |`${totalTableName}`.`${totalColName}` AS `${totalColName}`,
-//           |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
-//           |FROM `${totalTableName}` FULL JOIN `${missTableName}`
-//           |ON `${totalTableName}`.`${InternalColumns.tmst}` = 
`${missTableName}`.`${InternalColumns.tmst}`
-//         """.stripMargin
-//      }
-//      //      val accuracyParams = 
details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
-////      val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, 
SparkSqlType,
-////        accuracyMetricSql, Map[String, Any](), true)
-//      val accuracyParams = 
details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
-//        .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
-//      val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, 
SparkSqlType,
-//        accuracyMetricSql, Map[String, Any](), true)
-//
-//      // 5. accuracy metric merge
-//      val globalMetricName = "accu_global"
-//      val globalAccuSql = if (TempTables.existGlobalTable(globalMetricName)) 
{
-//        s"""
-//           |SELECT coalesce(`${globalMetricName}`.`${InternalColumns.tmst}`, 
`${accuracyMetricName}`.`${InternalColumns.tmst}`) AS `${InternalColumns.tmst}`,
-//           |coalesce(`${accuracyMetricName}`.`${missColName}`, 
`${globalMetricName}`.`${missColName}`) AS `${missColName}`,
-//           |coalesce(`${globalMetricName}`.`${totalColName}`, 
`${accuracyMetricName}`.`${totalColName}`) AS `${totalColName}`,
-//           |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`,
-//           |(`${totalColName}` = 0) AS `empty`,
-//           |(`${missColName}` = 0) AS `no_miss`,
-//           |(`${accuracyMetricName}`.`${missColName}` < 
`${globalMetricName}`.`${missColName}`) AS `update`
-//           |FROM `${globalMetricName}` FULL JOIN `${accuracyMetricName}`
-//           |ON `${globalMetricName}`.`${InternalColumns.tmst}` = 
`${accuracyMetricName}`.`${InternalColumns.tmst}`
-//         """.stripMargin
-//      } else {
-//        s"""
-//           |SELECT `${accuracyMetricName}`.`${InternalColumns.tmst}` AS 
`${InternalColumns.tmst}`,
-//           |`${accuracyMetricName}`.`${missColName}` AS `${missColName}`,
-//           |`${accuracyMetricName}`.`${totalColName}` AS `${totalColName}`,
-//           |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`,
-//           |(`${totalColName}` = 0) AS `empty`,
-//           |(`${missColName}` = 0) AS `no_miss`,
-//           |true AS `update`
-//           |FROM `${accuracyMetricName}`
-//         """.stripMargin
-//      }
-//      val globalAccuParams = Map[String, Any](
-//        ("global" -> true)
-//      )
-//      val mergeRuleInfo = RuleInfo(globalMetricName, None, SparkSqlType,
-//        globalAccuSql, globalAccuParams, true)
-//
-//      // 6. persist metrics
-//      val persistMetricName = "persist"
-//      val persistSql = {
-//        s"""
-//           |SELECT `${InternalColumns.tmst}`, `${missColName}`, 
`${totalColName}`, `${matchedColName}`
-//           |FROM `${globalMetricName}`
-//           |WHERE `update`
-//         """.stripMargin
-//      }
-//      val persistParams = details.addIfNotExist(RuleDetailKeys._persistType, 
MetricPersistType.desc)
-//        .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
-//      val persistRuleInfo = RuleInfo(persistMetricName, None, SparkSqlType,
-//        persistSql, persistParams, true)
-//
-//      // 5. accuracy metric filter
-////      val accuracyParams = details.addIfNotExist("df.name", 
accuracyMetricName)
-////        .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
-////        .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
-////      val accuracyRuleInfo = RuleInfo(accuracyMetricName, None, DfOprType,
-////        "accuracy", accuracyParams, true)
-//
-////      missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo ::
-////        accuracyMetricRuleInfo :: accuracyRuleInfo :: Nil
-//      missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo ::
-//        accuracyMetricRuleInfo :: mergeRuleInfo :: persistRuleInfo :: Nil
-//    }
-//  }
-
-//  private def accuracyRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: 
TimeInfo): Seq[RuleInfo] = {
-//    val calcTime = timeInfo.calcTime
-//    val details = ruleInfo.details
-//    val sourceName = details.getString(AccuracyKeys._source, 
dataSourceNames.head)
-//    val targetName = details.getString(AccuracyKeys._target, 
dataSourceNames.tail.head)
-//    val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], 
sourceName, targetName)
-//
-//    if (!TempTables.existTable(timeInfo.key, sourceName)) {
-//      Nil
-//    } else {
-//      // 1. miss record
-//      val missRecordsSql = if (!TempTables.existTable(timeInfo.key, 
targetName)) {
-//        val selClause = s"`${sourceName}`.*"
-//        s"SELECT ${selClause} FROM `${sourceName}`"
-//      } else {
-//        val selClause = s"`${sourceName}`.*"
-//        val onClause = expr.coalesceDesc
-//        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
-//          s"${sel.desc} IS NULL"
-//        }.mkString(" AND ")
-//        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
-//          s"${sel.desc} IS NULL"
-//        }.mkString(" AND ")
-//        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
-//        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` 
ON ${onClause} WHERE ${whereClause}"
-//      }
-//      val missRecordsName = AccuracyKeys._missRecords
-////      val tmstMissRecordsName = TempName.tmstName(missRecordsName, 
timeInfo)
-//      val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords)
-//        .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc)
-//        .addIfNotExist(RuleDetailKeys._persistName, missRecordsName)
-//      val missRecordsRuleInfo = RuleInfo(missRecordsName, None, SparkSqlType,
-//        missRecordsSql, missRecordsParams, true)
-////      val missRecordsStep = SparkSqlStep(
-////        timeInfo,
-////        RuleInfo(missRecordsName, Some(tmstMissRecordsName), 
missRecordsSql, missRecordsParams)
-////      )
-//
-//      // 2. miss count
-//      val missTableName = "_miss_"
-//      //      val tmstMissTableName = TempName.tmstName(missTableName, 
timeInfo)
-//      val missColName = details.getStringOrKey(AccuracyKeys._miss)
-//      val missSql = {
-//        s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsName}`"
-//      }
-//      val missRuleInfo = RuleInfo(missTableName, None, SparkSqlType,
-//        missSql, Map[String, Any](), false)
-////      val missStep = SparkSqlStep(
-////        timeInfo,
-////        RuleInfo(missTableName, None, missSql, Map[String, Any]())
-////      )
-//
-//      // 3. total count
-//      val totalTableName = "_total_"
-//      //      val tmstTotalTableName = TempName.tmstName(totalTableName, 
timeInfo)
-//      val totalColName = details.getStringOrKey(AccuracyKeys._total)
-//      val totalSql = {
-//        s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
-//      }
-//      val totalRuleInfo = RuleInfo(totalTableName, None, SparkSqlType,
-//        totalSql, Map[String, Any](), false)
-////      val totalStep = SparkSqlStep(
-////        timeInfo,
-////        RuleInfo(totalTableName, None, totalSql, Map[String, Any]())
-////      )
-//
-//      // 4. accuracy metric
-//      val accuracyMetricName = 
details.getString(RuleDetailKeys._persistName, ruleInfo.name)
-////      val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, 
timeInfo)
-//      val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
-//      val accuracyMetricSql = {
-//        s"""
-//           |SELECT `${missTableName}`.`${missColName}` AS `${missColName}`,
-//           |`${totalTableName}`.`${totalColName}` AS `${totalColName}`
-//           |FROM `${totalTableName}` FULL JOIN `${missTableName}`
-//         """.stripMargin
-//      }
-//      //      val accuracyParams = 
details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
-//      val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, 
SparkSqlType,
-//        accuracyMetricSql, Map[String, Any](), false)
-////      val accuracyMetricStep = SparkSqlStep(
-////        timeInfo,
-////        RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), 
accuracyMetricSql, Map[String, Any]())
-////      )
-//
-//      // 5. accuracy metric filter
-//      val accuracyParams = details.addIfNotExist("df.name", 
accuracyMetricName)
-//        .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
-//        .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
-//      val accuracyRuleInfo = RuleInfo(accuracyMetricName, None, DfOprType,
-//        "accuracy", accuracyParams, false)
-////      val accuracyStep = DfOprStep(
-////        timeInfo,
-////        RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), 
"accuracy", accuracyParams)
-////      )
-//
-//      missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo ::
-//        accuracyMetricRuleInfo :: accuracyRuleInfo :: Nil
-//    }
-//  }
-
-//  private def profilingRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: 
TimeInfo): Seq[RuleInfo] = {
-//    val details = ruleInfo.details
-//    val profilingClause = expr.asInstanceOf[ProfilingClause]
-//    val sourceName = profilingClause.fromClauseOpt match {
-//      case Some(fc) => fc.dataSource
-//      case _ => details.getString(ProfilingKeys._source, 
dataSourceNames.head)
-//    }
-//    val fromClause = 
profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
-//
-//    if (!TempTables.existTable(timeInfo.key, sourceName)) {
-//      Nil
-//    } else {
-//      val tmstAnalyzer = ProfilingAnalyzer(profilingClause, sourceName)
-//
-//      val selExprDescs = tmstAnalyzer.selectionExprs.map { sel =>
-//        val alias = sel match {
-//          case s: AliasableExpr if (s.alias.nonEmpty) => s" AS 
`${s.alias.get}`"
-//          case _ => ""
-//        }
-//        s"${sel.desc}${alias}"
-//      }
-//      val selCondition = 
profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString
-//      val selClause = selExprDescs.mkString(", ")
-////      val tmstFromClause = 
profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
-//      val groupByClauseOpt = tmstAnalyzer.groupbyExprOpt
-//      val groupbyClause = groupByClauseOpt.map(_.desc).getOrElse("")
-//      val preGroupbyClause = 
tmstAnalyzer.preGroupbyExprs.map(_.desc).mkString(" ")
-//      val postGroupbyClause = 
tmstAnalyzer.postGroupbyExprs.map(_.desc).mkString(" ")
-//
-//      // 1. select statement
-//      val profilingSql = {
-//        s"SELECT ${selCondition} ${selClause} ${fromClause} 
${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
-//      }
-//      //      println(profilingSql)
-//      val metricName = details.getString(RuleDetailKeys._persistName, 
ruleInfo.name)
-//      //      val tmstMetricName = TempName.tmstName(metricName, timeInfo)
-//      val profilingParams = 
details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
-//        .addIfNotExist(RuleDetailKeys._persistName, metricName)
-//      val profilingRuleInfo = ruleInfo.setDslType(SparkSqlType)
-//        .setRule(profilingSql).setDetails(profilingParams)
-////      val profilingStep = SparkSqlStep(
-////        timeInfo,
-////        ruleInfo.setRule(profilingSql).setDetails(profilingParams)
-////      )
-//
-//      //      filterStep :: profilingStep :: Nil
-//      profilingRuleInfo :: Nil
-//    }
-//  }
-
-//  def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): 
Seq[RuleStep] = {
-//    val ruleInfo = RuleInfoGen(param, timeInfo)
-//    val dqType = RuleInfoGen.dqType(param)
-//    GriffinDslStep(timeInfo, ruleInfo, dqType) :: Nil
-//  }
-//
-//  def adaptConcreteRuleStep(ruleStep: RuleStep
-//                           ): Seq[ConcreteRuleStep] = {
-//    ruleStep match {
-//      case rs @ GriffinDslStep(_, ri, dqType) => {
-//        try {
-//          val result = parser.parseRule(ri.rule, dqType)
-//          if (result.successful) {
-//            val expr = result.get
-//            transConcreteRuleStep(rs, expr)
-//          } else {
-//            println(result)
-//            warn(s"adapt concrete rule step warn: parse rule [ ${ri.rule} ] 
fails")
-//            Nil
-//          }
-//        } catch {
-//          case e: Throwable => {
-//            error(s"adapt concrete rule step error: ${e.getMessage}")
-//            Nil
-//          }
-//        }
-//      }
-//      case _ => Nil
-//    }
-//  }
-//
-//  private def transConcreteRuleStep(ruleStep: GriffinDslStep, expr: Expr
-//                                   ): Seq[ConcreteRuleStep] = {
-//    ruleStep.dqType match {
-//      case AccuracyType => transAccuracyRuleStep(ruleStep, expr)
-//      case ProfilingType => transProfilingRuleStep(ruleStep, expr)
-//      case TimelinessType => Nil
-//      case _ => Nil
-//    }
-//  }
-
-//  private def transAccuracyRuleStep(ruleStep: GriffinDslStep, expr: Expr
-//                                   ): Seq[ConcreteRuleStep] = {
-//    val timeInfo = ruleStep.timeInfo
-//    val ruleInfo = ruleStep.ruleInfo
-//    val calcTime = timeInfo.calcTime
-//    val tmst = timeInfo.tmst
-//
-//    val details = ruleInfo.details
-//    val sourceName = details.getString(AccuracyKeys._source, 
dataSourceNames.head)
-//    val targetName = details.getString(AccuracyKeys._target, 
dataSourceNames.tail.head)
-//    val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], 
sourceName, targetName)
-//
-//    if (!TempTables.existTable(key(calcTime), sourceName)) {
-//      Nil
-//    } else {
-//      // 1. miss record
-//      val missRecordsSql = if (!TempTables.existTable(key(calcTime), 
targetName)) {
-//        val selClause = s"`${sourceName}`.*"
-//        s"SELECT ${selClause} FROM `${sourceName}`"
-//      } else {
-//        val selClause = s"`${sourceName}`.*"
-//        val onClause = expr.coalesceDesc
-//        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
-//          s"${sel.desc} IS NULL"
-//        }.mkString(" AND ")
-//        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
-//          s"${sel.desc} IS NULL"
-//        }.mkString(" AND ")
-//        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
-//        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` 
ON ${onClause} WHERE ${whereClause}"
-//      }
-//      val missRecordsName = AccuracyKeys._missRecords
-//      val tmstMissRecordsName = TempName.tmstName(missRecordsName, timeInfo)
-//      val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords)
-//        .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc)
-//        .addIfNotExist(RuleDetailKeys._persistName, missRecordsName)
-//      val missRecordsStep = SparkSqlStep(
-//        timeInfo,
-//        RuleInfo(missRecordsName, Some(tmstMissRecordsName), missRecordsSql, 
missRecordsParams)
-//      )
-//
-//      // 2. miss count
-//      val missTableName = "_miss_"
-////      val tmstMissTableName = TempName.tmstName(missTableName, timeInfo)
-//      val missColName = details.getStringOrKey(AccuracyKeys._miss)
-//      val missSql = {
-//        s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsName}`"
-//      }
-//      val missStep = SparkSqlStep(
-//        timeInfo,
-//        RuleInfo(missTableName, None, missSql, Map[String, Any]())
-//      )
-//
-//      // 3. total count
-//      val totalTableName = "_total_"
-////      val tmstTotalTableName = TempName.tmstName(totalTableName, timeInfo)
-//      val totalColName = details.getStringOrKey(AccuracyKeys._total)
-//      val totalSql = {
-//        s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
-//      }
-//      val totalStep = SparkSqlStep(
-//        timeInfo,
-//        RuleInfo(totalTableName, None, totalSql, Map[String, Any]())
-//      )
-//
-//      // 4. accuracy metric
-//      val accuracyMetricName = 
details.getString(RuleDetailKeys._persistName, ruleStep.name)
-//      val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, 
timeInfo)
-//      val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
-//      val accuracyMetricSql = {
-//        s"""
-//           |SELECT `${missTableName}`.`${missColName}` AS `${missColName}`,
-//           |`${totalTableName}`.`${totalColName}` AS `${totalColName}`
-//           |FROM `${totalTableName}` FULL JOIN `${missTableName}`
-//         """.stripMargin
-//      }
-////      val accuracyParams = 
details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
-//      val accuracyMetricStep = SparkSqlStep(
-//        timeInfo,
-//        RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), 
accuracyMetricSql, Map[String, Any]())
-//      )
-//
-//      // 5. accuracy metric filter
-//      val accuracyParams = details.addIfNotExist("df.name", 
accuracyMetricName)
-//        .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
-//        .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName)
-//      val accuracyStep = DfOprStep(
-//        timeInfo,
-//        RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), 
"accuracy", accuracyParams)
-//      )
-//
-//      missRecordsStep :: missStep :: totalStep :: accuracyMetricStep :: 
accuracyStep :: Nil
-//    }
-//  }
-
-//  private def transProfilingRuleStep(ruleStep: GriffinDslStep, expr: Expr
-//                                    ): Seq[ConcreteRuleStep] = {
-//    val calcTime = ruleStep.timeInfo.calcTime
-//    val details = ruleStep.ruleInfo.details
-//    val profilingClause = expr.asInstanceOf[ProfilingClause]
-//    val sourceName = profilingClause.fromClauseOpt match {
-//      case Some(fc) => fc.dataSource
-//      case _ => details.getString(ProfilingKeys._source, 
dataSourceNames.head)
-//    }
-//    val fromClause = 
profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
-//
-//    if (!TempTables.existTable(key(calcTime), sourceName)) {
-//      Nil
-//    } else {
-//      val timeInfo = ruleStep.timeInfo
-//      val ruleInfo = ruleStep.ruleInfo
-//      val tmst = timeInfo.tmst
-//
-////      val tmstSourceName = TempName.tmstName(sourceName, timeInfo)
-//
-////      val tmstProfilingClause = 
profilingClause.map(dsHeadReplace(sourceName, tmstSourceName))
-//      val tmstAnalyzer = ProfilingAnalyzer(profilingClause, sourceName)
-//
-//      val selExprDescs = tmstAnalyzer.selectionExprs.map { sel =>
-//        val alias = sel match {
-//          case s: AliasableExpr if (s.alias.nonEmpty) => s" AS 
`${s.alias.get}`"
-//          case _ => ""
-//        }
-//        s"${sel.desc}${alias}"
-//      }
-//      val selCondition = 
profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString
-//      val selClause = selExprDescs.mkString(", ")
-////      val tmstFromClause = 
profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
-//      val groupByClauseOpt = tmstAnalyzer.groupbyExprOpt
-//      val groupbyClause = groupByClauseOpt.map(_.desc).getOrElse("")
-//      val preGroupbyClause = 
tmstAnalyzer.preGroupbyExprs.map(_.desc).mkString(" ")
-//      val postGroupbyClause = 
tmstAnalyzer.postGroupbyExprs.map(_.desc).mkString(" ")
-//
-//      // 1. select statement
-//      val profilingSql = {
-//        s"SELECT ${selCondition} ${selClause} ${fromClause} 
${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
-//      }
-////      println(profilingSql)
-//      val metricName = details.getString(RuleDetailKeys._persistName, 
ruleStep.name)
-////      val tmstMetricName = TempName.tmstName(metricName, timeInfo)
-//      val profilingParams = 
details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc)
-//        .addIfNotExist(RuleDetailKeys._persistName, metricName)
-//      val profilingStep = SparkSqlStep(
-//        timeInfo,
-//        ruleInfo.setRule(profilingSql).setDetails(profilingParams)
-//      )
-//
-////      filterStep :: profilingStep :: Nil
-//      profilingStep :: Nil
-//    }
-//
-//  }
-
-//  private def dsHeadReplace(originName: String, replaceName: String): (Expr) 
=> Expr = { expr: Expr =>
-//    expr match {
-//      case DataSourceHeadExpr(sn) if (sn == originName) => {
-//        DataSourceHeadExpr(replaceName)
-//      }
-//      case FromClause(sn) if (sn == originName) => {
-//        FromClause(replaceName)
-//      }
-//      case _ => expr.map(dsHeadReplace(originName, replaceName))
-//    }
-//  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
index da59348..11b67f2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
@@ -28,7 +28,7 @@ sealed trait DqType {
 
 object DqType {
   private val dqTypes: List[DqType] = List(
-    AccuracyType, ProfilingType, DuplicateType, TimelinessType, UnknownType
+    AccuracyType, ProfilingType, UniquenessType, TimelinessType, UnknownType
   )
   def apply(ptn: String): DqType = {
     dqTypes.filter(tp => ptn match {
@@ -49,9 +49,9 @@ final case object ProfilingType extends DqType {
   val desc = "profiling"
 }
 
-final case object DuplicateType extends DqType {
-  val regex = "^(?i)duplicate$".r
-  val desc = "duplicate"
+final case object UniquenessType extends DqType {
+  val regex = "^(?i)uniqueness|duplicate$".r
+  val desc = "uniqueness"
 }
 
 final case object TimelinessType extends DqType {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala
deleted file mode 100644
index 1ca2b76..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule.dsl.analyzer
-
-import org.apache.griffin.measure.rule.dsl.expr.{AliasableExpr, _}
-
-
-case class DuplicateAnalyzer(expr: DuplicateClause, sourceName: String, 
targetName: String) extends BasicAnalyzer {
-
-  val seqAlias = (expr: Expr, v: Seq[String]) => {
-    expr match {
-      case apr: AliasableExpr => v ++ apr.alias
-      case _ => v
-    }
-  }
-  val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b
-
-  private val exprs = expr.exprs
-  private def genAlias(idx: Int): String = s"alias_${idx}"
-  val selectionPairs = exprs.zipWithIndex.map { pair =>
-    val (pr, idx) = pair
-    val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias)
-    (pr, res.headOption.getOrElse(genAlias(idx)))
-  }
-
-  if (selectionPairs.isEmpty) {
-    throw new Exception(s"duplicate analyzer error: empty selection")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/UniquenessAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/UniquenessAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/UniquenessAnalyzer.scala
new file mode 100644
index 0000000..9fe65c2
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/UniquenessAnalyzer.scala
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl.analyzer
+
+import org.apache.griffin.measure.rule.dsl.expr.{AliasableExpr, _}
+
+
+case class UniquenessAnalyzer(expr: UniquenessClause, sourceName: String, 
targetName: String) extends BasicAnalyzer {
+
+  val seqAlias = (expr: Expr, v: Seq[String]) => {
+    expr match {
+      case apr: AliasableExpr => v ++ apr.alias
+      case _ => v
+    }
+  }
+  val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b
+
+  private val exprs = expr.exprs
+  private def genAlias(idx: Int): String = s"alias_${idx}"
+  val selectionPairs = exprs.zipWithIndex.map { pair =>
+    val (pr, idx) = pair
+    val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias)
+    (pr, res.headOption.getOrElse(genAlias(idx)))
+  }
+
+  if (selectionPairs.isEmpty) {
+    throw new Exception(s"uniqueness analyzer error: empty selection")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
index bc7af42..504e176 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
@@ -219,12 +219,12 @@ case class ProfilingClause(selectClause: SelectClause,
   }
 }
 
-case class DuplicateClause(exprs: Seq[Expr]) extends ClauseExpression {
+case class UniquenessClause(exprs: Seq[Expr]) extends ClauseExpression {
   addChildren(exprs)
 
   def desc: String = exprs.map(_.desc).mkString(", ")
   def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
-  override def map(func: (Expr) => Expr): DuplicateClause = 
DuplicateClause(exprs.map(func(_)))
+  override def map(func: (Expr) => Expr): UniquenessClause = 
UniquenessClause(exprs.map(func(_)))
 }
 
 case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
index 8d04e76..83f3153 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
@@ -39,11 +39,11 @@ case class GriffinDslParser(dataSourceNames: Seq[String], 
functionNames: Seq[Str
   }
 
   /**
-    * -- duplicate clauses --
-    * <duplicate-clauses> = <expr> [, <expr>]+
+    * -- uniqueness clauses --
+    * <uniqueness-clauses> = <expr> [, <expr>]+
     */
-  def duplicateClause: Parser[DuplicateClause] = rep1sep(expression, 
Operator.COMMA) ^^ {
-    case exprs => DuplicateClause(exprs)
+  def uniquenessClause: Parser[UniquenessClause] = rep1sep(expression, 
Operator.COMMA) ^^ {
+    case exprs => UniquenessClause(exprs)
   }
 
   /**
@@ -58,7 +58,7 @@ case class GriffinDslParser(dataSourceNames: Seq[String], 
functionNames: Seq[Str
     val rootExpr = dqType match {
       case AccuracyType => logicalExpression
       case ProfilingType => profilingClause
-      case DuplicateType => duplicateClause
+      case UniquenessType => uniquenessClause
       case TimelinessType => timelinessClause
       case _ => expression
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/resources/_duplicate-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_duplicate-batch-griffindsl.json 
b/measure/src/test/resources/_duplicate-batch-griffindsl.json
deleted file mode 100644
index cd71020..0000000
--- a/measure/src/test/resources/_duplicate-batch-griffindsl.json
+++ /dev/null
@@ -1,56 +0,0 @@
-{
-  "name": "dup_batch",
-
-  "process.type": "batch",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
-        }
-      ]
-    },
-    {
-      "name": "target",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "duplicate",
-        "name": "dup",
-        "rule": "user_id",
-        "details": {
-          "source": "source",
-          "target": "target",
-          "dup": "dup",
-          "num": "num"
-        },
-        "metric": {
-          "name": "dup"
-        },
-        "record": {
-          "name": "dupRecords"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/resources/_duplicate-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_duplicate-streaming-griffindsl.json 
b/measure/src/test/resources/_duplicate-streaming-griffindsl.json
deleted file mode 100644
index 18ac81a..0000000
--- a/measure/src/test/resources/_duplicate-streaming-griffindsl.json
+++ /dev/null
@@ -1,116 +0,0 @@
-{
-  "name": "dup_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "new",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "new",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "ttt",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/new",
-        "info.path": "new",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["0", "0"]
-      }
-    },
-    {
-      "name": "old",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "old",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "ttt",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
-        "info.path": "old",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["-24h", "0"]
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "duplicate",
-        "name": "dup",
-        "rule": "name, age",
-        "details": {
-          "source": "new",
-          "target": "old",
-          "dup": "dup",
-          "num": "num"
-        },
-        "metric": {
-          "name": "dup"
-        },
-        "record": {
-          "name": "dupRecords"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/resources/_duplicate-streaming-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_duplicate-streaming-sparksql.json 
b/measure/src/test/resources/_duplicate-streaming-sparksql.json
deleted file mode 100644
index 3d37dad..0000000
--- a/measure/src/test/resources/_duplicate-streaming-sparksql.json
+++ /dev/null
@@ -1,130 +0,0 @@
-{
-  "name": "dup_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "new",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "new",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "sss",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/new",
-        "info.path": "new",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["0", "0"]
-      }
-    },
-    {
-      "name": "old",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "old",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "sss",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
-        "info.path": "old",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["-24h", "0"]
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "spark-sql",
-        "name": "dist",
-        "rule": "SELECT DISTINCT * FROM new"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "joined",
-        "rule": "SELECT dist.* FROM old RIGHT JOIN dist ON coalesce(old.name, 
'') = coalesce(dist.name, '') AND coalesce(old.age, '') = coalesce(dist.age, 
'')"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "grouped",
-        "rule": "SELECT `__tmst`, `name`, `age`, count(*) as `dup_cnt` FROM 
joined GROUP BY `__tmst`, `name`, `age`"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "dupRecs",
-        "cache": true,
-        "rule": "SELECT * FROM grouped WHERE `dup_cnt` > 1",
-        "record": {
-          "name": "dupRecords"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "dupMetric",
-        "rule": "SELECT `__tmst`, `dup_cnt`, count(*) as `item_cnt` FROM 
dupRecs GROUP BY `__tmst`, `dup_cnt`",
-        "metric": {
-          "name": "dup"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/resources/_uniqueness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_uniqueness-batch-griffindsl.json 
b/measure/src/test/resources/_uniqueness-batch-griffindsl.json
new file mode 100644
index 0000000..28009e8
--- /dev/null
+++ b/measure/src/test/resources/_uniqueness-batch-griffindsl.json
@@ -0,0 +1,58 @@
+{
+  "name": "unique_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    },
+    {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "uniqueness",
+        "name": "dup",
+        "rule": "user_id",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "total": "total",
+          "unique": "unique",
+          "dup": "dup",
+          "num": "num"
+        },
+        "metric": {
+          "name": "unique"
+        },
+        "record": {
+          "name": "dupRecords"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json 
b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
new file mode 100644
index 0000000..bc5cbd2
--- /dev/null
+++ b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
@@ -0,0 +1,119 @@
+{
+  "name": "unique_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "new",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "new",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "ttt",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/new",
+        "info.path": "new",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"]
+      }
+    },
+    {
+      "name": "old",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "old",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "ttt",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+        "info.path": "old",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-24h", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "uniqueness",
+        "name": "dup",
+        "rule": "name, age",
+        "details": {
+          "source": "new",
+          "target": "old",
+          "total": "total",
+          "unique": "unique",
+          "dup": "dup",
+          "num": "num",
+          "duplication.array": "dup"
+        },
+        "metric": {
+          "name": "unique"
+        },
+        "record": {
+          "name": "dupRecords"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/resources/_uniqueness-streaming-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_uniqueness-streaming-sparksql.json 
b/measure/src/test/resources/_uniqueness-streaming-sparksql.json
new file mode 100644
index 0000000..7d13215
--- /dev/null
+++ b/measure/src/test/resources/_uniqueness-streaming-sparksql.json
@@ -0,0 +1,130 @@
+{
+  "name": "unique_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "new",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "new",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/new",
+        "info.path": "new",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"]
+      }
+    },
+    {
+      "name": "old",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "old",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+        "info.path": "old",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-24h", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "spark-sql",
+        "name": "dist",
+        "rule": "SELECT DISTINCT * FROM new"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "joined",
+        "rule": "SELECT dist.* FROM old RIGHT JOIN dist ON coalesce(old.name, 
'') = coalesce(dist.name, '') AND coalesce(old.age, '') = coalesce(dist.age, 
'')"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "grouped",
+        "rule": "SELECT `__tmst`, `name`, `age`, count(*) as `dup_cnt` FROM 
joined GROUP BY `__tmst`, `name`, `age`"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "dupRecs",
+        "cache": true,
+        "rule": "SELECT * FROM grouped WHERE `dup_cnt` > 1",
+        "record": {
+          "name": "dupRecords"
+        }
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "dupMetric",
+        "rule": "SELECT `__tmst`, `dup_cnt`, count(*) as `item_cnt` FROM 
dupRecs GROUP BY `__tmst`, `dup_cnt`",
+        "metric": {
+          "name": "dup"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala
index 22fc331..d551a4f 100644
--- 
a/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala
@@ -109,13 +109,13 @@ class GriffinDslAdaptorTest extends FunSuite with 
Matchers with BeforeAndAfter w
 //    }
   }
 
-  test ("duplicate") {
+  test ("uniqueness") {
 //    val adaptor = GriffinDslAdaptor("new" :: "old" :: Nil, "count" :: Nil)
 //    val ruleJson =
 //      """
 //        |{
 //        |  "dsl.type": "griffin-dsl",
-//        |  "dq.type": "duplicate",
+//        |  "dq.type": "uniqueness",
 //        |  "name": "dup",
 //        |  "rule": "name, count(age + 1) as ct",
 //        |  "details": {

Reply via email to