Repository: incubator-griffin
Updated Branches:
  refs/heads/master e704da627 -> cbff5b45c


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
new file mode 100644
index 0000000..31fe5ea
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
@@ -0,0 +1,41 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process.temp
+
+import scala.math.{min, max}
+
+case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends 
Serializable {
+  def merge(tr: TimeRange): TimeRange = {
+    TimeRange(min(begin, tr.begin), max(end, tr.end), tmsts ++ tr.tmsts)
+  }
+}
+
+object TimeRange {
+  val emptyTimeRange = TimeRange(0, 0, Set[Long]())
+  def apply(range: (Long, Long), tmsts: Set[Long]): TimeRange = 
TimeRange(range._1, range._2, tmsts)
+  def apply(ts: Long, tmsts: Set[Long]): TimeRange = TimeRange(ts, ts, tmsts)
+  def apply(ts: Long): TimeRange = TimeRange(ts, ts, Set[Long](ts))
+  def apply(tmsts: Set[Long]): TimeRange = {
+    try {
+      TimeRange(tmsts.min, tmsts.max, tmsts)
+    } catch {
+      case _: Throwable => emptyTimeRange
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
index 5447ccc..97589ad 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
@@ -18,7 +18,8 @@ under the License.
 */
 package org.apache.griffin.measure.rule.adaptor
 
-import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.process.{ExportMode, ProcessType}
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
 import org.apache.griffin.measure.utils.ParamUtil._
 
@@ -46,10 +47,12 @@ case class DataFrameOprAdaptor() extends RuleAdaptor {
 
   import RuleParamKeys._
 
-  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: 
ProcessType): RulePlan = {
+  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any],
+                  procType: ProcessType, dsTimeRanges: Map[String, 
TimeRange]): RulePlan = {
     val name = getRuleName(param)
     val step = DfOprStep(name, getRule(param), getDetails(param), 
getCache(param), getGlobal(param))
-    RulePlan(step :: Nil, genRuleExports(param, name, name))
+    val mode = ExportMode.defaultMode(procType)
+    RulePlan(step :: Nil, genRuleExports(param, name, name, timeInfo.calcTime, 
mode))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
new file mode 100644
index 0000000..f592709
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala
@@ -0,0 +1,70 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.adaptor
+
+object AccuracyKeys {
+  val _source = "source"
+  val _target = "target"
+  val _miss = "miss"
+  val _total = "total"
+  val _matched = "matched"
+  //  val _missRecords = "missRecords"
+}
+
+object ProfilingKeys {
+  val _source = "source"
+}
+
+object UniquenessKeys {
+  val _source = "source"
+  val _target = "target"
+  val _unique = "unique"
+  val _total = "total"
+  val _dup = "dup"
+  val _num = "num"
+
+  val _duplicationArray = "duplication.array"
+}
+
+object DistinctnessKeys {
+  val _source = "source"
+  val _target = "target"
+  val _distinct = "distinct"
+  val _total = "total"
+  val _dup = "dup"
+  val _accu_dup = "accu_dup"
+  val _num = "num"
+
+  val _duplicationArray = "duplication.array"
+  val _withAccumulate = "with.accumulate"
+}
+
+object TimelinessKeys {
+  val _source = "source"
+  val _latency = "latency"
+  val _threshold = "threshold"
+}
+
+object GlobalKeys {
+  val _initRule = "init.rule"
+}
+
+object ProcessDetailsKeys {
+  val _baselineDataSource = "baseline.data.source"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
index 98545d8..ad4a195 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.rule.adaptor
 
 import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache}
 import org.apache.griffin.measure.process.engine.DataFrameOprs.AccuracyOprKeys
-import org.apache.griffin.measure.process.temp.TableRegisters
+import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange}
 import org.apache.griffin.measure.process._
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.dsl.analyzer._
@@ -30,39 +30,6 @@ import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
 import org.apache.griffin.measure.utils.ParamUtil._
 import org.apache.griffin.measure.utils.TimeUtil
 
-object AccuracyKeys {
-  val _source = "source"
-  val _target = "target"
-  val _miss = "miss"
-  val _total = "total"
-  val _matched = "matched"
-//  val _missRecords = "missRecords"
-}
-
-object ProfilingKeys {
-  val _source = "source"
-}
-
-object UniquenessKeys {
-  val _source = "source"
-  val _target = "target"
-  val _unique = "unique"
-  val _total = "total"
-  val _dup = "dup"
-  val _num = "num"
-  val _duplicationArray = "duplication.array"
-}
-
-object TimelinessKeys {
-  val _source = "source"
-  val _latency = "latency"
-  val _threshold = "threshold"
-}
-
-object GlobalKeys {
-  val _initRule = "init.rule"
-}
-
 case class GriffinDslAdaptor(dataSourceNames: Seq[String],
                              functionNames: Seq[String]
                             ) extends RuleAdaptor {
@@ -77,7 +44,8 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
   private val emptyRulePlan = RulePlan(Nil, Nil)
   private val emptyMap = Map[String, Any]()
 
-  override def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], 
processType: ProcessType
+  override def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any],
+                           processType: ProcessType, dsTimeRanges: Map[String, 
TimeRange]
                           ): RulePlan = {
     val name = getRuleName(param)
     val rule = getRule(param)
@@ -90,6 +58,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
           case AccuracyType => accuracyRulePlan(timeInfo, name, expr, param, 
processType)
           case ProfilingType => profilingRulePlan(timeInfo, name, expr, param, 
processType)
           case UniquenessType => uniquenessRulePlan(timeInfo, name, expr, 
param, processType)
+          case DistinctnessType => distinctRulePlan(timeInfo, name, expr, 
param, processType, dsTimeRanges)
           case TimelinessType => timelinessRulePlan(timeInfo, name, expr, 
param, processType)
           case _ => emptyRulePlan
         }
@@ -107,22 +76,26 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
 
   // with accuracy opr
   private def accuracyRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                               param: Map[String, Any], processType: 
ProcessType
+                               param: Map[String, Any], procType: ProcessType
                               ): RulePlan = {
     val details = getDetails(param)
     val sourceName = details.getString(AccuracyKeys._source, 
dataSourceNames.head)
     val targetName = details.getString(AccuracyKeys._target, 
dataSourceNames.tail.head)
     val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], 
sourceName, targetName)
 
+    val mode = ExportMode.defaultMode(procType)
+
+    val ct = timeInfo.calcTime
+
     if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists")
+      println(s"[${ct}] data source ${sourceName} not exists")
       emptyRulePlan
     } else {
       // 1. miss record
       val missRecordsTableName = "__missRecords"
       val selClause = s"`${sourceName}`.*"
       val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, 
targetName)) {
-        println(s"[${timeInfo.calcTime}] data source ${targetName} not exists")
+        println(s"[${ct}] data source ${targetName} not exists")
         s"SELECT ${selClause} FROM `${sourceName}`"
       } else {
         val onClause = expr.coalesceDesc
@@ -136,10 +109,10 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` 
ON ${onClause} WHERE ${whereClause}"
       }
       val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, 
emptyMap, true)
-      val missRecordsExports = processType match {
+      val missRecordsExports = procType match {
         case BatchProcessType => {
           val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          genRecordExport(recordParam, missRecordsTableName, 
missRecordsTableName) :: Nil
+          genRecordExport(recordParam, missRecordsTableName, 
missRecordsTableName, ct, mode) :: Nil
         }
         case StreamingProcessType => Nil
       }
@@ -147,7 +120,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       // 2. miss count
       val missCountTableName = "__missCount"
       val missColName = details.getStringOrKey(AccuracyKeys._miss)
-      val missCountSql = processType match {
+      val missCountSql = procType match {
         case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM 
`${missRecordsTableName}`"
         case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY 
`${InternalColumns.tmst}`"
       }
@@ -156,7 +129,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       // 3. total count
       val totalCountTableName = "__totalCount"
       val totalColName = details.getStringOrKey(AccuracyKeys._total)
-      val totalCountSql = processType match {
+      val totalCountSql = procType match {
         case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
         case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY 
`${InternalColumns.tmst}`"
       }
@@ -165,7 +138,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       // 4. accuracy metric
       val accuracyTableName = name
       val matchedColName = details.getStringOrKey(AccuracyKeys._matched)
-      val accuracyMetricSql = processType match {
+      val accuracyMetricSql = procType match {
         case BatchProcessType => {
           s"""
              |SELECT `${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
@@ -186,10 +159,10 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         }
       }
       val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, 
emptyMap)
-      val accuracyExports = processType match {
+      val accuracyExports = procType match {
         case BatchProcessType => {
           val metricParam = 
RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-          genMetricExport(metricParam, accuracyTableName, accuracyTableName) 
:: Nil
+          genMetricExport(metricParam, accuracyTableName, accuracyTableName, 
ct, mode) :: Nil
         }
         case StreamingProcessType => Nil
       }
@@ -200,7 +173,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       val accuPlan = RulePlan(accuSteps, accuExports)
 
       // streaming extra accu plan
-      val streamingAccuPlan = processType match {
+      val streamingAccuPlan = procType match {
         case BatchProcessType => emptyRulePlan
         case StreamingProcessType => {
           // 5. accuracy metric merge
@@ -215,7 +188,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
           val accuracyMetricStep = DfOprStep(accuracyMetricTableName,
             accuracyMetricRule, accuracyMetricDetails)
           val metricParam = 
RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-          val accuracyMetricExports = genMetricExport(metricParam, name, 
accuracyMetricTableName) :: Nil
+          val accuracyMetricExports = genMetricExport(metricParam, name, 
accuracyMetricTableName, ct, mode) :: Nil
 
           // 6. collect accuracy records
           val accuracyRecordTableName = "__accuracyRecords"
@@ -230,7 +203,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
           val accuracyRecordParam = 
recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName)
             .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName)
           val accuracyRecordExports = genRecordExport(
-            accuracyRecordParam, missRecordsTableName, 
accuracyRecordTableName) :: Nil
+            accuracyRecordParam, missRecordsTableName, 
accuracyRecordTableName, ct, mode) :: Nil
 
           // gen accu plan
           val extraSteps = accuracyMetricStep :: accuracyRecordStep :: Nil
@@ -248,7 +221,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
   }
 
   private def profilingRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                                param: Map[String, Any], processType: 
ProcessType
+                                param: Map[String, Any], procType: ProcessType
                                ): RulePlan = {
     val details = getDetails(param)
     val profilingClause = expr.asInstanceOf[ProfilingClause]
@@ -258,6 +231,10 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
     }
     val fromClause = 
profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
 
+    val mode = ExportMode.defaultMode(procType)
+
+    val ct = timeInfo.calcTime
+
     if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
       emptyRulePlan
     } else {
@@ -270,12 +247,12 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         s"${sel.desc}${alias}"
       }
       val selCondition = 
profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString
-      val selClause = processType match {
+      val selClause = procType match {
         case BatchProcessType => selExprDescs.mkString(", ")
         case StreamingProcessType => (s"`${InternalColumns.tmst}`" +: 
selExprDescs).mkString(", ")
       }
       val groupByClauseOpt = analyzer.groupbyExprOpt
-      val groupbyClause = processType match {
+      val groupbyClause = procType match {
         case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("")
         case StreamingProcessType => {
           val tmstGroupbyClause = 
GroupbyClause(LiteralStringExpr(s"`${InternalColumns.tmst}`") :: Nil, None)
@@ -296,25 +273,29 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       val profilingName = name
       val profilingStep = SparkSqlStep(profilingName, profilingSql, details)
       val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-      val profilingExports = genMetricExport(metricParam, name, profilingName) 
:: Nil
+      val profilingExports = genMetricExport(metricParam, name, profilingName, 
ct, mode) :: Nil
 
       RulePlan(profilingStep :: Nil, profilingExports)
     }
   }
 
   private def uniquenessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                                 param: Map[String, Any], processType: 
ProcessType
+                                 param: Map[String, Any], procType: ProcessType
                                 ): RulePlan = {
     val details = getDetails(param)
     val sourceName = details.getString(UniquenessKeys._source, 
dataSourceNames.head)
     val targetName = details.getString(UniquenessKeys._target, 
dataSourceNames.tail.head)
     val analyzer = UniquenessAnalyzer(expr.asInstanceOf[UniquenessClause], 
sourceName, targetName)
 
+    val mode = ExportMode.defaultMode(procType)
+
+    val ct = timeInfo.calcTime
+
     if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
-      println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists")
+      println(s"[${ct}] data source ${sourceName} not exists")
       emptyRulePlan
     } else if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) {
-      println(s"[${timeInfo.calcTime}] data source ${targetName} not exists")
+      println(s"[${ct}] data source ${targetName} not exists")
       emptyRulePlan
     } else {
       val selItemsClause = analyzer.selectionPairs.map { pair =>
@@ -323,16 +304,16 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       }.mkString(", ")
       val aliases = analyzer.selectionPairs.map(_._2)
 
-      val selClause = processType match {
+      val selClause = procType match {
         case BatchProcessType => selItemsClause
         case StreamingProcessType => s"`${InternalColumns.tmst}`, 
${selItemsClause}"
       }
-      val selAliases = processType match {
+      val selAliases = procType match {
         case BatchProcessType => aliases
         case StreamingProcessType => InternalColumns.tmst +: aliases
       }
 
-      // 1. source mapping
+      // 1. source distinct mapping
       val sourceTableName = "__source"
       val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}"
       val sourceStep = SparkSqlStep(sourceTableName, sourceSql, emptyMap)
@@ -369,7 +350,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       // 5. total metric
       val totalTableName = "__totalMetric"
       val totalColName = details.getStringOrKey(UniquenessKeys._total)
-      val totalSql = processType match {
+      val totalSql = procType match {
         case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
         case StreamingProcessType => {
           s"""
@@ -380,7 +361,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       }
       val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap)
       val totalMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName)
+      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName, ct, mode)
 
       // 6. unique record
       val uniqueRecordTableName = "__uniqueRecord"
@@ -392,7 +373,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       // 7. unique metric
       val uniqueTableName = "__uniqueMetric"
       val uniqueColName = details.getStringOrKey(UniquenessKeys._unique)
-      val uniqueSql = processType match {
+      val uniqueSql = procType match {
         case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM 
`${uniqueRecordTableName}`"
         case StreamingProcessType => {
           s"""
@@ -403,32 +384,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       }
       val uniqueStep = SparkSqlStep(uniqueTableName, uniqueSql, emptyMap)
       val uniqueMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-      val uniqueMetricExport = genMetricExport(uniqueMetricParam, 
uniqueColName, uniqueTableName)
-
-      // 8. count metric
-//      val countMetricTableName = "__countMetric"
-//      val countMetricSql = processType match {
-//        case BatchProcessType => {
-//          s"""
-//             |SELECT `${totalTableName}`.`${totalColName}` AS 
`${totalColName}`,
-//             |coalesce(`${uniqueTableName}`.`${uniqueColName}`, 0) AS 
`${uniqueColName}`
-//             |FROM `${totalTableName}` LEFT JOIN `${uniqueTableName}`
-//          """.stripMargin
-//        }
-//        case StreamingProcessType => {
-//          s"""
-//             |SELECT `${totalTableName}`.`${InternalColumns.tmst}` AS 
`${InternalColumns.tmst}`,
-//             |`${totalTableName}`.`${totalColName}` AS `${totalColName}`,
-//             |coalesce(`${uniqueTableName}`.`${uniqueColName}`, 0) AS 
`${uniqueColName}`
-//             |FROM `${totalTableName}` LEFT JOIN `${uniqueTableName}`
-//             |ON `${totalTableName}`.`${InternalColumns.tmst}` = 
`${uniqueTableName}`.`${InternalColumns.tmst}`
-//          """.stripMargin
-//        }
-//      }
-//      val countMetricStep = SparkSqlStep(countMetricTableName, 
countMetricSql, emptyMap)
-//      val countMetricParam = 
RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-//        .addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
-//      val countMetricExport = genMetricExport(countMetricParam, "", 
countMetricTableName)
+      val uniqueMetricExport = genMetricExport(uniqueMetricParam, 
uniqueColName, uniqueTableName, ct, mode)
 
       val uniqueSteps = sourceStep :: targetStep :: joinedStep :: groupStep ::
         totalStep :: uniqueRecordStep :: uniqueStep :: Nil
@@ -444,16 +400,16 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         }
         val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
         val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-        val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, 
dupRecordTableName)
+        val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, 
dupRecordTableName, ct, mode)
 
         // 9. duplicate metric
         val dupMetricTableName = "__dupMetric"
         val numColName = details.getStringOrKey(UniquenessKeys._num)
-        val dupMetricSelClause = processType match {
+        val dupMetricSelClause = procType match {
           case BatchProcessType => s"`${dupColName}`, COUNT(*) AS 
`${numColName}`"
           case StreamingProcessType => s"`${InternalColumns.tmst}`, 
`${dupColName}`, COUNT(*) AS `${numColName}`"
         }
-        val dupMetricGroupbyClause = processType match {
+        val dupMetricGroupbyClause = procType match {
           case BatchProcessType => s"`${dupColName}`"
           case StreamingProcessType => s"`${InternalColumns.tmst}`, 
`${dupColName}`"
         }
@@ -465,7 +421,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         }
         val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
         val dupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName)
+        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, ct, mode)
 
         RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: 
dupMetricExport :: Nil)
       } else emptyRulePlan
@@ -474,13 +430,202 @@ case class GriffinDslAdaptor(dataSourceNames: 
Seq[String],
     }
   }
 
+  private def distinctRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
+                               param: Map[String, Any], procType: ProcessType,
+                               dsTimeRanges: Map[String, TimeRange]
+                              ): RulePlan = {
+    val details = getDetails(param)
+    val sourceName = details.getString(DistinctnessKeys._source, 
dataSourceNames.head)
+    val targetName = details.getString(UniquenessKeys._target, 
dataSourceNames.tail.head)
+    val analyzer = DistinctnessAnalyzer(expr.asInstanceOf[DistinctnessClause], 
sourceName)
+
+    val mode = SimpleMode
+
+    val ct = timeInfo.calcTime
+
+    val sourceTimeRange = dsTimeRanges.get(sourceName).getOrElse(TimeRange(ct))
+    val beginTime = sourceTimeRange.begin
+
+    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+      println(s"[${ct}] data source ${sourceName} not exists")
+      emptyRulePlan
+    } else {
+      val withOlderTable = {
+        details.getBoolean(DistinctnessKeys._withAccumulate, true) &&
+          TableRegisters.existRunTempTable(timeInfo.key, targetName)
+      }
+
+      val selClause = analyzer.selectionPairs.map { pair =>
+        val (expr, alias) = pair
+        s"${expr.desc} AS `${alias}`"
+      }.mkString(", ")
+      val aliases = analyzer.selectionPairs.map(_._2)
+      val aliasesClause = aliases.map( a => s"`${a}`" ).mkString(", ")
+
+      // 1. source alias
+      val sourceAliasTableName = "__sourceAlias"
+      val sourceAliasSql = {
+        s"SELECT ${selClause} FROM `${sourceName}`"
+      }
+      val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, 
emptyMap, true)
+
+      // 2. total metric
+      val totalTableName = "__totalMetric"
+      val totalColName = details.getStringOrKey(DistinctnessKeys._total)
+      val totalSql = {
+        s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`"
+      }
+      val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap)
+      val totalMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
+      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName, beginTime, mode)
+
+      // 3. group by self
+      val selfGroupTableName = "__selfGroup"
+      val dupColName = details.getStringOrKey(DistinctnessKeys._dup)
+      val accuDupColName = details.getStringOrKey(DistinctnessKeys._accu_dup)
+      val selfGroupSql = {
+        s"""
+           |SELECT ${aliasesClause}, (COUNT(*) - 1) AS `${dupColName}`,
+           |TRUE AS `${InternalColumns.distinct}`
+           |FROM `${sourceAliasTableName}` GROUP BY ${aliasesClause}
+          """.stripMargin
+      }
+      val selfGroupStep = SparkSqlStep(selfGroupTableName, selfGroupSql, 
emptyMap, true)
+
+      val selfDistRulePlan = RulePlan(
+        sourceAliasStep :: totalStep :: selfGroupStep :: Nil,
+        totalMetricExport :: Nil
+      )
+
+      val (distRulePlan, dupCountTableName) = procType match {
+        case StreamingProcessType if (withOlderTable) => {
+          // 4. older alias
+          val olderAliasTableName = "__older"
+          val olderAliasSql = {
+            s"SELECT ${selClause} FROM `${targetName}` WHERE 
`${InternalColumns.tmst}` < ${beginTime}"
+          }
+          val olderAliasStep = SparkSqlStep(olderAliasTableName, 
olderAliasSql, emptyMap)
+
+          // 5. join with older data
+          val joinedTableName = "__joined"
+          val selfSelClause = (aliases :+ dupColName).map { alias =>
+            s"`${selfGroupTableName}`.`${alias}`"
+          }.mkString(", ")
+          val onClause = aliases.map { alias =>
+            s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = 
coalesce(`${olderAliasTableName}`.`${alias}`, '')"
+          }.mkString(" AND ")
+          val olderIsNull = aliases.map { alias =>
+            s"`${olderAliasTableName}`.`${alias}` IS NULL"
+          }.mkString(" AND ")
+          val joinedSql = {
+            s"""
+               |SELECT ${selfSelClause}, (${olderIsNull}) AS 
`${InternalColumns.distinct}`
+               |FROM `${olderAliasTableName}` RIGHT JOIN 
`${selfGroupTableName}`
+               |ON ${onClause}
+            """.stripMargin
+          }
+          val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap)
+
+          // 6. group by joined data
+          val groupTableName = "__group"
+          val moreDupColName = "_more_dup"
+          val groupSql = {
+            s"""
+               |SELECT ${aliasesClause}, `${dupColName}`, 
`${InternalColumns.distinct}`,
+               |COUNT(*) AS `${moreDupColName}`
+               |FROM `${joinedTableName}`
+               |GROUP BY ${aliasesClause}, `${dupColName}`, 
`${InternalColumns.distinct}`
+             """.stripMargin
+          }
+          val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap)
+
+          // 7. final duplicate count
+          val finalDupCountTableName = "__finalDupCount"
+          val finalDupCountSql = {
+            s"""
+               |SELECT ${aliasesClause}, `${InternalColumns.distinct}`,
+               |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}`
+               |ELSE (`${dupColName}` + 1) END AS `${dupColName}`,
+               |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}`
+               |ELSE (`${dupColName}` + `${moreDupColName}`) END AS 
`${accuDupColName}`
+               |FROM `${groupTableName}`
+             """.stripMargin
+          }
+          val finalDupCountStep = SparkSqlStep(finalDupCountTableName, 
finalDupCountSql, emptyMap, true)
+
+          val rulePlan = RulePlan(olderAliasStep :: joinedStep :: groupStep :: 
finalDupCountStep :: Nil, Nil)
+          (rulePlan, finalDupCountTableName)
+        }
+        case _ => {
+          (emptyRulePlan, selfGroupTableName)
+        }
+      }
+
+      // 8. distinct metric
+      val distTableName = "__distMetric"
+      val distColName = details.getStringOrKey(DistinctnessKeys._distinct)
+      val distSql = {
+        s"""
+           |SELECT COUNT(*) AS `${distColName}`
+           |FROM `${dupCountTableName}` WHERE `${InternalColumns.distinct}`
+         """.stripMargin
+      }
+      val distStep = SparkSqlStep(distTableName, distSql, emptyMap)
+      val distMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
+      val distMetricExport = genMetricExport(distMetricParam, distColName, 
distTableName, beginTime, mode)
+
+      val distMetricRulePlan = RulePlan(distStep :: Nil, distMetricExport :: 
Nil)
+
+      val duplicationArrayName = 
details.getString(UniquenessKeys._duplicationArray, "")
+      val dupRulePlan = if (duplicationArrayName.nonEmpty) {
+        // 9. duplicate record
+        val dupRecordTableName = "__dupRecords"
+        val dupRecordSelClause = procType match {
+          case StreamingProcessType if (withOlderTable) => s"${aliasesClause}, 
`${dupColName}`, `${accuDupColName}`"
+          case _ => s"${aliasesClause}, `${dupColName}`"
+        }
+        val dupRecordSql = {
+          s"""
+             |SELECT ${dupRecordSelClause}
+             |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
+           """.stripMargin
+        }
+        val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
+        val dupRecordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+        val dupRecordExport = genRecordExport(dupRecordParam, 
dupRecordTableName, dupRecordTableName, beginTime, mode)
+
+        // 10. duplicate metric
+        val dupMetricTableName = "__dupMetric"
+        val numColName = details.getStringOrKey(DistinctnessKeys._num)
+        val dupMetricSql = {
+          s"""
+             |SELECT `${dupColName}`, COUNT(*) AS `${numColName}`
+             |FROM `${dupRecordTableName}` GROUP BY `${dupColName}`
+         """.stripMargin
+        }
+        val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
+        val dupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, beginTime, mode)
+
+        RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: 
dupMetricExport :: Nil)
+      } else emptyRulePlan
+
+      
selfDistRulePlan.merge(distRulePlan).merge(distMetricRulePlan).merge(dupRulePlan)
+
+    }
+  }
+
   private def timelinessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr,
-                                 param: Map[String, Any], processType: 
ProcessType
+                                 param: Map[String, Any], procType: ProcessType
                                 ): RulePlan = {
     val details = getDetails(param)
     val timelinessClause = expr.asInstanceOf[TimelinessClause]
     val sourceName = details.getString(TimelinessKeys._source, 
dataSourceNames.head)
 
+    val mode = ExportMode.defaultMode(procType)
+
+    val ct = timeInfo.calcTime
+
     if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
       emptyRulePlan
     } else {
@@ -521,7 +666,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
 
       // 3. timeliness metric
       val metricTableName = name
-      val metricSql = processType match {
+      val metricSql = procType match {
         case BatchProcessType => {
           s"""
              |SELECT CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`,
@@ -543,7 +688,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
       }
       val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap)
       val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-      val metricExports = genMetricExport(metricParam, name, metricTableName) 
:: Nil
+      val metricExports = genMetricExport(metricParam, name, metricTableName, 
ct, mode) :: Nil
 
       // current timeliness plan
       val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil
@@ -559,7 +704,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
           }
           val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap)
           val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          val recordExports = genRecordExport(recordParam, recordTableName, 
recordTableName) :: Nil
+          val recordExports = genRecordExport(recordParam, recordTableName, 
recordTableName, ct, mode) :: Nil
           RulePlan(recordStep :: Nil, recordExports)
         }
         case _ => emptyRulePlan

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
index bd344b1..fc6a246 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
@@ -27,5 +27,7 @@ object InternalColumns {
   val beginTs = "__begin_ts"
   val endTs = "__end_ts"
 
-  val columns = List[String](tmst, metric, record, empty, beginTs, endTs)
+  val distinct = "__distinct"
+
+  val columns = List[String](tmst, metric, record, empty, beginTs, endTs, 
distinct)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
index ebc8fdb..25025ac 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala
@@ -25,7 +25,8 @@ import org.apache.griffin.measure.cache.tmst.TempName
 import scala.collection.mutable.{Set => MutableSet}
 import org.apache.griffin.measure.config.params.user._
 import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.process.{ExportMode, ProcessType}
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
 
@@ -115,30 +116,40 @@ trait RuleAdaptor extends Loggable with Serializable {
     RuleParamKeys.getName(param, RuleStepNameGenerator.genName)
   }
 
-  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: 
ProcessType): RulePlan
+  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any],
+                  procType: ProcessType, dsTimeRanges: Map[String, 
TimeRange]): RulePlan
 
-  protected def genRuleExports(param: Map[String, Any], defName: String, 
stepName: String): Seq[RuleExport] = {
+  protected def genRuleExports(param: Map[String, Any], defName: String,
+                               stepName: String, defTimestamp: Long,
+                               mode: ExportMode
+                              ): Seq[RuleExport] = {
     val metricOpt = RuleParamKeys.getMetricOpt(param)
-    val metricExportSeq = metricOpt.map(genMetricExport(_, defName, 
stepName)).toSeq
+    val metricExportSeq = metricOpt.map(genMetricExport(_, defName, stepName, 
defTimestamp, mode)).toSeq
     val recordOpt = RuleParamKeys.getRecordOpt(param)
-    val recordExportSeq = recordOpt.map(genRecordExport(_, defName, 
stepName)).toSeq
+    val recordExportSeq = recordOpt.map(genRecordExport(_, defName, stepName, 
defTimestamp, mode)).toSeq
     metricExportSeq ++ recordExportSeq
   }
-  protected def genMetricExport(param: Map[String, Any], name: String, 
stepName: String
+  protected def genMetricExport(param: Map[String, Any], name: String, 
stepName: String,
+                                defTimestamp: Long, mode: ExportMode
                                ): MetricExport = {
     MetricExport(
       ExportParamKeys.getName(param, name),
       stepName,
-      ExportParamKeys.getCollectType(param)
+      ExportParamKeys.getCollectType(param),
+      defTimestamp,
+      mode
     )
   }
-  protected def genRecordExport(param: Map[String, Any], name: String, 
stepName: String
+  protected def genRecordExport(param: Map[String, Any], name: String, 
stepName: String,
+                                defTimestamp: Long, mode: ExportMode
                                ): RecordExport = {
     RecordExport(
       ExportParamKeys.getName(param, name),
       stepName,
       ExportParamKeys.getDataSourceCacheOpt(param),
-      ExportParamKeys.getOriginDFOpt(param)
+      ExportParamKeys.getOriginDFOpt(param),
+      defTimestamp,
+      mode
     )
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
index 1e077b1..30a356c 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
@@ -21,7 +21,7 @@ package org.apache.griffin.measure.rule.adaptor
 import org.apache.griffin.measure.cache.tmst.TempName
 import org.apache.griffin.measure.config.params.user._
 import org.apache.griffin.measure.process.ProcessType
-import org.apache.griffin.measure.process.temp.TableRegisters
+import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange}
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan._
 import org.apache.spark.sql.SQLContext
@@ -114,22 +114,24 @@ object RuleAdaptorGroup {
 //  }
 
   // -- gen rule plan --
-  def genRulePlan(timeInfo: TimeInfo, evaluateRuleParam: EvaluateRuleParam, 
procType: ProcessType
+  def genRulePlan(timeInfo: TimeInfo, evaluateRuleParam: EvaluateRuleParam,
+                  procType: ProcessType, dsTimeRanges: Map[String, TimeRange]
                  ): RulePlan = {
     val dslTypeStr = if (evaluateRuleParam.dslType == null) "" else 
evaluateRuleParam.dslType
     val defaultDslType = DslType(dslTypeStr)
     val ruleParams = evaluateRuleParam.rules
-    genRulePlan(timeInfo, ruleParams, defaultDslType, procType)
+    genRulePlan(timeInfo, ruleParams, defaultDslType, procType, dsTimeRanges)
   }
 
   def genRulePlan(timeInfo: TimeInfo, ruleParams: Seq[Map[String, Any]],
-                  defaultDslType: DslType, procType: ProcessType
+                  defaultDslType: DslType, procType: ProcessType,
+                  dsTimeRanges: Map[String, TimeRange]
                  ): RulePlan = {
     val (rulePlan, dsNames) = ruleParams.foldLeft((emptyRulePlan, 
dataSourceNames)) { (res, param) =>
       val (plan, names) = res
       val dslType = getDslType(param, defaultDslType)
       val curPlan: RulePlan = genRuleAdaptor(dslType, names) match {
-        case Some(adaptor) => adaptor.genRulePlan(timeInfo, param, procType)
+        case Some(adaptor) => adaptor.genRulePlan(timeInfo, param, procType, 
dsTimeRanges)
         case _ => emptyRulePlan
       }
       val globalNames = curPlan.globalRuleSteps.map(_.name)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
index 6b3b7cb..1fce03b 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala
@@ -19,7 +19,8 @@ under the License.
 package org.apache.griffin.measure.rule.adaptor
 
 import org.apache.griffin.measure.cache.tmst.TempName
-import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.process.{ExportMode, ProcessType}
+import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.rule.dsl.MetricPersistType
 import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
 import org.apache.griffin.measure.utils.ParamUtil._
@@ -39,10 +40,12 @@ case class SparkSqlAdaptor() extends RuleAdaptor {
 
   import RuleParamKeys._
 
-  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: 
ProcessType): RulePlan = {
+  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any],
+                  procType: ProcessType, dsTimeRanges: Map[String, 
TimeRange]): RulePlan = {
     val name = getRuleName(param)
     val step = SparkSqlStep(name, getRule(param), getDetails(param), 
getCache(param), getGlobal(param))
-    RulePlan(step :: Nil, genRuleExports(param, name, name))
+    val mode = ExportMode.defaultMode(procType)
+    RulePlan(step :: Nil, genRuleExports(param, name, name, timeInfo.calcTime, 
mode))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
index 11b67f2..18a5919 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
@@ -28,7 +28,7 @@ sealed trait DqType {
 
 object DqType {
   private val dqTypes: List[DqType] = List(
-    AccuracyType, ProfilingType, UniquenessType, TimelinessType, UnknownType
+    AccuracyType, ProfilingType, UniquenessType, DistinctnessType, 
TimelinessType, UnknownType
   )
   def apply(ptn: String): DqType = {
     dqTypes.filter(tp => ptn match {
@@ -54,6 +54,11 @@ final case object UniquenessType extends DqType {
   val desc = "uniqueness"
 }
 
+final case object DistinctnessType extends DqType {
+  val regex = "^(?i)distinct$".r
+  val desc = "distinct"
+}
+
 final case object TimelinessType extends DqType {
   val regex = "^(?i)timeliness$".r
   val desc = "timeliness"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala
new file mode 100644
index 0000000..55e4f39
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl.analyzer
+
+import org.apache.griffin.measure.rule.dsl.expr._
+
+
+//case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: 
String, targetName: String) extends BasicAnalyzer {
+case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String) 
extends BasicAnalyzer {
+
+  val seqAlias = (expr: Expr, v: Seq[String]) => {
+    expr match {
+      case apr: AliasableExpr => v ++ apr.alias
+      case _ => v
+    }
+  }
+  val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b
+
+  private val exprs = expr.exprs
+  private def genAlias(idx: Int): String = s"alias_${idx}"
+  val selectionPairs = exprs.zipWithIndex.map { pair =>
+    val (pr, idx) = pair
+    val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias)
+    (pr, res.headOption.getOrElse(genAlias(idx)))
+  }
+
+  if (selectionPairs.isEmpty) {
+    throw new Exception(s"uniqueness analyzer error: empty selection")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
index 504e176..340c1e2 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
@@ -227,6 +227,14 @@ case class UniquenessClause(exprs: Seq[Expr]) extends 
ClauseExpression {
   override def map(func: (Expr) => Expr): UniquenessClause = 
UniquenessClause(exprs.map(func(_)))
 }
 
+case class DistinctnessClause(exprs: Seq[Expr]) extends ClauseExpression {
+  addChildren(exprs)
+
+  def desc: String = exprs.map(_.desc).mkString(", ")
+  def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
+  override def map(func: (Expr) => Expr): DistinctnessClause = 
DistinctnessClause(exprs.map(func(_)))
+}
+
 case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression {
   addChildren(exprs)
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
index 83f3153..b129ead 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
@@ -47,6 +47,14 @@ case class GriffinDslParser(dataSourceNames: Seq[String], 
functionNames: Seq[Str
   }
 
   /**
+    * -- distinctness clauses --
+    * <distinctness-clauses> = <expr> [, <expr>]+
+    */
+  def distinctnessClause: Parser[DistinctnessClause] = rep1sep(expression, 
Operator.COMMA) ^^ {
+    case exprs => DistinctnessClause(exprs)
+  }
+
+  /**
     * -- timeliness clauses --
     * <timeliness-clauses> = <expr> [, <expr>]+
     */
@@ -59,6 +67,7 @@ case class GriffinDslParser(dataSourceNames: Seq[String], 
functionNames: Seq[Str
       case AccuracyType => logicalExpression
       case ProfilingType => profilingClause
       case UniquenessType => uniquenessClause
+      case DistinctnessType => distinctnessClause
       case TimelinessType => timelinessClause
       case _ => expression
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
index 10f1f9b..ac14153 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala
@@ -18,11 +18,17 @@ under the License.
 */
 package org.apache.griffin.measure.rule.plan
 
+import org.apache.griffin.measure.process.ExportMode
 import org.apache.griffin.measure.rule.dsl._
 
 case class MetricExport(name: String,
                         stepName: String,
-                        collectType: CollectType
+                        collectType: CollectType,
+                        defTimestamp: Long,
+                        mode: ExportMode
                        ) extends RuleExport {
 
+  def setDefTimestamp(t: Long): RuleExport =
+    MetricExport(name, stepName, collectType, t, mode)
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
index a467543..6afc836 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala
@@ -18,10 +18,17 @@ under the License.
 */
 package org.apache.griffin.measure.rule.plan
 
+import org.apache.griffin.measure.process.ExportMode
+
 case class RecordExport(name: String,
                         stepName: String,
                         dataSourceCacheOpt: Option[String],
-                        originDFOpt: Option[String]
+                        originDFOpt: Option[String],
+                        defTimestamp: Long,
+                        mode: ExportMode
                        ) extends RuleExport {
 
+  def setDefTimestamp(t: Long): RuleExport =
+    RecordExport(name, stepName, dataSourceCacheOpt, originDFOpt, t, mode)
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
index 26a962a..84467c2 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala
@@ -18,10 +18,18 @@ under the License.
 */
 package org.apache.griffin.measure.rule.plan
 
+import org.apache.griffin.measure.process.ExportMode
+
 trait RuleExport extends Serializable {
 
   val name: String    // export name
 
   val stepName: String    // the dependant step name
 
+  val defTimestamp: Long    // the default timestamp if tmst not in value
+
+  val mode: ExportMode   // export mode
+
+  def setDefTimestamp(t: Long): RuleExport
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/_distinctness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json 
b/measure/src/test/resources/_distinctness-batch-griffindsl.json
new file mode 100644
index 0000000..af0c91e
--- /dev/null
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl.json
@@ -0,0 +1,57 @@
+{
+  "name": "dist_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    },
+    {
+      "name": "target",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "distinct",
+        "name": "dist",
+        "rule": "user_id",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "total": "total",
+          "distinct": "distinct",
+          "dup": "dup",
+          "num": "num",
+          "duplication.array": "dup"
+        },
+        "metric": {
+          "name": "distinct"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/_distinctness-batch-griffindsl1.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl1.json 
b/measure/src/test/resources/_distinctness-batch-griffindsl1.json
new file mode 100644
index 0000000..f8aa077
--- /dev/null
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl1.json
@@ -0,0 +1,73 @@
+{
+  "name": "dist_batch",
+
+  "process.type": "batch",
+
+  "timestamp": 123456,
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/dupdata.avro"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select DISTINCT name, age from ${this}"
+            }
+          ]
+        }
+      ]
+    },
+    {
+      "name": "target",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/dupdata.avro"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select DISTINCT name, age from ${this}"
+            }
+          ]
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "distinct",
+        "name": "dist",
+        "rule": "name",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "total": "total",
+          "distinct": "distinct",
+          "dup": "dup",
+          "num": "num",
+          "duplication.array": "dup"
+        },
+        "metric": {
+          "name": "distinct"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/_distinctness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-streaming-griffindsl.json 
b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
new file mode 100644
index 0000000..c36e7ba
--- /dev/null
+++ b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
@@ -0,0 +1,85 @@
+{
+  "name": "dist_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "new",
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+        "info.path": "new",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"],
+        "read.only": true
+      }
+    },
+    {
+      "name": "old",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "old",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "ttt",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+        "info.path": "old",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-24h", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "distinct",
+        "name": "dist",
+        "rule": "name, age",
+        "details": {
+          "source": "new",
+          "target": "old",
+          "total": "total",
+          "distinct": "distinct",
+          "dup": "dup",
+          "accu_dup": "accu_dup",
+          "num": "num",
+          "duplication.array": "dup"
+        },
+        "metric": {
+          "name": "distinct"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/_profiling-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json 
b/measure/src/test/resources/_profiling-batch-griffindsl.json
index cd99eb1..043ba85 100644
--- a/measure/src/test/resources/_profiling-batch-griffindsl.json
+++ b/measure/src/test/resources/_profiling-batch-griffindsl.json
@@ -26,7 +26,7 @@
         "dsl.type": "griffin-dsl",
         "dq.type": "profiling",
         "name": "prof",
-        "rule": "select count(*) as `cnt`, count(distinct `post_code`) as 
`dis-cnt`, max(user_id) as `max` from source",
+        "rule": "count(*) from source",
         "metric": {
           "name": "prof"
         }
@@ -35,7 +35,7 @@
         "dsl.type": "griffin-dsl",
         "dq.type": "profiling",
         "name": "grp",
-        "rule": "select post_code as `pc`, count(*) as `cnt` from source group 
by post_code",
+        "rule": "source.post_code, count(*) from source group by 
source.post_code",
         "metric": {
           "name": "post_group",
           "collect.type": "array"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/dupdata.avro
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/dupdata.avro 
b/measure/src/test/resources/dupdata.avro
new file mode 100644
index 0000000..f6bd312
Binary files /dev/null and b/measure/src/test/resources/dupdata.avro differ

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/empty.avro
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/empty.avro 
b/measure/src/test/resources/empty.avro
new file mode 100644
index 0000000..1ac3a72
Binary files /dev/null and b/measure/src/test/resources/empty.avro differ

Reply via email to