http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala index 736ce56..382e302 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala @@ -18,8 +18,6 @@ under the License. */ package org.apache.griffin.measure.process.engine -import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache} -import org.apache.griffin.measure.log.Loggable import org.apache.griffin.measure.process._ import org.apache.griffin.measure.rule.adaptor.InternalColumns import org.apache.griffin.measure.rule.dsl._ @@ -101,7 +99,6 @@ trait SparkDqEngine extends DqEngine { } else emptyMetricMap } - private def getTmst(row: Row, defTmst: Long): Long = { try { row.getAs[Long](InternalColumns.tmst) @@ -110,46 +107,9 @@ trait SparkDqEngine extends DqEngine { } } -// def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport): Map[Long, DataFrame] = { -// if (collectable) { -// val RecordExport(_, stepName, _, originDFOpt, defTmst, procType) = recordExport -// val stepDf = sqlContext.table(s"`${stepName}`") -// val recordsDf = originDFOpt match { -// case Some(originName) => sqlContext.table(s"`${originName}`") -// case _ => stepDf -// } -// -// procType match { -// case BatchProcessType => { -// val recordsDf = sqlContext.table(s"`${stepName}`") -// emptyRecordMap + (defTmst -> recordsDf) -// } -// case StreamingProcessType => { -// originDFOpt match { -// case Some(originName) => { -// val recordsDf = sqlContext.table(s"`${originName}`") -// stepDf.map { row => -// val tmst = getTmst(row, defTmst) -// val trdf = if (recordsDf.columns.contains(InternalColumns.tmst)) { -// recordsDf.filter(s"`${InternalColumns.tmst}` = ${tmst}") -// } else recordsDf -// (tmst, trdf) -// }.collect.toMap -// } -// case _ => { -// val recordsDf = stepDf -// emptyRecordMap + (defTmst -> recordsDf) -// } -// } -// } -// } -// } else emptyRecordMap -// } - private def getRecordDataFrame(recordExport: RecordExport): Option[DataFrame] = { if (collectable) { - val RecordExport(_, stepName, _, _, defTmst, procType) = recordExport - val stepDf = sqlContext.table(s"`${stepName}`") + val stepDf = sqlContext.table(s"`${recordExport.stepName}`") Some(stepDf) } else None } @@ -209,187 +169,14 @@ trait SparkDqEngine extends DqEngine { } case _ => (None, Set[Long]()) } -// val recordsOpt = getRecordDataFrame(recordExport).flatMap { stepDf => -// originDFOpt match { -// case Some(originName) => { -// val tmsts = (stepDf.collect.flatMap { row => -// try { -// val tmst = row.getAs[Long](InternalColumns.tmst) -// val empty = row.getAs[Boolean](InternalColumns.empty) -// Some((tmst, empty)) -// } catch { -// case _: Throwable => None -// } -// }) -// val emptyTmsts = tmsts.filter(_._2).map(_._1).toSet -// val recordTmsts = tmsts.filter(!_._2).map(_._1).toSet -// if (recordTmsts.size > 0) { -// val recordsDf = sqlContext.table(s"`${originName}`") -// val records = recordsDf.flatMap { row => -// val tmst = row.getAs[Long](InternalColumns.tmst) -// if (recordTmsts.contains(tmst)) { -// try { -// val map = SparkRowFormatter.formatRow(row) -// val str = JsonUtil.toJson(map) -// Some((tmst, str)) -// } catch { -// case e: Throwable => None -// } -// } else None -// } -// Some((Some(records.groupByKey), emptyTmsts)) -// } else Some((None, emptyTmsts)) -// } -// case _ => { -// val records = stepDf.flatMap { row => -// val tmst = row.getAs[Long](InternalColumns.tmst) -// try { -// val map = SparkRowFormatter.formatRow(row) -// val str = JsonUtil.toJson(map) -// Some((tmst, str)) -// } catch { -// case e: Throwable => None -// } -// } -// Some(records.groupByKey) -// } -// } -// } } -// -// def collectUpdateRDD(ruleStep: ConcreteRuleStep): Option[DataFrame] = { -// if (collectable) { -// ruleStep match { -// case step: ConcreteRuleStep if ((step.ruleInfo.persistType == RecordPersistType) -// || (step.ruleInfo.cacheDataSourceOpt.nonEmpty)) => { -// val tmst = step.timeInfo.tmst -//// val metricName = step.ruleInfo.name -// -// step.ruleInfo.tmstNameOpt match { -// case Some(metricTmstName) => { -// try { -// val pdf = sqlContext.table(s"`${metricTmstName}`") -// Some(pdf) -// } catch { -// case e: Throwable => { -// error(s"collect records ${metricTmstName} error: ${e.getMessage}") -// None -// } -// } -// } -// case _ => None -// } -// } -// case _ => None -// } -// } else None -// } - - - - - -// def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long] -// ): Option[RDD[(Long, Iterable[String])]] = { -// if (collectable) { -// ruleStep match { -// case step: ConcreteRuleStep if ((step.ruleInfo.persistType == RecordPersistType) -// || (step.ruleInfo.cacheDataSourceOpt.nonEmpty)) => { -// val tmst = step.timeInfo.tmst -// val metricName = step.ruleInfo.name -// -// step.ruleInfo.tmstNameOpt match { -// case Some(metricTmstName) => { -// try { -// val pdf = sqlContext.table(s"`${metricTmstName}`") -// val cols = pdf.columns -// val rdd = pdf.flatMap { row => -// val values = cols.flatMap { col => -// Some((col, row.getAs[Any](col))) -// }.toMap -// values.get(GroupByColumn.tmst) match { -// case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, JsonUtil.toJson(values))) -// case _ => None -// } -// }.groupByKey() -// -// // find other keys in time groups, create empty records for those timestamps -// val existKeys = rdd.keys.collect -// val otherKeys = timeGroups.filter(t => !existKeys.exists(_ == t)) -// val otherPairs = otherKeys.map((_, Iterable[String]())).toSeq -// val otherPairRdd = sqlContext.sparkContext.parallelize(otherPairs) -// -// Some(rdd union otherPairRdd) -// } catch { -// case e: Throwable => { -// error(s"collect records ${metricTmstName} error: ${e.getMessage}") -// None -// } -// } -// } -// case _ => None -// } -// } -// case _ => None -// } -// } else None -// } - -// def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = { -// ruleStep match { -// case step: ConcreteRuleStep if (step.persistType == RecordPersistType) => { -// val name = step.name -// try { -// val pdf = sqlContext.table(s"`${name}`") -// val cols = pdf.columns -// val rdd = pdf.flatMap { row => -// val values = cols.flatMap { col => -// Some((col, row.getAs[Any](col))) -// }.toMap -// values.get(GroupByColumn.tmst) match { -// case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, JsonUtil.toJson(values))) -// case _ => None -// } -// }.groupByKey() -// Some(rdd) -// } catch { -// case e: Throwable => { -// error(s"collect records ${name} error: ${e.getMessage}") -// None -// } -// } -// } -// case _ => None -// } -// } -// -// def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = { -// ruleStep match { -// case step: ConcreteRuleStep if (step.updateDataSource.nonEmpty) => { -// val name = step.name -// try { -// val pdf = sqlContext.table(s"`${name}`") -// val cols = pdf.columns -// val rdd = pdf.flatMap { row => -// val values = cols.flatMap { col => -// Some((col, row.getAs[Any](col))) -// }.toMap -// values.get(GroupByColumn.tmst) match { -// case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, JsonUtil.toJson(values))) -// case _ => None -// } -// }.groupByKey() -// Some(rdd) -// } catch { -// case e: Throwable => { -// error(s"collect update cache datas ${name} error: ${e.getMessage}") -// None -// } -// } -// } -// case _ => None -// } -// } + def collectUpdateDf(dsUpdate: DsUpdate): Option[DataFrame] = { + if (collectable) { + val DsUpdate(_, stepName) = dsUpdate + val stepDf = sqlContext.table(s"`${stepName}`") + Some(stepDf) + } else None + } }
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala index 31fe5ea..db92533 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TimeRange.scala @@ -20,7 +20,7 @@ package org.apache.griffin.measure.process.temp import scala.math.{min, max} -case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends Serializable { + case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends Serializable { def merge(tr: TimeRange): TimeRange = { TimeRange(min(begin, tr.begin), max(end, tr.end), tmsts ++ tr.tmsts) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala index 97589ad..0b0b461 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala @@ -52,7 +52,11 @@ case class DataFrameOprAdaptor() extends RuleAdaptor { val name = getRuleName(param) val step = DfOprStep(name, getRule(param), getDetails(param), getCache(param), getGlobal(param)) val mode = ExportMode.defaultMode(procType) - RulePlan(step :: Nil, genRuleExports(param, name, name, timeInfo.calcTime, mode)) + RulePlan( + step :: Nil, + genRuleExports(param, name, name, timeInfo.calcTime, mode), + genDsUpdates(param, "", name) + ) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala index bd27b19..f6f35da 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala @@ -18,54 +18,6 @@ under the License. */ package org.apache.griffin.measure.rule.adaptor -object AccuracyKeys { - val _source = "source" - val _target = "target" - val _miss = "miss" - val _total = "total" - val _matched = "matched" - // val _missRecords = "missRecords" -} - -object ProfilingKeys { - val _source = "source" -} - -object UniquenessKeys { - val _source = "source" - val _target = "target" - val _unique = "unique" - val _total = "total" - val _dup = "dup" - val _num = "num" - - val _duplicationArray = "duplication.array" -} - -object DistinctnessKeys { - val _source = "source" - val _target = "target" - val _distinct = "distinct" - val _total = "total" - val _dup = "dup" - val _accu_dup = "accu_dup" - val _num = "num" - - val _duplicationArray = "duplication.array" - val _withAccumulate = "with.accumulate" -} - -object TimelinessKeys { - val _source = "source" - val _latency = "latency" - val _total = "total" - val _avg = "avg" - val _threshold = "threshold" - val _step = "step" - val _count = "count" - val _stepSize = "step.size" -} - object GlobalKeys { val _initRule = "init.rule" } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala index 5655a13..3b4ec31 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala @@ -18,17 +18,11 @@ under the License. */ package org.apache.griffin.measure.rule.adaptor -import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache} -import org.apache.griffin.measure.process.engine.DataFrameOprs.AccuracyOprKeys -import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange} +import org.apache.griffin.measure.process.temp._ import org.apache.griffin.measure.process._ -import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.dsl.analyzer._ -import org.apache.griffin.measure.rule.dsl.expr._ import org.apache.griffin.measure.rule.dsl.parser.GriffinDslParser import org.apache.griffin.measure.rule.plan.{TimeInfo, _} -import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.griffin.measure.utils.TimeUtil +import org.apache.griffin.measure.rule.trans._ case class GriffinDslAdaptor(dataSourceNames: Seq[String], functionNames: Seq[String] @@ -42,7 +36,6 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], val parser = GriffinDslParser(dataSourceNames, filteredFunctionNames) private val emptyRulePlan = RulePlan(Nil, Nil) - private val emptyMap = Map[String, Any]() override def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], processType: ProcessType, dsTimeRanges: Map[String, TimeRange] @@ -54,14 +47,9 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], val result = parser.parseRule(rule, dqType) if (result.successful) { val expr = result.get - dqType match { - case AccuracyType => accuracyRulePlan(timeInfo, name, expr, param, processType) - case ProfilingType => profilingRulePlan(timeInfo, name, expr, param, processType) - case UniquenessType => uniquenessRulePlan(timeInfo, name, expr, param, processType) - case DistinctnessType => distinctRulePlan(timeInfo, name, expr, param, processType, dsTimeRanges) - case TimelinessType => timelinessRulePlan(timeInfo, name, expr, param, processType) - case _ => emptyRulePlan - } + val rulePlanTrans = RulePlanTrans(dqType, dataSourceNames, timeInfo, + name, expr, param, processType, dsTimeRanges) + rulePlanTrans.trans } else { warn(s"parse rule [ ${rule} ] fails: \n${result}") emptyRulePlan @@ -74,741 +62,4 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } } - // with accuracy opr - private def accuracyRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], procType: ProcessType - ): RulePlan = { - val details = getDetails(param) - val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head) - val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head) - val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) - - val mode = ExportMode.defaultMode(procType) - - val ct = timeInfo.calcTime - - if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { - println(s"[${ct}] data source ${sourceName} not exists") - emptyRulePlan - } else { - // 1. miss record - val missRecordsTableName = "__missRecords" - val selClause = s"`${sourceName}`.*" - val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) { - println(s"[${ct}] data source ${targetName} not exists") - s"SELECT ${selClause} FROM `${sourceName}`" - } else { - val onClause = expr.coalesceDesc - val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => - s"${sel.desc} IS NULL" - }.mkString(" AND ") - val targetIsNull = analyzer.targetSelectionExprs.map { sel => - s"${sel.desc} IS NULL" - }.mkString(" AND ") - val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" - s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" - } - val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, emptyMap, true) - val missRecordsExports = procType match { - case BatchProcessType => { - val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - genRecordExport(recordParam, missRecordsTableName, missRecordsTableName, ct, mode) :: Nil - } - case StreamingProcessType => Nil - } - - // 2. miss count - val missCountTableName = "__missCount" - val missColName = details.getStringOrKey(AccuracyKeys._miss) - val missCountSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`" - case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${InternalColumns.tmst}`" - } - val missCountStep = SparkSqlStep(missCountTableName, missCountSql, emptyMap) - - // 3. total count - val totalCountTableName = "__totalCount" - val totalColName = details.getStringOrKey(AccuracyKeys._total) - val totalCountSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" - case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`" - } - val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, emptyMap) - - // 4. accuracy metric - val accuracyTableName = name - val matchedColName = details.getStringOrKey(AccuracyKeys._matched) - val accuracyMetricSql = procType match { - case BatchProcessType => { - s""" - |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, - |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, - |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` - |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` - """.stripMargin - } - case StreamingProcessType => { - s""" - |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, - |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, - |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, - |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` - |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` - |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}` - """.stripMargin - } - } - val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, emptyMap) - val accuracyExports = procType match { - case BatchProcessType => { - val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - genMetricExport(metricParam, accuracyTableName, accuracyTableName, ct, mode) :: Nil - } - case StreamingProcessType => Nil - } - - // current accu plan - val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: accuracyStep :: Nil - val accuExports = missRecordsExports ++ accuracyExports - val accuPlan = RulePlan(accuSteps, accuExports) - - // streaming extra accu plan - val streamingAccuPlan = procType match { - case BatchProcessType => emptyRulePlan - case StreamingProcessType => { - // 5. accuracy metric merge - val accuracyMetricTableName = "__accuracy" - val accuracyMetricRule = "accuracy" - val accuracyMetricDetails = Map[String, Any]( - (AccuracyOprKeys._dfName -> accuracyTableName), - (AccuracyOprKeys._miss -> missColName), - (AccuracyOprKeys._total -> totalColName), - (AccuracyOprKeys._matched -> matchedColName) - ) - val accuracyMetricStep = DfOprStep(accuracyMetricTableName, - accuracyMetricRule, accuracyMetricDetails) - val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - val accuracyMetricExports = genMetricExport(metricParam, name, accuracyMetricTableName, ct, mode) :: Nil - - // 6. collect accuracy records - val accuracyRecordTableName = "__accuracyRecords" - val accuracyRecordSql = { - s""" - |SELECT `${InternalColumns.tmst}`, `${InternalColumns.empty}` - |FROM `${accuracyMetricTableName}` WHERE `${InternalColumns.record}` - """.stripMargin - } - val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, accuracyRecordSql, emptyMap) - val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val accuracyRecordParam = recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName) - .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName) - val accuracyRecordExports = genRecordExport( - accuracyRecordParam, missRecordsTableName, accuracyRecordTableName, ct, mode) :: Nil - - // gen accu plan - val extraSteps = accuracyMetricStep :: accuracyRecordStep :: Nil - val extraExports = accuracyMetricExports ++ accuracyRecordExports - val extraPlan = RulePlan(extraSteps, extraExports) - - extraPlan - } - } - - // return accu plan - accuPlan.merge(streamingAccuPlan) - - } - } - - private def profilingRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], procType: ProcessType - ): RulePlan = { - val details = getDetails(param) - val profilingClause = expr.asInstanceOf[ProfilingClause] - val sourceName = profilingClause.fromClauseOpt match { - case Some(fc) => fc.dataSource - case _ => details.getString(ProfilingKeys._source, dataSourceNames.head) - } - val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc - - val mode = ExportMode.defaultMode(procType) - - val ct = timeInfo.calcTime - - if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { - emptyRulePlan - } else { - val analyzer = ProfilingAnalyzer(profilingClause, sourceName) - val selExprDescs = analyzer.selectionExprs.map { sel => - val alias = sel match { - case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`" - case _ => "" - } - s"${sel.desc}${alias}" - } - val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString - val selClause = procType match { - case BatchProcessType => selExprDescs.mkString(", ") - case StreamingProcessType => (s"`${InternalColumns.tmst}`" +: selExprDescs).mkString(", ") - } - val groupByClauseOpt = analyzer.groupbyExprOpt - val groupbyClause = procType match { - case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("") - case StreamingProcessType => { - val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${InternalColumns.tmst}`") :: Nil, None) - val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt match { - case Some(gbc) => gbc - case _ => GroupbyClause(Nil, None) - }) - mergedGroubbyClause.desc - } - } - val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ") - val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ") - - // 1. select statement - val profilingSql = { - s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" - } - val profilingName = name - val profilingStep = SparkSqlStep(profilingName, profilingSql, details) - val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - val profilingExports = genMetricExport(metricParam, name, profilingName, ct, mode) :: Nil - - RulePlan(profilingStep :: Nil, profilingExports) - } - } - - private def uniquenessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], procType: ProcessType - ): RulePlan = { - val details = getDetails(param) - val sourceName = details.getString(UniquenessKeys._source, dataSourceNames.head) - val targetName = details.getString(UniquenessKeys._target, dataSourceNames.tail.head) - val analyzer = UniquenessAnalyzer(expr.asInstanceOf[UniquenessClause], sourceName, targetName) - - val mode = ExportMode.defaultMode(procType) - - val ct = timeInfo.calcTime - - if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { - println(s"[${ct}] data source ${sourceName} not exists") - emptyRulePlan - } else if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) { - println(s"[${ct}] data source ${targetName} not exists") - emptyRulePlan - } else { - val selItemsClause = analyzer.selectionPairs.map { pair => - val (expr, alias) = pair - s"${expr.desc} AS `${alias}`" - }.mkString(", ") - val aliases = analyzer.selectionPairs.map(_._2) - - val selClause = procType match { - case BatchProcessType => selItemsClause - case StreamingProcessType => s"`${InternalColumns.tmst}`, ${selItemsClause}" - } - val selAliases = procType match { - case BatchProcessType => aliases - case StreamingProcessType => InternalColumns.tmst +: aliases - } - - // 1. source distinct mapping - val sourceTableName = "__source" - val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}" - val sourceStep = SparkSqlStep(sourceTableName, sourceSql, emptyMap) - - // 2. target mapping - val targetTableName = "__target" - val targetSql = s"SELECT ${selClause} FROM ${targetName}" - val targetStep = SparkSqlStep(targetTableName, targetSql, emptyMap) - - // 3. joined - val joinedTableName = "__joined" - val joinedSelClause = selAliases.map { alias => - s"`${sourceTableName}`.`${alias}` AS `${alias}`" - }.mkString(", ") - val onClause = aliases.map { alias => - s"coalesce(`${sourceTableName}`.`${alias}`, '') = coalesce(`${targetTableName}`.`${alias}`, '')" - }.mkString(" AND ") - val joinedSql = { - s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN `${sourceTableName}` ON ${onClause}" - } - val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap) - - // 4. group - val groupTableName = "__group" - val groupSelClause = selAliases.map { alias => - s"`${alias}`" - }.mkString(", ") - val dupColName = details.getStringOrKey(UniquenessKeys._dup) - val groupSql = { - s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM `${joinedTableName}` GROUP BY ${groupSelClause}" - } - val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap, true) - - // 5. total metric - val totalTableName = "__totalMetric" - val totalColName = details.getStringOrKey(UniquenessKeys._total) - val totalSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" - case StreamingProcessType => { - s""" - |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` - |FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}` - """.stripMargin - } - } - val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap) - val totalMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, ct, mode) - - // 6. unique record - val uniqueRecordTableName = "__uniqueRecord" - val uniqueRecordSql = { - s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0" - } - val uniqueRecordStep = SparkSqlStep(uniqueRecordTableName, uniqueRecordSql, emptyMap) - - // 7. unique metric - val uniqueTableName = "__uniqueMetric" - val uniqueColName = details.getStringOrKey(UniquenessKeys._unique) - val uniqueSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`" - case StreamingProcessType => { - s""" - |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${uniqueColName}` - |FROM `${uniqueRecordTableName}` GROUP BY `${InternalColumns.tmst}` - """.stripMargin - } - } - val uniqueStep = SparkSqlStep(uniqueTableName, uniqueSql, emptyMap) - val uniqueMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val uniqueMetricExport = genMetricExport(uniqueMetricParam, uniqueColName, uniqueTableName, ct, mode) - - val uniqueSteps = sourceStep :: targetStep :: joinedStep :: groupStep :: - totalStep :: uniqueRecordStep :: uniqueStep :: Nil - val uniqueExports = totalMetricExport :: uniqueMetricExport :: Nil - val uniqueRulePlan = RulePlan(uniqueSteps, uniqueExports) - - val duplicationArrayName = details.getString(UniquenessKeys._duplicationArray, "") - val dupRulePlan = if (duplicationArrayName.nonEmpty) { - // 8. duplicate record - val dupRecordTableName = "__dupRecords" - val dupRecordSql = { - s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0" - } - val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) - val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, dupRecordTableName, ct, mode) - - // 9. duplicate metric - val dupMetricTableName = "__dupMetric" - val numColName = details.getStringOrKey(UniquenessKeys._num) - val dupMetricSelClause = procType match { - case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`" - case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`" - } - val dupMetricGroupbyClause = procType match { - case BatchProcessType => s"`${dupColName}`" - case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`" - } - val dupMetricSql = { - s""" - |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}` - |GROUP BY ${dupMetricGroupbyClause} - """.stripMargin - } - val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) - val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, ct, mode) - - RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: dupMetricExport :: Nil) - } else emptyRulePlan - - uniqueRulePlan.merge(dupRulePlan) - } - } - - private def distinctRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], procType: ProcessType, - dsTimeRanges: Map[String, TimeRange] - ): RulePlan = { - val details = getDetails(param) - val sourceName = details.getString(DistinctnessKeys._source, dataSourceNames.head) - val targetName = details.getString(UniquenessKeys._target, dataSourceNames.tail.head) - val analyzer = DistinctnessAnalyzer(expr.asInstanceOf[DistinctnessClause], sourceName) - - val mode = SimpleMode - - val ct = timeInfo.calcTime - - val sourceTimeRange = dsTimeRanges.get(sourceName).getOrElse(TimeRange(ct)) - val beginTime = sourceTimeRange.begin - - if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { - println(s"[${ct}] data source ${sourceName} not exists") - emptyRulePlan - } else { - val withOlderTable = { - details.getBoolean(DistinctnessKeys._withAccumulate, true) && - TableRegisters.existRunTempTable(timeInfo.key, targetName) - } - - val selClause = analyzer.selectionPairs.map { pair => - val (expr, alias) = pair - s"${expr.desc} AS `${alias}`" - }.mkString(", ") - val aliases = analyzer.selectionPairs.map(_._2) - val aliasesClause = aliases.map( a => s"`${a}`" ).mkString(", ") - - // 1. source alias - val sourceAliasTableName = "__sourceAlias" - val sourceAliasSql = { - s"SELECT ${selClause} FROM `${sourceName}`" - } - val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, emptyMap, true) - - // 2. total metric - val totalTableName = "__totalMetric" - val totalColName = details.getStringOrKey(DistinctnessKeys._total) - val totalSql = { - s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`" - } - val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap) - val totalMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, beginTime, mode) - - // 3. group by self - val selfGroupTableName = "__selfGroup" - val dupColName = details.getStringOrKey(DistinctnessKeys._dup) - val accuDupColName = details.getStringOrKey(DistinctnessKeys._accu_dup) - val selfGroupSql = { - s""" - |SELECT ${aliasesClause}, (COUNT(*) - 1) AS `${dupColName}`, - |TRUE AS `${InternalColumns.distinct}` - |FROM `${sourceAliasTableName}` GROUP BY ${aliasesClause} - """.stripMargin - } - val selfGroupStep = SparkSqlStep(selfGroupTableName, selfGroupSql, emptyMap, true) - - val selfDistRulePlan = RulePlan( - sourceAliasStep :: totalStep :: selfGroupStep :: Nil, - totalMetricExport :: Nil - ) - - val (distRulePlan, dupCountTableName) = procType match { - case StreamingProcessType if (withOlderTable) => { - // 4. older alias - val olderAliasTableName = "__older" - val olderAliasSql = { - s"SELECT ${selClause} FROM `${targetName}` WHERE `${InternalColumns.tmst}` < ${beginTime}" - } - val olderAliasStep = SparkSqlStep(olderAliasTableName, olderAliasSql, emptyMap) - - // 5. join with older data - val joinedTableName = "__joined" - val selfSelClause = (aliases :+ dupColName).map { alias => - s"`${selfGroupTableName}`.`${alias}`" - }.mkString(", ") - val onClause = aliases.map { alias => - s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = coalesce(`${olderAliasTableName}`.`${alias}`, '')" - }.mkString(" AND ") - val olderIsNull = aliases.map { alias => - s"`${olderAliasTableName}`.`${alias}` IS NULL" - }.mkString(" AND ") - val joinedSql = { - s""" - |SELECT ${selfSelClause}, (${olderIsNull}) AS `${InternalColumns.distinct}` - |FROM `${olderAliasTableName}` RIGHT JOIN `${selfGroupTableName}` - |ON ${onClause} - """.stripMargin - } - val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap) - - // 6. group by joined data - val groupTableName = "__group" - val moreDupColName = "_more_dup" - val groupSql = { - s""" - |SELECT ${aliasesClause}, `${dupColName}`, `${InternalColumns.distinct}`, - |COUNT(*) AS `${moreDupColName}` - |FROM `${joinedTableName}` - |GROUP BY ${aliasesClause}, `${dupColName}`, `${InternalColumns.distinct}` - """.stripMargin - } - val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap) - - // 7. final duplicate count - val finalDupCountTableName = "__finalDupCount" - val finalDupCountSql = { - s""" - |SELECT ${aliasesClause}, `${InternalColumns.distinct}`, - |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}` - |ELSE (`${dupColName}` + 1) END AS `${dupColName}`, - |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}` - |ELSE (`${dupColName}` + `${moreDupColName}`) END AS `${accuDupColName}` - |FROM `${groupTableName}` - """.stripMargin - } - val finalDupCountStep = SparkSqlStep(finalDupCountTableName, finalDupCountSql, emptyMap, true) - - val rulePlan = RulePlan(olderAliasStep :: joinedStep :: groupStep :: finalDupCountStep :: Nil, Nil) - (rulePlan, finalDupCountTableName) - } - case _ => { - (emptyRulePlan, selfGroupTableName) - } - } - - // 8. distinct metric - val distTableName = "__distMetric" - val distColName = details.getStringOrKey(DistinctnessKeys._distinct) - val distSql = { - s""" - |SELECT COUNT(*) AS `${distColName}` - |FROM `${dupCountTableName}` WHERE `${InternalColumns.distinct}` - """.stripMargin - } - val distStep = SparkSqlStep(distTableName, distSql, emptyMap) - val distMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val distMetricExport = genMetricExport(distMetricParam, distColName, distTableName, beginTime, mode) - - val distMetricRulePlan = RulePlan(distStep :: Nil, distMetricExport :: Nil) - - val duplicationArrayName = details.getString(UniquenessKeys._duplicationArray, "") - val dupRulePlan = if (duplicationArrayName.nonEmpty) { - // 9. duplicate record - val dupRecordTableName = "__dupRecords" - val dupRecordSelClause = procType match { - case StreamingProcessType if (withOlderTable) => s"${aliasesClause}, `${dupColName}`, `${accuDupColName}`" - case _ => s"${aliasesClause}, `${dupColName}`" - } - val dupRecordSql = { - s""" - |SELECT ${dupRecordSelClause} - |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0 - """.stripMargin - } - val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) - val dupRecordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val dupRecordExport = genRecordExport(dupRecordParam, dupRecordTableName, dupRecordTableName, beginTime, mode) - - // 10. duplicate metric - val dupMetricTableName = "__dupMetric" - val numColName = details.getStringOrKey(DistinctnessKeys._num) - val dupMetricSql = { - s""" - |SELECT `${dupColName}`, COUNT(*) AS `${numColName}` - |FROM `${dupRecordTableName}` GROUP BY `${dupColName}` - """.stripMargin - } - val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) - val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, beginTime, mode) - - RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: dupMetricExport :: Nil) - } else emptyRulePlan - - selfDistRulePlan.merge(distRulePlan).merge(distMetricRulePlan).merge(dupRulePlan) - - } - } - - private def timelinessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], procType: ProcessType - ): RulePlan = { - val details = getDetails(param) - val timelinessClause = expr.asInstanceOf[TimelinessClause] - val sourceName = details.getString(TimelinessKeys._source, dataSourceNames.head) - - val mode = ExportMode.defaultMode(procType) - - val ct = timeInfo.calcTime - - if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { - emptyRulePlan - } else { - val analyzer = TimelinessAnalyzer(timelinessClause, sourceName) - val btsSel = analyzer.btsExpr - val etsSelOpt = analyzer.etsExprOpt - - // 1. in time - val inTimeTableName = "__inTime" - val inTimeSql = etsSelOpt match { - case Some(etsSel) => { - s""" - |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`, - |(${etsSel}) AS `${InternalColumns.endTs}` - |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) IS NOT NULL - """.stripMargin - } - case _ => { - s""" - |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}` - |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL - """.stripMargin - } - } - val inTimeStep = SparkSqlStep(inTimeTableName, inTimeSql, emptyMap) - - // 2. latency - val latencyTableName = "__lat" - val latencyColName = details.getStringOrKey(TimelinessKeys._latency) - val etsColName = etsSelOpt match { - case Some(_) => InternalColumns.endTs - case _ => InternalColumns.tmst - } - val latencySql = { - s"SELECT *, (`${etsColName}` - `${InternalColumns.beginTs}`) AS `${latencyColName}` FROM `${inTimeTableName}`" - } - val latencyStep = SparkSqlStep(latencyTableName, latencySql, emptyMap, true) - - // 3. timeliness metric - val metricTableName = name - val totalColName = details.getStringOrKey(TimelinessKeys._total) - val avgColName = details.getStringOrKey(TimelinessKeys._avg) - val metricSql = procType match { - case BatchProcessType => { - s""" - |SELECT COUNT(*) AS `${totalColName}`, - |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}` - |FROM `${latencyTableName}` - """.stripMargin - } - case StreamingProcessType => { - s""" - |SELECT `${InternalColumns.tmst}`, - |COUNT(*) AS `${totalColName}`, - |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}` - |FROM `${latencyTableName}` - |GROUP BY `${InternalColumns.tmst}` - """.stripMargin - } - } - val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap) - val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - val metricExports = genMetricExport(metricParam, name, metricTableName, ct, mode) :: Nil - - // current timeliness plan - val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil - val timeExports = metricExports - val timePlan = RulePlan(timeSteps, timeExports) - - // 4. timeliness record - val recordPlan = TimeUtil.milliseconds(details.getString(TimelinessKeys._threshold, "")) match { - case Some(tsh) => { - val recordTableName = "__lateRecords" - val recordSql = { - s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > ${tsh}" - } - val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap) - val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val recordExports = genRecordExport(recordParam, recordTableName, recordTableName, ct, mode) :: Nil - RulePlan(recordStep :: Nil, recordExports) - } - case _ => emptyRulePlan - } - - // 5. ranges -// val rangePlan = details.get(TimelinessKeys._rangeSplit) match { -// case Some(arr: Seq[String]) => { -// val ranges = splitTimeRanges(arr) -// if (ranges.size > 0) { -// try { -// // 5.1. range -// val rangeTableName = "__range" -// val rangeColName = details.getStringOrKey(TimelinessKeys._range) -// val caseClause = { -// val whenClause = ranges.map { range => -// s"WHEN `${latencyColName}` < ${range._1} THEN '<${range._2}'" -// }.mkString("\n") -// s"CASE ${whenClause} ELSE '>=${ranges.last._2}' END AS `${rangeColName}`" -// } -// val rangeSql = { -// s"SELECT *, ${caseClause} FROM `${latencyTableName}`" -// } -// val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap) -// -// // 5.2. range metric -// val rangeMetricTableName = "__rangeMetric" -// val countColName = details.getStringOrKey(TimelinessKeys._count) -// val rangeMetricSql = procType match { -// case BatchProcessType => { -// s""" -// |SELECT `${rangeColName}`, COUNT(*) AS `${countColName}` -// |FROM `${rangeTableName}` GROUP BY `${rangeColName}` -// """.stripMargin -// } -// case StreamingProcessType => { -// s""" -// |SELECT `${InternalColumns.tmst}`, `${rangeColName}`, COUNT(*) AS `${countColName}` -// |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, `${rangeColName}` -// """.stripMargin -// } -// } -// val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap) -// val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) -// val rangeMetricExports = genMetricExport(rangeMetricParam, rangeColName, rangeMetricTableName, ct, mode) :: Nil -// -// RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports) -// } catch { -// case _: Throwable => emptyRulePlan -// } -// } else emptyRulePlan -// } -// case _ => emptyRulePlan -// } - - // return timeliness plan - - // 5. ranges - val rangePlan = TimeUtil.milliseconds(details.getString(TimelinessKeys._stepSize, "")) match { - case Some(stepSize) => { - // 5.1 range - val rangeTableName = "__range" - val stepColName = details.getStringOrKey(TimelinessKeys._step) - val rangeSql = { - s""" - |SELECT *, CAST((`${latencyColName}` / ${stepSize}) AS BIGINT) AS `${stepColName}` - |FROM `${latencyTableName}` - """.stripMargin - } - val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap) - - // 5.2 range metric - val rangeMetricTableName = "__rangeMetric" - val countColName = details.getStringOrKey(TimelinessKeys._count) - val rangeMetricSql = procType match { - case BatchProcessType => { - s""" - |SELECT `${stepColName}`, COUNT(*) AS `${countColName}` - |FROM `${rangeTableName}` GROUP BY `${stepColName}` - """.stripMargin - } - case StreamingProcessType => { - s""" - |SELECT `${InternalColumns.tmst}`, `${stepColName}`, COUNT(*) AS `${countColName}` - |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, `${stepColName}` - """.stripMargin - } - } - val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap) - val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val rangeMetricExports = genMetricExport(rangeMetricParam, stepColName, rangeMetricTableName, ct, mode) :: Nil - - RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports) - } - case _ => emptyRulePlan - } - - timePlan.merge(recordPlan).merge(rangePlan) - } - } - - private def splitTimeRanges(tstrs: Seq[String]): List[(Long, String)] = { - val ts = tstrs.flatMap(TimeUtil.milliseconds(_)).sorted.toList - ts.map { t => (t, TimeUtil.time2String(t)) } - } - } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala index 25025ac..e85575f 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala @@ -20,29 +20,12 @@ package org.apache.griffin.measure.rule.adaptor import java.util.concurrent.atomic.AtomicLong -import org.apache.griffin.measure.cache.tmst.TempName - -import scala.collection.mutable.{Set => MutableSet} -import org.apache.griffin.measure.config.params.user._ import org.apache.griffin.measure.log.Loggable import org.apache.griffin.measure.process.{ExportMode, ProcessType} import org.apache.griffin.measure.process.temp.TimeRange import org.apache.griffin.measure.rule.dsl._ import org.apache.griffin.measure.rule.plan.{TimeInfo, _} - -//object RuleInfoKeys { -// val _name = "name" -// val _rule = "rule" -// val _details = "details" -// val _dslType = "dsl.type" -// val _dqType = "dq.type" -// val _global = "global" -//// val _gatherStep = "gather.step" -// -// val _metric = "metric" -// val _record = "record" -//} -//import RuleInfoKeys._ +import org.apache.griffin.measure.rule.trans.{DsUpdateFactory, RuleExportFactory} import org.apache.griffin.measure.utils.ParamUtil._ object RuleParamKeys { @@ -56,6 +39,7 @@ object RuleParamKeys { val _metric = "metric" val _record = "record" + val _dsUpdate = "ds.update" def getName(param: Map[String, Any], defName: String): String = param.getString(_name, defName) def getRule(param: Map[String, Any]): String = param.getString(_rule, "") @@ -66,52 +50,11 @@ object RuleParamKeys { def getMetricOpt(param: Map[String, Any]): Option[Map[String, Any]] = param.getParamMapOpt(_metric) def getRecordOpt(param: Map[String, Any]): Option[Map[String, Any]] = param.getParamMapOpt(_record) -} - -object ExportParamKeys { - val _name = "name" - val _collectType = "collect.type" - val _dataSourceCache = "data.source.cache" - val _originDF = "origin.DF" - - def getName(param: Map[String, Any], defName: String): String = param.getString(_name, defName) - def getCollectType(param: Map[String, Any]): CollectType = CollectType(param.getString(_collectType, "")) - def getDataSourceCacheOpt(param: Map[String, Any]): Option[String] = param.get(_dataSourceCache).map(_.toString) - def getOriginDFOpt(param: Map[String, Any]): Option[String] = param.get(_originDF).map(_.toString) + def getDsUpdateOpt(param: Map[String, Any]): Option[Map[String, Any]] = param.getParamMapOpt(_dsUpdate) } trait RuleAdaptor extends Loggable with Serializable { -// val adaptPhase: AdaptPhase - -// protected def genRuleInfo(param: Map[String, Any]): RuleInfo = RuleInfoGen(param) - -// protected def getName(param: Map[String, Any]) = param.getOrElse(_name, RuleStepNameGenerator.genName).toString -// protected def getRule(param: Map[String, Any]) = param.getOrElse(_rule, "").toString -// protected def getDetails(param: Map[String, Any]) = param.get(_details) match { -// case Some(dt: Map[String, Any]) => dt -// case _ => Map[String, Any]() -// } - - - -// def getPersistNames(steps: Seq[RuleStep]): Seq[String] = steps.map(_.ruleInfo.persistName) -// -// protected def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): Seq[RuleStep] -// protected def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] -// def genConcreteRuleStep(timeInfo: TimeInfo, param: Map[String, Any] -// ): Seq[ConcreteRuleStep] = { -// genRuleStep(timeInfo, param).flatMap { rs => -// adaptConcreteRuleStep(rs) -// } -// } - - - -// def genRuleInfos(param: Map[String, Any], timeInfo: TimeInfo): Seq[RuleInfo] = { -// RuleInfoGen(param) :: Nil -// } - protected def getRuleName(param: Map[String, Any]): String = { RuleParamKeys.getName(param, RuleStepNameGenerator.genName) } @@ -124,66 +67,25 @@ trait RuleAdaptor extends Loggable with Serializable { mode: ExportMode ): Seq[RuleExport] = { val metricOpt = RuleParamKeys.getMetricOpt(param) - val metricExportSeq = metricOpt.map(genMetricExport(_, defName, stepName, defTimestamp, mode)).toSeq + val metricExportSeq = metricOpt.map( + RuleExportFactory.genMetricExport(_, defName, stepName, defTimestamp, mode) + ).toSeq val recordOpt = RuleParamKeys.getRecordOpt(param) - val recordExportSeq = recordOpt.map(genRecordExport(_, defName, stepName, defTimestamp, mode)).toSeq + val recordExportSeq = recordOpt.map( + RuleExportFactory.genRecordExport(_, defName, stepName, defTimestamp, mode) + ).toSeq metricExportSeq ++ recordExportSeq } - protected def genMetricExport(param: Map[String, Any], name: String, stepName: String, - defTimestamp: Long, mode: ExportMode - ): MetricExport = { - MetricExport( - ExportParamKeys.getName(param, name), - stepName, - ExportParamKeys.getCollectType(param), - defTimestamp, - mode - ) - } - protected def genRecordExport(param: Map[String, Any], name: String, stepName: String, - defTimestamp: Long, mode: ExportMode - ): RecordExport = { - RecordExport( - ExportParamKeys.getName(param, name), - stepName, - ExportParamKeys.getDataSourceCacheOpt(param), - ExportParamKeys.getOriginDFOpt(param), - defTimestamp, - mode - ) - } - + protected def genDsUpdates(param: Map[String, Any], defDsName: String, + stepName: String + ): Seq[DsUpdate] = { + val dsUpdateOpt = RuleParamKeys.getDsUpdateOpt(param) + dsUpdateOpt.map(DsUpdateFactory.genDsUpdate(_, defDsName, stepName)).toSeq + } } - - -//object RuleInfoGen { -// def apply(param: Map[String, Any]): RuleInfo = { -// val name = param.get(_name) match { -// case Some(n: String) => n -// case _ => RuleStepNameGenerator.genName -// } -// RuleInfo( -// name, -// None, -// DslType(param.getString(_dslType, "")), -// param.getString(_rule, ""), -// param.getParamMap(_details), -// param.getBoolean(_gatherStep, false) -// ) -// } -// def apply(ri: RuleInfo, timeInfo: TimeInfo): RuleInfo = { -// if (ri.persistType.needPersist) { -// val tmstName = TempName.tmstName(ri.name, timeInfo) -// ri.setTmstNameOpt(Some(tmstName)) -// } else ri -// } -// -// def dqType(param: Map[String, Any]): DqType = DqType(param.getString(_dqType, "")) -//} - object RuleStepNameGenerator { private val counter: AtomicLong = new AtomicLong(0L) private val head: String = "rs" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala index 1fce03b..b7c68b5 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala @@ -45,7 +45,11 @@ case class SparkSqlAdaptor() extends RuleAdaptor { val name = getRuleName(param) val step = SparkSqlStep(name, getRule(param), getDetails(param), getCache(param), getGlobal(param)) val mode = ExportMode.defaultMode(procType) - RulePlan(step :: Nil, genRuleExports(param, name, name, timeInfo.calcTime, mode)) + RulePlan( + step :: Nil, + genRuleExports(param, name, name, timeInfo.calcTime, mode), + genDsUpdates(param, "", name) + ) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala new file mode 100644 index 0000000..4956b29 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala @@ -0,0 +1,24 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.plan + +case class DsUpdate(dsName: String, + stepName: String + ) extends Serializable { +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala index ac14153..84313e4 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala @@ -28,7 +28,4 @@ case class MetricExport(name: String, mode: ExportMode ) extends RuleExport { - def setDefTimestamp(t: Long): RuleExport = - MetricExport(name, stepName, collectType, t, mode) - } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala index 6afc836..c69dc55 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala @@ -28,7 +28,4 @@ case class RecordExport(name: String, mode: ExportMode ) extends RuleExport { - def setDefTimestamp(t: Long): RuleExport = - RecordExport(name, stepName, dataSourceCacheOpt, originDFOpt, t, mode) - } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala index 84467c2..da5eb9d 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala @@ -30,6 +30,4 @@ trait RuleExport extends Serializable { val mode: ExportMode // export mode - def setDefTimestamp(t: Long): RuleExport - } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala index 54a6062..678ab3e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala @@ -21,7 +21,8 @@ package org.apache.griffin.measure.rule.plan import scala.reflect.ClassTag case class RulePlan(ruleSteps: Seq[RuleStep], - ruleExports: Seq[RuleExport] + ruleExports: Seq[RuleExport], + dsUpdates: Seq[DsUpdate] = Nil ) extends Serializable { val globalRuleSteps = filterRuleSteps(_.global) @@ -48,7 +49,11 @@ case class RulePlan(ruleSteps: Seq[RuleStep], // } def merge(rp: RulePlan): RulePlan = { - RulePlan(this.ruleSteps ++ rp.ruleSteps, this.ruleExports ++ rp.ruleExports) + RulePlan( + this.ruleSteps ++ rp.ruleSteps, + this.ruleExports ++ rp.ruleExports, + this.dsUpdates ++ rp.dsUpdates + ) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala new file mode 100644 index 0000000..2ff8feb --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala @@ -0,0 +1,198 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.trans + +import org.apache.griffin.measure.process.engine.DataFrameOprs.AccuracyOprKeys +import org.apache.griffin.measure.process.temp.TableRegisters +import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, ProcessType, StreamingProcessType} +import org.apache.griffin.measure.rule.adaptor._ +import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._ +import org.apache.griffin.measure.rule.dsl.analyzer.AccuracyAnalyzer +import org.apache.griffin.measure.rule.dsl.expr.{Expr, LogicalExpr} +import org.apache.griffin.measure.rule.plan._ +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.griffin.measure.rule.trans.RuleExportFactory._ +import org.apache.griffin.measure.rule.trans.DsUpdateFactory._ + +case class AccuracyRulePlanTrans(dataSourceNames: Seq[String], + timeInfo: TimeInfo, name: String, expr: Expr, + param: Map[String, Any], procType: ProcessType + ) extends RulePlanTrans { + + private object AccuracyKeys { + val _source = "source" + val _target = "target" + val _miss = "miss" + val _total = "total" + val _matched = "matched" + } + import AccuracyKeys._ + + def trans(): RulePlan = { + val details = getDetails(param) + val sourceName = details.getString(_source, dataSourceNames.head) + val targetName = details.getString(_target, dataSourceNames.tail.head) + val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) + + val mode = ExportMode.defaultMode(procType) + + val ct = timeInfo.calcTime + + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { + println(s"[${ct}] data source ${sourceName} not exists") + emptyRulePlan + } else { + // 1. miss record + val missRecordsTableName = "__missRecords" + val selClause = s"`${sourceName}`.*" + val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) { + println(s"[${ct}] data source ${targetName} not exists") + s"SELECT ${selClause} FROM `${sourceName}`" + } else { + val onClause = expr.coalesceDesc + val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => + s"${sel.desc} IS NULL" + }.mkString(" AND ") + val targetIsNull = analyzer.targetSelectionExprs.map { sel => + s"${sel.desc} IS NULL" + }.mkString(" AND ") + val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" + s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" + } + val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, emptyMap, true) + val missRecordsExports = procType match { + case BatchProcessType => { + val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + genRecordExport(recordParam, missRecordsTableName, missRecordsTableName, ct, mode) :: Nil + } + case StreamingProcessType => Nil + } + val missRecordsUpdates = procType match { + case BatchProcessType => Nil + case StreamingProcessType => { + val updateParam = emptyMap + genDsUpdate(updateParam, sourceName, missRecordsTableName) :: Nil + } + } + + // 2. miss count + val missCountTableName = "__missCount" + val missColName = details.getStringOrKey(_miss) + val missCountSql = procType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`" + case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${InternalColumns.tmst}`" + } + val missCountStep = SparkSqlStep(missCountTableName, missCountSql, emptyMap) + + // 3. total count + val totalCountTableName = "__totalCount" + val totalColName = details.getStringOrKey(_total) + val totalCountSql = procType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" + case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`" + } + val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, emptyMap) + + // 4. accuracy metric + val accuracyTableName = name + val matchedColName = details.getStringOrKey(_matched) + val accuracyMetricSql = procType match { + case BatchProcessType => { + s""" + |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, + |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, + |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` + |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` + """.stripMargin + } + case StreamingProcessType => { + s""" + |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, + |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, + |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, + |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` + |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` + |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}` + """.stripMargin + } + } + val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, emptyMap) + val accuracyExports = procType match { + case BatchProcessType => { + val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) + genMetricExport(metricParam, accuracyTableName, accuracyTableName, ct, mode) :: Nil + } + case StreamingProcessType => Nil + } + + // current accu plan + val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: accuracyStep :: Nil + val accuExports = missRecordsExports ++ accuracyExports + val accuUpdates = missRecordsUpdates + val accuPlan = RulePlan(accuSteps, accuExports, accuUpdates) + + // streaming extra accu plan + val streamingAccuPlan = procType match { + case BatchProcessType => emptyRulePlan + case StreamingProcessType => { + // 5. accuracy metric merge + val accuracyMetricTableName = "__accuracy" + val accuracyMetricRule = "accuracy" + val accuracyMetricDetails = Map[String, Any]( + (AccuracyOprKeys._dfName -> accuracyTableName), + (AccuracyOprKeys._miss -> missColName), + (AccuracyOprKeys._total -> totalColName), + (AccuracyOprKeys._matched -> matchedColName) + ) + val accuracyMetricStep = DfOprStep(accuracyMetricTableName, + accuracyMetricRule, accuracyMetricDetails) + val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) + val accuracyMetricExports = genMetricExport(metricParam, name, accuracyMetricTableName, ct, mode) :: Nil + + // 6. collect accuracy records + val accuracyRecordTableName = "__accuracyRecords" + val accuracyRecordSql = { + s""" + |SELECT `${InternalColumns.tmst}`, `${InternalColumns.empty}` + |FROM `${accuracyMetricTableName}` WHERE `${InternalColumns.record}` + """.stripMargin + } + val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, accuracyRecordSql, emptyMap) + val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + val accuracyRecordParam = recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName) + .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName) + val accuracyRecordExports = genRecordExport( + accuracyRecordParam, missRecordsTableName, accuracyRecordTableName, ct, mode) :: Nil + + // gen accu plan + val extraSteps = accuracyMetricStep :: accuracyRecordStep :: Nil + val extraExports = accuracyMetricExports ++ accuracyRecordExports + val extraPlan = RulePlan(extraSteps, extraExports) + + extraPlan + } + } + + // return accu plan + accuPlan.merge(streamingAccuPlan) + + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala new file mode 100644 index 0000000..0f4e7c4 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala @@ -0,0 +1,234 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.trans + +import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange} +import org.apache.griffin.measure.process._ +import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._ +import org.apache.griffin.measure.rule.adaptor._ +import org.apache.griffin.measure.rule.dsl.{ArrayCollectType, EntriesCollectType} +import org.apache.griffin.measure.rule.dsl.analyzer.DistinctnessAnalyzer +import org.apache.griffin.measure.rule.dsl.expr._ +import org.apache.griffin.measure.rule.plan._ +import org.apache.griffin.measure.rule.trans.RuleExportFactory._ +import org.apache.griffin.measure.utils.ParamUtil._ + +case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], + timeInfo: TimeInfo, name: String, expr: Expr, + param: Map[String, Any], procType: ProcessType, + dsTimeRanges: Map[String, TimeRange] + ) extends RulePlanTrans { + + private object DistinctnessKeys { + val _source = "source" + val _target = "target" + val _distinct = "distinct" + val _total = "total" + val _dup = "dup" + val _accu_dup = "accu_dup" + val _num = "num" + + val _duplicationArray = "duplication.array" + val _withAccumulate = "with.accumulate" + } + import DistinctnessKeys._ + + def trans(): RulePlan = { + val details = getDetails(param) + val sourceName = details.getString(_source, dataSourceNames.head) + val targetName = details.getString(_target, dataSourceNames.tail.head) + val analyzer = DistinctnessAnalyzer(expr.asInstanceOf[DistinctnessClause], sourceName) + + val mode = SimpleMode + + val ct = timeInfo.calcTime + + val sourceTimeRange = dsTimeRanges.get(sourceName).getOrElse(TimeRange(ct)) + val beginTime = sourceTimeRange.begin + + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { + println(s"[${ct}] data source ${sourceName} not exists") + emptyRulePlan + } else { + val withOlderTable = { + details.getBoolean(_withAccumulate, true) && + TableRegisters.existRunTempTable(timeInfo.key, targetName) + } + + val selClause = analyzer.selectionPairs.map { pair => + val (expr, alias) = pair + s"${expr.desc} AS `${alias}`" + }.mkString(", ") + val aliases = analyzer.selectionPairs.map(_._2) + val aliasesClause = aliases.map( a => s"`${a}`" ).mkString(", ") + + // 1. source alias + val sourceAliasTableName = "__sourceAlias" + val sourceAliasSql = { + s"SELECT ${selClause} FROM `${sourceName}`" + } + val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, emptyMap, true) + + // 2. total metric + val totalTableName = "__totalMetric" + val totalColName = details.getStringOrKey(_total) + val totalSql = { + s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`" + } + val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap) + val totalMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) + val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, beginTime, mode) + + // 3. group by self + val selfGroupTableName = "__selfGroup" + val dupColName = details.getStringOrKey(_dup) + val accuDupColName = details.getStringOrKey(_accu_dup) + val selfGroupSql = { + s""" + |SELECT ${aliasesClause}, (COUNT(*) - 1) AS `${dupColName}`, + |TRUE AS `${InternalColumns.distinct}` + |FROM `${sourceAliasTableName}` GROUP BY ${aliasesClause} + """.stripMargin + } + val selfGroupStep = SparkSqlStep(selfGroupTableName, selfGroupSql, emptyMap, true) + + val selfDistRulePlan = RulePlan( + sourceAliasStep :: totalStep :: selfGroupStep :: Nil, + totalMetricExport :: Nil + ) + + val (distRulePlan, dupCountTableName) = procType match { + case StreamingProcessType if (withOlderTable) => { + // 4. older alias + val olderAliasTableName = "__older" + val olderAliasSql = { + s"SELECT ${selClause} FROM `${targetName}` WHERE `${InternalColumns.tmst}` < ${beginTime}" + } + val olderAliasStep = SparkSqlStep(olderAliasTableName, olderAliasSql, emptyMap) + + // 5. join with older data + val joinedTableName = "__joined" + val selfSelClause = (aliases :+ dupColName).map { alias => + s"`${selfGroupTableName}`.`${alias}`" + }.mkString(", ") + val onClause = aliases.map { alias => + s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = coalesce(`${olderAliasTableName}`.`${alias}`, '')" + }.mkString(" AND ") + val olderIsNull = aliases.map { alias => + s"`${olderAliasTableName}`.`${alias}` IS NULL" + }.mkString(" AND ") + val joinedSql = { + s""" + |SELECT ${selfSelClause}, (${olderIsNull}) AS `${InternalColumns.distinct}` + |FROM `${olderAliasTableName}` RIGHT JOIN `${selfGroupTableName}` + |ON ${onClause} + """.stripMargin + } + val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap) + + // 6. group by joined data + val groupTableName = "__group" + val moreDupColName = "_more_dup" + val groupSql = { + s""" + |SELECT ${aliasesClause}, `${dupColName}`, `${InternalColumns.distinct}`, + |COUNT(*) AS `${moreDupColName}` + |FROM `${joinedTableName}` + |GROUP BY ${aliasesClause}, `${dupColName}`, `${InternalColumns.distinct}` + """.stripMargin + } + val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap) + + // 7. final duplicate count + val finalDupCountTableName = "__finalDupCount" + val finalDupCountSql = { + s""" + |SELECT ${aliasesClause}, `${InternalColumns.distinct}`, + |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}` + |ELSE (`${dupColName}` + 1) END AS `${dupColName}`, + |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}` + |ELSE (`${dupColName}` + `${moreDupColName}`) END AS `${accuDupColName}` + |FROM `${groupTableName}` + """.stripMargin + } + val finalDupCountStep = SparkSqlStep(finalDupCountTableName, finalDupCountSql, emptyMap, true) + + val rulePlan = RulePlan(olderAliasStep :: joinedStep :: groupStep :: finalDupCountStep :: Nil, Nil) + (rulePlan, finalDupCountTableName) + } + case _ => { + (emptyRulePlan, selfGroupTableName) + } + } + + // 8. distinct metric + val distTableName = "__distMetric" + val distColName = details.getStringOrKey(_distinct) + val distSql = { + s""" + |SELECT COUNT(*) AS `${distColName}` + |FROM `${dupCountTableName}` WHERE `${InternalColumns.distinct}` + """.stripMargin + } + val distStep = SparkSqlStep(distTableName, distSql, emptyMap) + val distMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) + val distMetricExport = genMetricExport(distMetricParam, distColName, distTableName, beginTime, mode) + + val distMetricRulePlan = RulePlan(distStep :: Nil, distMetricExport :: Nil) + + val duplicationArrayName = details.getString(_duplicationArray, "") + val dupRulePlan = if (duplicationArrayName.nonEmpty) { + // 9. duplicate record + val dupRecordTableName = "__dupRecords" + val dupRecordSelClause = procType match { + case StreamingProcessType if (withOlderTable) => s"${aliasesClause}, `${dupColName}`, `${accuDupColName}`" + case _ => s"${aliasesClause}, `${dupColName}`" + } + val dupRecordSql = { + s""" + |SELECT ${dupRecordSelClause} + |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0 + """.stripMargin + } + val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) + val dupRecordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + val dupRecordExport = genRecordExport(dupRecordParam, dupRecordTableName, dupRecordTableName, beginTime, mode) + + // 10. duplicate metric + val dupMetricTableName = "__dupMetric" + val numColName = details.getStringOrKey(_num) + val dupMetricSql = { + s""" + |SELECT `${dupColName}`, COUNT(*) AS `${numColName}` + |FROM `${dupRecordTableName}` GROUP BY `${dupColName}` + """.stripMargin + } + val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) + val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) + val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, beginTime, mode) + + RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: dupMetricExport :: Nil) + } else emptyRulePlan + + selfDistRulePlan.merge(distRulePlan).merge(distMetricRulePlan).merge(dupRulePlan) + + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala new file mode 100644 index 0000000..772163e --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala @@ -0,0 +1,37 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.trans + +import org.apache.griffin.measure.rule.plan._ +import org.apache.griffin.measure.utils.ParamUtil._ + +object DsUpdateFactory { + + def genDsUpdate(param: Map[String, Any], defDsName: String, + stepName: String): DsUpdate = { + DsUpdate(UpdateParamKeys.getName(param, defDsName), stepName) + } + +} + +object UpdateParamKeys { + val _name = "name" + + def getName(param: Map[String, Any], defName: String): String = param.getString(_name, defName) +} \ No newline at end of file
