completeness test pass
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/2e771689 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/2e771689 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/2e771689 Branch: refs/heads/griffin-0.2.0-incubating-rc4 Commit: 2e771689aa1cdc9eda1dbb4e6d36bbd9e5e2b742 Parents: b9411cb Author: Lionel Liu <[email protected]> Authored: Mon Apr 9 17:01:26 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Mon Apr 9 17:01:26 2018 +0800 ---------------------------------------------------------------------- .../measure/process/BatchDqProcess.scala | 2 +- .../measure/process/temp/TimeRange.scala | 2 +- .../griffin/measure/rule/dsl/DqType.scala | 7 +- .../dsl/analyzer/CompletenessAnalyzer.scala | 46 ++++++ .../rule/dsl/expr/ClauseExpression.scala | 8 + .../rule/dsl/parser/GriffinDslParser.scala | 9 ++ .../rule/trans/CompletenessRulePlanTrans.scala | 145 +++++++++++++++++++ .../measure/rule/trans/RulePlanTrans.scala | 1 + .../rule/trans/TimelinessRulePlanTrans.scala | 16 +- .../_completeness-batch-griffindsl.json | 36 +++++ 10 files changed, 261 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2e771689/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala index 8c95a39..2770de8 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala @@ -166,7 +166,7 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess { private def printTimeRanges(timeRanges: Map[String, TimeRange]): Unit = { val timeRangesStr = timeRanges.map { pair => val (name, timeRange) = pair - s"${name} -> [${timeRange.begin}, ${timeRange.end})" + s"${name} -> (${timeRange.begin}, ${timeRange.end}]" }.mkString(", ") println(s"data source timeRanges: ${timeRangesStr}") } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2e771689/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala index 9e79396..4073753 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala @@ -24,7 +24,7 @@ case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends Serializa def merge(tr: TimeRange): TimeRange = { TimeRange(min(begin, tr.begin), max(end, tr.end), tmsts ++ tr.tmsts) } - def beginTmstOpt: Option[Long] = { + def minTmstOpt: Option[Long] = { try { if (tmsts.nonEmpty) Some(tmsts.min) else None } catch { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2e771689/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala index 18a5919..f6a7f85 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala @@ -28,7 +28,7 @@ sealed trait DqType { object DqType { private val dqTypes: List[DqType] = List( - AccuracyType, ProfilingType, UniquenessType, DistinctnessType, TimelinessType, UnknownType + AccuracyType, ProfilingType, UniquenessType, DistinctnessType, TimelinessType, CompletenessType, UnknownType ) def apply(ptn: String): DqType = { dqTypes.filter(tp => ptn match { @@ -64,6 +64,11 @@ final case object TimelinessType extends DqType { val desc = "timeliness" } +final case object CompletenessType extends DqType { + val regex = "^(?i)completeness$".r + val desc = "completeness" +} + final case object UnknownType extends DqType { val regex = "".r val desc = "unknown" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2e771689/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/CompletenessAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/CompletenessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/CompletenessAnalyzer.scala new file mode 100644 index 0000000..7df1a84 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/CompletenessAnalyzer.scala @@ -0,0 +1,46 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.dsl.analyzer + +import org.apache.griffin.measure.rule.dsl.expr._ + + +case class CompletenessAnalyzer(expr: CompletenessClause, sourceName: String) extends BasicAnalyzer { + + val seqAlias = (expr: Expr, v: Seq[String]) => { + expr match { + case apr: AliasableExpr => v ++ apr.alias + case _ => v + } + } + val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b + + private val exprs = expr.exprs + private def genAlias(idx: Int): String = s"alias_${idx}" + val selectionPairs = exprs.zipWithIndex.map { pair => + val (pr, idx) = pair + val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias) + (pr, res.headOption.getOrElse(genAlias(idx))) + } + + if (selectionPairs.isEmpty) { + throw new Exception(s"uniqueness analyzer error: empty selection") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2e771689/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala index 6790268..ecc5d67 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala @@ -259,4 +259,12 @@ case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression { def desc: String = exprs.map(_.desc).mkString(", ") def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ") override def map(func: (Expr) => Expr): TimelinessClause = TimelinessClause(exprs.map(func(_))) +} + +case class CompletenessClause(exprs: Seq[Expr]) extends ClauseExpression { + addChildren(exprs) + + def desc: String = exprs.map(_.desc).mkString(", ") + def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ") + override def map(func: (Expr) => Expr): CompletenessClause = CompletenessClause(exprs.map(func(_))) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2e771689/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala index d4a037b..b4496e7 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala @@ -70,6 +70,14 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str case exprs => TimelinessClause(exprs) } + /** + * -- completeness clauses -- + * <completeness-clauses> = <expr> [, <expr>]+ + */ + def completenessClause: Parser[CompletenessClause] = rep1sep(expression, Operator.COMMA) ^^ { + case exprs => CompletenessClause(exprs) + } + def parseRule(rule: String, dqType: DqType): ParseResult[Expr] = { val rootExpr = dqType match { case AccuracyType => logicalExpression @@ -77,6 +85,7 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str case UniquenessType => uniquenessClause case DistinctnessType => distinctnessClause case TimelinessType => timelinessClause + case CompletenessType => completenessClause case _ => expression } parseAll(rootExpr, rule) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2e771689/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala new file mode 100644 index 0000000..5b1a893 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala @@ -0,0 +1,145 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.trans + +import org.apache.griffin.measure.process.temp.TableRegisters +import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, ProcessType, StreamingProcessType} +import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._ +import org.apache.griffin.measure.rule.adaptor._ +import org.apache.griffin.measure.rule.dsl.analyzer.CompletenessAnalyzer +import org.apache.griffin.measure.rule.dsl.expr._ +import org.apache.griffin.measure.rule.plan._ +import org.apache.griffin.measure.rule.trans.RuleExportFactory._ +import org.apache.griffin.measure.utils.ParamUtil._ + +import scala.util.Try + +case class CompletenessRulePlanTrans(dataSourceNames: Seq[String], + timeInfo: TimeInfo, name: String, expr: Expr, + param: Map[String, Any], procType: ProcessType + ) extends RulePlanTrans { + + private object CompletenessKeys { + val _source = "source" + val _total = "total" + val _complete = "complete" + val _incomplete = "incomplete" + } + import CompletenessKeys._ + + def trans(): Try[RulePlan] = Try { + val details = getDetails(param) + val completenessClause = expr.asInstanceOf[CompletenessClause] + val sourceName = details.getString(_source, dataSourceNames.head) + + val mode = ExportMode.defaultMode(procType) + + val ct = timeInfo.calcTime + + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { + emptyRulePlan + } else { + val analyzer = CompletenessAnalyzer(completenessClause, sourceName) + + val selItemsClause = analyzer.selectionPairs.map { pair => + val (expr, alias) = pair + s"${expr.desc} AS `${alias}`" + }.mkString(", ") + val aliases = analyzer.selectionPairs.map(_._2) + + val selClause = procType match { + case BatchProcessType => selItemsClause + case StreamingProcessType => s"`${InternalColumns.tmst}`, ${selItemsClause}" + } + val selAliases = procType match { + case BatchProcessType => aliases + case StreamingProcessType => InternalColumns.tmst +: aliases + } + + // 1. source alias + val sourceAliasTableName = "__sourceAlias" + val sourceAliasSql = { + s"SELECT ${selClause} FROM `${sourceName}`" + } + val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, emptyMap, true) + + // 2. incomplete record + val incompleteRecordsTableName = "__incompleteRecords" + val completeWhereClause = aliases.map(a => s"`${a}` IS NOT NULL").mkString(" AND ") + val incompleteWhereClause = s"NOT (${completeWhereClause})" + val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}" + val incompleteRecordStep = SparkSqlStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true) + val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + val incompleteRecordExport = genRecordExport(recordParam, incompleteRecordsTableName, incompleteRecordsTableName, ct, mode) + + // 3. incomplete count + val incompleteCountTableName = "__missCount" + val incompleteColName = details.getStringOrKey(_incomplete) + val incompleteCountSql = procType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}`" + case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${incompleteCountTableName}` FROM `${incompleteRecordsTableName}` GROUP BY `${InternalColumns.tmst}`" + } + val incompleteCountStep = SparkSqlStep(incompleteCountTableName, incompleteCountSql, emptyMap) + + // 4. total count + val totalCountTableName = "__totalCount" + val totalColName = details.getStringOrKey(_total) + val totalCountSql = procType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`" + case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}` GROUP BY `${InternalColumns.tmst}`" + } + val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, emptyMap) + + // 5. complete metric + val completeTableName = name + val completeColName = details.getStringOrKey(_complete) + val completeMetricSql = procType match { + case BatchProcessType => { + s""" + |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, + |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`, + |(`${totalColName}` - `${incompleteColName}`) AS `${completeColName}` + |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}` + """.stripMargin + } + case StreamingProcessType => { + s""" + |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, + |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, + |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`, + |(`${totalColName}` - `${incompleteColName}`) AS `${completeColName}` + |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}` + |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${incompleteCountTableName}`.`${InternalColumns.tmst}` + """.stripMargin + } + } + val completeStep = SparkSqlStep(completeTableName, completeMetricSql, emptyMap) + val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) + val completeExport = genMetricExport(metricParam, completeTableName, completeTableName, ct, mode) + + // complete plan + val completeSteps = sourceAliasStep :: incompleteRecordStep :: incompleteCountStep :: totalCountStep :: completeStep :: Nil + val completeExports = incompleteRecordExport :: completeExport :: Nil + val completePlan = RulePlan(completeSteps, completeExports) + + completePlan + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2e771689/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala index 9289053..ba9565f 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala @@ -53,6 +53,7 @@ object RulePlanTrans { case UniquenessType => UniquenessRulePlanTrans(dsNames, ti, name, expr, param, procType) case DistinctnessType => DistinctnessRulePlanTrans(dsNames, ti, name, expr, param, procType, dsTimeRanges) case TimelinessType => TimelinessRulePlanTrans(dsNames, ti, name, expr, param, procType, dsTimeRanges) + case CompletenessType => CompletenessRulePlanTrans(dsNames, ti, name, expr, param, procType) case _ => emptyRulePlanTrans } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2e771689/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala index 7e9b8fb..d6dc499 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala @@ -61,10 +61,10 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], // val ct = timeInfo.calcTime - val beginTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.beginTmstOpt) - val beginTmst = beginTmstOpt match { + val minTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.minTmstOpt) + val minTmst = minTmstOpt match { case Some(t) => t - case _ => throw new Exception(s"empty begin tmst from ${sourceName}") + case _ => throw new Exception(s"empty min tmst from ${sourceName}") } if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { @@ -129,7 +129,7 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], } val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap) val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - val metricExports = genMetricExport(metricParam, name, metricTableName, beginTmst, mode) :: Nil + val metricExports = genMetricExport(metricParam, name, metricTableName, minTmst, mode) :: Nil // current timeliness plan val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil @@ -145,7 +145,7 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], } val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap) val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val recordExports = genRecordExport(recordParam, recordTableName, recordTableName, beginTmst, mode) :: Nil + val recordExports = genRecordExport(recordParam, recordTableName, recordTableName, minTmst, mode) :: Nil RulePlan(recordStep :: Nil, recordExports) } case _ => emptyRulePlan @@ -184,7 +184,7 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], } val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap) val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val rangeMetricExports = genMetricExport(rangeMetricParam, stepColName, rangeMetricTableName, beginTmst, mode) :: Nil + val rangeMetricExports = genMetricExport(rangeMetricParam, stepColName, rangeMetricTableName, minTmst, mode) :: Nil RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports) } @@ -208,9 +208,9 @@ case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], } val percentileStep = SparkSqlStep(percentileTableName, percentileSql, emptyMap) val percentileParam = emptyMap - val percentielExports = genMetricExport(percentileParam, percentileTableName, percentileTableName, beginTmst, mode) :: Nil + val percentileExports = genMetricExport(percentileParam, percentileTableName, percentileTableName, minTmst, mode) :: Nil - RulePlan(percentileStep :: Nil, percentielExports) + RulePlan(percentileStep :: Nil, percentileExports) } else emptyRulePlan timePlan.merge(recordPlan).merge(rangePlan).merge(percentilePlan) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2e771689/measure/src/test/resources/_completeness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json b/measure/src/test/resources/_completeness-batch-griffindsl.json new file mode 100644 index 0000000..9c00444 --- /dev/null +++ b/measure/src/test/resources/_completeness-batch-griffindsl.json @@ -0,0 +1,36 @@ +{ + "name": "comp_batch", + + "process.type": "batch", + + "timestamp": 123456, + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "completeness", + "name": "comp", + "rule": "email, post_code, first_name", + "metric": { + "name": "comp" + } + } + ] + } +} \ No newline at end of file
