http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala deleted file mode 100644 index 3ed6839..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala +++ /dev/null @@ -1,348 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.trans - -import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange} -import org.apache.griffin.measure.process._ -import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._ -import org.apache.griffin.measure.rule.adaptor._ -import org.apache.griffin.measure.rule.dsl.{ArrayCollectType, EntriesCollectType} -import org.apache.griffin.measure.rule.dsl.analyzer.DistinctnessAnalyzer -import org.apache.griffin.measure.rule.dsl.expr._ -import org.apache.griffin.measure.rule.plan._ -import org.apache.griffin.measure.rule.trans.RuleExportFactory._ -import org.apache.griffin.measure.rule.trans.DsUpdateFactory._ -import org.apache.griffin.measure.utils.ParamUtil._ - -import scala.util.Try - -case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], - timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], procType: ProcessType, - dsTimeRanges: Map[String, TimeRange] - ) extends RulePlanTrans { - - private object DistinctnessKeys { - val _source = "source" - val _target = "target" - val _distinct = "distinct" - val _total = "total" - val _dup = "dup" - val _accu_dup = "accu_dup" - val _num = "num" - - val _duplicationArray = "duplication.array" - val _withAccumulate = "with.accumulate" - - val _recordEnable = "record.enable" - } - import DistinctnessKeys._ - - def trans(): Try[RulePlan] = Try { - val details = getDetails(param) - val sourceName = details.getString(_source, dataSourceNames.head) - val targetName = details.getString(_target, dataSourceNames.tail.head) - val analyzer = DistinctnessAnalyzer(expr.asInstanceOf[DistinctnessClause], sourceName) - - val mode = SimpleMode - - val ct = timeInfo.calcTime - - val beginTmst = dsTimeRanges.get(sourceName).map(_.begin) match { - case Some(t) => t - case _ => throw new Exception(s"empty begin tmst from ${sourceName}") - } - val endTmst = dsTimeRanges.get(sourceName).map(_.end) match { - case Some(t) => t - case _ => throw new Exception(s"empty end tmst from ${sourceName}") - } - - if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { - println(s"[${ct}] data source ${sourceName} not exists") - emptyRulePlan - } else { - val withOlderTable = { - details.getBoolean(_withAccumulate, true) && - TableRegisters.existRunTempTable(timeInfo.key, targetName) - } - - val selClause = analyzer.selectionPairs.map { pair => - val (expr, alias, _) = pair - s"${expr.desc} AS `${alias}`" - }.mkString(", ") - val distAliases = analyzer.selectionPairs.filter(_._3).map(_._2) - val distAliasesClause = distAliases.map( a => s"`${a}`" ).mkString(", ") - val allAliases = analyzer.selectionPairs.map(_._2) - val allAliasesClause = allAliases.map( a => s"`${a}`" ).mkString(", ") - val groupAliases = analyzer.selectionPairs.filter(!_._3).map(_._2) - val groupAliasesClause = groupAliases.map( a => s"`${a}`" ).mkString(", ") - - // 1. source alias - val sourceAliasTableName = "__sourceAlias" - val sourceAliasSql = { - s"SELECT ${selClause} FROM `${sourceName}`" - } - val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, emptyMap, true) - - // 2. total metric - val totalTableName = "__totalMetric" - val totalColName = details.getStringOrKey(_total) - val totalSql = { - s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`" - } - val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap) - val totalMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, endTmst, mode) - - // 3. group by self - val selfGroupTableName = "__selfGroup" - val dupColName = details.getStringOrKey(_dup) - val accuDupColName = details.getStringOrKey(_accu_dup) - val selfGroupSql = { - s""" - |SELECT ${distAliasesClause}, (COUNT(*) - 1) AS `${dupColName}`, - |TRUE AS `${InternalColumns.distinct}` - |FROM `${sourceAliasTableName}` GROUP BY ${distAliasesClause} - """.stripMargin - } - val selfGroupStep = SparkSqlStep(selfGroupTableName, selfGroupSql, emptyMap, true) - - val selfDistRulePlan = RulePlan( - sourceAliasStep :: totalStep :: selfGroupStep :: Nil, - totalMetricExport :: Nil - ) - - val (distRulePlan, dupCountTableName) = procType match { - case StreamingProcessType if (withOlderTable) => { - // 4.0 update old data -// val updateOldTableName = "__updateOld" -// val updateOldSql = { -// s"SELECT * FROM `${targetName}`" -// } - val updateParam = emptyMap - val targetDsUpdate = genDsUpdate(updateParam, targetName, targetName) - - // 4. older alias - val olderAliasTableName = "__older" - val olderAliasSql = { - s"SELECT ${selClause} FROM `${targetName}` WHERE `${InternalColumns.tmst}` <= ${beginTmst}" - } - val olderAliasStep = SparkSqlStep(olderAliasTableName, olderAliasSql, emptyMap) - - // 5. join with older data - val joinedTableName = "__joined" - val selfSelClause = (distAliases :+ dupColName).map { alias => - s"`${selfGroupTableName}`.`${alias}`" - }.mkString(", ") - val onClause = distAliases.map { alias => - s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = coalesce(`${olderAliasTableName}`.`${alias}`, '')" - }.mkString(" AND ") - val olderIsNull = distAliases.map { alias => - s"`${olderAliasTableName}`.`${alias}` IS NULL" - }.mkString(" AND ") - val joinedSql = { - s""" - |SELECT ${selfSelClause}, (${olderIsNull}) AS `${InternalColumns.distinct}` - |FROM `${olderAliasTableName}` RIGHT JOIN `${selfGroupTableName}` - |ON ${onClause} - """.stripMargin - } - val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap) - - // 6. group by joined data - val groupTableName = "__group" - val moreDupColName = "_more_dup" - val groupSql = { - s""" - |SELECT ${distAliasesClause}, `${dupColName}`, `${InternalColumns.distinct}`, - |COUNT(*) AS `${moreDupColName}` - |FROM `${joinedTableName}` - |GROUP BY ${distAliasesClause}, `${dupColName}`, `${InternalColumns.distinct}` - """.stripMargin - } - val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap) - - // 7. final duplicate count - val finalDupCountTableName = "__finalDupCount" - // dupColName: the duplicate count of duplicated items only occurs in new data, - // which means the distinct one in new data is also duplicate - // accuDupColName: the count of duplicated items accumulated in new data and old data, - // which means the accumulated distinct count in all data - // e.g.: new data [A, A, B, B, C, D], old data [A, A, B, C] - // selfGroupTable will be (A, 1, F), (B, 1, F), (C, 0, T), (D, 0, T) - // joinedTable will be (A, 1, F), (A, 1, F), (B, 1, F), (C, 0, F), (D, 0, T) - // groupTable will be (A, 1, F, 2), (B, 1, F, 1), (C, 0, F, 1), (D, 0, T, 1) - // finalDupCountTable will be (A, F, 2, 3), (B, F, 2, 2), (C, F, 1, 1), (D, T, 0, 0) - // The distinct result of new data only should be: (A, 2), (B, 2), (C, 1), (D, 0), - // which means in new data [A, A, B, B, C, D], [A, A, B, B, C] are all duplicated, only [D] is distinct - val finalDupCountSql = { - s""" - |SELECT ${distAliasesClause}, `${InternalColumns.distinct}`, - |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}` - |ELSE (`${dupColName}` + 1) END AS `${dupColName}`, - |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}` - |ELSE (`${dupColName}` + `${moreDupColName}`) END AS `${accuDupColName}` - |FROM `${groupTableName}` - """.stripMargin - } - val finalDupCountStep = SparkSqlStep(finalDupCountTableName, finalDupCountSql, emptyMap, true) - - val rulePlan = RulePlan( - olderAliasStep :: joinedStep :: groupStep :: finalDupCountStep :: Nil, - Nil, - targetDsUpdate :: Nil - ) - (rulePlan, finalDupCountTableName) - } - case _ => { - (emptyRulePlan, selfGroupTableName) - } - } - - // 8. distinct metric - val distTableName = "__distMetric" - val distColName = details.getStringOrKey(_distinct) - val distSql = { - s""" - |SELECT COUNT(*) AS `${distColName}` - |FROM `${dupCountTableName}` WHERE `${InternalColumns.distinct}` - """.stripMargin - } - val distStep = SparkSqlStep(distTableName, distSql, emptyMap) - val distMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val distMetricExport = genMetricExport(distMetricParam, distColName, distTableName, endTmst, mode) - - val distMetricRulePlan = RulePlan(distStep :: Nil, distMetricExport :: Nil) - - val duplicationArrayName = details.getString(_duplicationArray, "") - val dupRulePlan = if (duplicationArrayName.nonEmpty) { - val recordEnable = details.getBoolean(_recordEnable, false) - if (groupAliases.nonEmpty) { - // with some group by requirement - // 9. origin data join with distinct information - val informedTableName = "__informed" - val onClause = distAliases.map { alias => - s"coalesce(`${sourceAliasTableName}`.`${alias}`, '') = coalesce(`${dupCountTableName}`.`${alias}`, '')" - }.mkString(" AND ") - val informedSql = { - s""" - |SELECT `${sourceAliasTableName}`.*, - |`${dupCountTableName}`.`${dupColName}` AS `${dupColName}`, - |`${dupCountTableName}`.`${InternalColumns.distinct}` AS `${InternalColumns.distinct}` - |FROM `${sourceAliasTableName}` LEFT JOIN `${dupCountTableName}` - |ON ${onClause} - """.stripMargin - } - val informedStep = SparkSqlStep(informedTableName, informedSql, emptyMap) - - // 10. add row number - val rnTableName = "__rowNumber" - val rnDistClause = distAliasesClause - val rnSortClause = s"SORT BY `${InternalColumns.distinct}`" - val rnSql = { - s""" - |SELECT *, - |ROW_NUMBER() OVER (DISTRIBUTE BY ${rnDistClause} ${rnSortClause}) `${InternalColumns.rowNumber}` - |FROM `${informedTableName}` - """.stripMargin - } - val rnStep = SparkSqlStep(rnTableName, rnSql, emptyMap) - - // 11. recognize duplicate items - val dupItemsTableName = "__dupItems" - val dupItemsSql = { - s""" - |SELECT ${allAliasesClause}, `${dupColName}` FROM `${rnTableName}` - |WHERE NOT `${InternalColumns.distinct}` OR `${InternalColumns.rowNumber}` > 1 - """.stripMargin - } - val dupItemsStep = SparkSqlStep(dupItemsTableName, dupItemsSql, emptyMap) - val dupItemsParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val dupItemsExport = genRecordExport(dupItemsParam, dupItemsTableName, dupItemsTableName, endTmst, mode) - - // 12. group by dup Record metric - val groupDupMetricTableName = "__groupDupMetric" - val numColName = details.getStringOrKey(_num) - val groupSelClause = groupAliasesClause - val groupDupMetricSql = { - s""" - |SELECT ${groupSelClause}, `${dupColName}`, COUNT(*) AS `${numColName}` - |FROM `${dupItemsTableName}` GROUP BY ${groupSelClause}, `${dupColName}` - """.stripMargin - } - val groupDupMetricStep = SparkSqlStep(groupDupMetricTableName, groupDupMetricSql, emptyMap) - val groupDupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val groupDupMetricExport = genMetricExport(groupDupMetricParam, duplicationArrayName, groupDupMetricTableName, endTmst, mode) - - val exports = if (recordEnable) { - dupItemsExport :: groupDupMetricExport :: Nil - } else { - groupDupMetricExport :: Nil - } - RulePlan( - informedStep :: rnStep :: dupItemsStep :: groupDupMetricStep :: Nil, - exports - ) - - } else { - // no group by requirement - // 9. duplicate record - val dupRecordTableName = "__dupRecords" - val dupRecordSelClause = procType match { - case StreamingProcessType if (withOlderTable) => s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`" - case _ => s"${distAliasesClause}, `${dupColName}`" - } - val dupRecordSql = { - s""" - |SELECT ${dupRecordSelClause} - |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0 - """.stripMargin - } - val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) - val dupRecordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val dupRecordExport = genRecordExport(dupRecordParam, dupRecordTableName, dupRecordTableName, endTmst, mode) - - // 10. duplicate metric - val dupMetricTableName = "__dupMetric" - val numColName = details.getStringOrKey(_num) - val dupMetricSql = { - s""" - |SELECT `${dupColName}`, COUNT(*) AS `${numColName}` - |FROM `${dupRecordTableName}` GROUP BY `${dupColName}` - """.stripMargin - } - val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) - val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, endTmst, mode) - - val exports = if (recordEnable) { - dupRecordExport :: dupMetricExport :: Nil - } else { - dupMetricExport :: Nil - } - RulePlan(dupRecordStep :: dupMetricStep :: Nil, exports) - } - } else emptyRulePlan - - selfDistRulePlan.merge(distRulePlan).merge(distMetricRulePlan).merge(dupRulePlan) - - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala deleted file mode 100644 index 772163e..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DsUpdateFactory.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.trans - -import org.apache.griffin.measure.rule.plan._ -import org.apache.griffin.measure.utils.ParamUtil._ - -object DsUpdateFactory { - - def genDsUpdate(param: Map[String, Any], defDsName: String, - stepName: String): DsUpdate = { - DsUpdate(UpdateParamKeys.getName(param, defDsName), stepName) - } - -} - -object UpdateParamKeys { - val _name = "name" - - def getName(param: Map[String, Any], defName: String): String = param.getString(_name, defName) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala deleted file mode 100644 index f80f3c1..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.trans - -import org.apache.griffin.measure.process.temp.TableRegisters -import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, ProcessType, StreamingProcessType} -import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._ -import org.apache.griffin.measure.rule.adaptor._ -import org.apache.griffin.measure.rule.dsl.analyzer.ProfilingAnalyzer -import org.apache.griffin.measure.rule.dsl.expr._ -import org.apache.griffin.measure.rule.plan._ -import org.apache.griffin.measure.rule.trans.RuleExportFactory._ -import org.apache.griffin.measure.utils.ParamUtil._ - -import scala.util.Try - -case class ProfilingRulePlanTrans(dataSourceNames: Seq[String], - timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], procType: ProcessType - ) extends RulePlanTrans { - - private object ProfilingKeys { - val _source = "source" - } - import ProfilingKeys._ - - def trans(): Try[RulePlan] = Try { - val details = getDetails(param) - val profilingClause = expr.asInstanceOf[ProfilingClause] - val sourceName = profilingClause.fromClauseOpt match { - case Some(fc) => fc.dataSource - case _ => details.getString(_source, dataSourceNames.head) - } - val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc - - val mode = ExportMode.defaultMode(procType) - - val ct = timeInfo.calcTime - - if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { - emptyRulePlan - } else { - val analyzer = ProfilingAnalyzer(profilingClause, sourceName) - val selExprDescs = analyzer.selectionExprs.map { sel => - val alias = sel match { - case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`" - case _ => "" - } - s"${sel.desc}${alias}" - } - val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString - val selClause = procType match { - case BatchProcessType => selExprDescs.mkString(", ") - case StreamingProcessType => (s"`${InternalColumns.tmst}`" +: selExprDescs).mkString(", ") - } - val groupByClauseOpt = analyzer.groupbyExprOpt - val groupbyClause = procType match { - case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("") - case StreamingProcessType => { - val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${InternalColumns.tmst}`") :: Nil, None) - val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt match { - case Some(gbc) => gbc - case _ => GroupbyClause(Nil, None) - }) - mergedGroubbyClause.desc - } - } - val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ") - val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ") - - // 1. select statement - val profilingSql = { - s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" - } - val profilingName = name - val profilingStep = SparkSqlStep(profilingName, profilingSql, details) - val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - val profilingExports = genMetricExport(metricParam, name, profilingName, ct, mode) :: Nil - - RulePlan(profilingStep :: Nil, profilingExports) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala deleted file mode 100644 index 915e654..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.trans - -import org.apache.griffin.measure.process.ExportMode -import org.apache.griffin.measure.rule.dsl.CollectType -import org.apache.griffin.measure.rule.plan._ - -import org.apache.griffin.measure.utils.ParamUtil._ - -object RuleExportFactory { - - def genMetricExport(param: Map[String, Any], name: String, stepName: String, - defTimestamp: Long, mode: ExportMode - ): MetricExport = { - MetricExport( - ExportParamKeys.getName(param, name), - stepName, - ExportParamKeys.getCollectType(param), - defTimestamp, - mode - ) - } - def genRecordExport(param: Map[String, Any], name: String, stepName: String, - defTimestamp: Long, mode: ExportMode - ): RecordExport = { - RecordExport( - ExportParamKeys.getName(param, name), - stepName, - ExportParamKeys.getDataSourceCacheOpt(param), - ExportParamKeys.getOriginDFOpt(param), - defTimestamp, - mode - ) - } - -} - -object ExportParamKeys { - val _name = "name" - val _collectType = "collect.type" - val _dataSourceCache = "data.source.cache" - val _originDF = "origin.DF" - - def getName(param: Map[String, Any], defName: String): String = param.getString(_name, defName) - def getCollectType(param: Map[String, Any]): CollectType = CollectType(param.getString(_collectType, "")) - def getDataSourceCacheOpt(param: Map[String, Any]): Option[String] = param.get(_dataSourceCache).map(_.toString) - def getOriginDFOpt(param: Map[String, Any]): Option[String] = param.get(_originDF).map(_.toString) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala deleted file mode 100644 index ba9565f..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.trans - -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.process.ProcessType -import org.apache.griffin.measure.process.temp.TimeRange -import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.dsl.expr.Expr -import org.apache.griffin.measure.rule.plan._ - -import scala.util.Try - -trait RulePlanTrans extends Loggable with Serializable { - - protected val emptyRulePlan = RulePlan(Nil, Nil) - protected val emptyMap = Map[String, Any]() - - def trans(): Try[RulePlan] - -} - -object RulePlanTrans { - private val emptyRulePlanTrans = new RulePlanTrans { - def trans(): Try[RulePlan] = Try(emptyRulePlan) - } - - def apply(dqType: DqType, - dsNames: Seq[String], - ti: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], procType: ProcessType, - dsTimeRanges: Map[String, TimeRange] - ): RulePlanTrans = { - dqType match { - case AccuracyType => AccuracyRulePlanTrans(dsNames, ti, name, expr, param, procType) - case ProfilingType => ProfilingRulePlanTrans(dsNames, ti, name, expr, param, procType) - case UniquenessType => UniquenessRulePlanTrans(dsNames, ti, name, expr, param, procType) - case DistinctnessType => DistinctnessRulePlanTrans(dsNames, ti, name, expr, param, procType, dsTimeRanges) - case TimelinessType => TimelinessRulePlanTrans(dsNames, ti, name, expr, param, procType, dsTimeRanges) - case CompletenessType => CompletenessRulePlanTrans(dsNames, ti, name, expr, param, procType) - case _ => emptyRulePlanTrans - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala deleted file mode 100644 index deee65b..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala +++ /dev/null @@ -1,224 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.trans - -import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange} -import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, ProcessType, StreamingProcessType} -import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._ -import org.apache.griffin.measure.rule.adaptor._ -import org.apache.griffin.measure.rule.dsl.ArrayCollectType -import org.apache.griffin.measure.rule.dsl.analyzer.TimelinessAnalyzer -import org.apache.griffin.measure.rule.dsl.expr._ -import org.apache.griffin.measure.rule.plan._ -import org.apache.griffin.measure.rule.trans.RuleExportFactory._ -import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.griffin.measure.utils.TimeUtil - -import scala.util.Try - -case class TimelinessRulePlanTrans(dataSourceNames: Seq[String], - timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], procType: ProcessType, - dsTimeRanges: Map[String, TimeRange] - ) extends RulePlanTrans { - - private object TimelinessKeys { - val _source = "source" - val _latency = "latency" - val _total = "total" - val _avg = "avg" - val _threshold = "threshold" - val _step = "step" - val _count = "count" - val _stepSize = "step.size" - val _percentileColPrefix = "percentile" - val _percentileValues = "percentile.values" - } - import TimelinessKeys._ - - def trans(): Try[RulePlan] = Try { - val details = getDetails(param) - val timelinessClause = expr.asInstanceOf[TimelinessClause] - val sourceName = details.getString(_source, dataSourceNames.head) - - val mode = ExportMode.defaultMode(procType) - -// val ct = timeInfo.calcTime - - val minTmstOpt = dsTimeRanges.get(sourceName).flatMap(_.minTmstOpt) - val minTmst = minTmstOpt match { - case Some(t) => t - case _ => throw new Exception(s"empty min tmst from ${sourceName}") - } - - if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { - emptyRulePlan - } else { - val analyzer = TimelinessAnalyzer(timelinessClause, sourceName) - val btsSel = analyzer.btsExpr - val etsSelOpt = analyzer.etsExprOpt - - // 1. in time - val inTimeTableName = "__inTime" - val inTimeSql = etsSelOpt match { - case Some(etsSel) => { - s""" - |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`, - |(${etsSel}) AS `${InternalColumns.endTs}` - |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) IS NOT NULL - """.stripMargin - } - case _ => { - s""" - |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}` - |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL - """.stripMargin - } - } - val inTimeStep = SparkSqlStep(inTimeTableName, inTimeSql, emptyMap) - - // 2. latency - val latencyTableName = "__lat" - val latencyColName = details.getStringOrKey(_latency) - val etsColName = etsSelOpt match { - case Some(_) => InternalColumns.endTs - case _ => InternalColumns.tmst - } - val latencySql = { - s"SELECT *, (`${etsColName}` - `${InternalColumns.beginTs}`) AS `${latencyColName}` FROM `${inTimeTableName}`" - } - val latencyStep = SparkSqlStep(latencyTableName, latencySql, emptyMap, true) - - // 3. timeliness metric - val metricTableName = name - val totalColName = details.getStringOrKey(_total) - val avgColName = details.getStringOrKey(_avg) - val metricSql = procType match { - case BatchProcessType => { - s""" - |SELECT COUNT(*) AS `${totalColName}`, - |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}` - |FROM `${latencyTableName}` - """.stripMargin - } - case StreamingProcessType => { - s""" - |SELECT `${InternalColumns.tmst}`, - |COUNT(*) AS `${totalColName}`, - |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}` - |FROM `${latencyTableName}` - |GROUP BY `${InternalColumns.tmst}` - """.stripMargin - } - } - val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap) - val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - val metricExports = genMetricExport(metricParam, name, metricTableName, minTmst, mode) :: Nil - - // current timeliness plan - val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil - val timeExports = metricExports - val timePlan = RulePlan(timeSteps, timeExports) - - // 4. timeliness record - val recordPlan = TimeUtil.milliseconds(details.getString(_threshold, "")) match { - case Some(tsh) => { - val recordTableName = "__lateRecords" - val recordSql = { - s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > ${tsh}" - } - val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap) - val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val recordExports = genRecordExport(recordParam, recordTableName, recordTableName, minTmst, mode) :: Nil - RulePlan(recordStep :: Nil, recordExports) - } - case _ => emptyRulePlan - } - - // 5. ranges - val rangePlan = TimeUtil.milliseconds(details.getString(_stepSize, "")) match { - case Some(stepSize) => { - // 5.1 range - val rangeTableName = "__range" - val stepColName = details.getStringOrKey(_step) - val rangeSql = { - s""" - |SELECT *, CAST((`${latencyColName}` / ${stepSize}) AS BIGINT) AS `${stepColName}` - |FROM `${latencyTableName}` - """.stripMargin - } - val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap) - - // 5.2 range metric - val rangeMetricTableName = "__rangeMetric" - val countColName = details.getStringOrKey(_count) - val rangeMetricSql = procType match { - case BatchProcessType => { - s""" - |SELECT `${stepColName}`, COUNT(*) AS `${countColName}` - |FROM `${rangeTableName}` GROUP BY `${stepColName}` - """.stripMargin - } - case StreamingProcessType => { - s""" - |SELECT `${InternalColumns.tmst}`, `${stepColName}`, COUNT(*) AS `${countColName}` - |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, `${stepColName}` - """.stripMargin - } - } - val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap) - val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val rangeMetricExports = genMetricExport(rangeMetricParam, stepColName, rangeMetricTableName, minTmst, mode) :: Nil - - RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports) - } - case _ => emptyRulePlan - } - - // 6. percentiles - val percentiles = getPercentiles(details) - val percentilePlan = if (percentiles.nonEmpty) { - val percentileTableName = "__percentile" - val percentileColName = details.getStringOrKey(_percentileColPrefix) - val percentileCols = percentiles.map { pct => - val pctName = (pct * 100).toInt.toString - s"floor(percentile_approx(${latencyColName}, ${pct})) AS `${percentileColName}_${pctName}`" - }.mkString(", ") - val percentileSql = { - s""" - |SELECT ${percentileCols} - |FROM `${latencyTableName}` - """.stripMargin - } - val percentileStep = SparkSqlStep(percentileTableName, percentileSql, emptyMap) - val percentileParam = emptyMap - val percentileExports = genMetricExport(percentileParam, percentileTableName, percentileTableName, minTmst, mode) :: Nil - - RulePlan(percentileStep :: Nil, percentileExports) - } else emptyRulePlan - - timePlan.merge(recordPlan).merge(rangePlan).merge(percentilePlan) - } - } - - private def getPercentiles(details: Map[String, Any]): Seq[Double] = { - details.getArr[Double](_percentileValues).filter(d => (d >= 0 && d <= 1)) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala deleted file mode 100644 index baa5572..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala +++ /dev/null @@ -1,200 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.trans - -import org.apache.griffin.measure.process._ -import org.apache.griffin.measure.process.temp._ -import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._ -import org.apache.griffin.measure.rule.adaptor._ -import org.apache.griffin.measure.rule.dsl.analyzer.UniquenessAnalyzer -import org.apache.griffin.measure.rule.dsl.expr._ -import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.plan._ -import org.apache.griffin.measure.rule.trans.RuleExportFactory._ -import org.apache.griffin.measure.utils.ParamUtil._ - -import scala.util.Try - -case class UniquenessRulePlanTrans(dataSourceNames: Seq[String], - timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], procType: ProcessType - ) extends RulePlanTrans { - - private object UniquenessKeys { - val _source = "source" - val _target = "target" - val _unique = "unique" - val _total = "total" - val _dup = "dup" - val _num = "num" - - val _duplicationArray = "duplication.array" - } - import UniquenessKeys._ - - def trans(): Try[RulePlan] = Try { - val details = getDetails(param) - val sourceName = details.getString(_source, dataSourceNames.head) - val targetName = details.getString(_target, dataSourceNames.tail.head) - val analyzer = UniquenessAnalyzer(expr.asInstanceOf[UniquenessClause], sourceName, targetName) - - val mode = ExportMode.defaultMode(procType) - - val ct = timeInfo.calcTime - - if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { - println(s"[${ct}] data source ${sourceName} not exists") - emptyRulePlan - } else if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) { - println(s"[${ct}] data source ${targetName} not exists") - emptyRulePlan - } else { - val selItemsClause = analyzer.selectionPairs.map { pair => - val (expr, alias) = pair - s"${expr.desc} AS `${alias}`" - }.mkString(", ") - val aliases = analyzer.selectionPairs.map(_._2) - - val selClause = procType match { - case BatchProcessType => selItemsClause - case StreamingProcessType => s"`${InternalColumns.tmst}`, ${selItemsClause}" - } - val selAliases = procType match { - case BatchProcessType => aliases - case StreamingProcessType => InternalColumns.tmst +: aliases - } - - // 1. source distinct mapping - val sourceTableName = "__source" - val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}" - val sourceStep = SparkSqlStep(sourceTableName, sourceSql, emptyMap) - - // 2. target mapping - val targetTableName = "__target" - val targetSql = s"SELECT ${selClause} FROM ${targetName}" - val targetStep = SparkSqlStep(targetTableName, targetSql, emptyMap) - - // 3. joined - val joinedTableName = "__joined" - val joinedSelClause = selAliases.map { alias => - s"`${sourceTableName}`.`${alias}` AS `${alias}`" - }.mkString(", ") - val onClause = aliases.map { alias => - s"coalesce(`${sourceTableName}`.`${alias}`, '') = coalesce(`${targetTableName}`.`${alias}`, '')" - }.mkString(" AND ") - val joinedSql = { - s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN `${sourceTableName}` ON ${onClause}" - } - val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap) - - // 4. group - val groupTableName = "__group" - val groupSelClause = selAliases.map { alias => - s"`${alias}`" - }.mkString(", ") - val dupColName = details.getStringOrKey(_dup) - val groupSql = { - s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM `${joinedTableName}` GROUP BY ${groupSelClause}" - } - val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap, true) - - // 5. total metric - val totalTableName = "__totalMetric" - val totalColName = details.getStringOrKey(_total) - val totalSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" - case StreamingProcessType => { - s""" - |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` - |FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}` - """.stripMargin - } - } - val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap) - val totalMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val totalMetricExport = genMetricExport(totalMetricParam, totalColName, totalTableName, ct, mode) - - // 6. unique record - val uniqueRecordTableName = "__uniqueRecord" - val uniqueRecordSql = { - s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0" - } - val uniqueRecordStep = SparkSqlStep(uniqueRecordTableName, uniqueRecordSql, emptyMap) - - // 7. unique metric - val uniqueTableName = "__uniqueMetric" - val uniqueColName = details.getStringOrKey(_unique) - val uniqueSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`" - case StreamingProcessType => { - s""" - |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${uniqueColName}` - |FROM `${uniqueRecordTableName}` GROUP BY `${InternalColumns.tmst}` - """.stripMargin - } - } - val uniqueStep = SparkSqlStep(uniqueTableName, uniqueSql, emptyMap) - val uniqueMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc) - val uniqueMetricExport = genMetricExport(uniqueMetricParam, uniqueColName, uniqueTableName, ct, mode) - - val uniqueSteps = sourceStep :: targetStep :: joinedStep :: groupStep :: - totalStep :: uniqueRecordStep :: uniqueStep :: Nil - val uniqueExports = totalMetricExport :: uniqueMetricExport :: Nil - val uniqueRulePlan = RulePlan(uniqueSteps, uniqueExports) - - val duplicationArrayName = details.getString(_duplicationArray, "") - val dupRulePlan = if (duplicationArrayName.nonEmpty) { - // 8. duplicate record - val dupRecordTableName = "__dupRecords" - val dupRecordSql = { - s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0" - } - val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) - val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, dupRecordTableName, ct, mode) - - // 9. duplicate metric - val dupMetricTableName = "__dupMetric" - val numColName = details.getStringOrKey(_num) - val dupMetricSelClause = procType match { - case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`" - case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`" - } - val dupMetricGroupbyClause = procType match { - case BatchProcessType => s"`${dupColName}`" - case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`" - } - val dupMetricSql = { - s""" - |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}` - |GROUP BY ${dupMetricGroupbyClause} - """.stripMargin - } - val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) - val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, ct, mode) - - RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: dupMetricExport :: Nil) - } else emptyRulePlan - - uniqueRulePlan.merge(dupRulePlan) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala deleted file mode 100644 index cb00641..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.udf - -import org.apache.spark.sql.SQLContext - -object GriffinUdafs { - - def register(sqlContext: SQLContext): Unit = { -// sqlContext.udf.register("my_mean", new MeanUdaf) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala deleted file mode 100644 index 1d9eb8b..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.udf - -import org.apache.spark.sql.SQLContext - -object GriffinUdfs { - - def register(sqlContext: SQLContext): Unit = { - sqlContext.udf.register("index_of", indexOf _) - sqlContext.udf.register("matches", matches _) - sqlContext.udf.register("reg_replace", regReplace _) - } - - private def indexOf(arr: Seq[String], v: String) = { - arr.indexOf(v) - } - - private def matches(s: String, regex: String) = { - s.matches(regex) - } - - private def regReplace(s: String, regex: String, replacement: String) = { - s.replaceAll(regex, replacement) - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala deleted file mode 100644 index 80b3a02..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.udf - -import org.apache.spark.sql.Row -import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} -import org.apache.spark.sql.types._ - -class MeanUdaf extends UserDefinedAggregateFunction { - def inputSchema: StructType = StructType(Array(StructField("item", LongType))) - - def bufferSchema = StructType(Array( - StructField("sum", DoubleType), - StructField("cnt", LongType) - )) - - def dataType: DataType = DoubleType - - def deterministic: Boolean = true - - def initialize(buffer: MutableAggregationBuffer): Unit = { - buffer.update(0, 0.toDouble) - buffer.update(1, 0L) - } - - def update(buffer: MutableAggregationBuffer, input: Row): Unit = { - val sum = buffer.getDouble(0) - val cnt = buffer.getLong(1) - val value = input.getLong(0) - buffer.update(0, sum + value) - buffer.update(1, cnt + 1) - } - - def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { - buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0)) - buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1)) - } - - def evaluate(buffer: Row): Any = { - buffer.getDouble(0) / buffer.getLong(1).toDouble - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala new file mode 100644 index 0000000..8a34118 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/DQStep.scala @@ -0,0 +1,32 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.context.DQContext + +trait DQStep extends Loggable { + + val name: String + + def execute(context: DQContext): Boolean + + def getNames(): Seq[String] = name :: Nil + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala new file mode 100644 index 0000000..b827604 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/SeqDQStep.scala @@ -0,0 +1,44 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step + +import org.apache.griffin.measure.context.DQContext + +/** + * sequence of dq steps + */ +case class SeqDQStep(dqSteps: Seq[DQStep]) extends DQStep { + + val name: String = "" + val rule: String = "" + val details: Map[String, Any] = Map() + + def execute(context: DQContext): Boolean = { + dqSteps.foldLeft(true) { (ret, dqStep) => + ret && dqStep.execute(context) + } + } + + override def getNames(): Seq[String] = { + dqSteps.foldLeft(Nil: Seq[String]) { (ret, dqStep) => + ret ++ dqStep.getNames + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/BatchDataSourceStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/BatchDataSourceStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/BatchDataSourceStepBuilder.scala new file mode 100644 index 0000000..2d739c4 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/BatchDataSourceStepBuilder.scala @@ -0,0 +1,31 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder + +import org.apache.griffin.measure.configuration.params._ +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.read.ReadStep + +case class BatchDataSourceStepBuilder() extends DataSourceParamStepBuilder { + + def buildReadSteps(context: DQContext, dcParam: DataConnectorParam): Option[ReadStep] = { + None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/ConstantColumns.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/ConstantColumns.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/ConstantColumns.scala new file mode 100644 index 0000000..e8bec89 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/ConstantColumns.scala @@ -0,0 +1,38 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder + +/** + * for griffin dsl rules, the constant columns might be used during calculation, + */ +object ConstantColumns { + val tmst = "__tmst" + val metric = "__metric" + val record = "__record" + val empty = "__empty" + + val beginTs = "__begin_ts" + val endTs = "__end_ts" + + val distinct = "__distinct" + + val rowNumber = "__rn" + + val columns = List[String](tmst, metric, record, empty, beginTs, endTs, distinct, rowNumber) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala new file mode 100644 index 0000000..f5b69ad --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala @@ -0,0 +1,81 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder + +import org.apache.commons.lang.StringUtils +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.configuration.enums._ +import org.apache.griffin.measure.configuration.params.{DataSourceParam, Param, RuleParam} +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step._ + +/** + * build dq step by param + */ +trait DQStepBuilder extends Loggable with Serializable { + + type ParamType <: Param + + def buildDQStep(context: DQContext, param: ParamType): Option[DQStep] + + protected def getStepName(name: String): String = { + if (StringUtils.isNotBlank(name)) name + else DQStepNameGenerator.genName + } + +} + +object DQStepBuilder { + + def buildStepOptByDataSourceParam(context: DQContext, dsParam: DataSourceParam + ): Option[DQStep] = { + getDataSourceParamStepBuilder(context.procType) + .flatMap(_.buildDQStep(context, dsParam)) + } + + private def getDataSourceParamStepBuilder(procType: ProcessType): Option[DataSourceParamStepBuilder] = { + procType match { + case BatchProcessType => Some(BatchDataSourceStepBuilder()) + case StreamingProcessType => Some(StreamingDataSourceStepBuilder()) + case _ => None + } + } + + def buildStepOptByRuleParam(context: DQContext, ruleParam: RuleParam, defaultDslType: DslType + ): Option[DQStep] = { + val dslType = ruleParam.getDslType(defaultDslType) + val dsNames = context.dataSourceNames + val funcNames = context.functionNames + val dqStepOpt = getRuleParamStepBuilder(dslType, dsNames, funcNames) + .flatMap(_.buildDQStep(context, ruleParam)) + dqStepOpt.toSeq.flatMap(_.getNames).foreach(name => context.compileTableRegister.registerTable(name)) + dqStepOpt + } + + private def getRuleParamStepBuilder(dslType: DslType, dsNames: Seq[String], funcNames: Seq[String] + ): Option[RuleParamStepBuilder] = { + dslType match { + case SparkSqlType => Some(SparkSqlDQStepBuilder()) + case DataFrameOpsType => Some(DataFrameOpsDQStepBuilder()) + case GriffinDslType => Some(GriffinDslDQStepBuilder(dsNames, funcNames)) + case _ => None + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepNameGenerator.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepNameGenerator.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepNameGenerator.scala new file mode 100644 index 0000000..9af568a --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepNameGenerator.scala @@ -0,0 +1,34 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder + +import java.util.concurrent.atomic.AtomicLong + +object DQStepNameGenerator { + private val counter: AtomicLong = new AtomicLong(0L) + private val head: String = "step" + + def genName: String = { + s"${head}${increment}" + } + + private def increment: Long = { + counter.incrementAndGet() + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala new file mode 100644 index 0000000..c3854f9 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala @@ -0,0 +1,35 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder + +import org.apache.griffin.measure.configuration.params.RuleParam +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.DQStep +import org.apache.griffin.measure.step.transform.DataFrameOpsTransformStep + +case class DataFrameOpsDQStepBuilder() extends RuleParamStepBuilder { + + def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = { + val name = getStepName(ruleParam.getName) + val transformStep = DataFrameOpsTransformStep( + name, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache) + transformStep +: buildDirectWriteSteps(ruleParam) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala new file mode 100644 index 0000000..b941211 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala @@ -0,0 +1,44 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder + +import org.apache.griffin.measure.configuration.params.{DataConnectorParam, DataSourceParam} +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.DQStep +import org.apache.griffin.measure.step.read._ + +/** + * build dq step by data source param + */ +trait DataSourceParamStepBuilder extends DQStepBuilder { + + type ParamType = DataSourceParam + + def buildDQStep(context: DQContext, param: ParamType): Option[DQStep] = { + val name = getStepName(param.name) + val steps = param.getConnectors.flatMap { dc => + buildReadSteps(context, dc) + } + if (steps.nonEmpty) Some(UnionReadStep(name, steps)) + else None + } + + protected def buildReadSteps(context: DQContext, dcParam: DataConnectorParam): Option[ReadStep] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala new file mode 100644 index 0000000..7ce78d1 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala @@ -0,0 +1,60 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder + +import org.apache.griffin.measure.configuration.params.RuleParam +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.DQStep +import org.apache.griffin.measure.step.builder.dsl.parser.GriffinDslParser +import org.apache.griffin.measure.step.builder.dsl.transform.Expr2DQSteps + +import scala.util.{Failure, Success} + +case class GriffinDslDQStepBuilder(dataSourceNames: Seq[String], + functionNames: Seq[String] + ) extends RuleParamStepBuilder { + + val filteredFunctionNames = functionNames.filter { fn => + fn.matches("""^[a-zA-Z_]\w*$""") + } + val parser = GriffinDslParser(dataSourceNames, filteredFunctionNames) + + def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = { + val name = getStepName(ruleParam.getName) + val rule = ruleParam.getRule + val dqType = ruleParam.getDqType + try { + val result = parser.parseRule(rule, dqType) + if (result.successful) { + val expr = result.get + val expr2DQSteps = Expr2DQSteps(context, expr, ruleParam.replaceName(name)) + expr2DQSteps.getDQSteps() + } else { + warn(s"parse rule [ ${rule} ] fails: \n${result}") + Nil + } + } catch { + case e: Throwable => { + error(s"generate rule plan ${name} fails: ${e.getMessage}") + Nil + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala new file mode 100644 index 0000000..9a2c7b5 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala @@ -0,0 +1,61 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder + +import org.apache.griffin.measure.configuration.enums.NormalizeType +import org.apache.griffin.measure.configuration.params.RuleParam +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, MetricWriteStep, RecordWriteStep} +import org.apache.griffin.measure.step.{DQStep, SeqDQStep} + +/** + * build dq step by rule param + */ +trait RuleParamStepBuilder extends DQStepBuilder { + + type ParamType = RuleParam + + def buildDQStep(context: DQContext, param: ParamType): Option[DQStep] = { + val steps = buildSteps(context, param) + if (steps.size > 1) Some(SeqDQStep(steps)) + else if (steps.size == 1) steps.headOption + else None + } + + def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] + + protected def buildDirectWriteSteps(ruleParam: RuleParam): Seq[DQStep] = { + val name = getStepName(ruleParam.getName) + // metric writer + val metricSteps = ruleParam.metricOpt.map { metric => + MetricWriteStep(metric.name, name, NormalizeType(metric.collectType)) + }.toSeq + // record writer + val recordSteps = ruleParam.recordOpt.map { record => + RecordWriteStep(record.name, name) + }.toSeq + // update writer + val dsCacheUpdateSteps = ruleParam.dsCacheUpdateOpt.map { dsCacheUpdate => + DsCacheUpdateWriteStep(dsCacheUpdate.dsName, name) + }.toSeq + + metricSteps ++ recordSteps ++ dsCacheUpdateSteps + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala new file mode 100644 index 0000000..36237d7 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala @@ -0,0 +1,35 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder + +import org.apache.griffin.measure.configuration.params.RuleParam +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.DQStep +import org.apache.griffin.measure.step.transform.SparkSqlTransformStep + +case class SparkSqlDQStepBuilder() extends RuleParamStepBuilder { + + def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = { + val name = getStepName(ruleParam.getName) + val transformStep = SparkSqlTransformStep( + name, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache) + transformStep +: buildDirectWriteSteps(ruleParam) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/StreamingDataSourceStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/StreamingDataSourceStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/StreamingDataSourceStepBuilder.scala new file mode 100644 index 0000000..0139d19 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/StreamingDataSourceStepBuilder.scala @@ -0,0 +1,31 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder + +import org.apache.griffin.measure.configuration.params._ +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.read.ReadStep + +case class StreamingDataSourceStepBuilder() extends DataSourceParamStepBuilder { + + def buildReadSteps(context: DQContext, dcParam: DataConnectorParam): Option[ReadStep] = { + None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/AliasableExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/AliasableExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/AliasableExpr.scala new file mode 100644 index 0000000..412aa3c --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/AliasableExpr.scala @@ -0,0 +1,25 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.expr + +trait AliasableExpr extends Expr { + + def alias: Option[String] + +}
