Repository: incubator-griffin
Updated Branches:
  refs/heads/master 8633476ea -> c1f089815


[GRIFFIN-135] support completeness measurement for batch and streaming mode

Author: Lionel Liu <bhlx3l...@163.com>

Closes #254 from bhlx3lyx7/tmst.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/c1f08981
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/c1f08981
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/c1f08981

Branch: refs/heads/master
Commit: c1f0898156b178407ad252969054ff994791ab6e
Parents: 8633476
Author: Lionel Liu <bhlx3l...@163.com>
Authored: Tue Apr 10 13:01:03 2018 +0800
Committer: Lionel Liu <bhlx3l...@163.com>
Committed: Tue Apr 10 13:01:03 2018 +0800

----------------------------------------------------------------------
 .../measure/process/BatchDqProcess.scala        |   2 +-
 .../measure/process/temp/TimeRange.scala        |   2 +-
 .../griffin/measure/rule/dsl/DqType.scala       |   7 +-
 .../dsl/analyzer/CompletenessAnalyzer.scala     |  46 ++++++
 .../rule/dsl/expr/ClauseExpression.scala        |   8 +
 .../rule/dsl/parser/GriffinDslParser.scala      |   9 ++
 .../rule/trans/AccuracyRulePlanTrans.scala      |   4 +-
 .../rule/trans/CompletenessRulePlanTrans.scala  | 145 +++++++++++++++++++
 .../measure/rule/trans/RulePlanTrans.scala      |   1 +
 .../rule/trans/TimelinessRulePlanTrans.scala    |  16 +-
 .../_completeness-batch-griffindsl.json         |  36 +++++
 .../_completeness-streaming-griffindsl.json     |  64 ++++++++
 12 files changed, 327 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
index 8c95a39..2770de8 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
@@ -166,7 +166,7 @@ case class BatchDqProcess(allParam: AllParam) extends 
DqProcess {
   private def printTimeRanges(timeRanges: Map[String, TimeRange]): Unit = {
     val timeRangesStr = timeRanges.map { pair =>
       val (name, timeRange) = pair
-      s"${name} -> [${timeRange.begin}, ${timeRange.end})"
+      s"${name} -> (${timeRange.begin}, ${timeRange.end}]"
     }.mkString(", ")
     println(s"data source timeRanges: ${timeRangesStr}")
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
index 9e79396..4073753 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala
@@ -24,7 +24,7 @@ case class TimeRange(begin: Long, end: Long, tmsts: 
Set[Long]) extends Serializa
   def merge(tr: TimeRange): TimeRange = {
     TimeRange(min(begin, tr.begin), max(end, tr.end), tmsts ++ tr.tmsts)
   }
-  def beginTmstOpt: Option[Long] = {
+  def minTmstOpt: Option[Long] = {
     try {
       if (tmsts.nonEmpty) Some(tmsts.min) else None
     } catch {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
index 18a5919..f6a7f85 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala
@@ -28,7 +28,7 @@ sealed trait DqType {
 
 object DqType {
   private val dqTypes: List[DqType] = List(
-    AccuracyType, ProfilingType, UniquenessType, DistinctnessType, 
TimelinessType, UnknownType
+    AccuracyType, ProfilingType, UniquenessType, DistinctnessType, 
TimelinessType, CompletenessType, UnknownType
   )
   def apply(ptn: String): DqType = {
     dqTypes.filter(tp => ptn match {
@@ -64,6 +64,11 @@ final case object TimelinessType extends DqType {
   val desc = "timeliness"
 }
 
+final case object CompletenessType extends DqType {
+  val regex = "^(?i)completeness$".r
+  val desc = "completeness"
+}
+
 final case object UnknownType extends DqType {
   val regex = "".r
   val desc = "unknown"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/CompletenessAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/CompletenessAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/CompletenessAnalyzer.scala
new file mode 100644
index 0000000..ad56e1a
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/CompletenessAnalyzer.scala
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl.analyzer
+
+import org.apache.griffin.measure.rule.dsl.expr._
+
+
+case class CompletenessAnalyzer(expr: CompletenessClause, sourceName: String) 
extends BasicAnalyzer {
+
+  val seqAlias = (expr: Expr, v: Seq[String]) => {
+    expr match {
+      case apr: AliasableExpr => v ++ apr.alias
+      case _ => v
+    }
+  }
+  val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b
+
+  private val exprs = expr.exprs
+  private def genAlias(idx: Int): String = s"alias_${idx}"
+  val selectionPairs = exprs.zipWithIndex.map { pair =>
+    val (pr, idx) = pair
+    val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias)
+    (pr, res.headOption.getOrElse(genAlias(idx)))
+  }
+
+  if (selectionPairs.isEmpty) {
+    throw new Exception(s"completeness analyzer error: empty selection")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
index 6790268..ecc5d67 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
@@ -259,4 +259,12 @@ case class TimelinessClause(exprs: Seq[Expr]) extends 
ClauseExpression {
   def desc: String = exprs.map(_.desc).mkString(", ")
   def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
   override def map(func: (Expr) => Expr): TimelinessClause = 
TimelinessClause(exprs.map(func(_)))
+}
+
+case class CompletenessClause(exprs: Seq[Expr]) extends ClauseExpression {
+  addChildren(exprs)
+
+  def desc: String = exprs.map(_.desc).mkString(", ")
+  def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
+  override def map(func: (Expr) => Expr): CompletenessClause = 
CompletenessClause(exprs.map(func(_)))
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
index d4a037b..b4496e7 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
@@ -70,6 +70,14 @@ case class GriffinDslParser(dataSourceNames: Seq[String], 
functionNames: Seq[Str
     case exprs => TimelinessClause(exprs)
   }
 
+  /**
+    * -- completeness clauses --
+    * <completeness-clauses> = <expr> [, <expr>]+
+    */
+  def completenessClause: Parser[CompletenessClause] = rep1sep(expression, 
Operator.COMMA) ^^ {
+    case exprs => CompletenessClause(exprs)
+  }
+
   def parseRule(rule: String, dqType: DqType): ParseResult[Expr] = {
     val rootExpr = dqType match {
       case AccuracyType => logicalExpression
@@ -77,6 +85,7 @@ case class GriffinDslParser(dataSourceNames: Seq[String], 
functionNames: Seq[Str
       case UniquenessType => uniquenessClause
       case DistinctnessType => distinctnessClause
       case TimelinessType => timelinessClause
+      case CompletenessType => completenessClause
       case _ => expression
     }
     parseAll(rootExpr, rule)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
index 904b087..ec746d2 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala
@@ -119,7 +119,7 @@ case class AccuracyRulePlanTrans(dataSourceNames: 
Seq[String],
           s"""
              |SELECT `${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
              |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
-             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+             |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
              |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
          """.stripMargin
         }
@@ -128,7 +128,7 @@ case class AccuracyRulePlanTrans(dataSourceNames: 
Seq[String],
              |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS 
`${InternalColumns.tmst}`,
              |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
              |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
-             |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`
+             |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
              |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
              |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = 
`${missCountTableName}`.`${InternalColumns.tmst}`
          """.stripMargin

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
new file mode 100644
index 0000000..1b89587
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala
@@ -0,0 +1,145 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.trans
+
+import org.apache.griffin.measure.process.temp.TableRegisters
+import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, 
ProcessType, StreamingProcessType}
+import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
+import org.apache.griffin.measure.rule.adaptor._
+import org.apache.griffin.measure.rule.dsl.analyzer.CompletenessAnalyzer
+import org.apache.griffin.measure.rule.dsl.expr._
+import org.apache.griffin.measure.rule.plan._
+import org.apache.griffin.measure.rule.trans.RuleExportFactory._
+import org.apache.griffin.measure.utils.ParamUtil._
+
+import scala.util.Try
+
+case class CompletenessRulePlanTrans(dataSourceNames: Seq[String],
+                                     timeInfo: TimeInfo, name: String, expr: 
Expr,
+                                     param: Map[String, Any], procType: 
ProcessType
+                                    ) extends RulePlanTrans {
+
+  private object CompletenessKeys {
+    val _source = "source"
+    val _total = "total"
+    val _complete = "complete"
+    val _incomplete = "incomplete"
+  }
+  import CompletenessKeys._
+
+  def trans(): Try[RulePlan] =  Try {
+    val details = getDetails(param)
+    val completenessClause = expr.asInstanceOf[CompletenessClause]
+    val sourceName = details.getString(_source, dataSourceNames.head)
+
+    val mode = ExportMode.defaultMode(procType)
+
+    val ct = timeInfo.calcTime
+
+    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+      emptyRulePlan
+    } else {
+      val analyzer = CompletenessAnalyzer(completenessClause, sourceName)
+
+      val selItemsClause = analyzer.selectionPairs.map { pair =>
+        val (expr, alias) = pair
+        s"${expr.desc} AS `${alias}`"
+      }.mkString(", ")
+      val aliases = analyzer.selectionPairs.map(_._2)
+
+      val selClause = procType match {
+        case BatchProcessType => selItemsClause
+        case StreamingProcessType => s"`${InternalColumns.tmst}`, 
${selItemsClause}"
+      }
+      val selAliases = procType match {
+        case BatchProcessType => aliases
+        case StreamingProcessType => InternalColumns.tmst +: aliases
+      }
+
+      // 1. source alias
+      val sourceAliasTableName = "__sourceAlias"
+      val sourceAliasSql = {
+        s"SELECT ${selClause} FROM `${sourceName}`"
+      }
+      val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, 
emptyMap, true)
+
+      // 2. incomplete record
+      val incompleteRecordsTableName = "__incompleteRecords"
+      val completeWhereClause = aliases.map(a => s"`${a}` IS NOT 
NULL").mkString(" AND ")
+      val incompleteWhereClause = s"NOT (${completeWhereClause})"
+      val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` 
WHERE ${incompleteWhereClause}"
+      val incompleteRecordStep = SparkSqlStep(incompleteRecordsTableName, 
incompleteRecordsSql, emptyMap, true)
+      val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+      val incompleteRecordExport = genRecordExport(recordParam, 
incompleteRecordsTableName, incompleteRecordsTableName, ct, mode)
+
+      // 3. incomplete count
+      val incompleteCountTableName = "__incompleteCount"
+      val incompleteColName = details.getStringOrKey(_incomplete)
+      val incompleteCountSql = procType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` 
FROM `${incompleteRecordsTableName}`"
+        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}` GROUP 
BY `${InternalColumns.tmst}`"
+      }
+      val incompleteCountStep = SparkSqlStep(incompleteCountTableName, 
incompleteCountSql, emptyMap)
+
+      // 4. total count
+      val totalCountTableName = "__totalCount"
+      val totalColName = details.getStringOrKey(_total)
+      val totalCountSql = procType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceAliasTableName}`"
+        case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, 
COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}` GROUP BY 
`${InternalColumns.tmst}`"
+      }
+      val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, 
emptyMap)
+
+      // 5. complete metric
+      val completeTableName = name
+      val completeColName = details.getStringOrKey(_complete)
+      val completeMetricSql = procType match {
+        case BatchProcessType => {
+          s"""
+             |SELECT `${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
+             |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 
0) AS `${incompleteColName}`,
+             |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS 
`${completeColName}`
+             |FROM `${totalCountTableName}` LEFT JOIN 
`${incompleteCountTableName}`
+         """.stripMargin
+        }
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS 
`${InternalColumns.tmst}`,
+             |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
+             |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 
0) AS `${incompleteColName}`,
+             |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS 
`${completeColName}`
+             |FROM `${totalCountTableName}` LEFT JOIN 
`${incompleteCountTableName}`
+             |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = 
`${incompleteCountTableName}`.`${InternalColumns.tmst}`
+         """.stripMargin
+        }
+      }
+      val completeStep = SparkSqlStep(completeTableName, completeMetricSql, 
emptyMap)
+      val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+      val completeExport = genMetricExport(metricParam, completeTableName, 
completeTableName, ct, mode)
+
+      // complete plan
+      val completeSteps = sourceAliasStep :: incompleteRecordStep :: 
incompleteCountStep :: totalCountStep :: completeStep :: Nil
+      val completeExports = incompleteRecordExport :: completeExport :: Nil
+      val completePlan = RulePlan(completeSteps, completeExports)
+
+      completePlan
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
index 9289053..ba9565f 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
@@ -53,6 +53,7 @@ object RulePlanTrans {
       case UniquenessType => UniquenessRulePlanTrans(dsNames, ti, name, expr, 
param, procType)
       case DistinctnessType => DistinctnessRulePlanTrans(dsNames, ti, name, 
expr, param, procType, dsTimeRanges)
       case TimelinessType => TimelinessRulePlanTrans(dsNames, ti, name, expr, 
param, procType, dsTimeRanges)
+      case CompletenessType => CompletenessRulePlanTrans(dsNames, ti, name, 
expr, param, procType)
       case _ => emptyRulePlanTrans
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
index 7e9b8fb..d6dc499 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
@@ -61,10 +61,10 @@ case class TimelinessRulePlanTrans(dataSourceNames: 
Seq[String],
 
 //    val ct = timeInfo.calcTime
 
-    val beginTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.beginTmstOpt)
-    val beginTmst = beginTmstOpt match {
+    val minTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.minTmstOpt)
+    val minTmst = minTmstOpt match {
       case Some(t) => t
-      case _ => throw new Exception(s"empty begin tmst from ${sourceName}")
+      case _ => throw new Exception(s"empty min tmst from ${sourceName}")
     }
 
     if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
@@ -129,7 +129,7 @@ case class TimelinessRulePlanTrans(dataSourceNames: 
Seq[String],
       }
       val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap)
       val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
-      val metricExports = genMetricExport(metricParam, name, metricTableName, 
beginTmst, mode) :: Nil
+      val metricExports = genMetricExport(metricParam, name, metricTableName, 
minTmst, mode) :: Nil
 
       // current timeliness plan
       val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil
@@ -145,7 +145,7 @@ case class TimelinessRulePlanTrans(dataSourceNames: 
Seq[String],
           }
           val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap)
           val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-          val recordExports = genRecordExport(recordParam, recordTableName, 
recordTableName, beginTmst, mode) :: Nil
+          val recordExports = genRecordExport(recordParam, recordTableName, 
recordTableName, minTmst, mode) :: Nil
           RulePlan(recordStep :: Nil, recordExports)
         }
         case _ => emptyRulePlan
@@ -184,7 +184,7 @@ case class TimelinessRulePlanTrans(dataSourceNames: 
Seq[String],
           }
           val rangeMetricStep = SparkSqlStep(rangeMetricTableName, 
rangeMetricSql, emptyMap)
           val rangeMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-          val rangeMetricExports = genMetricExport(rangeMetricParam, 
stepColName, rangeMetricTableName, beginTmst, mode) :: Nil
+          val rangeMetricExports = genMetricExport(rangeMetricParam, 
stepColName, rangeMetricTableName, minTmst, mode) :: Nil
 
           RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports)
         }
@@ -208,9 +208,9 @@ case class TimelinessRulePlanTrans(dataSourceNames: 
Seq[String],
         }
         val percentileStep = SparkSqlStep(percentileTableName, percentileSql, 
emptyMap)
         val percentileParam = emptyMap
-        val percentielExports = genMetricExport(percentileParam, 
percentileTableName, percentileTableName, beginTmst, mode) :: Nil
+        val percentileExports = genMetricExport(percentileParam, 
percentileTableName, percentileTableName, minTmst, mode) :: Nil
 
-        RulePlan(percentileStep :: Nil, percentielExports)
+        RulePlan(percentileStep :: Nil, percentileExports)
       } else emptyRulePlan
 
       timePlan.merge(recordPlan).merge(rangePlan).merge(percentilePlan)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/test/resources/_completeness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json 
b/measure/src/test/resources/_completeness-batch-griffindsl.json
new file mode 100644
index 0000000..9c00444
--- /dev/null
+++ b/measure/src/test/resources/_completeness-batch-griffindsl.json
@@ -0,0 +1,36 @@
+{
+  "name": "comp_batch",
+
+  "process.type": "batch",
+
+  "timestamp": 123456,
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "completeness",
+        "name": "comp",
+        "rule": "email, post_code, first_name",
+        "metric": {
+          "name": "comp"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/c1f08981/measure/src/test/resources/_completeness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json 
b/measure/src/test/resources/_completeness-streaming-griffindsl.json
new file mode 100644
index 0000000..df1b889
--- /dev/null
+++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json
@@ -0,0 +1,64 @@
+{
+  "name": "comp_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.147.177.107:9092",
+              "group.id": "source",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "test",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+        "info.path": "old",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "completeness",
+        "name": "comp",
+        "rule": "name, age",
+        "metric": {
+          "name": "comp"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

Reply via email to