Repository: incubator-griffin Updated Branches: refs/heads/master e704da627 -> cbff5b45c
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala new file mode 100644 index 0000000..31fe5ea --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala @@ -0,0 +1,41 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process.temp + +import scala.math.{min, max} + +case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends Serializable { + def merge(tr: TimeRange): TimeRange = { + TimeRange(min(begin, tr.begin), max(end, tr.end), tmsts ++ tr.tmsts) + } +} + +object TimeRange { + val emptyTimeRange = TimeRange(0, 0, Set[Long]()) + def apply(range: (Long, Long), tmsts: Set[Long]): TimeRange = TimeRange(range._1, range._2, tmsts) + def apply(ts: Long, tmsts: Set[Long]): TimeRange = TimeRange(ts, ts, tmsts) + def apply(ts: Long): TimeRange = TimeRange(ts, ts, Set[Long](ts)) + def apply(tmsts: Set[Long]): TimeRange = { + try { + TimeRange(tmsts.min, tmsts.max, tmsts) + } catch { + case _: Throwable => emptyTimeRange + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala index 5447ccc..97589ad 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala @@ -18,7 +18,8 @@ under the License. */ package org.apache.griffin.measure.rule.adaptor -import org.apache.griffin.measure.process.ProcessType +import org.apache.griffin.measure.process.{ExportMode, ProcessType} +import org.apache.griffin.measure.process.temp.TimeRange import org.apache.griffin.measure.rule.plan.{TimeInfo, _} import org.apache.griffin.measure.utils.ParamUtil._ @@ -46,10 +47,12 @@ case class DataFrameOprAdaptor() extends RuleAdaptor { import RuleParamKeys._ - def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: ProcessType): RulePlan = { + def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], + procType: ProcessType, dsTimeRanges: Map[String, TimeRange]): RulePlan = { val name = getRuleName(param) val step = DfOprStep(name, getRule(param), getDetails(param), getCache(param), getGlobal(param)) - RulePlan(step :: Nil, genRuleExports(param, name, name)) + val mode = ExportMode.defaultMode(procType) + RulePlan(step :: Nil, genRuleExports(param, name, name, timeInfo.calcTime, mode)) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala new file mode 100644 index 0000000..f592709 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala @@ -0,0 +1,70 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.adaptor + +object AccuracyKeys { + val _source = "source" + val _target = "target" + val _miss = "miss" + val _total = "total" + val _matched = "matched" + // val _missRecords = "missRecords" +} + +object ProfilingKeys { + val _source = "source" +} + +object UniquenessKeys { + val _source = "source" + val _target = "target" + val _unique = "unique" + val _total = "total" + val _dup = "dup" + val _num = "num" + + val _duplicationArray = "duplication.array" +} + +object DistinctnessKeys { + val _source = "source" + val _target = "target" + val _distinct = "distinct" + val _total = "total" + val _dup = "dup" + val _accu_dup = "accu_dup" + val _num = "num" + + val _duplicationArray = "duplication.array" + val _withAccumulate = "with.accumulate" +} + +object TimelinessKeys { + val _source = "source" + val _latency = "latency" + val _threshold = "threshold" +} + +object GlobalKeys { + val _initRule = "init.rule" +} + +object ProcessDetailsKeys { + val _baselineDataSource = "baseline.data.source" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala index 98545d8..ad4a195 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala @@ -20,7 +20,7 @@ package org.apache.griffin.measure.rule.adaptor import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache} import org.apache.griffin.measure.process.engine.DataFrameOprs.AccuracyOprKeys -import org.apache.griffin.measure.process.temp.TableRegisters +import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange} import org.apache.griffin.measure.process._ import org.apache.griffin.measure.rule.dsl._ import org.apache.griffin.measure.rule.dsl.analyzer._ @@ -30,39 +30,6 @@ import org.apache.griffin.measure.rule.plan.{TimeInfo, _} import org.apache.griffin.measure.utils.ParamUtil._ import org.apache.griffin.measure.utils.TimeUtil -object AccuracyKeys { - val _source = "source" - val _target = "target" - val _miss = "miss" - val _total = "total" - val _matched = "matched" -// val _missRecords = "missRecords" -} - -object ProfilingKeys { - val _source = "source" -} - -object UniquenessKeys { - val _source = "source" - val _target = "target" - val _unique = "unique" - val _total = "total" - val _dup = "dup" - val _num = "num" - val _duplicationArray = "duplication.array" -} - -object TimelinessKeys { - val _source = "source" - val _latency = "latency" - val _threshold = "threshold" -} - -object GlobalKeys { - val _initRule = "init.rule" -} - case class GriffinDslAdaptor(dataSourceNames: Seq[String], functionNames: Seq[String] ) extends RuleAdaptor { @@ -77,7 +44,8 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], private val emptyRulePlan = RulePlan(Nil, Nil) private val emptyMap = Map[String, Any]() - override def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], processType: ProcessType + override def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], + processType: ProcessType, dsTimeRanges: Map[String, TimeRange] ): RulePlan = { val name = getRuleName(param) val rule = getRule(param) @@ -90,6 +58,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], case AccuracyType => accuracyRulePlan(timeInfo, name, expr, param, processType) case ProfilingType => profilingRulePlan(timeInfo, name, expr, param, processType) case UniquenessType => uniquenessRulePlan(timeInfo, name, expr, param, processType) + case DistinctnessType => distinctRulePlan(timeInfo, name, expr, param, processType, dsTimeRanges) case TimelinessType => timelinessRulePlan(timeInfo, name, expr, param, processType) case _ => emptyRulePlan } @@ -107,22 +76,26 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], // with accuracy opr private def accuracyRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], processType: ProcessType + param: Map[String, Any], procType: ProcessType ): RulePlan = { val details = getDetails(param) val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head) val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head) val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) + val mode = ExportMode.defaultMode(procType) + + val ct = timeInfo.calcTime + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { - println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists") + println(s"[${ct}] data source ${sourceName} not exists") emptyRulePlan } else { // 1. miss record val missRecordsTableName = "__missRecords" val selClause = s"`${sourceName}`.*" val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) { - println(s"[${timeInfo.calcTime}] data source ${targetName} not exists") + println(s"[${ct}] data source ${targetName} not exists") s"SELECT ${selClause} FROM `${sourceName}`" } else { val onClause = expr.coalesceDesc @@ -136,10 +109,10 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" } val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, emptyMap, true) - val missRecordsExports = processType match { + val missRecordsExports = procType match { case BatchProcessType => { val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - genRecordExport(recordParam, missRecordsTableName, missRecordsTableName) :: Nil + genRecordExport(recordParam, missRecordsTableName, missRecordsTableName, ct, mode) :: Nil } case StreamingProcessType => Nil } @@ -147,7 +120,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], // 2. miss count val missCountTableName = "__missCount" val missColName = details.getStringOrKey(AccuracyKeys._miss) - val missCountSql = processType match { + val missCountSql = procType match { case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`" case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${InternalColumns.tmst}`" } @@ -156,7 +129,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], // 3. total count val totalCountTableName = "__totalCount" val totalColName = details.getStringOrKey(AccuracyKeys._total) - val totalCountSql = processType match { + val totalCountSql = procType match { case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`" } @@ -165,7 +138,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], // 4. accuracy metric val accuracyTableName = name val matchedColName = details.getStringOrKey(AccuracyKeys._matched) - val accuracyMetricSql = processType match { + val accuracyMetricSql = procType match { case BatchProcessType => { s""" |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, @@ -186,10 +159,10 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } } val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, emptyMap) - val accuracyExports = processType match { + val accuracyExports = procType match { case BatchProcessType => { val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - genMetricExport(metricParam, accuracyTableName, accuracyTableName) :: Nil + genMetricExport(metricParam, accuracyTableName, accuracyTableName, ct, mode) :: Nil } case StreamingProcessType => Nil } @@ -200,7 +173,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], val accuPlan = RulePlan(accuSteps, accuExports) // streaming extra accu plan - val streamingAccuPlan = processType match { + val streamingAccuPlan = procType match { case BatchProcessType => emptyRulePlan case StreamingProcessType => { // 5. accuracy metric merge @@ -215,7 +188,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], val accuracyMetricStep = DfOprStep(accuracyMetricTableName, accuracyMetricRule, accuracyMetricDetails) val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - val accuracyMetricExports = genMetricExport(metricParam, name, accuracyMetricTableName) :: Nil + val accuracyMetricExports = genMetricExport(metricParam, name, accuracyMetricTableName, ct, mode) :: Nil // 6. collect accuracy records val accuracyRecordTableName = "__accuracyRecords" @@ -230,7 +203,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], val accuracyRecordParam = recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName) .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName) val accuracyRecordExports = genRecordExport( - accuracyRecordParam, missRecordsTableName, accuracyRecordTableName) :: Nil + accuracyRecordParam, missRecordsTableName, accuracyRecordTableName, ct, mode) :: Nil // gen accu plan val extraSteps = accuracyMetricStep :: accuracyRecordStep :: Nil @@ -248,7 +221,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } private def profilingRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], processType: ProcessType + param: Map[String, Any], procType: ProcessType ): RulePlan = { val details = getDetails(param) val profilingClause = expr.asInstanceOf[ProfilingClause] @@ -258,6 +231,10 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc + val mode = ExportMode.defaultMode(procType) + + val ct = timeInfo.calcTime + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { emptyRulePlan } else { @@ -270,12 +247,12 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], s"${sel.desc}${alias}" } val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString - val selClause = processType match { + val selClause = procType match { case BatchProcessType => selExprDescs.mkString(", ") case StreamingProcessType => (s"`${InternalColumns.tmst}`" +: selExprDescs).mkString(", ") } val groupByClauseOpt = analyzer.groupbyExprOpt - val groupbyClause = processType match { + val groupbyClause = procType match { case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("") case StreamingProcessType => { val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${InternalColumns.tmst}`") :: Nil, None) @@ -296,25 +273,29 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], val profilingName = name val profilingStep = SparkSqlStep(profilingName, profilingSql, details) val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - val profilingExports = genMetricExport(metricParam, name, profilingName) :: Nil + val profilingExports = genMetricExport(metricParam, name, profilingName, ct, mode) :: Nil RulePlan(profilingStep :: Nil, profilingExports) } } private def uniquenessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], processType: ProcessType + param: Map[String, Any], procType: ProcessType ): RulePlan = { val details = getDetails(param) val sourceName = details.getString(UniquenessKeys._source, dataSourceNames.head) val targetName = details.getString(UniquenessKeys._target, dataSourceNames.tail.head) val analyzer = UniquenessAnalyzer(expr.asInstanceOf[UniquenessClause], sourceName, targetName) + val mode = ExportMode.defaultMode(procType) + + val ct = timeInfo.calcTime + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { - println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists") + println(s"[${ct}] data source ${sourceName} not exists") emptyRulePlan } else if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) { - println(s"[${timeInfo.calcTime}] data source ${targetName} not exists") + println(s"[${ct}] data source ${targetName} not exists") emptyRulePlan } else { val selItemsClause = analyzer.selectionPairs.map { pair => @@ -323,16 +304,16 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], }.mkString(", ") val aliases = analyzer.selectionPairs.map(_._2) - val selClause = processType match { + val selClause = procType match { case BatchProcessType => selItemsClause case StreamingProcessType => s"`${InternalColumns.tmst}`, ${selItemsClause}" } - val selAliases = processType match { + val selAliases = procType match { case BatchProcessType => aliases case StreamingProcessType => InternalColumns.tmst +: aliases } - // 1. source mapping + // 1. source distinct mapping val sourceTableName = "__source" val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}" val sourceStep = SparkSqlStep(sourceTableName, sourceSql, emptyMap) @@ -369,7 +350,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], // 5. total metric val totalTableName = "__totalMetric" val totalColName = details.getStringOrKey(UniquenessKeys._total) - val totalSql = processType match { + val totalSql = procType match { case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" case StreamingProcessType => { s""" @@ -380,7 +361,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap) val totalMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName) + val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, ct, mode) // 6. unique record val uniqueRecordTableName = "__uniqueRecord" @@ -392,7 +373,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], // 7. unique metric val uniqueTableName = "__uniqueMetric" val uniqueColName = details.getStringOrKey(UniquenessKeys._unique) - val uniqueSql = processType match { + val uniqueSql = procType match { case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`" case StreamingProcessType => { s""" @@ -403,32 +384,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } val uniqueStep = SparkSqlStep(uniqueTableName, uniqueSql, emptyMap) val uniqueMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val uniqueMetricExport = genMetricExport(uniqueMetricParam, uniqueColName, uniqueTableName) - - // 8. count metric -// val countMetricTableName = "__countMetric" -// val countMetricSql = processType match { -// case BatchProcessType => { -// s""" -// |SELECT `${totalTableName}`.`${totalColName}` AS `${totalColName}`, -// |coalesce(`${uniqueTableName}`.`${uniqueColName}`, 0) AS `${uniqueColName}` -// |FROM `${totalTableName}` LEFT JOIN `${uniqueTableName}` -// """.stripMargin -// } -// case StreamingProcessType => { -// s""" -// |SELECT `${totalTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, -// |`${totalTableName}`.`${totalColName}` AS `${totalColName}`, -// |coalesce(`${uniqueTableName}`.`${uniqueColName}`, 0) AS `${uniqueColName}` -// |FROM `${totalTableName}` LEFT JOIN `${uniqueTableName}` -// |ON `${totalTableName}`.`${InternalColumns.tmst}` = `${uniqueTableName}`.`${InternalColumns.tmst}` -// """.stripMargin -// } -// } -// val countMetricStep = SparkSqlStep(countMetricTableName, countMetricSql, emptyMap) -// val countMetricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) -// .addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) -// val countMetricExport = genMetricExport(countMetricParam, "", countMetricTableName) + val uniqueMetricExport = genMetricExport(uniqueMetricParam, uniqueColName, uniqueTableName, ct, mode) val uniqueSteps = sourceStep :: targetStep :: joinedStep :: groupStep :: totalStep :: uniqueRecordStep :: uniqueStep :: Nil @@ -444,16 +400,16 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, dupRecordTableName) + val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, dupRecordTableName, ct, mode) // 9. duplicate metric val dupMetricTableName = "__dupMetric" val numColName = details.getStringOrKey(UniquenessKeys._num) - val dupMetricSelClause = processType match { + val dupMetricSelClause = procType match { case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`" case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`" } - val dupMetricGroupbyClause = processType match { + val dupMetricGroupbyClause = procType match { case BatchProcessType => s"`${dupColName}`" case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`" } @@ -465,7 +421,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName) + val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, ct, mode) RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: dupMetricExport :: Nil) } else emptyRulePlan @@ -474,13 +430,202 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } } + private def distinctRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, + param: Map[String, Any], procType: ProcessType, + dsTimeRanges: Map[String, TimeRange] + ): RulePlan = { + val details = getDetails(param) + val sourceName = details.getString(DistinctnessKeys._source, dataSourceNames.head) + val targetName = details.getString(UniquenessKeys._target, dataSourceNames.tail.head) + val analyzer = DistinctnessAnalyzer(expr.asInstanceOf[DistinctnessClause], sourceName) + + val mode = SimpleMode + + val ct = timeInfo.calcTime + + val sourceTimeRange = dsTimeRanges.get(sourceName).getOrElse(TimeRange(ct)) + val beginTime = sourceTimeRange.begin + + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { + println(s"[${ct}] data source ${sourceName} not exists") + emptyRulePlan + } else { + val withOlderTable = { + details.getBoolean(DistinctnessKeys._withAccumulate, true) && + TableRegisters.existRunTempTable(timeInfo.key, targetName) + } + + val selClause = analyzer.selectionPairs.map { pair => + val (expr, alias) = pair + s"${expr.desc} AS `${alias}`" + }.mkString(", ") + val aliases = analyzer.selectionPairs.map(_._2) + val aliasesClause = aliases.map( a => s"`${a}`" ).mkString(", ") + + // 1. source alias + val sourceAliasTableName = "__sourceAlias" + val sourceAliasSql = { + s"SELECT ${selClause} FROM `${sourceName}`" + } + val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, emptyMap, true) + + // 2. total metric + val totalTableName = "__totalMetric" + val totalColName = details.getStringOrKey(DistinctnessKeys._total) + val totalSql = { + s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`" + } + val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap) + val totalMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) + val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, beginTime, mode) + + // 3. group by self + val selfGroupTableName = "__selfGroup" + val dupColName = details.getStringOrKey(DistinctnessKeys._dup) + val accuDupColName = details.getStringOrKey(DistinctnessKeys._accu_dup) + val selfGroupSql = { + s""" + |SELECT ${aliasesClause}, (COUNT(*) - 1) AS `${dupColName}`, + |TRUE AS `${InternalColumns.distinct}` + |FROM `${sourceAliasTableName}` GROUP BY ${aliasesClause} + """.stripMargin + } + val selfGroupStep = SparkSqlStep(selfGroupTableName, selfGroupSql, emptyMap, true) + + val selfDistRulePlan = RulePlan( + sourceAliasStep :: totalStep :: selfGroupStep :: Nil, + totalMetricExport :: Nil + ) + + val (distRulePlan, dupCountTableName) = procType match { + case StreamingProcessType if (withOlderTable) => { + // 4. older alias + val olderAliasTableName = "__older" + val olderAliasSql = { + s"SELECT ${selClause} FROM `${targetName}` WHERE `${InternalColumns.tmst}` < ${beginTime}" + } + val olderAliasStep = SparkSqlStep(olderAliasTableName, olderAliasSql, emptyMap) + + // 5. join with older data + val joinedTableName = "__joined" + val selfSelClause = (aliases :+ dupColName).map { alias => + s"`${selfGroupTableName}`.`${alias}`" + }.mkString(", ") + val onClause = aliases.map { alias => + s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = coalesce(`${olderAliasTableName}`.`${alias}`, '')" + }.mkString(" AND ") + val olderIsNull = aliases.map { alias => + s"`${olderAliasTableName}`.`${alias}` IS NULL" + }.mkString(" AND ") + val joinedSql = { + s""" + |SELECT ${selfSelClause}, (${olderIsNull}) AS `${InternalColumns.distinct}` + |FROM `${olderAliasTableName}` RIGHT JOIN `${selfGroupTableName}` + |ON ${onClause} + """.stripMargin + } + val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap) + + // 6. group by joined data + val groupTableName = "__group" + val moreDupColName = "_more_dup" + val groupSql = { + s""" + |SELECT ${aliasesClause}, `${dupColName}`, `${InternalColumns.distinct}`, + |COUNT(*) AS `${moreDupColName}` + |FROM `${joinedTableName}` + |GROUP BY ${aliasesClause}, `${dupColName}`, `${InternalColumns.distinct}` + """.stripMargin + } + val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap) + + // 7. final duplicate count + val finalDupCountTableName = "__finalDupCount" + val finalDupCountSql = { + s""" + |SELECT ${aliasesClause}, `${InternalColumns.distinct}`, + |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}` + |ELSE (`${dupColName}` + 1) END AS `${dupColName}`, + |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}` + |ELSE (`${dupColName}` + `${moreDupColName}`) END AS `${accuDupColName}` + |FROM `${groupTableName}` + """.stripMargin + } + val finalDupCountStep = SparkSqlStep(finalDupCountTableName, finalDupCountSql, emptyMap, true) + + val rulePlan = RulePlan(olderAliasStep :: joinedStep :: groupStep :: finalDupCountStep :: Nil, Nil) + (rulePlan, finalDupCountTableName) + } + case _ => { + (emptyRulePlan, selfGroupTableName) + } + } + + // 8. distinct metric + val distTableName = "__distMetric" + val distColName = details.getStringOrKey(DistinctnessKeys._distinct) + val distSql = { + s""" + |SELECT COUNT(*) AS `${distColName}` + |FROM `${dupCountTableName}` WHERE `${InternalColumns.distinct}` + """.stripMargin + } + val distStep = SparkSqlStep(distTableName, distSql, emptyMap) + val distMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) + val distMetricExport = genMetricExport(distMetricParam, distColName, distTableName, beginTime, mode) + + val distMetricRulePlan = RulePlan(distStep :: Nil, distMetricExport :: Nil) + + val duplicationArrayName = details.getString(UniquenessKeys._duplicationArray, "") + val dupRulePlan = if (duplicationArrayName.nonEmpty) { + // 9. duplicate record + val dupRecordTableName = "__dupRecords" + val dupRecordSelClause = procType match { + case StreamingProcessType if (withOlderTable) => s"${aliasesClause}, `${dupColName}`, `${accuDupColName}`" + case _ => s"${aliasesClause}, `${dupColName}`" + } + val dupRecordSql = { + s""" + |SELECT ${dupRecordSelClause} + |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0 + """.stripMargin + } + val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) + val dupRecordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + val dupRecordExport = genRecordExport(dupRecordParam, dupRecordTableName, dupRecordTableName, beginTime, mode) + + // 10. duplicate metric + val dupMetricTableName = "__dupMetric" + val numColName = details.getStringOrKey(DistinctnessKeys._num) + val dupMetricSql = { + s""" + |SELECT `${dupColName}`, COUNT(*) AS `${numColName}` + |FROM `${dupRecordTableName}` GROUP BY `${dupColName}` + """.stripMargin + } + val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) + val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) + val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, beginTime, mode) + + RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: dupMetricExport :: Nil) + } else emptyRulePlan + + selfDistRulePlan.merge(distRulePlan).merge(distMetricRulePlan).merge(dupRulePlan) + + } + } + private def timelinessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], processType: ProcessType + param: Map[String, Any], procType: ProcessType ): RulePlan = { val details = getDetails(param) val timelinessClause = expr.asInstanceOf[TimelinessClause] val sourceName = details.getString(TimelinessKeys._source, dataSourceNames.head) + val mode = ExportMode.defaultMode(procType) + + val ct = timeInfo.calcTime + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { emptyRulePlan } else { @@ -521,7 +666,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], // 3. timeliness metric val metricTableName = name - val metricSql = processType match { + val metricSql = procType match { case BatchProcessType => { s""" |SELECT CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`, @@ -543,7 +688,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap) val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - val metricExports = genMetricExport(metricParam, name, metricTableName) :: Nil + val metricExports = genMetricExport(metricParam, name, metricTableName, ct, mode) :: Nil // current timeliness plan val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil @@ -559,7 +704,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap) val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val recordExports = genRecordExport(recordParam, recordTableName, recordTableName) :: Nil + val recordExports = genRecordExport(recordParam, recordTableName, recordTableName, ct, mode) :: Nil RulePlan(recordStep :: Nil, recordExports) } case _ => emptyRulePlan http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala index bd344b1..fc6a246 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala @@ -27,5 +27,7 @@ object InternalColumns { val beginTs = "__begin_ts" val endTs = "__end_ts" - val columns = List[String](tmst, metric, record, empty, beginTs, endTs) + val distinct = "__distinct" + + val columns = List[String](tmst, metric, record, empty, beginTs, endTs, distinct) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala index ebc8fdb..25025ac 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala @@ -25,7 +25,8 @@ import org.apache.griffin.measure.cache.tmst.TempName import scala.collection.mutable.{Set => MutableSet} import org.apache.griffin.measure.config.params.user._ import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.process.ProcessType +import org.apache.griffin.measure.process.{ExportMode, ProcessType} +import org.apache.griffin.measure.process.temp.TimeRange import org.apache.griffin.measure.rule.dsl._ import org.apache.griffin.measure.rule.plan.{TimeInfo, _} @@ -115,30 +116,40 @@ trait RuleAdaptor extends Loggable with Serializable { RuleParamKeys.getName(param, RuleStepNameGenerator.genName) } - def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: ProcessType): RulePlan + def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], + procType: ProcessType, dsTimeRanges: Map[String, TimeRange]): RulePlan - protected def genRuleExports(param: Map[String, Any], defName: String, stepName: String): Seq[RuleExport] = { + protected def genRuleExports(param: Map[String, Any], defName: String, + stepName: String, defTimestamp: Long, + mode: ExportMode + ): Seq[RuleExport] = { val metricOpt = RuleParamKeys.getMetricOpt(param) - val metricExportSeq = metricOpt.map(genMetricExport(_, defName, stepName)).toSeq + val metricExportSeq = metricOpt.map(genMetricExport(_, defName, stepName, defTimestamp, mode)).toSeq val recordOpt = RuleParamKeys.getRecordOpt(param) - val recordExportSeq = recordOpt.map(genRecordExport(_, defName, stepName)).toSeq + val recordExportSeq = recordOpt.map(genRecordExport(_, defName, stepName, defTimestamp, mode)).toSeq metricExportSeq ++ recordExportSeq } - protected def genMetricExport(param: Map[String, Any], name: String, stepName: String + protected def genMetricExport(param: Map[String, Any], name: String, stepName: String, + defTimestamp: Long, mode: ExportMode ): MetricExport = { MetricExport( ExportParamKeys.getName(param, name), stepName, - ExportParamKeys.getCollectType(param) + ExportParamKeys.getCollectType(param), + defTimestamp, + mode ) } - protected def genRecordExport(param: Map[String, Any], name: String, stepName: String + protected def genRecordExport(param: Map[String, Any], name: String, stepName: String, + defTimestamp: Long, mode: ExportMode ): RecordExport = { RecordExport( ExportParamKeys.getName(param, name), stepName, ExportParamKeys.getDataSourceCacheOpt(param), - ExportParamKeys.getOriginDFOpt(param) + ExportParamKeys.getOriginDFOpt(param), + defTimestamp, + mode ) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala index 1e077b1..30a356c 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala @@ -21,7 +21,7 @@ package org.apache.griffin.measure.rule.adaptor import org.apache.griffin.measure.cache.tmst.TempName import org.apache.griffin.measure.config.params.user._ import org.apache.griffin.measure.process.ProcessType -import org.apache.griffin.measure.process.temp.TableRegisters +import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange} import org.apache.griffin.measure.rule.dsl._ import org.apache.griffin.measure.rule.plan._ import org.apache.spark.sql.SQLContext @@ -114,22 +114,24 @@ object RuleAdaptorGroup { // } // -- gen rule plan -- - def genRulePlan(timeInfo: TimeInfo, evaluateRuleParam: EvaluateRuleParam, procType: ProcessType + def genRulePlan(timeInfo: TimeInfo, evaluateRuleParam: EvaluateRuleParam, + procType: ProcessType, dsTimeRanges: Map[String, TimeRange] ): RulePlan = { val dslTypeStr = if (evaluateRuleParam.dslType == null) "" else evaluateRuleParam.dslType val defaultDslType = DslType(dslTypeStr) val ruleParams = evaluateRuleParam.rules - genRulePlan(timeInfo, ruleParams, defaultDslType, procType) + genRulePlan(timeInfo, ruleParams, defaultDslType, procType, dsTimeRanges) } def genRulePlan(timeInfo: TimeInfo, ruleParams: Seq[Map[String, Any]], - defaultDslType: DslType, procType: ProcessType + defaultDslType: DslType, procType: ProcessType, + dsTimeRanges: Map[String, TimeRange] ): RulePlan = { val (rulePlan, dsNames) = ruleParams.foldLeft((emptyRulePlan, dataSourceNames)) { (res, param) => val (plan, names) = res val dslType = getDslType(param, defaultDslType) val curPlan: RulePlan = genRuleAdaptor(dslType, names) match { - case Some(adaptor) => adaptor.genRulePlan(timeInfo, param, procType) + case Some(adaptor) => adaptor.genRulePlan(timeInfo, param, procType, dsTimeRanges) case _ => emptyRulePlan } val globalNames = curPlan.globalRuleSteps.map(_.name) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala index 6b3b7cb..1fce03b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala @@ -19,7 +19,8 @@ under the License. package org.apache.griffin.measure.rule.adaptor import org.apache.griffin.measure.cache.tmst.TempName -import org.apache.griffin.measure.process.ProcessType +import org.apache.griffin.measure.process.{ExportMode, ProcessType} +import org.apache.griffin.measure.process.temp.TimeRange import org.apache.griffin.measure.rule.dsl.MetricPersistType import org.apache.griffin.measure.rule.plan.{TimeInfo, _} import org.apache.griffin.measure.utils.ParamUtil._ @@ -39,10 +40,12 @@ case class SparkSqlAdaptor() extends RuleAdaptor { import RuleParamKeys._ - def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: ProcessType): RulePlan = { + def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], + procType: ProcessType, dsTimeRanges: Map[String, TimeRange]): RulePlan = { val name = getRuleName(param) val step = SparkSqlStep(name, getRule(param), getDetails(param), getCache(param), getGlobal(param)) - RulePlan(step :: Nil, genRuleExports(param, name, name)) + val mode = ExportMode.defaultMode(procType) + RulePlan(step :: Nil, genRuleExports(param, name, name, timeInfo.calcTime, mode)) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala index 11b67f2..18a5919 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala @@ -28,7 +28,7 @@ sealed trait DqType { object DqType { private val dqTypes: List[DqType] = List( - AccuracyType, ProfilingType, UniquenessType, TimelinessType, UnknownType + AccuracyType, ProfilingType, UniquenessType, DistinctnessType, TimelinessType, UnknownType ) def apply(ptn: String): DqType = { dqTypes.filter(tp => ptn match { @@ -54,6 +54,11 @@ final case object UniquenessType extends DqType { val desc = "uniqueness" } +final case object DistinctnessType extends DqType { + val regex = "^(?i)distinct$".r + val desc = "distinct" +} + final case object TimelinessType extends DqType { val regex = "^(?i)timeliness$".r val desc = "timeliness" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala new file mode 100644 index 0000000..55e4f39 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala @@ -0,0 +1,47 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.dsl.analyzer + +import org.apache.griffin.measure.rule.dsl.expr._ + + +//case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String, targetName: String) extends BasicAnalyzer { +case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String) extends BasicAnalyzer { + + val seqAlias = (expr: Expr, v: Seq[String]) => { + expr match { + case apr: AliasableExpr => v ++ apr.alias + case _ => v + } + } + val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b + + private val exprs = expr.exprs + private def genAlias(idx: Int): String = s"alias_${idx}" + val selectionPairs = exprs.zipWithIndex.map { pair => + val (pr, idx) = pair + val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias) + (pr, res.headOption.getOrElse(genAlias(idx))) + } + + if (selectionPairs.isEmpty) { + throw new Exception(s"uniqueness analyzer error: empty selection") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala index 504e176..340c1e2 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala @@ -227,6 +227,14 @@ case class UniquenessClause(exprs: Seq[Expr]) extends ClauseExpression { override def map(func: (Expr) => Expr): UniquenessClause = UniquenessClause(exprs.map(func(_))) } +case class DistinctnessClause(exprs: Seq[Expr]) extends ClauseExpression { + addChildren(exprs) + + def desc: String = exprs.map(_.desc).mkString(", ") + def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ") + override def map(func: (Expr) => Expr): DistinctnessClause = DistinctnessClause(exprs.map(func(_))) +} + case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression { addChildren(exprs) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala index 83f3153..b129ead 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala @@ -47,6 +47,14 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str } /** + * -- distinctness clauses -- + * <distinctness-clauses> = <expr> [, <expr>]+ + */ + def distinctnessClause: Parser[DistinctnessClause] = rep1sep(expression, Operator.COMMA) ^^ { + case exprs => DistinctnessClause(exprs) + } + + /** * -- timeliness clauses -- * <timeliness-clauses> = <expr> [, <expr>]+ */ @@ -59,6 +67,7 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str case AccuracyType => logicalExpression case ProfilingType => profilingClause case UniquenessType => uniquenessClause + case DistinctnessType => distinctnessClause case TimelinessType => timelinessClause case _ => expression } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala index 10f1f9b..ac14153 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala @@ -18,11 +18,17 @@ under the License. */ package org.apache.griffin.measure.rule.plan +import org.apache.griffin.measure.process.ExportMode import org.apache.griffin.measure.rule.dsl._ case class MetricExport(name: String, stepName: String, - collectType: CollectType + collectType: CollectType, + defTimestamp: Long, + mode: ExportMode ) extends RuleExport { + def setDefTimestamp(t: Long): RuleExport = + MetricExport(name, stepName, collectType, t, mode) + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala index a467543..6afc836 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala @@ -18,10 +18,17 @@ under the License. */ package org.apache.griffin.measure.rule.plan +import org.apache.griffin.measure.process.ExportMode + case class RecordExport(name: String, stepName: String, dataSourceCacheOpt: Option[String], - originDFOpt: Option[String] + originDFOpt: Option[String], + defTimestamp: Long, + mode: ExportMode ) extends RuleExport { + def setDefTimestamp(t: Long): RuleExport = + RecordExport(name, stepName, dataSourceCacheOpt, originDFOpt, t, mode) + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala index 26a962a..84467c2 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala @@ -18,10 +18,18 @@ under the License. */ package org.apache.griffin.measure.rule.plan +import org.apache.griffin.measure.process.ExportMode + trait RuleExport extends Serializable { val name: String // export name val stepName: String // the dependant step name + val defTimestamp: Long // the default timestamp if tmst not in value + + val mode: ExportMode // export mode + + def setDefTimestamp(t: Long): RuleExport + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/_distinctness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json b/measure/src/test/resources/_distinctness-batch-griffindsl.json new file mode 100644 index 0000000..af0c91e --- /dev/null +++ b/measure/src/test/resources/_distinctness-batch-griffindsl.json @@ -0,0 +1,57 @@ +{ + "name": "dist_batch", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + }, + { + "name": "target", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "distinct", + "name": "dist", + "rule": "user_id", + "details": { + "source": "source", + "target": "target", + "total": "total", + "distinct": "distinct", + "dup": "dup", + "num": "num", + "duplication.array": "dup" + }, + "metric": { + "name": "distinct" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/_distinctness-batch-griffindsl1.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl1.json b/measure/src/test/resources/_distinctness-batch-griffindsl1.json new file mode 100644 index 0000000..f8aa077 --- /dev/null +++ b/measure/src/test/resources/_distinctness-batch-griffindsl1.json @@ -0,0 +1,73 @@ +{ + "name": "dist_batch", + + "process.type": "batch", + + "timestamp": 123456, + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/dupdata.avro" + }, + "pre.proc": [ + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select DISTINCT name, age from ${this}" + } + ] + } + ] + }, + { + "name": "target", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/dupdata.avro" + }, + "pre.proc": [ + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select DISTINCT name, age from ${this}" + } + ] + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "distinct", + "name": "dist", + "rule": "name", + "details": { + "source": "source", + "target": "target", + "total": "total", + "distinct": "distinct", + "dup": "dup", + "num": "num", + "duplication.array": "dup" + }, + "metric": { + "name": "distinct" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/_distinctness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-streaming-griffindsl.json b/measure/src/test/resources/_distinctness-streaming-griffindsl.json new file mode 100644 index 0000000..c36e7ba --- /dev/null +++ b/measure/src/test/resources/_distinctness-streaming-griffindsl.json @@ -0,0 +1,85 @@ +{ + "name": "dist_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "new", + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/old", + "info.path": "new", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"], + "read.only": true + } + }, + { + "name": "old", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "old", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/old", + "info.path": "old", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-24h", "0"] + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "distinct", + "name": "dist", + "rule": "name, age", + "details": { + "source": "new", + "target": "old", + "total": "total", + "distinct": "distinct", + "dup": "dup", + "accu_dup": "accu_dup", + "num": "num", + "duplication.array": "dup" + }, + "metric": { + "name": "distinct" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/_profiling-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json b/measure/src/test/resources/_profiling-batch-griffindsl.json index cd99eb1..043ba85 100644 --- a/measure/src/test/resources/_profiling-batch-griffindsl.json +++ b/measure/src/test/resources/_profiling-batch-griffindsl.json @@ -26,7 +26,7 @@ "dsl.type": "griffin-dsl", "dq.type": "profiling", "name": "prof", - "rule": "select count(*) as `cnt`, count(distinct `post_code`) as `dis-cnt`, max(user_id) as `max` from source", + "rule": "count(*) from source", "metric": { "name": "prof" } @@ -35,7 +35,7 @@ "dsl.type": "griffin-dsl", "dq.type": "profiling", "name": "grp", - "rule": "select post_code as `pc`, count(*) as `cnt` from source group by post_code", + "rule": "source.post_code, count(*) from source group by source.post_code", "metric": { "name": "post_group", "collect.type": "array" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/dupdata.avro ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/dupdata.avro b/measure/src/test/resources/dupdata.avro new file mode 100644 index 0000000..f6bd312 Binary files /dev/null and b/measure/src/test/resources/dupdata.avro differ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/cbff5b45/measure/src/test/resources/empty.avro ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/empty.avro b/measure/src/test/resources/empty.avro new file mode 100644 index 0000000..1ac3a72 Binary files /dev/null and b/measure/src/test/resources/empty.avro differ
