Repository: incubator-griffin Updated Branches: refs/heads/master 530c2dafc -> b83c58706
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala new file mode 100644 index 0000000..d9d2d4e --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala @@ -0,0 +1,98 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.trans + +import org.apache.griffin.measure.process.temp.TableRegisters +import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, ProcessType, StreamingProcessType} +import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._ +import org.apache.griffin.measure.rule.adaptor._ +import org.apache.griffin.measure.rule.dsl.analyzer.ProfilingAnalyzer +import org.apache.griffin.measure.rule.dsl.expr._ +import org.apache.griffin.measure.rule.plan._ +import org.apache.griffin.measure.rule.trans.RuleExportFactory._ +import org.apache.griffin.measure.utils.ParamUtil._ + +case class ProfilingRulePlanTrans(dataSourceNames: Seq[String], + timeInfo: TimeInfo, name: String, expr: Expr, + param: Map[String, Any], procType: ProcessType + ) extends RulePlanTrans { + + private object ProfilingKeys { + val _source = "source" + } + import ProfilingKeys._ + + def trans(): RulePlan = { + val details = getDetails(param) + val profilingClause = expr.asInstanceOf[ProfilingClause] + val sourceName = profilingClause.fromClauseOpt match { + case Some(fc) => fc.dataSource + case _ => details.getString(_source, dataSourceNames.head) + } + val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc + + val mode = ExportMode.defaultMode(procType) + + val ct = timeInfo.calcTime + + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { + emptyRulePlan + } else { + val analyzer = ProfilingAnalyzer(profilingClause, sourceName) + val selExprDescs = analyzer.selectionExprs.map { sel => + val alias = sel match { + case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`" + case _ => "" + } + s"${sel.desc}${alias}" + } + val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString + val selClause = procType match { + case BatchProcessType => selExprDescs.mkString(", ") + case StreamingProcessType => (s"`${InternalColumns.tmst}`" +: selExprDescs).mkString(", ") + } + val groupByClauseOpt = analyzer.groupbyExprOpt + val groupbyClause = procType match { + case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("") + case StreamingProcessType => { + val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${InternalColumns.tmst}`") :: Nil, None) + val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt match { + case Some(gbc) => gbc + case _ => GroupbyClause(Nil, None) + }) + mergedGroubbyClause.desc + } + } + val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ") + val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ") + + // 1. select statement + val profilingSql = { + s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" + } + val profilingName = name + val profilingStep = SparkSqlStep(profilingName, profilingSql, details) + val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) + val profilingExports = genMetricExport(metricParam, name, profilingName, ct, mode) :: Nil + + RulePlan(profilingStep :: Nil, profilingExports) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala new file mode 100644 index 0000000..915e654 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala @@ -0,0 +1,65 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.trans + +import org.apache.griffin.measure.process.ExportMode +import org.apache.griffin.measure.rule.dsl.CollectType +import org.apache.griffin.measure.rule.plan._ + +import org.apache.griffin.measure.utils.ParamUtil._ + +object RuleExportFactory { + + def genMetricExport(param: Map[String, Any], name: String, stepName: String, + defTimestamp: Long, mode: ExportMode + ): MetricExport = { + MetricExport( + ExportParamKeys.getName(param, name), + stepName, + ExportParamKeys.getCollectType(param), + defTimestamp, + mode + ) + } + def genRecordExport(param: Map[String, Any], name: String, stepName: String, + defTimestamp: Long, mode: ExportMode + ): RecordExport = { + RecordExport( + ExportParamKeys.getName(param, name), + stepName, + ExportParamKeys.getDataSourceCacheOpt(param), + ExportParamKeys.getOriginDFOpt(param), + defTimestamp, + mode + ) + } + +} + +object ExportParamKeys { + val _name = "name" + val _collectType = "collect.type" + val _dataSourceCache = "data.source.cache" + val _originDF = "origin.DF" + + def getName(param: Map[String, Any], defName: String): String = param.getString(_name, defName) + def getCollectType(param: Map[String, Any]): CollectType = CollectType(param.getString(_collectType, "")) + def getDataSourceCacheOpt(param: Map[String, Any]): Option[String] = param.get(_dataSourceCache).map(_.toString) + def getOriginDFOpt(param: Map[String, Any]): Option[String] = param.get(_originDF).map(_.toString) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala new file mode 100644 index 0000000..b7226ba --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala @@ -0,0 +1,57 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.trans + +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.process.ProcessType +import org.apache.griffin.measure.process.temp.TimeRange +import org.apache.griffin.measure.rule.dsl._ +import org.apache.griffin.measure.rule.dsl.expr.Expr +import org.apache.griffin.measure.rule.plan._ + +trait RulePlanTrans extends Loggable with Serializable { + + protected val emptyRulePlan = RulePlan(Nil, Nil) + protected val emptyMap = Map[String, Any]() + + def trans(): RulePlan + +} + +object RulePlanTrans { + private val emptyRulePlanTrans = new RulePlanTrans { + def trans(): RulePlan = emptyRulePlan + } + + def apply(dqType: DqType, + dsNames: Seq[String], + ti: TimeInfo, name: String, expr: Expr, + param: Map[String, Any], procType: ProcessType, + dsTimeRanges: Map[String, TimeRange] + ): RulePlanTrans = { + dqType match { + case AccuracyType => AccuracyRulePlanTrans(dsNames, ti, name, expr, param, procType) + case ProfilingType => ProfilingRulePlanTrans(dsNames, ti, name, expr, param, procType) + case UniquenessType => UniquenessRulePlanTrans(dsNames, ti, name, expr, param, procType) + case DistinctnessType => DistinctnessRulePlanTrans(dsNames, ti, name, expr, param, procType, dsTimeRanges) + case TimelinessType => TimelinessRulePlanTrans(dsNames, ti, name, expr, param, procType) + case _ => emptyRulePlanTrans + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala new file mode 100644 index 0000000..9a01553 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala @@ -0,0 +1,279 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.trans + +import org.apache.griffin.measure.process.temp.TableRegisters +import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, ProcessType, StreamingProcessType} +import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._ +import org.apache.griffin.measure.rule.adaptor._ +import org.apache.griffin.measure.rule.dsl.ArrayCollectType +import org.apache.griffin.measure.rule.dsl.analyzer.TimelinessAnalyzer +import org.apache.griffin.measure.rule.dsl.expr._ +import org.apache.griffin.measure.rule.plan._ +import org.apache.griffin.measure.rule.trans.RuleExportFactory._ +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.griffin.measure.utils.TimeUtil + +case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], + timeInfo: TimeInfo, name: String, expr: Expr, + param: Map[String, Any], procType: ProcessType + ) extends RulePlanTrans { + + private object TimelinessKeys { + val _source = "source" + val _latency = "latency" + val _total = "total" + val _avg = "avg" + val _threshold = "threshold" + val _step = "step" + val _count = "count" + val _stepSize = "step.size" + val _percentileColPrefix = "percentile" + val _percentileValues = "percentile.values" + } + import TimelinessKeys._ + + def trans(): RulePlan = { + val details = getDetails(param) + val timelinessClause = expr.asInstanceOf[TimelinessClause] + val sourceName = details.getString(_source, dataSourceNames.head) + + val mode = ExportMode.defaultMode(procType) + + val ct = timeInfo.calcTime + + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { + emptyRulePlan + } else { + val analyzer = TimelinessAnalyzer(timelinessClause, sourceName) + val btsSel = analyzer.btsExpr + val etsSelOpt = analyzer.etsExprOpt + + // 1. in time + val inTimeTableName = "__inTime" + val inTimeSql = etsSelOpt match { + case Some(etsSel) => { + s""" + |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`, + |(${etsSel}) AS `${InternalColumns.endTs}` + |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) IS NOT NULL + """.stripMargin + } + case _ => { + s""" + |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}` + |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL + """.stripMargin + } + } + val inTimeStep = SparkSqlStep(inTimeTableName, inTimeSql, emptyMap) + + // 2. latency + val latencyTableName = "__lat" + val latencyColName = details.getStringOrKey(_latency) + val etsColName = etsSelOpt match { + case Some(_) => InternalColumns.endTs + case _ => InternalColumns.tmst + } + val latencySql = { + s"SELECT *, (`${etsColName}` - `${InternalColumns.beginTs}`) AS `${latencyColName}` FROM `${inTimeTableName}`" + } + val latencyStep = SparkSqlStep(latencyTableName, latencySql, emptyMap, true) + + // 3. timeliness metric + val metricTableName = name + val totalColName = details.getStringOrKey(_total) + val avgColName = details.getStringOrKey(_avg) + val metricSql = procType match { + case BatchProcessType => { + s""" + |SELECT COUNT(*) AS `${totalColName}`, + |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}` + |FROM `${latencyTableName}` + """.stripMargin + } + case StreamingProcessType => { + s""" + |SELECT `${InternalColumns.tmst}`, + |COUNT(*) AS `${totalColName}`, + |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}` + |FROM `${latencyTableName}` + |GROUP BY `${InternalColumns.tmst}` + """.stripMargin + } + } + val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap) + val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) + val metricExports = genMetricExport(metricParam, name, metricTableName, ct, mode) :: Nil + + // current timeliness plan + val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil + val timeExports = metricExports + val timePlan = RulePlan(timeSteps, timeExports) + + // 4. timeliness record + val recordPlan = TimeUtil.milliseconds(details.getString(_threshold, "")) match { + case Some(tsh) => { + val recordTableName = "__lateRecords" + val recordSql = { + s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > ${tsh}" + } + val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap) + val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + val recordExports = genRecordExport(recordParam, recordTableName, recordTableName, ct, mode) :: Nil + RulePlan(recordStep :: Nil, recordExports) + } + case _ => emptyRulePlan + } + +// 5. ranges +// val rangePlan = details.get(_rangeSplit) match { +// case Some(arr: Seq[String]) => { +// val ranges = splitTimeRanges(arr) +// if (ranges.size > 0) { +// try { +// // 5.1. range +// val rangeTableName = "__range" +// val rangeColName = details.getStringOrKey(_range) +// val caseClause = { +// val whenClause = ranges.map { range => +// s"WHEN `${latencyColName}` < ${range._1} THEN '<${range._2}'" +// }.mkString("\n") +// s"CASE ${whenClause} ELSE '>=${ranges.last._2}' END AS `${rangeColName}`" +// } +// val rangeSql = { +// s"SELECT *, ${caseClause} FROM `${latencyTableName}`" +// } +// val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap) +// +// // 5.2. range metric +// val rangeMetricTableName = "__rangeMetric" +// val countColName = details.getStringOrKey(_count) +// val rangeMetricSql = procType match { +// case BatchProcessType => { +// s""" +// |SELECT `${rangeColName}`, COUNT(*) AS `${countColName}` +// |FROM `${rangeTableName}` GROUP BY `${rangeColName}` +// """.stripMargin +// } +// case StreamingProcessType => { +// s""" +// |SELECT `${InternalColumns.tmst}`, `${rangeColName}`, COUNT(*) AS `${countColName}` +// |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, `${rangeColName}` +// """.stripMargin +// } +// } +// val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap) +// val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) +// val rangeMetricExports = genMetricExport(rangeMetricParam, rangeColName, rangeMetricTableName, ct, mode) :: Nil +// +// RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports) +// } catch { +// case _: Throwable => emptyRulePlan +// } +// } else emptyRulePlan +// } +// case _ => emptyRulePlan +// } + +// return timeliness plan + + // 5. ranges + val rangePlan = TimeUtil.milliseconds(details.getString(_stepSize, "")) match { + case Some(stepSize) => { + // 5.1 range + val rangeTableName = "__range" + val stepColName = details.getStringOrKey(_step) + val rangeSql = { + s""" + |SELECT *, CAST((`${latencyColName}` / ${stepSize}) AS BIGINT) AS `${stepColName}` + |FROM `${latencyTableName}` + """.stripMargin + } + val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap) + + // 5.2 range metric + val rangeMetricTableName = "__rangeMetric" + val countColName = details.getStringOrKey(_count) + val rangeMetricSql = procType match { + case BatchProcessType => { + s""" + |SELECT `${stepColName}`, COUNT(*) AS `${countColName}` + |FROM `${rangeTableName}` GROUP BY `${stepColName}` + """.stripMargin + } + case StreamingProcessType => { + s""" + |SELECT `${InternalColumns.tmst}`, `${stepColName}`, COUNT(*) AS `${countColName}` + |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, `${stepColName}` + """.stripMargin + } + } + val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap) + val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) + val rangeMetricExports = genMetricExport(rangeMetricParam, stepColName, rangeMetricTableName, ct, mode) :: Nil + + RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports) + } + case _ => emptyRulePlan + } + + // 6. percentiles + val percentiles = getPercentiles(details) + val percentilePlan = if (percentiles.size > 0) { + val percentileTableName = "__percentile" + val percentileColName = details.getStringOrKey(_percentileColPrefix) + val percentileCols = percentiles.map { pct => + val pctName = (pct * 100).toInt.toString + s"floor(percentile_approx(${latencyColName}, ${pct})) AS `${percentileColName}_${pctName}`" + }.mkString(", ") + val percentileSql = procType match { + case BatchProcessType => { + s""" + |SELECT ${percentileCols} + |FROM `${latencyTableName}` + """.stripMargin + } + case StreamingProcessType => { + s""" + |SELECT `${InternalColumns.tmst}`, ${percentileCols} + |FROM `${latencyTableName}` GROUP BY `${InternalColumns.tmst}` + """.stripMargin + } + } + val percentileStep = SparkSqlStep(percentileTableName, percentileSql, emptyMap) + val percentileParam = emptyMap + val percentielExports = genMetricExport(percentileParam, percentileTableName, percentileTableName, ct, mode) :: Nil + + RulePlan(percentileStep :: Nil, percentielExports) + } else emptyRulePlan + + timePlan.merge(recordPlan).merge(rangePlan).merge(percentilePlan) + } + } + + private def getPercentiles(details: Map[String, Any]): Seq[Double] = { +// details.get(_percentiles) match { +// case Some(seq: Seq[Double]) => seq +// case _ => Nil +// } + details.getArr[Double](_percentileValues).filter(d => (d >= 0 && d <= 1)) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala new file mode 100644 index 0000000..326d80b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala @@ -0,0 +1,198 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.trans + +import org.apache.griffin.measure.process._ +import org.apache.griffin.measure.process.temp._ +import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._ +import org.apache.griffin.measure.rule.adaptor._ +import org.apache.griffin.measure.rule.dsl.analyzer.UniquenessAnalyzer +import org.apache.griffin.measure.rule.dsl.expr._ +import org.apache.griffin.measure.rule.dsl._ +import org.apache.griffin.measure.rule.plan._ +import org.apache.griffin.measure.rule.trans.RuleExportFactory._ +import org.apache.griffin.measure.utils.ParamUtil._ + +case class UniquenessRulePlanTrans(dataSourceNames: Seq[String], + timeInfo: TimeInfo, name: String, expr: Expr, + param: Map[String, Any], procType: ProcessType + ) extends RulePlanTrans { + + private object UniquenessKeys { + val _source = "source" + val _target = "target" + val _unique = "unique" + val _total = "total" + val _dup = "dup" + val _num = "num" + + val _duplicationArray = "duplication.array" + } + import UniquenessKeys._ + + def trans(): RulePlan = { + val details = getDetails(param) + val sourceName = details.getString(_source, dataSourceNames.head) + val targetName = details.getString(_target, dataSourceNames.tail.head) + val analyzer = UniquenessAnalyzer(expr.asInstanceOf[UniquenessClause], sourceName, targetName) + + val mode = ExportMode.defaultMode(procType) + + val ct = timeInfo.calcTime + + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { + println(s"[${ct}] data source ${sourceName} not exists") + emptyRulePlan + } else if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) { + println(s"[${ct}] data source ${targetName} not exists") + emptyRulePlan + } else { + val selItemsClause = analyzer.selectionPairs.map { pair => + val (expr, alias) = pair + s"${expr.desc} AS `${alias}`" + }.mkString(", ") + val aliases = analyzer.selectionPairs.map(_._2) + + val selClause = procType match { + case BatchProcessType => selItemsClause + case StreamingProcessType => s"`${InternalColumns.tmst}`, ${selItemsClause}" + } + val selAliases = procType match { + case BatchProcessType => aliases + case StreamingProcessType => InternalColumns.tmst +: aliases + } + + // 1. source distinct mapping + val sourceTableName = "__source" + val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}" + val sourceStep = SparkSqlStep(sourceTableName, sourceSql, emptyMap) + + // 2. target mapping + val targetTableName = "__target" + val targetSql = s"SELECT ${selClause} FROM ${targetName}" + val targetStep = SparkSqlStep(targetTableName, targetSql, emptyMap) + + // 3. joined + val joinedTableName = "__joined" + val joinedSelClause = selAliases.map { alias => + s"`${sourceTableName}`.`${alias}` AS `${alias}`" + }.mkString(", ") + val onClause = aliases.map { alias => + s"coalesce(`${sourceTableName}`.`${alias}`, '') = coalesce(`${targetTableName}`.`${alias}`, '')" + }.mkString(" AND ") + val joinedSql = { + s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN `${sourceTableName}` ON ${onClause}" + } + val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap) + + // 4. group + val groupTableName = "__group" + val groupSelClause = selAliases.map { alias => + s"`${alias}`" + }.mkString(", ") + val dupColName = details.getStringOrKey(_dup) + val groupSql = { + s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM `${joinedTableName}` GROUP BY ${groupSelClause}" + } + val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap, true) + + // 5. total metric + val totalTableName = "__totalMetric" + val totalColName = details.getStringOrKey(_total) + val totalSql = procType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" + case StreamingProcessType => { + s""" + |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` + |FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}` + """.stripMargin + } + } + val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap) + val totalMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) + val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, ct, mode) + + // 6. unique record + val uniqueRecordTableName = "__uniqueRecord" + val uniqueRecordSql = { + s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0" + } + val uniqueRecordStep = SparkSqlStep(uniqueRecordTableName, uniqueRecordSql, emptyMap) + + // 7. unique metric + val uniqueTableName = "__uniqueMetric" + val uniqueColName = details.getStringOrKey(_unique) + val uniqueSql = procType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`" + case StreamingProcessType => { + s""" + |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${uniqueColName}` + |FROM `${uniqueRecordTableName}` GROUP BY `${InternalColumns.tmst}` + """.stripMargin + } + } + val uniqueStep = SparkSqlStep(uniqueTableName, uniqueSql, emptyMap) + val uniqueMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) + val uniqueMetricExport = genMetricExport(uniqueMetricParam, uniqueColName, uniqueTableName, ct, mode) + + val uniqueSteps = sourceStep :: targetStep :: joinedStep :: groupStep :: + totalStep :: uniqueRecordStep :: uniqueStep :: Nil + val uniqueExports = totalMetricExport :: uniqueMetricExport :: Nil + val uniqueRulePlan = RulePlan(uniqueSteps, uniqueExports) + + val duplicationArrayName = details.getString(_duplicationArray, "") + val dupRulePlan = if (duplicationArrayName.nonEmpty) { + // 8. duplicate record + val dupRecordTableName = "__dupRecords" + val dupRecordSql = { + s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0" + } + val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) + val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, dupRecordTableName, ct, mode) + + // 9. duplicate metric + val dupMetricTableName = "__dupMetric" + val numColName = details.getStringOrKey(_num) + val dupMetricSelClause = procType match { + case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`" + case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`" + } + val dupMetricGroupbyClause = procType match { + case BatchProcessType => s"`${dupColName}`" + case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`" + } + val dupMetricSql = { + s""" + |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}` + |GROUP BY ${dupMetricGroupbyClause} + """.stripMargin + } + val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) + val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) + val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, ct, mode) + + RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: dupMetricExport :: Nil) + } else emptyRulePlan + + uniqueRulePlan.merge(dupRulePlan) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala new file mode 100644 index 0000000..cb00641 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala @@ -0,0 +1,29 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.udf + +import org.apache.spark.sql.SQLContext + +object GriffinUdafs { + + def register(sqlContext: SQLContext): Unit = { +// sqlContext.udf.register("my_mean", new MeanUdaf) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala new file mode 100644 index 0000000..80b3a02 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala @@ -0,0 +1,58 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.udf + +import org.apache.spark.sql.Row +import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} +import org.apache.spark.sql.types._ + +class MeanUdaf extends UserDefinedAggregateFunction { + def inputSchema: StructType = StructType(Array(StructField("item", LongType))) + + def bufferSchema = StructType(Array( + StructField("sum", DoubleType), + StructField("cnt", LongType) + )) + + def dataType: DataType = DoubleType + + def deterministic: Boolean = true + + def initialize(buffer: MutableAggregationBuffer): Unit = { + buffer.update(0, 0.toDouble) + buffer.update(1, 0L) + } + + def update(buffer: MutableAggregationBuffer, input: Row): Unit = { + val sum = buffer.getDouble(0) + val cnt = buffer.getLong(1) + val value = input.getLong(0) + buffer.update(0, sum + value) + buffer.update(1, cnt + 1) + } + + def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { + buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0)) + buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1)) + } + + def evaluate(buffer: Row): Any = { + buffer.getDouble(0) / buffer.getLong(1).toDouble + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala index 1ca32b3..d125d87 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala @@ -185,6 +185,17 @@ object ParamUtil { } } + def getArr[T](key: String): Seq[T] = { + try { + params.get(key) match { + case Some(seq: Seq[T]) => seq + case _ => Nil + } + } catch { + case _: Throwable => Nil + } + } + def addIfNotExist(key: String, value: Any): Map[String, Any] = { params.get(key) match { case Some(v) => params http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/test/resources/_accuracy-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json b/measure/src/test/resources/_accuracy-streaming-griffindsl.json index da010d7..a0e2e7d 100644 --- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json +++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json @@ -40,11 +40,13 @@ } ], "cache": { + "type": "parquet", "file.path": "hdfs://localhost/griffin/streaming/dump/source", "info.path": "source", "ready.time.interval": "10s", "ready.time.delay": "0", - "time.range": ["-2m", "0"] + "time.range": ["-2m", "0"], + "updatable": true } }, { "name": "target", @@ -81,6 +83,7 @@ } ], "cache": { + "type": "parquet", "file.path": "hdfs://localhost/griffin/streaming/dump/target", "info.path": "target", "ready.time.interval": "10s", @@ -108,8 +111,7 @@ "name": "accu" }, "record": { - "name": "missRecords", - "data.source.cache": "source" + "name": "missRecords" } } ] http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/test/resources/_timeliness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json b/measure/src/test/resources/_timeliness-batch-griffindsl.json index bd48401..90439df 100644 --- a/measure/src/test/resources/_timeliness-batch-griffindsl.json +++ b/measure/src/test/resources/_timeliness-batch-griffindsl.json @@ -28,10 +28,14 @@ "details": { "source": "source", "latency": "latency", + "total": "total", + "avg": "avg", "threshold": "3m", "step": "step", "count": "cnt", - "step.size": "2m" + "step.size": "2m", + "percentile": "percentile", + "percentile.values": [0.95] }, "metric": { "name": "timeliness" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/test/resources/_timeliness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json b/measure/src/test/resources/_timeliness-streaming-griffindsl.json index fbaf8d4..5916e5c 100644 --- a/measure/src/test/resources/_timeliness-streaming-griffindsl.json +++ b/measure/src/test/resources/_timeliness-streaming-griffindsl.json @@ -63,7 +63,9 @@ "threshold": "1h", "step": "step", "count": "cnt", - "step.size": "5m" + "step.size": "5m", + "percentile": "percentile", + "percentile.values": [0.2, 0.5, 0.8] }, "metric": { "name": "timeliness"