Repository: incubator-griffin Updated Branches: refs/heads/master 2d268aaf1 -> fad7daf7e
Uniqueness enhance add total and unique in uniqueness metric Author: Lionel Liu <[email protected]> Closes #187 from bhlx3lyx7/tmst. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/fad7daf7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/fad7daf7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/fad7daf7 Branch: refs/heads/master Commit: fad7daf7e3199584d6a8f985bc115e72d52e6cc5 Parents: 2d268aa Author: Lionel Liu <[email protected]> Authored: Wed Jan 10 15:14:33 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Wed Jan 10 15:14:33 2018 +0800 ---------------------------------------------------------------------- griffin-doc/measure/dsl-guide.md | 13 +- .../rule/adaptor/GriffinDslAdaptor.scala | 872 +++---------------- .../griffin/measure/rule/dsl/DqType.scala | 8 +- .../rule/dsl/analyzer/DuplicateAnalyzer.scala | 46 - .../rule/dsl/analyzer/UniquenessAnalyzer.scala | 46 + .../rule/dsl/expr/ClauseExpression.scala | 4 +- .../rule/dsl/parser/GriffinDslParser.scala | 10 +- .../resources/_duplicate-batch-griffindsl.json | 56 -- .../_duplicate-streaming-griffindsl.json | 116 --- .../_duplicate-streaming-sparksql.json | 130 --- .../resources/_uniqueness-batch-griffindsl.json | 58 ++ .../_uniqueness-streaming-griffindsl.json | 119 +++ .../_uniqueness-streaming-sparksql.json | 130 +++ .../rule/adaptor/GriffinDslAdaptorTest.scala | 4 +- 14 files changed, 489 insertions(+), 1123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/griffin-doc/measure/dsl-guide.md ---------------------------------------------------------------------- diff --git a/griffin-doc/measure/dsl-guide.md b/griffin-doc/measure/dsl-guide.md index fb2eeb9..0fc8059 100644 --- a/griffin-doc/measure/dsl-guide.md +++ b/griffin-doc/measure/dsl-guide.md @@ -121,8 +121,8 @@ Accuracy rule expression in Griffin DSL is a logical expression, telling the map Profiling rule expression in Griffin DSL is a sql-like expression, with select clause ahead, following optional from clause, where clause, group-by clause, order-by clause, limit clause in order. e.g. `source.gender, source.id.count() where source.age > 20 group by source.gender`, `select country, max(age), min(age), count(*) as cnt from source group by country order by cnt desc limit 5` -### Duplicate Rule -Duplicate rule expression in Griffin DSL is a list of selection expressions separated by comma, indicates the duplicate columns to measure. +### Uniqueness Rule +Uniqueness rule expression in Griffin DSL is a list of selection expressions separated by comma, indicates the columns to check if is unique. e.g. `name, age`, `name, (age + 1) as next_age` ### Timeliness Rule @@ -151,13 +151,16 @@ For example, the dsl rule is `source.cntry, source.id.count(), source.age.max() After the translation, the metrics will be persisted in table `profiling`. -### Duplicate -For duplicate, or called uniqueness, is to find out the duplicate items of data, and rollup the items count group by duplicate times. +### Uniqueness +For uniqueness, or called duplicate, is to find out the duplicate items of data, and rollup the items count group by duplicate times. For example, the dsl rule is `name, age`, which represents the duplicate requests, in this case, source and target are the same data set. After the translation, the sql rule is as below: - **get distinct items from source**: `SELECT name, age FROM source`, save as table `src`. - **get all items from target**: `SELECT name, age FROM target`, save as table `tgt`. - **join two tables**: `SELECT src.name, src.age FROM tgt RIGHT JOIN src ON coalesce(src.name, '') = coalesce(tgt.name, '') AND coalesce(src.age, '') = coalesce(tgt.age, '')`, save as table `joined`. -- **get duplicate items**: `SELECT name, age, (count(*) - 1) AS dup FROM joined GROUP BY name, age`, save as table `grouped`. +- **get items duplication**: `SELECT name, age, (count(*) - 1) AS dup FROM joined GROUP BY name, age`, save as table `grouped`. +- **get total metric**: `SELECT count(*) FROM source`, save as table `total_metric`. +- **get unique record**: `SELECT * FROM grouped WHERE dup = 0`, save as table `unique_record`. +- **get unique metric**: `SELECT count(*) FROM unique_record`, save as table `unique_metric`. - **get duplicate record**: `SELECT * FROM grouped WHERE dup > 0`, save as table `dup_record`. - **get duplicate metric**: `SELECT dup, count(*) AS num FROM dup_records GROUP BY dup`, save as table `dup_metric`. http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala index a02335a..98545d8 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala @@ -43,11 +43,14 @@ object ProfilingKeys { val _source = "source" } -object DuplicateKeys { +object UniquenessKeys { val _source = "source" val _target = "target" + val _unique = "unique" + val _total = "total" val _dup = "dup" val _num = "num" + val _duplicationArray = "duplication.array" } object TimelinessKeys { @@ -58,7 +61,6 @@ object TimelinessKeys { object GlobalKeys { val _initRule = "init.rule" -// val _globalMetricKeep = "global.metric.keep" } case class GriffinDslAdaptor(dataSourceNames: Seq[String], @@ -87,7 +89,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], dqType match { case AccuracyType => accuracyRulePlan(timeInfo, name, expr, param, processType) case ProfilingType => profilingRulePlan(timeInfo, name, expr, param, processType) - case DuplicateType => duplicateRulePlan(timeInfo, name, expr, param, processType) + case UniquenessType => uniquenessRulePlan(timeInfo, name, expr, param, processType) case TimelinessType => timelinessRulePlan(timeInfo, name, expr, param, processType) case _ => emptyRulePlan } @@ -169,7 +171,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` - |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}` + |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` """.stripMargin } case StreamingProcessType => { @@ -178,7 +180,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` - |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}` + |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}` """.stripMargin } @@ -245,187 +247,6 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } } -// private def accuracyRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, -// param: Map[String, Any], processType: ProcessType -// ): RulePlan = { -// val details = getDetails(param) -// val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head) -// val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head) -// val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) -// -// if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { -// emptyRulePlan -// } else { -// // 1. miss record -// val missRecordsTableName = "__missRecords" -// val selClause = s"`${sourceName}`.*" -// val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) { -// s"SELECT ${selClause} FROM `${sourceName}`" -// } else { -// val onClause = expr.coalesceDesc -// val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => -// s"${sel.desc} IS NULL" -// }.mkString(" AND ") -// val targetIsNull = analyzer.targetSelectionExprs.map { sel => -// s"${sel.desc} IS NULL" -// }.mkString(" AND ") -// val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" -// s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" -// } -// val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, emptyMap, true) -// val missRecordsExports = processType match { -// case BatchProcessType => { -// val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) -// genRecordExport(recordParam, missRecordsTableName, missRecordsTableName) :: Nil -// } -// case StreamingProcessType => Nil -// } -// -// // 2. miss count -// val missCountTableName = "__missCount" -// val missColName = details.getStringOrKey(AccuracyKeys._miss) -// val missCountSql = processType match { -// case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`" -// case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${InternalColumns.tmst}`" -// } -// val missCountStep = SparkSqlStep(missCountTableName, missCountSql, emptyMap) -// -// // 3. total count -// val totalCountTableName = "__totalCount" -// val totalColName = details.getStringOrKey(AccuracyKeys._total) -// val totalCountSql = processType match { -// case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" -// case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`" -// } -// val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, emptyMap) -// -// // 4. accuracy metric -// val accuracyTableName = name -// val matchedColName = details.getStringOrKey(AccuracyKeys._matched) -// val accuracyMetricSql = processType match { -// case BatchProcessType => { -// s""" -// |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, -// |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, -// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` -// |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}` -// """.stripMargin -// } -// case StreamingProcessType => { -// s""" -// |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, -// |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, -// |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, -// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` -// |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}` -// |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}` -// """.stripMargin -// } -// } -// val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, emptyMap, true) -// val accuracyExports = processType match { -// case BatchProcessType => { -// val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) -// genMetricExport(metricParam, accuracyTableName, accuracyTableName) :: Nil -// } -// case StreamingProcessType => Nil -// } -// -// // current accu plan -// val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: accuracyStep :: Nil -// val accuExports = missRecordsExports ++ accuracyExports -// val accuPlan = RulePlan(accuSteps, accuExports) -// -// // streaming extra accu plan -// val streamingAccuPlan = processType match { -// case BatchProcessType => emptyRulePlan -// case StreamingProcessType => { -// // 5. global accuracy metric merge -// val globalAccuracyTableName = "__globalAccuracy" -// val globalAccuracySql = { -// s""" -// |SELECT coalesce(`${globalAccuracyTableName}`.`${InternalColumns.tmst}`, `${accuracyTableName}`.`${InternalColumns.tmst}`) AS `${InternalColumns.tmst}`, -// |coalesce(`${accuracyTableName}`.`${missColName}`, `${globalAccuracyTableName}`.`${missColName}`) AS `${missColName}`, -// |coalesce(`${globalAccuracyTableName}`.`${totalColName}`, `${accuracyTableName}`.`${totalColName}`) AS `${totalColName}`, -// |((`${accuracyTableName}`.`${missColName}` IS NOT NULL) AND ((`${globalAccuracyTableName}`.`${missColName}` IS NULL) OR (`${accuracyTableName}`.`${missColName}` < `${globalAccuracyTableName}`.`${missColName}`))) AS `${InternalColumns.metric}` -// |FROM `${globalAccuracyTableName}` FULL JOIN `${accuracyTableName}` -// |ON `${globalAccuracyTableName}`.`${InternalColumns.tmst}` = `${accuracyTableName}`.`${InternalColumns.tmst}` -// """.stripMargin -// } -// val globalAccuracyInitSql = { -// s""" -// |SELECT `${InternalColumns.tmst}`, `${totalColName}`, `${missColName}`, -// |(true) AS `${InternalColumns.metric}` -// |FROM `${accuracyTableName}` -// """.stripMargin -// } -// val globalAccuracyDetails = Map[String, Any](GlobalKeys._initRule -> globalAccuracyInitSql) -// val globalAccuracyStep = SparkSqlStep(globalAccuracyTableName, -// globalAccuracySql, globalAccuracyDetails, true, true) -// -// // 6. collect accuracy metrics -// val accuracyMetricTableName = name -// val accuracyMetricSql = { -// s""" -// |SELECT `${InternalColumns.tmst}`, `${totalColName}`, `${missColName}`, -// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` -// |FROM `${globalAccuracyTableName}` WHERE `${InternalColumns.metric}` -// """.stripMargin -// } -// val accuracyMetricStep = SparkSqlStep(accuracyMetricTableName, accuracyMetricSql, emptyMap) -// val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) -// val accuracyMetricExports = genMetricExport(metricParam, accuracyMetricTableName, accuracyMetricTableName) :: Nil -// -// // 7. collect accuracy records -// val accuracyRecordTableName = "__accuracyRecords" -// val accuracyRecordSql = { -// s""" -// |SELECT `${InternalColumns.tmst}` -// |FROM `${accuracyMetricTableName}` WHERE `${matchedColName}` > 0 -// """.stripMargin -// } -// val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, accuracyRecordSql, emptyMap) -// val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) -// val accuracyRecordParam = recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName) -// .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName) -// val accuracyRecordExports = genRecordExport( -// accuracyRecordParam, missRecordsTableName, accuracyRecordTableName) :: Nil -// -// // 8. update global accuracy metric -// val updateGlobalAccuracyTableName = globalAccuracyTableName -// val globalMetricKeepTime = details.getString(GlobalKeys._globalMetricKeep, "") -// val updateGlobalAccuracySql = TimeUtil.milliseconds(globalMetricKeepTime) match { -// case Some(kt) => { -// s""" -// |SELECT * FROM `${globalAccuracyTableName}` -// |WHERE (`${missColName}` > 0) AND (`${InternalColumns.tmst}` > ${timeInfo.calcTime - kt}) -// """.stripMargin -// } -// case _ => { -// s""" -// |SELECT * FROM `${globalAccuracyTableName}` -// |WHERE (`${missColName}` > 0) -// """.stripMargin -// } -// } -// val updateGlobalAccuracyStep = SparkSqlStep(updateGlobalAccuracyTableName, -// updateGlobalAccuracySql, emptyMap, true, true) -// -// // gen accu plan -// val extraSteps = globalAccuracyStep :: accuracyMetricStep :: accuracyRecordStep :: updateGlobalAccuracyStep :: Nil -// val extraExports = accuracyMetricExports ++ accuracyRecordExports -// val extraPlan = RulePlan(extraSteps, extraExports) -// -// extraPlan -// } -// } -// -// // return accu plan -// accuPlan.merge(streamingAccuPlan) -// -// } -// } - private def profilingRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, param: Map[String, Any], processType: ProcessType ): RulePlan = { @@ -481,13 +302,13 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } } - private def duplicateRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], processType: ProcessType - ): RulePlan = { + private def uniquenessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, + param: Map[String, Any], processType: ProcessType + ): RulePlan = { val details = getDetails(param) - val sourceName = details.getString(DuplicateKeys._source, dataSourceNames.head) - val targetName = details.getString(DuplicateKeys._target, dataSourceNames.tail.head) - val analyzer = DuplicateAnalyzer(expr.asInstanceOf[DuplicateClause], sourceName, targetName) + val sourceName = details.getString(UniquenessKeys._source, dataSourceNames.head) + val targetName = details.getString(UniquenessKeys._target, dataSourceNames.tail.head) + val analyzer = UniquenessAnalyzer(expr.asInstanceOf[UniquenessClause], sourceName, targetName) if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists") @@ -539,49 +360,117 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], val groupSelClause = selAliases.map { alias => s"`${alias}`" }.mkString(", ") - val dupColName = details.getStringOrKey(DuplicateKeys._dup) + val dupColName = details.getStringOrKey(UniquenessKeys._dup) val groupSql = { s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM `${joinedTableName}` GROUP BY ${groupSelClause}" } - val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap) - - // 5. duplicate record - val dupRecordTableName = "__dupRecords" - val dupRecordSql = { - s""" - |SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0 - """.stripMargin - } - val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) - val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val dupRecordxports = genRecordExport(recordParam, dupRecordTableName, dupRecordTableName) :: Nil + val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap, true) - // 6. duplicate metric - val dupMetricTableName = name - val numColName = details.getStringOrKey(DuplicateKeys._num) - val dupMetricSelClause = processType match { - case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`" - case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`" + // 5. total metric + val totalTableName = "__totalMetric" + val totalColName = details.getStringOrKey(UniquenessKeys._total) + val totalSql = processType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" + case StreamingProcessType => { + s""" + |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` + |FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}` + """.stripMargin + } } - val dupMetricGroupbyClause = processType match { - case BatchProcessType => s"`${dupColName}`" - case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`" + val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap) + val totalMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) + val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName) + + // 6. unique record + val uniqueRecordTableName = "__uniqueRecord" + val uniqueRecordSql = { + s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0" } - val dupMetricSql = { - s""" - |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}` - |GROUP BY ${dupMetricGroupbyClause} - """.stripMargin + val uniqueRecordStep = SparkSqlStep(uniqueRecordTableName, uniqueRecordSql, emptyMap) + + // 7. unique metric + val uniqueTableName = "__uniqueMetric" + val uniqueColName = details.getStringOrKey(UniquenessKeys._unique) + val uniqueSql = processType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`" + case StreamingProcessType => { + s""" + |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${uniqueColName}` + |FROM `${uniqueRecordTableName}` GROUP BY `${InternalColumns.tmst}` + """.stripMargin + } } - val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) - val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - .addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val dupMetricExports = genMetricExport(metricParam, name, dupMetricTableName) :: Nil + val uniqueStep = SparkSqlStep(uniqueTableName, uniqueSql, emptyMap) + val uniqueMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) + val uniqueMetricExport = genMetricExport(uniqueMetricParam, uniqueColName, uniqueTableName) + + // 8. count metric +// val countMetricTableName = "__countMetric" +// val countMetricSql = processType match { +// case BatchProcessType => { +// s""" +// |SELECT `${totalTableName}`.`${totalColName}` AS `${totalColName}`, +// |coalesce(`${uniqueTableName}`.`${uniqueColName}`, 0) AS `${uniqueColName}` +// |FROM `${totalTableName}` LEFT JOIN `${uniqueTableName}` +// """.stripMargin +// } +// case StreamingProcessType => { +// s""" +// |SELECT `${totalTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, +// |`${totalTableName}`.`${totalColName}` AS `${totalColName}`, +// |coalesce(`${uniqueTableName}`.`${uniqueColName}`, 0) AS `${uniqueColName}` +// |FROM `${totalTableName}` LEFT JOIN `${uniqueTableName}` +// |ON `${totalTableName}`.`${InternalColumns.tmst}` = `${uniqueTableName}`.`${InternalColumns.tmst}` +// """.stripMargin +// } +// } +// val countMetricStep = SparkSqlStep(countMetricTableName, countMetricSql, emptyMap) +// val countMetricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) +// .addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) +// val countMetricExport = genMetricExport(countMetricParam, "", countMetricTableName) + + val uniqueSteps = sourceStep :: targetStep :: joinedStep :: groupStep :: + totalStep :: uniqueRecordStep :: uniqueStep :: Nil + val uniqueExports = totalMetricExport :: uniqueMetricExport :: Nil + val uniqueRulePlan = RulePlan(uniqueSteps, uniqueExports) + + val duplicationArrayName = details.getString(UniquenessKeys._duplicationArray, "") + val dupRulePlan = if (duplicationArrayName.nonEmpty) { + // 8. duplicate record + val dupRecordTableName = "__dupRecords" + val dupRecordSql = { + s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0" + } + val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) + val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, dupRecordTableName) + + // 9. duplicate metric + val dupMetricTableName = "__dupMetric" + val numColName = details.getStringOrKey(UniquenessKeys._num) + val dupMetricSelClause = processType match { + case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`" + case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`" + } + val dupMetricGroupbyClause = processType match { + case BatchProcessType => s"`${dupColName}`" + case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`" + } + val dupMetricSql = { + s""" + |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}` + |GROUP BY ${dupMetricGroupbyClause} + """.stripMargin + } + val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) + val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) + val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName) - val dupSteps = sourceStep :: targetStep :: joinedStep :: groupStep :: dupRecordStep :: dupMetricStep :: Nil - val dupExports = dupRecordxports ++ dupMetricExports + RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: dupMetricExport :: Nil) + } else emptyRulePlan - RulePlan(dupSteps, dupExports) + uniqueRulePlan.merge(dupRulePlan) } } @@ -681,535 +570,4 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } } - // override def genRuleInfos(param: Map[String, Any], timeInfo: TimeInfo): Seq[RuleInfo] = { -// val ruleInfo = RuleInfoGen(param) -// val dqType = RuleInfoGen.dqType(param) -// try { -// val result = parser.parseRule(ruleInfo.rule, dqType) -// if (result.successful) { -// val expr = result.get -// dqType match { -// case AccuracyType => accuracyRuleInfos(ruleInfo, expr, timeInfo) -// case ProfilingType => profilingRuleInfos(ruleInfo, expr, timeInfo) -// case TimelinessType => Nil -// case _ => Nil -// } -// } else { -// warn(s"parse rule [ ${ruleInfo.rule} ] fails: \n${result}") -// Nil -// } -// } catch { -// case e: Throwable => { -// error(s"generate rule info ${ruleInfo} fails: ${e.getMessage}") -// Nil -// } -// } -// } - - // group by version -// private def accuracyRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: TimeInfo): Seq[RuleInfo] = { -// val calcTime = timeInfo.calcTime -// val details = ruleInfo.details -// val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head) -// val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head) -// val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) -// -// if (!TempTables.existTable(timeInfo.key, sourceName)) { -// Nil -// } else { -// // 1. miss record -// val missRecordsSql = if (!TempTables.existTable(timeInfo.key, targetName)) { -// val selClause = s"`${sourceName}`.*" -// s"SELECT ${selClause} FROM `${sourceName}`" -// } else { -// val selClause = s"`${sourceName}`.*" -// val onClause = expr.coalesceDesc -// val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => -// s"${sel.desc} IS NULL" -// }.mkString(" AND ") -// val targetIsNull = analyzer.targetSelectionExprs.map { sel => -// s"${sel.desc} IS NULL" -// }.mkString(" AND ") -// val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" -// s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" -// } -// val missRecordsName = AccuracyKeys._missRecords -// // val tmstMissRecordsName = TempName.tmstName(missRecordsName, timeInfo) -// val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords) -// .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc) -// .addIfNotExist(RuleDetailKeys._persistName, missRecordsName) -// val missRecordsRuleInfo = RuleInfo(missRecordsName, None, SparkSqlType, -// missRecordsSql, missRecordsParams, true) -// // val missRecordsStep = SparkSqlStep( -// // timeInfo, -// // RuleInfo(missRecordsName, Some(tmstMissRecordsName), missRecordsSql, missRecordsParams) -// // ) -// -// // 2. miss count -// val missTableName = "_miss_" -// // val tmstMissTableName = TempName.tmstName(missTableName, timeInfo) -// val missColName = details.getStringOrKey(AccuracyKeys._miss) -// val missSql = { -// s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsName}` GROUP BY `${InternalColumns.tmst}`" -// } -// val missRuleInfo = RuleInfo(missTableName, None, SparkSqlType, -// missSql, Map[String, Any](), true) -// // val missStep = SparkSqlStep( -// // timeInfo, -// // RuleInfo(missTableName, None, missSql, Map[String, Any]()) -// // ) -// -// // 3. total count -// val totalTableName = "_total_" -// // val tmstTotalTableName = TempName.tmstName(totalTableName, timeInfo) -// val totalColName = details.getStringOrKey(AccuracyKeys._total) -// val totalSql = { -// s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`" -// } -// val totalRuleInfo = RuleInfo(totalTableName, None, SparkSqlType, -// totalSql, Map[String, Any](), true) -// // val totalStep = SparkSqlStep( -// // timeInfo, -// // RuleInfo(totalTableName, None, totalSql, Map[String, Any]()) -// // ) -// -// // 4. accuracy metric -// val accuracyMetricName = details.getString(RuleDetailKeys._persistName, ruleInfo.name) -// // val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, timeInfo) -// val matchedColName = details.getStringOrKey(AccuracyKeys._matched) -// val accuracyMetricSql = { -// s""" -// |SELECT `${totalTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, -// |`${missTableName}`.`${missColName}` AS `${missColName}`, -// |`${totalTableName}`.`${totalColName}` AS `${totalColName}`, -// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` -// |FROM `${totalTableName}` FULL JOIN `${missTableName}` -// |ON `${totalTableName}`.`${InternalColumns.tmst}` = `${missTableName}`.`${InternalColumns.tmst}` -// """.stripMargin -// } -// // val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) -//// val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, SparkSqlType, -//// accuracyMetricSql, Map[String, Any](), true) -// val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) -// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName) -// val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, SparkSqlType, -// accuracyMetricSql, Map[String, Any](), true) -// -// // 5. accuracy metric merge -// val globalMetricName = "accu_global" -// val globalAccuSql = if (TempTables.existGlobalTable(globalMetricName)) { -// s""" -// |SELECT coalesce(`${globalMetricName}`.`${InternalColumns.tmst}`, `${accuracyMetricName}`.`${InternalColumns.tmst}`) AS `${InternalColumns.tmst}`, -// |coalesce(`${accuracyMetricName}`.`${missColName}`, `${globalMetricName}`.`${missColName}`) AS `${missColName}`, -// |coalesce(`${globalMetricName}`.`${totalColName}`, `${accuracyMetricName}`.`${totalColName}`) AS `${totalColName}`, -// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`, -// |(`${totalColName}` = 0) AS `empty`, -// |(`${missColName}` = 0) AS `no_miss`, -// |(`${accuracyMetricName}`.`${missColName}` < `${globalMetricName}`.`${missColName}`) AS `update` -// |FROM `${globalMetricName}` FULL JOIN `${accuracyMetricName}` -// |ON `${globalMetricName}`.`${InternalColumns.tmst}` = `${accuracyMetricName}`.`${InternalColumns.tmst}` -// """.stripMargin -// } else { -// s""" -// |SELECT `${accuracyMetricName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, -// |`${accuracyMetricName}`.`${missColName}` AS `${missColName}`, -// |`${accuracyMetricName}`.`${totalColName}` AS `${totalColName}`, -// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`, -// |(`${totalColName}` = 0) AS `empty`, -// |(`${missColName}` = 0) AS `no_miss`, -// |true AS `update` -// |FROM `${accuracyMetricName}` -// """.stripMargin -// } -// val globalAccuParams = Map[String, Any]( -// ("global" -> true) -// ) -// val mergeRuleInfo = RuleInfo(globalMetricName, None, SparkSqlType, -// globalAccuSql, globalAccuParams, true) -// -// // 6. persist metrics -// val persistMetricName = "persist" -// val persistSql = { -// s""" -// |SELECT `${InternalColumns.tmst}`, `${missColName}`, `${totalColName}`, `${matchedColName}` -// |FROM `${globalMetricName}` -// |WHERE `update` -// """.stripMargin -// } -// val persistParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) -// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName) -// val persistRuleInfo = RuleInfo(persistMetricName, None, SparkSqlType, -// persistSql, persistParams, true) -// -// // 5. accuracy metric filter -//// val accuracyParams = details.addIfNotExist("df.name", accuracyMetricName) -//// .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) -//// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName) -//// val accuracyRuleInfo = RuleInfo(accuracyMetricName, None, DfOprType, -//// "accuracy", accuracyParams, true) -// -//// missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo :: -//// accuracyMetricRuleInfo :: accuracyRuleInfo :: Nil -// missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo :: -// accuracyMetricRuleInfo :: mergeRuleInfo :: persistRuleInfo :: Nil -// } -// } - -// private def accuracyRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: TimeInfo): Seq[RuleInfo] = { -// val calcTime = timeInfo.calcTime -// val details = ruleInfo.details -// val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head) -// val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head) -// val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) -// -// if (!TempTables.existTable(timeInfo.key, sourceName)) { -// Nil -// } else { -// // 1. miss record -// val missRecordsSql = if (!TempTables.existTable(timeInfo.key, targetName)) { -// val selClause = s"`${sourceName}`.*" -// s"SELECT ${selClause} FROM `${sourceName}`" -// } else { -// val selClause = s"`${sourceName}`.*" -// val onClause = expr.coalesceDesc -// val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => -// s"${sel.desc} IS NULL" -// }.mkString(" AND ") -// val targetIsNull = analyzer.targetSelectionExprs.map { sel => -// s"${sel.desc} IS NULL" -// }.mkString(" AND ") -// val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" -// s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" -// } -// val missRecordsName = AccuracyKeys._missRecords -//// val tmstMissRecordsName = TempName.tmstName(missRecordsName, timeInfo) -// val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords) -// .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc) -// .addIfNotExist(RuleDetailKeys._persistName, missRecordsName) -// val missRecordsRuleInfo = RuleInfo(missRecordsName, None, SparkSqlType, -// missRecordsSql, missRecordsParams, true) -//// val missRecordsStep = SparkSqlStep( -//// timeInfo, -//// RuleInfo(missRecordsName, Some(tmstMissRecordsName), missRecordsSql, missRecordsParams) -//// ) -// -// // 2. miss count -// val missTableName = "_miss_" -// // val tmstMissTableName = TempName.tmstName(missTableName, timeInfo) -// val missColName = details.getStringOrKey(AccuracyKeys._miss) -// val missSql = { -// s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsName}`" -// } -// val missRuleInfo = RuleInfo(missTableName, None, SparkSqlType, -// missSql, Map[String, Any](), false) -//// val missStep = SparkSqlStep( -//// timeInfo, -//// RuleInfo(missTableName, None, missSql, Map[String, Any]()) -//// ) -// -// // 3. total count -// val totalTableName = "_total_" -// // val tmstTotalTableName = TempName.tmstName(totalTableName, timeInfo) -// val totalColName = details.getStringOrKey(AccuracyKeys._total) -// val totalSql = { -// s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" -// } -// val totalRuleInfo = RuleInfo(totalTableName, None, SparkSqlType, -// totalSql, Map[String, Any](), false) -//// val totalStep = SparkSqlStep( -//// timeInfo, -//// RuleInfo(totalTableName, None, totalSql, Map[String, Any]()) -//// ) -// -// // 4. accuracy metric -// val accuracyMetricName = details.getString(RuleDetailKeys._persistName, ruleInfo.name) -//// val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, timeInfo) -// val matchedColName = details.getStringOrKey(AccuracyKeys._matched) -// val accuracyMetricSql = { -// s""" -// |SELECT `${missTableName}`.`${missColName}` AS `${missColName}`, -// |`${totalTableName}`.`${totalColName}` AS `${totalColName}` -// |FROM `${totalTableName}` FULL JOIN `${missTableName}` -// """.stripMargin -// } -// // val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) -// val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, SparkSqlType, -// accuracyMetricSql, Map[String, Any](), false) -//// val accuracyMetricStep = SparkSqlStep( -//// timeInfo, -//// RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), accuracyMetricSql, Map[String, Any]()) -//// ) -// -// // 5. accuracy metric filter -// val accuracyParams = details.addIfNotExist("df.name", accuracyMetricName) -// .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) -// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName) -// val accuracyRuleInfo = RuleInfo(accuracyMetricName, None, DfOprType, -// "accuracy", accuracyParams, false) -//// val accuracyStep = DfOprStep( -//// timeInfo, -//// RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), "accuracy", accuracyParams) -//// ) -// -// missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo :: -// accuracyMetricRuleInfo :: accuracyRuleInfo :: Nil -// } -// } - -// private def profilingRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: TimeInfo): Seq[RuleInfo] = { -// val details = ruleInfo.details -// val profilingClause = expr.asInstanceOf[ProfilingClause] -// val sourceName = profilingClause.fromClauseOpt match { -// case Some(fc) => fc.dataSource -// case _ => details.getString(ProfilingKeys._source, dataSourceNames.head) -// } -// val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc -// -// if (!TempTables.existTable(timeInfo.key, sourceName)) { -// Nil -// } else { -// val tmstAnalyzer = ProfilingAnalyzer(profilingClause, sourceName) -// -// val selExprDescs = tmstAnalyzer.selectionExprs.map { sel => -// val alias = sel match { -// case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`" -// case _ => "" -// } -// s"${sel.desc}${alias}" -// } -// val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString -// val selClause = selExprDescs.mkString(", ") -//// val tmstFromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc -// val groupByClauseOpt = tmstAnalyzer.groupbyExprOpt -// val groupbyClause = groupByClauseOpt.map(_.desc).getOrElse("") -// val preGroupbyClause = tmstAnalyzer.preGroupbyExprs.map(_.desc).mkString(" ") -// val postGroupbyClause = tmstAnalyzer.postGroupbyExprs.map(_.desc).mkString(" ") -// -// // 1. select statement -// val profilingSql = { -// s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" -// } -// // println(profilingSql) -// val metricName = details.getString(RuleDetailKeys._persistName, ruleInfo.name) -// // val tmstMetricName = TempName.tmstName(metricName, timeInfo) -// val profilingParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) -// .addIfNotExist(RuleDetailKeys._persistName, metricName) -// val profilingRuleInfo = ruleInfo.setDslType(SparkSqlType) -// .setRule(profilingSql).setDetails(profilingParams) -//// val profilingStep = SparkSqlStep( -//// timeInfo, -//// ruleInfo.setRule(profilingSql).setDetails(profilingParams) -//// ) -// -// // filterStep :: profilingStep :: Nil -// profilingRuleInfo :: Nil -// } -// } - -// def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): Seq[RuleStep] = { -// val ruleInfo = RuleInfoGen(param, timeInfo) -// val dqType = RuleInfoGen.dqType(param) -// GriffinDslStep(timeInfo, ruleInfo, dqType) :: Nil -// } -// -// def adaptConcreteRuleStep(ruleStep: RuleStep -// ): Seq[ConcreteRuleStep] = { -// ruleStep match { -// case rs @ GriffinDslStep(_, ri, dqType) => { -// try { -// val result = parser.parseRule(ri.rule, dqType) -// if (result.successful) { -// val expr = result.get -// transConcreteRuleStep(rs, expr) -// } else { -// println(result) -// warn(s"adapt concrete rule step warn: parse rule [ ${ri.rule} ] fails") -// Nil -// } -// } catch { -// case e: Throwable => { -// error(s"adapt concrete rule step error: ${e.getMessage}") -// Nil -// } -// } -// } -// case _ => Nil -// } -// } -// -// private def transConcreteRuleStep(ruleStep: GriffinDslStep, expr: Expr -// ): Seq[ConcreteRuleStep] = { -// ruleStep.dqType match { -// case AccuracyType => transAccuracyRuleStep(ruleStep, expr) -// case ProfilingType => transProfilingRuleStep(ruleStep, expr) -// case TimelinessType => Nil -// case _ => Nil -// } -// } - -// private def transAccuracyRuleStep(ruleStep: GriffinDslStep, expr: Expr -// ): Seq[ConcreteRuleStep] = { -// val timeInfo = ruleStep.timeInfo -// val ruleInfo = ruleStep.ruleInfo -// val calcTime = timeInfo.calcTime -// val tmst = timeInfo.tmst -// -// val details = ruleInfo.details -// val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head) -// val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head) -// val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) -// -// if (!TempTables.existTable(key(calcTime), sourceName)) { -// Nil -// } else { -// // 1. miss record -// val missRecordsSql = if (!TempTables.existTable(key(calcTime), targetName)) { -// val selClause = s"`${sourceName}`.*" -// s"SELECT ${selClause} FROM `${sourceName}`" -// } else { -// val selClause = s"`${sourceName}`.*" -// val onClause = expr.coalesceDesc -// val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => -// s"${sel.desc} IS NULL" -// }.mkString(" AND ") -// val targetIsNull = analyzer.targetSelectionExprs.map { sel => -// s"${sel.desc} IS NULL" -// }.mkString(" AND ") -// val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" -// s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" -// } -// val missRecordsName = AccuracyKeys._missRecords -// val tmstMissRecordsName = TempName.tmstName(missRecordsName, timeInfo) -// val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords) -// .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc) -// .addIfNotExist(RuleDetailKeys._persistName, missRecordsName) -// val missRecordsStep = SparkSqlStep( -// timeInfo, -// RuleInfo(missRecordsName, Some(tmstMissRecordsName), missRecordsSql, missRecordsParams) -// ) -// -// // 2. miss count -// val missTableName = "_miss_" -//// val tmstMissTableName = TempName.tmstName(missTableName, timeInfo) -// val missColName = details.getStringOrKey(AccuracyKeys._miss) -// val missSql = { -// s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsName}`" -// } -// val missStep = SparkSqlStep( -// timeInfo, -// RuleInfo(missTableName, None, missSql, Map[String, Any]()) -// ) -// -// // 3. total count -// val totalTableName = "_total_" -//// val tmstTotalTableName = TempName.tmstName(totalTableName, timeInfo) -// val totalColName = details.getStringOrKey(AccuracyKeys._total) -// val totalSql = { -// s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" -// } -// val totalStep = SparkSqlStep( -// timeInfo, -// RuleInfo(totalTableName, None, totalSql, Map[String, Any]()) -// ) -// -// // 4. accuracy metric -// val accuracyMetricName = details.getString(RuleDetailKeys._persistName, ruleStep.name) -// val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, timeInfo) -// val matchedColName = details.getStringOrKey(AccuracyKeys._matched) -// val accuracyMetricSql = { -// s""" -// |SELECT `${missTableName}`.`${missColName}` AS `${missColName}`, -// |`${totalTableName}`.`${totalColName}` AS `${totalColName}` -// |FROM `${totalTableName}` FULL JOIN `${missTableName}` -// """.stripMargin -// } -//// val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) -// val accuracyMetricStep = SparkSqlStep( -// timeInfo, -// RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), accuracyMetricSql, Map[String, Any]()) -// ) -// -// // 5. accuracy metric filter -// val accuracyParams = details.addIfNotExist("df.name", accuracyMetricName) -// .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) -// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName) -// val accuracyStep = DfOprStep( -// timeInfo, -// RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), "accuracy", accuracyParams) -// ) -// -// missRecordsStep :: missStep :: totalStep :: accuracyMetricStep :: accuracyStep :: Nil -// } -// } - -// private def transProfilingRuleStep(ruleStep: GriffinDslStep, expr: Expr -// ): Seq[ConcreteRuleStep] = { -// val calcTime = ruleStep.timeInfo.calcTime -// val details = ruleStep.ruleInfo.details -// val profilingClause = expr.asInstanceOf[ProfilingClause] -// val sourceName = profilingClause.fromClauseOpt match { -// case Some(fc) => fc.dataSource -// case _ => details.getString(ProfilingKeys._source, dataSourceNames.head) -// } -// val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc -// -// if (!TempTables.existTable(key(calcTime), sourceName)) { -// Nil -// } else { -// val timeInfo = ruleStep.timeInfo -// val ruleInfo = ruleStep.ruleInfo -// val tmst = timeInfo.tmst -// -//// val tmstSourceName = TempName.tmstName(sourceName, timeInfo) -// -//// val tmstProfilingClause = profilingClause.map(dsHeadReplace(sourceName, tmstSourceName)) -// val tmstAnalyzer = ProfilingAnalyzer(profilingClause, sourceName) -// -// val selExprDescs = tmstAnalyzer.selectionExprs.map { sel => -// val alias = sel match { -// case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`" -// case _ => "" -// } -// s"${sel.desc}${alias}" -// } -// val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString -// val selClause = selExprDescs.mkString(", ") -//// val tmstFromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc -// val groupByClauseOpt = tmstAnalyzer.groupbyExprOpt -// val groupbyClause = groupByClauseOpt.map(_.desc).getOrElse("") -// val preGroupbyClause = tmstAnalyzer.preGroupbyExprs.map(_.desc).mkString(" ") -// val postGroupbyClause = tmstAnalyzer.postGroupbyExprs.map(_.desc).mkString(" ") -// -// // 1. select statement -// val profilingSql = { -// s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" -// } -//// println(profilingSql) -// val metricName = details.getString(RuleDetailKeys._persistName, ruleStep.name) -//// val tmstMetricName = TempName.tmstName(metricName, timeInfo) -// val profilingParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) -// .addIfNotExist(RuleDetailKeys._persistName, metricName) -// val profilingStep = SparkSqlStep( -// timeInfo, -// ruleInfo.setRule(profilingSql).setDetails(profilingParams) -// ) -// -//// filterStep :: profilingStep :: Nil -// profilingStep :: Nil -// } -// -// } - -// private def dsHeadReplace(originName: String, replaceName: String): (Expr) => Expr = { expr: Expr => -// expr match { -// case DataSourceHeadExpr(sn) if (sn == originName) => { -// DataSourceHeadExpr(replaceName) -// } -// case FromClause(sn) if (sn == originName) => { -// FromClause(replaceName) -// } -// case _ => expr.map(dsHeadReplace(originName, replaceName)) -// } -// } - } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala index da59348..11b67f2 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala @@ -28,7 +28,7 @@ sealed trait DqType { object DqType { private val dqTypes: List[DqType] = List( - AccuracyType, ProfilingType, DuplicateType, TimelinessType, UnknownType + AccuracyType, ProfilingType, UniquenessType, TimelinessType, UnknownType ) def apply(ptn: String): DqType = { dqTypes.filter(tp => ptn match { @@ -49,9 +49,9 @@ final case object ProfilingType extends DqType { val desc = "profiling" } -final case object DuplicateType extends DqType { - val regex = "^(?i)duplicate$".r - val desc = "duplicate" +final case object UniquenessType extends DqType { + val regex = "^(?i)uniqueness|duplicate$".r + val desc = "uniqueness" } final case object TimelinessType extends DqType { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala deleted file mode 100644 index 1ca2b76..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.dsl.analyzer - -import org.apache.griffin.measure.rule.dsl.expr.{AliasableExpr, _} - - -case class DuplicateAnalyzer(expr: DuplicateClause, sourceName: String, targetName: String) extends BasicAnalyzer { - - val seqAlias = (expr: Expr, v: Seq[String]) => { - expr match { - case apr: AliasableExpr => v ++ apr.alias - case _ => v - } - } - val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b - - private val exprs = expr.exprs - private def genAlias(idx: Int): String = s"alias_${idx}" - val selectionPairs = exprs.zipWithIndex.map { pair => - val (pr, idx) = pair - val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias) - (pr, res.headOption.getOrElse(genAlias(idx))) - } - - if (selectionPairs.isEmpty) { - throw new Exception(s"duplicate analyzer error: empty selection") - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/UniquenessAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/UniquenessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/UniquenessAnalyzer.scala new file mode 100644 index 0000000..9fe65c2 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/UniquenessAnalyzer.scala @@ -0,0 +1,46 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.dsl.analyzer + +import org.apache.griffin.measure.rule.dsl.expr.{AliasableExpr, _} + + +case class UniquenessAnalyzer(expr: UniquenessClause, sourceName: String, targetName: String) extends BasicAnalyzer { + + val seqAlias = (expr: Expr, v: Seq[String]) => { + expr match { + case apr: AliasableExpr => v ++ apr.alias + case _ => v + } + } + val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b + + private val exprs = expr.exprs + private def genAlias(idx: Int): String = s"alias_${idx}" + val selectionPairs = exprs.zipWithIndex.map { pair => + val (pr, idx) = pair + val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias) + (pr, res.headOption.getOrElse(genAlias(idx))) + } + + if (selectionPairs.isEmpty) { + throw new Exception(s"uniqueness analyzer error: empty selection") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala index bc7af42..504e176 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala @@ -219,12 +219,12 @@ case class ProfilingClause(selectClause: SelectClause, } } -case class DuplicateClause(exprs: Seq[Expr]) extends ClauseExpression { +case class UniquenessClause(exprs: Seq[Expr]) extends ClauseExpression { addChildren(exprs) def desc: String = exprs.map(_.desc).mkString(", ") def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ") - override def map(func: (Expr) => Expr): DuplicateClause = DuplicateClause(exprs.map(func(_))) + override def map(func: (Expr) => Expr): UniquenessClause = UniquenessClause(exprs.map(func(_))) } case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala index 8d04e76..83f3153 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala @@ -39,11 +39,11 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str } /** - * -- duplicate clauses -- - * <duplicate-clauses> = <expr> [, <expr>]+ + * -- uniqueness clauses -- + * <uniqueness-clauses> = <expr> [, <expr>]+ */ - def duplicateClause: Parser[DuplicateClause] = rep1sep(expression, Operator.COMMA) ^^ { - case exprs => DuplicateClause(exprs) + def uniquenessClause: Parser[UniquenessClause] = rep1sep(expression, Operator.COMMA) ^^ { + case exprs => UniquenessClause(exprs) } /** @@ -58,7 +58,7 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str val rootExpr = dqType match { case AccuracyType => logicalExpression case ProfilingType => profilingClause - case DuplicateType => duplicateClause + case UniquenessType => uniquenessClause case TimelinessType => timelinessClause case _ => expression } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/resources/_duplicate-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_duplicate-batch-griffindsl.json b/measure/src/test/resources/_duplicate-batch-griffindsl.json deleted file mode 100644 index cd71020..0000000 --- a/measure/src/test/resources/_duplicate-batch-griffindsl.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "name": "dup_batch", - - "process.type": "batch", - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - } - } - ] - }, - { - "name": "target", - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "duplicate", - "name": "dup", - "rule": "user_id", - "details": { - "source": "source", - "target": "target", - "dup": "dup", - "num": "num" - }, - "metric": { - "name": "dup" - }, - "record": { - "name": "dupRecords" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/resources/_duplicate-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_duplicate-streaming-griffindsl.json b/measure/src/test/resources/_duplicate-streaming-griffindsl.json deleted file mode 100644 index 18ac81a..0000000 --- a/measure/src/test/resources/_duplicate-streaming-griffindsl.json +++ /dev/null @@ -1,116 +0,0 @@ -{ - "name": "dup_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "new", - "baseline": true, - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "new", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "ttt", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/new", - "info.path": "new", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["0", "0"] - } - }, - { - "name": "old", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "old", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "ttt", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/old", - "info.path": "old", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["-24h", "0"] - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "duplicate", - "name": "dup", - "rule": "name, age", - "details": { - "source": "new", - "target": "old", - "dup": "dup", - "num": "num" - }, - "metric": { - "name": "dup" - }, - "record": { - "name": "dupRecords" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/resources/_duplicate-streaming-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_duplicate-streaming-sparksql.json b/measure/src/test/resources/_duplicate-streaming-sparksql.json deleted file mode 100644 index 3d37dad..0000000 --- a/measure/src/test/resources/_duplicate-streaming-sparksql.json +++ /dev/null @@ -1,130 +0,0 @@ -{ - "name": "dup_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "new", - "baseline": true, - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "new", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "sss", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/new", - "info.path": "new", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["0", "0"] - } - }, - { - "name": "old", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.149.247.156:9092", - "group.id": "old", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "sss", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/old", - "info.path": "old", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["-24h", "0"] - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "dist", - "rule": "SELECT DISTINCT * FROM new" - }, - { - "dsl.type": "spark-sql", - "name": "joined", - "rule": "SELECT dist.* FROM old RIGHT JOIN dist ON coalesce(old.name, '') = coalesce(dist.name, '') AND coalesce(old.age, '') = coalesce(dist.age, '')" - }, - { - "dsl.type": "spark-sql", - "name": "grouped", - "rule": "SELECT `__tmst`, `name`, `age`, count(*) as `dup_cnt` FROM joined GROUP BY `__tmst`, `name`, `age`" - }, - { - "dsl.type": "spark-sql", - "name": "dupRecs", - "cache": true, - "rule": "SELECT * FROM grouped WHERE `dup_cnt` > 1", - "record": { - "name": "dupRecords" - } - }, - { - "dsl.type": "spark-sql", - "name": "dupMetric", - "rule": "SELECT `__tmst`, `dup_cnt`, count(*) as `item_cnt` FROM dupRecs GROUP BY `__tmst`, `dup_cnt`", - "metric": { - "name": "dup" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/resources/_uniqueness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_uniqueness-batch-griffindsl.json b/measure/src/test/resources/_uniqueness-batch-griffindsl.json new file mode 100644 index 0000000..28009e8 --- /dev/null +++ b/measure/src/test/resources/_uniqueness-batch-griffindsl.json @@ -0,0 +1,58 @@ +{ + "name": "unique_batch", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + }, + { + "name": "target", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "uniqueness", + "name": "dup", + "rule": "user_id", + "details": { + "source": "source", + "target": "target", + "total": "total", + "unique": "unique", + "dup": "dup", + "num": "num" + }, + "metric": { + "name": "unique" + }, + "record": { + "name": "dupRecords" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/resources/_uniqueness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json new file mode 100644 index 0000000..bc5cbd2 --- /dev/null +++ b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json @@ -0,0 +1,119 @@ +{ + "name": "unique_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "new", + "baseline": true, + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "new", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/new", + "info.path": "new", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"] + } + }, + { + "name": "old", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "old", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/old", + "info.path": "old", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-24h", "0"] + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "uniqueness", + "name": "dup", + "rule": "name, age", + "details": { + "source": "new", + "target": "old", + "total": "total", + "unique": "unique", + "dup": "dup", + "num": "num", + "duplication.array": "dup" + }, + "metric": { + "name": "unique" + }, + "record": { + "name": "dupRecords" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/resources/_uniqueness-streaming-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_uniqueness-streaming-sparksql.json b/measure/src/test/resources/_uniqueness-streaming-sparksql.json new file mode 100644 index 0000000..7d13215 --- /dev/null +++ b/measure/src/test/resources/_uniqueness-streaming-sparksql.json @@ -0,0 +1,130 @@ +{ + "name": "unique_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "new", + "baseline": true, + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "new", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/new", + "info.path": "new", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"] + } + }, + { + "name": "old", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "old", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/old", + "info.path": "old", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-24h", "0"] + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "spark-sql", + "name": "dist", + "rule": "SELECT DISTINCT * FROM new" + }, + { + "dsl.type": "spark-sql", + "name": "joined", + "rule": "SELECT dist.* FROM old RIGHT JOIN dist ON coalesce(old.name, '') = coalesce(dist.name, '') AND coalesce(old.age, '') = coalesce(dist.age, '')" + }, + { + "dsl.type": "spark-sql", + "name": "grouped", + "rule": "SELECT `__tmst`, `name`, `age`, count(*) as `dup_cnt` FROM joined GROUP BY `__tmst`, `name`, `age`" + }, + { + "dsl.type": "spark-sql", + "name": "dupRecs", + "cache": true, + "rule": "SELECT * FROM grouped WHERE `dup_cnt` > 1", + "record": { + "name": "dupRecords" + } + }, + { + "dsl.type": "spark-sql", + "name": "dupMetric", + "rule": "SELECT `__tmst`, `dup_cnt`, count(*) as `item_cnt` FROM dupRecs GROUP BY `__tmst`, `dup_cnt`", + "metric": { + "name": "dup" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/fad7daf7/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala index 22fc331..d551a4f 100644 --- a/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala +++ b/measure/src/test/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptorTest.scala @@ -109,13 +109,13 @@ class GriffinDslAdaptorTest extends FunSuite with Matchers with BeforeAndAfter w // } } - test ("duplicate") { + test ("uniqueness") { // val adaptor = GriffinDslAdaptor("new" :: "old" :: Nil, "count" :: Nil) // val ruleJson = // """ // |{ // | "dsl.type": "griffin-dsl", -// | "dq.type": "duplicate", +// | "dq.type": "uniqueness", // | "name": "dup", // | "rule": "name, count(age + 1) as ct", // | "details": {
