http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala new file mode 100644 index 0000000..3482955 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala @@ -0,0 +1,358 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform + +import org.apache.griffin.measure.configuration.enums.{ArrayNormalizeType, EntriesNormalizeType, ProcessType, StreamingProcessType} +import org.apache.griffin.measure.configuration.params.RuleParam +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.DQStep +import org.apache.griffin.measure.step.builder.ConstantColumns +import org.apache.griffin.measure.step.builder.dsl.expr.{DistinctnessClause, _} +import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.DistinctnessAnalyzer +import org.apache.griffin.measure.step.transform.SparkSqlTransformStep +import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, MetricWriteStep, RecordWriteStep} +import org.apache.griffin.measure.utils.ParamUtil._ + +/** + * generate distinctness dq steps + */ +case class DistinctnessExpr2DQSteps(context: DQContext, + expr: Expr, + ruleParam: RuleParam + ) extends Expr2DQSteps { + + private object DistinctnessKeys { + val _source = "source" + val _target = "target" + val _distinct = "distinct" + val _total = "total" + val _dup = "dup" + val _accu_dup = "accu_dup" + val _num = "num" + + val _duplicationArray = "duplication.array" + val _withAccumulate = "with.accumulate" + + val _recordEnable = "record.enable" + } + import DistinctnessKeys._ + + def getDQSteps(): Seq[DQStep] = { + val details = ruleParam.getDetails + val distinctnessExpr = expr.asInstanceOf[DistinctnessClause] + + val sourceName = details.getString(_source, context.getDataSourceName(0)) + val targetName = details.getString(_target, context.getDataSourceName(1)) + val analyzer = DistinctnessAnalyzer(distinctnessExpr, sourceName) + + val procType = context.procType + val timestamp = context.contextId.timestamp + val dsTimeRanges = context.dataSourceTimeRanges + + val beginTmst = dsTimeRanges.get(sourceName).map(_.begin) match { + case Some(t) => t + case _ => throw new Exception(s"empty begin tmst from ${sourceName}") + } + val endTmst = dsTimeRanges.get(sourceName).map(_.end) match { + case Some(t) => t + case _ => throw new Exception(s"empty end tmst from ${sourceName}") + } + + val writeTimestampOpt = Some(endTmst) + + if (!context.runTimeTableRegister.existsTable(sourceName)) { + warn(s"[${timestamp}] data source ${sourceName} not exists") + Nil + } else { + val withOlderTable = { + details.getBoolean(_withAccumulate, true) && + context.runTimeTableRegister.existsTable(targetName) + } + + val selClause = analyzer.selectionPairs.map { pair => + val (expr, alias, _) = pair + s"${expr.desc} AS `${alias}`" + }.mkString(", ") + val distAliases = analyzer.selectionPairs.filter(_._3).map(_._2) + val distAliasesClause = distAliases.map( a => s"`${a}`" ).mkString(", ") + val allAliases = analyzer.selectionPairs.map(_._2) + val allAliasesClause = allAliases.map( a => s"`${a}`" ).mkString(", ") + val groupAliases = analyzer.selectionPairs.filter(!_._3).map(_._2) + val groupAliasesClause = groupAliases.map( a => s"`${a}`" ).mkString(", ") + + // 1. source alias + val sourceAliasTableName = "__sourceAlias" + val sourceAliasSql = { + s"SELECT ${selClause} FROM `${sourceName}`" + } + val sourceAliasTransStep = SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true) + + // 2. total metric + val totalTableName = "__totalMetric" + val totalColName = details.getStringOrKey(_total) + val totalSql = { + s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`" + } + val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap) + val totalMetricWriteStep = { + MetricWriteStep(totalColName, totalTableName, EntriesNormalizeType, writeTimestampOpt) + } + + // 3. group by self + val selfGroupTableName = "__selfGroup" + val dupColName = details.getStringOrKey(_dup) + val accuDupColName = details.getStringOrKey(_accu_dup) + val selfGroupSql = { + s""" + |SELECT ${distAliasesClause}, (COUNT(*) - 1) AS `${dupColName}`, + |TRUE AS `${ConstantColumns.distinct}` + |FROM `${sourceAliasTableName}` GROUP BY ${distAliasesClause} + """.stripMargin + } + val selfGroupTransStep = SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true) + + val transSteps1 = sourceAliasTransStep :: totalTransStep :: selfGroupTransStep :: Nil + val writeSteps1 = totalMetricWriteStep :: Nil + + val ((transSteps2, writeSteps2), dupCountTableName) = procType match { + case StreamingProcessType if (withOlderTable) => { + // 4.0 update old data + val targetDsUpdateWriteStep = DsCacheUpdateWriteStep(targetName, targetName) + + // 4. older alias + val olderAliasTableName = "__older" + val olderAliasSql = { + s"SELECT ${selClause} FROM `${targetName}` WHERE `${ConstantColumns.tmst}` <= ${beginTmst}" + } + val olderAliasTransStep = SparkSqlTransformStep(olderAliasTableName, olderAliasSql, emptyMap) + + // 5. join with older data + val joinedTableName = "__joined" + val selfSelClause = (distAliases :+ dupColName).map { alias => + s"`${selfGroupTableName}`.`${alias}`" + }.mkString(", ") + val onClause = distAliases.map { alias => + s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = coalesce(`${olderAliasTableName}`.`${alias}`, '')" + }.mkString(" AND ") + val olderIsNull = distAliases.map { alias => + s"`${olderAliasTableName}`.`${alias}` IS NULL" + }.mkString(" AND ") + val joinedSql = { + s""" + |SELECT ${selfSelClause}, (${olderIsNull}) AS `${ConstantColumns.distinct}` + |FROM `${olderAliasTableName}` RIGHT JOIN `${selfGroupTableName}` + |ON ${onClause} + """.stripMargin + } + val joinedTransStep = SparkSqlTransformStep(joinedTableName, joinedSql, emptyMap) + + // 6. group by joined data + val groupTableName = "__group" + val moreDupColName = "_more_dup" + val groupSql = { + s""" + |SELECT ${distAliasesClause}, `${dupColName}`, `${ConstantColumns.distinct}`, + |COUNT(*) AS `${moreDupColName}` + |FROM `${joinedTableName}` + |GROUP BY ${distAliasesClause}, `${dupColName}`, `${ConstantColumns.distinct}` + """.stripMargin + } + val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap) + + // 7. final duplicate count + val finalDupCountTableName = "__finalDupCount" + /** + * dupColName: the duplicate count of duplicated items only occurs in new data, + * which means the distinct one in new data is also duplicate + * accuDupColName: the count of duplicated items accumulated in new data and old data, + * which means the accumulated distinct count in all data + * e.g.: new data [A, A, B, B, C, D], old data [A, A, B, C] + * selfGroupTable will be (A, 1, F), (B, 1, F), (C, 0, T), (D, 0, T) + * joinedTable will be (A, 1, F), (A, 1, F), (B, 1, F), (C, 0, F), (D, 0, T) + * groupTable will be (A, 1, F, 2), (B, 1, F, 1), (C, 0, F, 1), (D, 0, T, 1) + * finalDupCountTable will be (A, F, 2, 3), (B, F, 2, 2), (C, F, 1, 1), (D, T, 0, 0) + * The distinct result of new data only should be: (A, 2), (B, 2), (C, 1), (D, 0), + * which means in new data [A, A, B, B, C, D], [A, A, B, B, C] are all duplicated, only [D] is distinct + */ + val finalDupCountSql = { + s""" + |SELECT ${distAliasesClause}, `${ConstantColumns.distinct}`, + |CASE WHEN `${ConstantColumns.distinct}` THEN `${dupColName}` + |ELSE (`${dupColName}` + 1) END AS `${dupColName}`, + |CASE WHEN `${ConstantColumns.distinct}` THEN `${dupColName}` + |ELSE (`${dupColName}` + `${moreDupColName}`) END AS `${accuDupColName}` + |FROM `${groupTableName}` + """.stripMargin + } + val finalDupCountTransStep = SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true) + + ((olderAliasTransStep :: joinedTransStep :: groupTransStep :: finalDupCountTransStep :: Nil, + targetDsUpdateWriteStep :: Nil), finalDupCountTableName) + } + case _ => { + ((Nil, Nil), selfGroupTableName) + } + } + + // 8. distinct metric + val distTableName = "__distMetric" + val distColName = details.getStringOrKey(_distinct) + val distSql = { + s""" + |SELECT COUNT(*) AS `${distColName}` + |FROM `${dupCountTableName}` WHERE `${ConstantColumns.distinct}` + """.stripMargin + } + val distTransStep = SparkSqlTransformStep(distTableName, distSql, emptyMap) + val distMetricWriteStep = { + MetricWriteStep(distColName, distTableName, EntriesNormalizeType, writeTimestampOpt) + } + + val transSteps3 = distTransStep :: Nil + val writeSteps3 = distMetricWriteStep :: Nil + + val duplicationArrayName = details.getString(_duplicationArray, "") + val (transSteps4, writeSteps4) = if (duplicationArrayName.nonEmpty) { + val recordEnable = details.getBoolean(_recordEnable, false) + if (groupAliases.size > 0) { + // with some group by requirement + // 9. origin data join with distinct information + val informedTableName = "__informed" + val onClause = distAliases.map { alias => + s"coalesce(`${sourceAliasTableName}`.`${alias}`, '') = coalesce(`${dupCountTableName}`.`${alias}`, '')" + }.mkString(" AND ") + val informedSql = { + s""" + |SELECT `${sourceAliasTableName}`.*, + |`${dupCountTableName}`.`${dupColName}` AS `${dupColName}`, + |`${dupCountTableName}`.`${ConstantColumns.distinct}` AS `${ConstantColumns.distinct}` + |FROM `${sourceAliasTableName}` LEFT JOIN `${dupCountTableName}` + |ON ${onClause} + """.stripMargin + } + val informedTransStep = SparkSqlTransformStep(informedTableName, informedSql, emptyMap) + + // 10. add row number + val rnTableName = "__rowNumber" + val rnDistClause = distAliasesClause + val rnSortClause = s"SORT BY `${ConstantColumns.distinct}`" + val rnSql = { + s""" + |SELECT *, + |ROW_NUMBER() OVER (DISTRIBUTE BY ${rnDistClause} ${rnSortClause}) `${ConstantColumns.rowNumber}` + |FROM `${informedTableName}` + """.stripMargin + } + val rnTransStep = SparkSqlTransformStep(rnTableName, rnSql, emptyMap) + + // 11. recognize duplicate items + val dupItemsTableName = "__dupItems" + val dupItemsSql = { + s""" + |SELECT ${allAliasesClause}, `${dupColName}` FROM `${rnTableName}` + |WHERE NOT `${ConstantColumns.distinct}` OR `${ConstantColumns.rowNumber}` > 1 + """.stripMargin + } + val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap) + val dupItemsWriteStep = { + val rwName = ruleParam.recordOpt.map(_.name).getOrElse(dupItemsTableName) + RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt) + } + + // 12. group by dup Record metric + val groupDupMetricTableName = "__groupDupMetric" + val numColName = details.getStringOrKey(_num) + val groupSelClause = groupAliasesClause + val groupDupMetricSql = { + s""" + |SELECT ${groupSelClause}, `${dupColName}`, COUNT(*) AS `${numColName}` + |FROM `${dupItemsTableName}` GROUP BY ${groupSelClause}, `${dupColName}` + """.stripMargin + } + val groupDupMetricTransStep = SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap) + val groupDupMetricWriteStep = { + MetricWriteStep(duplicationArrayName, groupDupMetricTableName, ArrayNormalizeType, writeTimestampOpt) + } + + val msteps = { + informedTransStep :: rnTransStep :: dupItemsTransStep :: groupDupMetricTransStep :: Nil + } + val wsteps = if (recordEnable) { + dupItemsWriteStep :: groupDupMetricWriteStep :: Nil + } else { + groupDupMetricWriteStep :: Nil + } + + (msteps, wsteps) + + } else { + // no group by requirement + // 9. duplicate record + val dupRecordTableName = "__dupRecords" + val dupRecordSelClause = procType match { + case StreamingProcessType if (withOlderTable) => s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`" + case _ => s"${distAliasesClause}, `${dupColName}`" + } + val dupRecordSql = { + s""" + |SELECT ${dupRecordSelClause} + |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0 + """.stripMargin + } + val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true) + val dupRecordWriteStep = { + val rwName = ruleParam.recordOpt.map(_.name).getOrElse(dupRecordTableName) + RecordWriteStep(rwName, dupRecordTableName, None, writeTimestampOpt) + } + + // 10. duplicate metric + val dupMetricTableName = "__dupMetric" + val numColName = details.getStringOrKey(_num) + val dupMetricSql = { + s""" + |SELECT `${dupColName}`, COUNT(*) AS `${numColName}` + |FROM `${dupRecordTableName}` GROUP BY `${dupColName}` + """.stripMargin + } + val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap) + val dupMetricWriteStep = { + MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayNormalizeType, writeTimestampOpt) + } + + val msteps = { + dupRecordTransStep :: dupMetricTransStep :: Nil + } + val wsteps = if (recordEnable) { + dupRecordWriteStep :: dupMetricWriteStep :: Nil + } else { + dupMetricWriteStep :: Nil + } + + (msteps, wsteps) + } + } else (Nil, Nil) + + // full steps + transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4 ++ + writeSteps1 ++ writeSteps2 ++ writeSteps3 ++ writeSteps4 + + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala new file mode 100644 index 0000000..509e678 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala @@ -0,0 +1,59 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.configuration.enums._ +import org.apache.griffin.measure.configuration.params.RuleParam +import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange} +import org.apache.griffin.measure.step.DQStep +import org.apache.griffin.measure.step.builder.dsl.expr.Expr + +trait Expr2DQSteps extends Loggable with Serializable { + + protected val emtptDQSteps = Seq[DQStep]() + protected val emptyMap = Map[String, Any]() + + def getDQSteps(): Seq[DQStep] + +} + +/** + * get dq steps generator for griffin dsl rule + */ +object Expr2DQSteps { + private val emtptExpr2DQSteps = new Expr2DQSteps { + def getDQSteps(): Seq[DQStep] = emtptDQSteps + } + + def apply(context: DQContext, + expr: Expr, + ruleParam: RuleParam + ): Expr2DQSteps = { + ruleParam.getDqType match { + case AccuracyType => AccuracyExpr2DQSteps(context, expr, ruleParam) + case ProfilingType => ProfilingExpr2DQSteps(context, expr, ruleParam) + case UniquenessType => UniquenessExpr2DQSteps(context, expr, ruleParam) + case DistinctnessType => DistinctnessExpr2DQSteps(context, expr, ruleParam) + case TimelinessType => TimelinessExpr2DQSteps(context, expr, ruleParam) + case CompletenessType => CompletenessExpr2DQSteps(context, expr, ruleParam) + case _ => emtptExpr2DQSteps + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala new file mode 100644 index 0000000..b4da7eb --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala @@ -0,0 +1,105 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform + +import org.apache.griffin.measure.configuration.enums.{BatchProcessType, NormalizeType, StreamingProcessType} +import org.apache.griffin.measure.configuration.params.RuleParam +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.DQStep +import org.apache.griffin.measure.step.builder.ConstantColumns +import org.apache.griffin.measure.step.builder.dsl.expr._ +import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.ProfilingAnalyzer +import org.apache.griffin.measure.step.transform.SparkSqlTransformStep +import org.apache.griffin.measure.step.write.MetricWriteStep +import org.apache.griffin.measure.utils.ParamUtil._ + +/** + * generate profiling dq steps + */ +case class ProfilingExpr2DQSteps(context: DQContext, + expr: Expr, + ruleParam: RuleParam + ) extends Expr2DQSteps { + + private object ProfilingKeys { + val _source = "source" + } + import ProfilingKeys._ + + def getDQSteps(): Seq[DQStep] = { + val details = ruleParam.getDetails + val profilingExpr = expr.asInstanceOf[ProfilingClause] + + val sourceName = profilingExpr.fromClauseOpt match { + case Some(fc) => fc.dataSource + case _ => details.getString(_source, context.getDataSourceName(0)) + } + val fromClause = profilingExpr.fromClauseOpt.getOrElse(FromClause(sourceName)).desc + + val procType = context.procType + val timestamp = context.contextId.timestamp + + if (!context.runTimeTableRegister.existsTable(sourceName)) { + warn(s"[${timestamp}] data source ${sourceName} not exists") + Nil + } else { + val analyzer = ProfilingAnalyzer(profilingExpr, sourceName) + val selExprDescs = analyzer.selectionExprs.map { sel => + val alias = sel match { + case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`" + case _ => "" + } + s"${sel.desc}${alias}" + } + val selCondition = profilingExpr.selectClause.extraConditionOpt.map(_.desc).mkString + val selClause = procType match { + case BatchProcessType => selExprDescs.mkString(", ") + case StreamingProcessType => (s"`${ConstantColumns.tmst}`" +: selExprDescs).mkString(", ") + } + val groupByClauseOpt = analyzer.groupbyExprOpt + val groupbyClause = procType match { + case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("") + case StreamingProcessType => { + val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${ConstantColumns.tmst}`") :: Nil, None) + val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt match { + case Some(gbc) => gbc + case _ => GroupbyClause(Nil, None) + }) + mergedGroubbyClause.desc + } + } + val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ") + val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ") + + // 1. select statement + val profilingSql = { + s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" + } + val profilingName = ruleParam.name + val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details) + val profilingMetricWriteStep = { + val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name) + val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse("")) + MetricWriteStep(mwName, profilingName, collectType) + } + profilingTransStep :: profilingMetricWriteStep :: Nil + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala new file mode 100644 index 0000000..a56937c --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala @@ -0,0 +1,234 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform + +import org.apache.griffin.measure.configuration.enums._ +import org.apache.griffin.measure.configuration.params.RuleParam +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.DQStep +import org.apache.griffin.measure.step.builder.ConstantColumns +import org.apache.griffin.measure.step.builder.dsl.expr._ +import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.TimelinessAnalyzer +import org.apache.griffin.measure.step.transform.SparkSqlTransformStep +import org.apache.griffin.measure.step.write.{MetricWriteStep, RecordWriteStep} +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.griffin.measure.utils.TimeUtil + +/** + * generate timeliness dq steps + */ +case class TimelinessExpr2DQSteps(context: DQContext, + expr: Expr, + ruleParam: RuleParam + ) extends Expr2DQSteps { + + private object TimelinessKeys { + val _source = "source" + val _latency = "latency" + val _total = "total" + val _avg = "avg" + val _threshold = "threshold" + val _step = "step" + val _count = "count" + val _stepSize = "step.size" + val _percentileColPrefix = "percentile" + val _percentileValues = "percentile.values" + } + import TimelinessKeys._ + + def getDQSteps(): Seq[DQStep] = { + val details = ruleParam.getDetails + val timelinessExpr = expr.asInstanceOf[TimelinessClause] + + val sourceName = details.getString(_source, context.getDataSourceName(0)) + + val procType = context.procType + val timestamp = context.contextId.timestamp + val dsTimeRanges = context.dataSourceTimeRanges + + val minTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.minTmstOpt) + val minTmst = minTmstOpt match { + case Some(t) => t + case _ => throw new Exception(s"empty min tmst from ${sourceName}") + } + + if (!context.runTimeTableRegister.existsTable(sourceName)) { + warn(s"[${timestamp}] data source ${sourceName} not exists") + Nil + } else { + val analyzer = TimelinessAnalyzer(timelinessExpr, sourceName) + val btsSel = analyzer.btsExpr + val etsSelOpt = analyzer.etsExprOpt + + // 1. in time + val inTimeTableName = "__inTime" + val inTimeSql = etsSelOpt match { + case Some(etsSel) => { + s""" + |SELECT *, (${btsSel}) AS `${ConstantColumns.beginTs}`, + |(${etsSel}) AS `${ConstantColumns.endTs}` + |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) IS NOT NULL + """.stripMargin + } + case _ => { + s""" + |SELECT *, (${btsSel}) AS `${ConstantColumns.beginTs}` + |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL + """.stripMargin + } + } + val inTimeTransStep = SparkSqlTransformStep(inTimeTableName, inTimeSql, emptyMap) + + // 2. latency + val latencyTableName = "__lat" + val latencyColName = details.getStringOrKey(_latency) + val etsColName = etsSelOpt match { + case Some(_) => ConstantColumns.endTs + case _ => ConstantColumns.tmst + } + val latencySql = { + s"SELECT *, (`${etsColName}` - `${ConstantColumns.beginTs}`) AS `${latencyColName}` FROM `${inTimeTableName}`" + } + val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true) + + // 3. timeliness metric + val metricTableName = ruleParam.name + val totalColName = details.getStringOrKey(_total) + val avgColName = details.getStringOrKey(_avg) + val metricSql = procType match { + case BatchProcessType => { + s""" + |SELECT COUNT(*) AS `${totalColName}`, + |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}` + |FROM `${latencyTableName}` + """.stripMargin + } + case StreamingProcessType => { + s""" + |SELECT `${ConstantColumns.tmst}`, + |COUNT(*) AS `${totalColName}`, + |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}` + |FROM `${latencyTableName}` + |GROUP BY `${ConstantColumns.tmst}` + """.stripMargin + } + } + val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap) + val metricWriteStep = { + val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name) + val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse("")) + MetricWriteStep(mwName, metricTableName, collectType) + } + + // current steps + val transSteps1 = inTimeTransStep :: latencyTransStep :: metricTransStep :: Nil + val writeSteps1 = metricWriteStep :: Nil + + // 4. timeliness record + val (transSteps2, writeSteps2) = TimeUtil.milliseconds(details.getString(_threshold, "")) match { + case Some(tsh) => { + val recordTableName = "__lateRecords" + val recordSql = { + s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > ${tsh}" + } + val recordTransStep = SparkSqlTransformStep(recordTableName, recordSql, emptyMap) + val recordWriteStep = { + val rwName = ruleParam.recordOpt.map(_.name).getOrElse(recordTableName) + RecordWriteStep(rwName, recordTableName, None) + } + (recordTransStep :: Nil, recordWriteStep :: Nil) + } + case _ => (Nil, Nil) + } + + // 5. ranges + val (transSteps3, writeSteps3) = TimeUtil.milliseconds(details.getString(_stepSize, "")) match { + case Some(stepSize) => { + // 5.1 range + val rangeTableName = "__range" + val stepColName = details.getStringOrKey(_step) + val rangeSql = { + s""" + |SELECT *, CAST((`${latencyColName}` / ${stepSize}) AS BIGINT) AS `${stepColName}` + |FROM `${latencyTableName}` + """.stripMargin + } + val rangeTransStep = SparkSqlTransformStep(rangeTableName, rangeSql, emptyMap) + + // 5.2 range metric + val rangeMetricTableName = "__rangeMetric" + val countColName = details.getStringOrKey(_count) + val rangeMetricSql = procType match { + case BatchProcessType => { + s""" + |SELECT `${stepColName}`, COUNT(*) AS `${countColName}` + |FROM `${rangeTableName}` GROUP BY `${stepColName}` + """.stripMargin + } + case StreamingProcessType => { + s""" + |SELECT `${ConstantColumns.tmst}`, `${stepColName}`, COUNT(*) AS `${countColName}` + |FROM `${rangeTableName}` GROUP BY `${ConstantColumns.tmst}`, `${stepColName}` + """.stripMargin + } + } + val rangeMetricTransStep = SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap) + val rangeMetricWriteStep = { + MetricWriteStep(stepColName, rangeMetricTableName, ArrayNormalizeType) + } + + (rangeTransStep :: rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil) + } + case _ => (Nil, Nil) + } + + // 6. percentiles + val percentiles = getPercentiles(details) + val (transSteps4, writeSteps4) = if (percentiles.size > 0) { + val percentileTableName = "__percentile" + val percentileColName = details.getStringOrKey(_percentileColPrefix) + val percentileCols = percentiles.map { pct => + val pctName = (pct * 100).toInt.toString + s"floor(percentile_approx(${latencyColName}, ${pct})) AS `${percentileColName}_${pctName}`" + }.mkString(", ") + val percentileSql = { + s""" + |SELECT ${percentileCols} + |FROM `${latencyTableName}` + """.stripMargin + } + val percentileTransStep = SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap) + val percentileWriteStep = { + MetricWriteStep(percentileTableName, percentileTableName, DefaultNormalizeType) + } + + (percentileTransStep :: Nil, percentileWriteStep :: Nil) + } else (Nil, Nil) + + // full steps + transSteps1 ++ transSteps2 ++ transSteps3 ++ transSteps4 ++ + writeSteps1 ++ writeSteps2 ++ writeSteps3 ++ writeSteps4 + } + } + + private def getPercentiles(details: Map[String, Any]): Seq[Double] = { + details.getArr[Double](_percentileValues).filter(d => (d >= 0 && d <= 1)) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala new file mode 100644 index 0000000..9fecb6d --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala @@ -0,0 +1,204 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform + +import org.apache.griffin.measure.configuration.enums._ +import org.apache.griffin.measure.configuration.params.RuleParam +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.DQStep +import org.apache.griffin.measure.step.builder.ConstantColumns +import org.apache.griffin.measure.step.builder.dsl.expr._ +import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.UniquenessAnalyzer +import org.apache.griffin.measure.step.transform.SparkSqlTransformStep +import org.apache.griffin.measure.step.write.{MetricWriteStep, RecordWriteStep} +import org.apache.griffin.measure.utils.ParamUtil._ + +/** + * generate uniqueness dq steps + */ +case class UniquenessExpr2DQSteps(context: DQContext, + expr: Expr, + ruleParam: RuleParam + ) extends Expr2DQSteps { + + private object UniquenessKeys { + val _source = "source" + val _target = "target" + val _unique = "unique" + val _total = "total" + val _dup = "dup" + val _num = "num" + + val _duplicationArray = "duplication.array" + } + import UniquenessKeys._ + + def getDQSteps(): Seq[DQStep] = { + val details = ruleParam.getDetails + val uniquenessExpr = expr.asInstanceOf[UniquenessClause] + + val sourceName = details.getString(_source, context.getDataSourceName(0)) + val targetName = details.getString(_target, context.getDataSourceName(1)) + val analyzer = UniquenessAnalyzer(uniquenessExpr, sourceName, targetName) + + val procType = context.procType + val timestamp = context.contextId.timestamp + + if (!context.runTimeTableRegister.existsTable(sourceName)) { + warn(s"[${timestamp}] data source ${sourceName} not exists") + Nil + } else if (!context.runTimeTableRegister.existsTable(targetName)) { + println(s"[${timestamp}] data source ${targetName} not exists") + Nil + } else { + val selItemsClause = analyzer.selectionPairs.map { pair => + val (expr, alias) = pair + s"${expr.desc} AS `${alias}`" + }.mkString(", ") + val aliases = analyzer.selectionPairs.map(_._2) + + val selClause = procType match { + case BatchProcessType => selItemsClause + case StreamingProcessType => s"`${ConstantColumns.tmst}`, ${selItemsClause}" + } + val selAliases = procType match { + case BatchProcessType => aliases + case StreamingProcessType => ConstantColumns.tmst +: aliases + } + + // 1. source distinct mapping + val sourceTableName = "__source" + val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}" + val sourceTransStep = SparkSqlTransformStep(sourceTableName, sourceSql, emptyMap) + + // 2. target mapping + val targetTableName = "__target" + val targetSql = s"SELECT ${selClause} FROM ${targetName}" + val targetTransStep = SparkSqlTransformStep(targetTableName, targetSql, emptyMap) + + // 3. joined + val joinedTableName = "__joined" + val joinedSelClause = selAliases.map { alias => + s"`${sourceTableName}`.`${alias}` AS `${alias}`" + }.mkString(", ") + val onClause = aliases.map { alias => + s"coalesce(`${sourceTableName}`.`${alias}`, '') = coalesce(`${targetTableName}`.`${alias}`, '')" + }.mkString(" AND ") + val joinedSql = { + s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN `${sourceTableName}` ON ${onClause}" + } + val joinedTransStep = SparkSqlTransformStep(joinedTableName, joinedSql, emptyMap) + + // 4. group + val groupTableName = "__group" + val groupSelClause = selAliases.map { alias => + s"`${alias}`" + }.mkString(", ") + val dupColName = details.getStringOrKey(_dup) + val groupSql = { + s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM `${joinedTableName}` GROUP BY ${groupSelClause}" + } + val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap, true) + + // 5. total metric + val totalTableName = "__totalMetric" + val totalColName = details.getStringOrKey(_total) + val totalSql = procType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" + case StreamingProcessType => { + s""" + |SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` + |FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}` + """.stripMargin + } + } + val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap) + val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, EntriesNormalizeType) + + // 6. unique record + val uniqueRecordTableName = "__uniqueRecord" + val uniqueRecordSql = { + s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0" + } + val uniqueRecordTransStep = SparkSqlTransformStep(uniqueRecordTableName, uniqueRecordSql, emptyMap) + + // 7. unique metric + val uniqueTableName = "__uniqueMetric" + val uniqueColName = details.getStringOrKey(_unique) + val uniqueSql = procType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`" + case StreamingProcessType => { + s""" + |SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${uniqueColName}` + |FROM `${uniqueRecordTableName}` GROUP BY `${ConstantColumns.tmst}` + """.stripMargin + } + } + val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap) + val uniqueMetricWriteStep = MetricWriteStep(uniqueColName, uniqueTableName, EntriesNormalizeType) + + val transSteps1 = sourceTransStep :: targetTransStep :: joinedTransStep :: groupTransStep :: + totalTransStep :: uniqueRecordTransStep :: uniqueTransStep :: Nil + val writeSteps1 = totalMetricWriteStep :: uniqueMetricWriteStep :: Nil + + val duplicationArrayName = details.getString(_duplicationArray, "") + val (transSteps2, writeSteps2) = if (duplicationArrayName.nonEmpty) { + // 8. duplicate record + val dupRecordTableName = "__dupRecords" + val dupRecordSql = { + s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0" + } + val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true) + val dupRecordWriteStep = { + val rwName = ruleParam.recordOpt.map(_.name).getOrElse(dupRecordTableName) + RecordWriteStep(rwName, dupRecordTableName) + } + + // 9. duplicate metric + val dupMetricTableName = "__dupMetric" + val numColName = details.getStringOrKey(_num) + val dupMetricSelClause = procType match { + case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`" + case StreamingProcessType => s"`${ConstantColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`" + } + val dupMetricGroupbyClause = procType match { + case BatchProcessType => s"`${dupColName}`" + case StreamingProcessType => s"`${ConstantColumns.tmst}`, `${dupColName}`" + } + val dupMetricSql = { + s""" + |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}` + |GROUP BY ${dupMetricGroupbyClause} + """.stripMargin + } + val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap) + val dupMetricWriteStep = { + MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayNormalizeType) + } + + (dupRecordTransStep :: dupMetricTransStep :: Nil, + dupRecordWriteStep :: dupMetricWriteStep :: Nil) + } else (Nil, Nil) + + // full steps + transSteps1 ++ transSteps2 ++ writeSteps1 ++ writeSteps2 + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala new file mode 100644 index 0000000..b2a95ce --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala @@ -0,0 +1,41 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform.analyzer + +import org.apache.griffin.measure.step.builder.dsl.expr._ + + +case class AccuracyAnalyzer(expr: LogicalExpr, sourceName: String, targetName: String) extends BasicAnalyzer { + + val dataSourceNames = expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames) + + val sourceSelectionExprs = { + val seq = seqSelectionExprs(sourceName) + expr.preOrderTraverseDepthFirst(Seq[SelectionExpr]())(seq, combSelectionExprs) + } + val targetSelectionExprs = { + val seq = seqSelectionExprs(targetName) + expr.preOrderTraverseDepthFirst(Seq[SelectionExpr]())(seq, combSelectionExprs) + } + + val selectionExprs = sourceSelectionExprs ++ { + expr.preOrderTraverseDepthFirst(Seq[AliasableExpr]())(seqWithAliasExprs, combWithAliasExprs) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala new file mode 100644 index 0000000..2925272 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/BasicAnalyzer.scala @@ -0,0 +1,55 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform.analyzer + +import org.apache.griffin.measure.step.builder.dsl.expr._ + +/** + * analyzer of expr, to help generate dq steps by expr + */ +trait BasicAnalyzer extends Serializable { + + val expr: Expr + + val seqDataSourceNames = (expr: Expr, v: Set[String]) => { + expr match { + case DataSourceHeadExpr(name) => v + name + case _ => v + } + } + val combDataSourceNames = (a: Set[String], b: Set[String]) => a ++ b + + val seqSelectionExprs = (dsName: String) => (expr: Expr, v: Seq[SelectionExpr]) => { + expr match { + case se @ SelectionExpr(head: DataSourceHeadExpr, _, _) if (head.name == dsName) => v :+ se + case _ => v + } + } + val combSelectionExprs = (a: Seq[SelectionExpr], b: Seq[SelectionExpr]) => a ++ b + + val seqWithAliasExprs = (expr: Expr, v: Seq[AliasableExpr]) => { + expr match { + case se: SelectExpr => v + case a: AliasableExpr if (a.alias.nonEmpty) => v :+ a + case _ => v + } + } + val combWithAliasExprs = (a: Seq[AliasableExpr], b: Seq[AliasableExpr]) => a ++ b + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala new file mode 100644 index 0000000..eab568c --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala @@ -0,0 +1,46 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform.analyzer + +import org.apache.griffin.measure.step.builder.dsl.expr._ + + +case class CompletenessAnalyzer(expr: CompletenessClause, sourceName: String) extends BasicAnalyzer { + + val seqAlias = (expr: Expr, v: Seq[String]) => { + expr match { + case apr: AliasableExpr => v ++ apr.alias + case _ => v + } + } + val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b + + private val exprs = expr.exprs + private def genAlias(idx: Int): String = s"alias_${idx}" + val selectionPairs = exprs.zipWithIndex.map { pair => + val (pr, idx) = pair + val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias) + (pr, res.headOption.getOrElse(genAlias(idx))) + } + + if (selectionPairs.isEmpty) { + throw new Exception(s"completeness analyzer error: empty selection") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala new file mode 100644 index 0000000..efa1754 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala @@ -0,0 +1,47 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform.analyzer + +import org.apache.griffin.measure.step.builder.dsl.expr._ + + +//case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String, targetName: String) extends BasicAnalyzer { +case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String) extends BasicAnalyzer { + + val seqAlias = (expr: Expr, v: Seq[String]) => { + expr match { + case apr: AliasableExpr => v ++ apr.alias + case _ => v + } + } + val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b + + private val exprs = expr.exprs + private def genAlias(idx: Int): String = s"alias_${idx}" + val selectionPairs = exprs.zipWithIndex.map { pair => + val (pr, idx) = pair + val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias) + (pr, res.headOption.getOrElse(genAlias(idx)), pr.tag.isEmpty) + } + + if (selectionPairs.isEmpty) { + throw new Exception(s"uniqueness analyzer error: empty selection") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala new file mode 100644 index 0000000..049e6fd --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala @@ -0,0 +1,42 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform.analyzer + +import org.apache.griffin.measure.step.builder.dsl.expr._ + + +case class ProfilingAnalyzer(expr: ProfilingClause, sourceName: String) extends BasicAnalyzer { + + val dataSourceNames = expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames) + + val selectionExprs: Seq[Expr] = { + expr.selectClause.exprs.map(_.extractSelf).flatMap { expr => + expr match { + case e: SelectionExpr => Some(e) + case e: FunctionExpr => Some(e) + case _ => None + } + } + } + + val groupbyExprOpt = expr.groupbyClauseOpt + val preGroupbyExprs = expr.preGroupbyClauses.map(_.extractSelf) + val postGroupbyExprs = expr.postGroupbyClauses.map(_.extractSelf) + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/TimelinessAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/TimelinessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/TimelinessAnalyzer.scala new file mode 100644 index 0000000..af00080 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/TimelinessAnalyzer.scala @@ -0,0 +1,65 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform.analyzer + +import org.apache.griffin.measure.step.builder.dsl.expr._ + + +case class TimelinessAnalyzer(expr: TimelinessClause, sourceName: String) extends BasicAnalyzer { + +// val tsExpr = expr.desc + +// val seqAlias = (expr: Expr, v: Seq[String]) => { +// expr match { +// case apr: AliasableExpr => v ++ apr.alias +// case _ => v +// } +// } +// val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b +// +// private val exprs = expr.exprs.toList +// val selectionPairs = exprs.map { pr => +// val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias) +// println(res) +// println(pr) +// (pr, res.headOption) +// } +// +// val (tsExprPair, endTsPairOpt) = selectionPairs match { +// case Nil => throw new Exception(s"timeliness analyzer error: ts column not set") +// case tsPair :: Nil => (tsPair, None) +// case tsPair :: endTsPair :: _ => (tsPair, Some(endTsPair)) +// } +// +// def getSelAlias(pair: (Expr, Option[String]), defAlias: String): (String, String) = { +// val (pr, aliasOpt) = pair +// val alias = aliasOpt.getOrElse(defAlias) +// (pr.desc, alias) +// } + + + private val exprs = expr.exprs.map(_.desc).toList + + val (btsExpr, etsExprOpt) = exprs match { + case Nil => throw new Exception(s"timeliness analyzer error: ts column not set") + case btsExpr :: Nil => (btsExpr, None) + case btsExpr :: etsExpr :: _ => (btsExpr, Some(etsExpr)) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala new file mode 100644 index 0000000..21a1628 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala @@ -0,0 +1,46 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform.analyzer + +import org.apache.griffin.measure.step.builder.dsl.expr.{AliasableExpr, _} + + +case class UniquenessAnalyzer(expr: UniquenessClause, sourceName: String, targetName: String) extends BasicAnalyzer { + + val seqAlias = (expr: Expr, v: Seq[String]) => { + expr match { + case apr: AliasableExpr => v ++ apr.alias + case _ => v + } + } + val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b + + private val exprs = expr.exprs + private def genAlias(idx: Int): String = s"alias_${idx}" + val selectionPairs = exprs.zipWithIndex.map { pair => + val (pr, idx) = pair + val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias) + (pr, res.headOption.getOrElse(genAlias(idx))) + } + + if (selectionPairs.isEmpty) { + throw new Exception(s"uniqueness analyzer error: empty selection") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala new file mode 100644 index 0000000..f1543be --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala @@ -0,0 +1,72 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.preproc + +import org.apache.griffin.measure.configuration.params.RuleParam + +/** + * generate rule params by template defined in pre-proc param + */ +object PreProcRuleParamGenerator { + + val _name = "name" + + def getNewPreProcRules(rules: Seq[RuleParam], suffix: String): Seq[RuleParam] = { + if (rules == null) Nil else { + rules.map { rule => + getNewPreProcRule(rule, suffix) + } + } + } + + private def getNewPreProcRule(param: RuleParam, suffix: String): RuleParam = { + val newName = genNewString(param.getName, suffix) + val newRule = genNewString(param.getRule, suffix) + val newDetails = getNewMap(param.getDetails, suffix) + param.replaceName(newName).replaceRule(newRule).replaceDetails(newDetails) + } + + private def getNewMap(details: Map[String, Any], suffix: String): Map[String, Any] = { + val keys = details.keys + keys.foldLeft(details) { (map, key) => + map.get(key) match { + case Some(s: String) => map + (key -> genNewString(s, suffix)) + case Some(subMap: Map[String, Any]) => map + (key -> getNewMap(subMap, suffix)) + case Some(arr: Seq[_]) => map + (key -> getNewArr(arr, suffix)) + case _ => map + } + } + } + + private def getNewArr(paramArr: Seq[Any], suffix: String): Seq[Any] = { + paramArr.foldLeft(Nil: Seq[Any]) { (res, param) => + param match { + case s: String => res :+ genNewString(s, suffix) + case map: Map[String, Any] => res :+ getNewMap(map, suffix) + case arr: Seq[_] => res :+ getNewArr(arr, suffix) + case _ => res :+ param + } + } + } + + private def genNewString(str: String, suffix: String): String = { + str.replaceAll("""\$\{(.*)\}""", s"$$1_${suffix}") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala new file mode 100644 index 0000000..61b93ab --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/udf/GriffinUDFs.scala @@ -0,0 +1,63 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.udf + +import org.apache.spark.sql.SQLContext + +object GriffinUDFAgent { + def register(sqlContext: SQLContext): Unit = { + GriffinUDFs.register(sqlContext) + GriffinUDAggFs.register(sqlContext) + } +} + +/** + * user defined functions extension + */ +object GriffinUDFs { + + def register(sqlContext: SQLContext): Unit = { + sqlContext.udf.register("index_of", indexOf _) + sqlContext.udf.register("matches", matches _) + sqlContext.udf.register("reg_replace", regReplace _) + } + + private def indexOf(arr: Seq[String], v: String) = { + arr.indexOf(v) + } + + private def matches(s: String, regex: String) = { + s.matches(regex) + } + + private def regReplace(s: String, regex: String, replacement: String) = { + s.replaceAll(regex, replacement) + } + +} + +/** + * aggregation functions extension + */ +object GriffinUDAggFs { + + def register(sqlContext: SQLContext): Unit = { + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala new file mode 100644 index 0000000..8b1df82 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala @@ -0,0 +1,48 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.read + +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.DQStep +import org.apache.spark.sql._ + +trait ReadStep extends DQStep { + + val config: Map[String, Any] + + val cache: Boolean + + def execute(context: DQContext): Boolean = { + info(s"read data source [${name}]") + read(context) match { + case Some(df) => { +// if (needCache) context.dataFrameCache.cacheDataFrame(name, df) + context.runTimeTableRegister.registerTable(name, df) + true + } + case _ => { + warn(s"read data source [${name}] fails") + false + } + } + } + + def read(context: DQContext): Option[DataFrame] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala new file mode 100644 index 0000000..6dae1cb --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala @@ -0,0 +1,41 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.read + +import org.apache.griffin.measure.context.DQContext +import org.apache.spark.sql._ +import org.apache.griffin.measure.utils.DataFrameUtil._ + +case class UnionReadStep(name: String, + readSteps: Seq[ReadStep] + ) extends ReadStep { + + val config: Map[String, Any] = Map() + val cache: Boolean = false + + def read(context: DQContext): Option[DataFrame] = { + val dfOpts = readSteps.map { readStep => + readStep.read(context) + } + if (dfOpts.size > 0) { + dfOpts.reduce((a, b) => unionDfOpts(a, b)) + } else None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala new file mode 100644 index 0000000..dc9a3f8 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala @@ -0,0 +1,134 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.transform + +import java.util.Date + +import org.apache.griffin.measure.context.ContextId +import org.apache.griffin.measure.context.streaming.metric.CacheResults.CacheResult +import org.apache.griffin.measure.context.streaming.metric._ +import org.apache.griffin.measure.step.builder.ConstantColumns +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.spark.sql.types.{BooleanType, LongType, StructField, StructType} +import org.apache.spark.sql.{Encoders, Row, SQLContext, _} + +/** + * pre-defined data frame operations + */ +object DataFrameOps { + + final val _fromJson = "from_json" + final val _accuracy = "accuracy" + final val _clear = "clear" + + object AccuracyOprKeys { + val _dfName = "df.name" + val _miss = "miss" + val _total = "total" + val _matched = "matched" + } + + def fromJson(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = { + val _dfName = "df.name" + val _colName = "col.name" + val dfName = details.getOrElse(_dfName, "").toString + val colNameOpt = details.get(_colName).map(_.toString) + + implicit val encoder = Encoders.STRING + + val df: DataFrame = sqlContext.table(s"`${dfName}`") + val rdd = colNameOpt match { + case Some(colName: String) => df.map(r => r.getAs[String](colName)) + case _ => df.map(_.getAs[String](0)) + } + sqlContext.read.json(rdd) // slow process + } + + def accuracy(sqlContext: SQLContext, contextId: ContextId, details: Map[String, Any]): DataFrame = { + import AccuracyOprKeys._ + + val dfName = details.getStringOrKey(_dfName) + val miss = details.getStringOrKey(_miss) + val total = details.getStringOrKey(_total) + val matched = details.getStringOrKey(_matched) + +// val _enableIgnoreCache = "enable.ignore.cache" +// val enableIgnoreCache = details.getBoolean(_enableIgnoreCache, false) + +// val tmst = InternalColumns.tmst + + val updateTime = new Date().getTime + + def getLong(r: Row, k: String): Option[Long] = { + try { + Some(r.getAs[Long](k)) + } catch { + case e: Throwable => None + } + } + + val df = sqlContext.table(s"`${dfName}`") + + val results = df.rdd.flatMap { row => + try { + val tmst = getLong(row, ConstantColumns.tmst).getOrElse(contextId.timestamp) + val missCount = getLong(row, miss).getOrElse(0L) + val totalCount = getLong(row, total).getOrElse(0L) + val ar = AccuracyMetric(missCount, totalCount) + if (ar.isLegal) Some((tmst, ar)) else None + } catch { + case e: Throwable => None + } + }.collect + + // cache and update results + val updatedResults = CacheResults.update(results.map{ pair => + val (t, r) = pair + CacheResult(t, updateTime, r) + }) + + // generate metrics + val schema = StructType(Array( + StructField(ConstantColumns.tmst, LongType), + StructField(miss, LongType), + StructField(total, LongType), + StructField(matched, LongType), + StructField(ConstantColumns.record, BooleanType), + StructField(ConstantColumns.empty, BooleanType) + )) + val rows = updatedResults.map { r => + val ar = r.result.asInstanceOf[AccuracyMetric] + Row(r.timeStamp, ar.miss, ar.total, ar.getMatch, !ar.initial, ar.eventual) + }.toArray + val rowRdd = sqlContext.sparkContext.parallelize(rows) + val retDf = sqlContext.createDataFrame(rowRdd, schema) + + retDf + } + + def clear(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = { + val _dfName = "df.name" + val dfName = details.getOrElse(_dfName, "").toString + + val df = sqlContext.table(s"`${dfName}`") + val emptyRdd = sqlContext.sparkContext.emptyRDD[Row] + sqlContext.createDataFrame(emptyRdd, df.schema) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala new file mode 100644 index 0000000..e2f90f9 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala @@ -0,0 +1,52 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.transform + +import org.apache.griffin.measure.context.DQContext + +/** + * data frame ops transform step + */ +case class DataFrameOpsTransformStep(name: String, + rule: String, + details: Map[String, Any], + cache: Boolean = false + ) extends TransformStep { + + def execute(context: DQContext): Boolean = { + val sqlContext = context.sqlContext + try { + val df = rule match { + case DataFrameOps._fromJson => DataFrameOps.fromJson(sqlContext, details) + case DataFrameOps._accuracy => DataFrameOps.accuracy(sqlContext, context.contextId, details) + case DataFrameOps._clear => DataFrameOps.clear(sqlContext, details) + case _ => throw new Exception(s"df opr [ ${rule} ] not supported") + } + if (cache) context.dataFrameCache.cacheDataFrame(name, df) + context.runTimeTableRegister.registerTable(name, df) + true + } catch { + case e: Throwable => { + error(s"run data frame ops [ ${rule} ] error: ${e.getMessage}") + false + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala new file mode 100644 index 0000000..ead7344 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala @@ -0,0 +1,47 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.transform + +import org.apache.griffin.measure.context.DQContext + +/** + * spark sql transform step + */ +case class SparkSqlTransformStep(name: String, + rule: String, + details: Map[String, Any], + cache: Boolean = false + ) extends TransformStep { + + def execute(context: DQContext): Boolean = { + val sqlContext = context.sqlContext + try { + val df = sqlContext.sql(rule) + if (cache) context.dataFrameCache.cacheDataFrame(name, df) + context.runTimeTableRegister.registerTable(name, df) + true + } catch { + case e: Throwable => { + error(s"run spark sql [ ${rule} ] error: ${e.getMessage}") + false + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala new file mode 100644 index 0000000..995ce49 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/TransformStep.scala @@ -0,0 +1,31 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.transform + +import org.apache.griffin.measure.step.DQStep + +trait TransformStep extends DQStep { + + val rule: String + + val details: Map[String, Any] + + val cache: Boolean + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala new file mode 100644 index 0000000..27dbb3c --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala @@ -0,0 +1,61 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.write + +import org.apache.commons.lang.StringUtils +import org.apache.griffin.measure.context.DQContext +import org.apache.spark.sql.DataFrame + +/** + * update streaming data source cache + */ +case class DsCacheUpdateWriteStep(dsName: String, + inputName: String + ) extends WriteStep { + + val name: String = "" + val writeTimestampOpt: Option[Long] = None + + def execute(context: DQContext): Boolean = { + collectDsCacheUpdateDf(context) match { + case Some(df) => { + context.dataSources.find(ds => StringUtils.equals(ds.name, dsName)).foreach(_.updateData(df)) + } + case _ => { + warn(s"update ${dsName} from ${inputName} fails") + } + } + true + } + + private def getDataFrame(context: DQContext, name: String): Option[DataFrame] = { + try { + val df = context.sqlContext.table(s"`${name}`") + Some(df) + } catch { + case e: Throwable => { + error(s"get data frame ${name} fails") + None + } + } + } + + private def collectDsCacheUpdateDf(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName) + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala new file mode 100644 index 0000000..40c9b05 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala @@ -0,0 +1,45 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.write + +import org.apache.griffin.measure.context.DQContext + +/** + * flush final metric map in context and write + */ +case class MetricFlushStep() extends WriteStep { + + val name: String = "" + val inputName: String = "" + val writeTimestampOpt: Option[Long] = None + + def execute(context: DQContext): Boolean = { + context.metricWrapper.flush.foldLeft(true) { (ret, pair) => + val (t, metric) = pair + val pr = try { + context.getPersist(t).persistMetrics(metric) + true + } catch { + case e: Throwable => false + } + ret && pr + } + } + +}
