http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala index 1e3ecb1..6ba0cf8 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala @@ -18,339 +18,1198 @@ under the License. */ package org.apache.griffin.measure.rule.adaptor -import org.apache.griffin.measure.data.connector.GroupByColumn +import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache} +import org.apache.griffin.measure.process.engine.DataFrameOprs.AccuracyOprKeys +import org.apache.griffin.measure.process.temp.TableRegisters +import org.apache.griffin.measure.process._ import org.apache.griffin.measure.rule.dsl._ import org.apache.griffin.measure.rule.dsl.analyzer._ import org.apache.griffin.measure.rule.dsl.expr._ import org.apache.griffin.measure.rule.dsl.parser.GriffinDslParser -import org.apache.griffin.measure.rule.step._ +import org.apache.griffin.measure.rule.plan.{TimeInfo, _} import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.griffin.measure.utils.TimeUtil -case class GriffinDslAdaptor(dataSourceNames: Seq[String], - functionNames: Seq[String], - adaptPhase: AdaptPhase - ) extends RuleAdaptor { +object AccuracyKeys { + val _source = "source" + val _target = "target" + val _miss = "miss" + val _total = "total" + val _matched = "matched" +// val _missRecords = "missRecords" +} - object StepInfo { - val _Name = "name" - val _PersistType = "persist.type" - val _UpdateDataSource = "update.data.source" - def getNameOpt(param: Map[String, Any]): Option[String] = param.get(_Name).map(_.toString) - def getPersistType(param: Map[String, Any]): PersistType = PersistType(param.getString(_PersistType, "")) - def getUpdateDataSourceOpt(param: Map[String, Any]): Option[String] = param.get(_UpdateDataSource).map(_.toString) - } - object AccuracyInfo { - val _Source = "source" - val _Target = "target" - val _MissRecords = "miss.records" - val _Accuracy = "accuracy" - val _Miss = "miss" - val _Total = "total" - val _Matched = "matched" - } - object ProfilingInfo { - val _Source = "source" - val _Profiling = "profiling" - } +object ProfilingKeys { + val _source = "source" +} - def getNameOpt(param: Map[String, Any], key: String): Option[String] = param.get(key).map(_.toString) - def resultName(param: Map[String, Any], key: String): String = { - val nameOpt = param.get(key) match { - case Some(prm: Map[String, Any]) => StepInfo.getNameOpt(prm) - case _ => None - } - nameOpt.getOrElse(key) - } - def resultPersistType(param: Map[String, Any], key: String, defPersistType: PersistType): PersistType = { - param.get(key) match { - case Some(prm: Map[String, Any]) => StepInfo.getPersistType(prm) - case _ => defPersistType - } - } - def resultUpdateDataSourceOpt(param: Map[String, Any], key: String): Option[String] = { - param.get(key) match { - case Some(prm: Map[String, Any]) => StepInfo.getUpdateDataSourceOpt(prm) - case _ => None - } - } +object DuplicateKeys { + val _source = "source" + val _target = "target" + val _dup = "dup" + val _num = "num" +} + +object TimelinessKeys { + val _source = "source" + val _latency = "latency" + val _threshold = "threshold" +} + +object GlobalKeys { + val _initRule = "init.rule" +// val _globalMetricKeep = "global.metric.keep" +} - val _dqType = "dq.type" +case class GriffinDslAdaptor(dataSourceNames: Seq[String], + functionNames: Seq[String] + ) extends RuleAdaptor { - protected def getDqType(param: Map[String, Any]) = DqType(param.getString(_dqType, "")) + import RuleParamKeys._ val filteredFunctionNames = functionNames.filter { fn => fn.matches("""^[a-zA-Z_]\w*$""") } val parser = GriffinDslParser(dataSourceNames, filteredFunctionNames) - def genRuleStep(param: Map[String, Any]): Seq[RuleStep] = { - GriffinDslStep(getName(param), getRule(param), getDqType(param), getDetails(param)) :: Nil - } + private val emptyRulePlan = RulePlan(Nil, Nil) + private val emptyMap = Map[String, Any]() - def getTempSourceNames(param: Map[String, Any]): Seq[String] = { + override def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], processType: ProcessType + ): RulePlan = { + val name = getRuleName(param) + val rule = getRule(param) val dqType = getDqType(param) - param.get(_name) match { - case Some(name) => { + try { + val result = parser.parseRule(rule, dqType) + if (result.successful) { + val expr = result.get dqType match { - case AccuracyType => { - Seq[String]( - resultName(param, AccuracyInfo._MissRecords), - resultName(param, AccuracyInfo._Accuracy) - ) - } - case ProfilingType => { - Seq[String]( - resultName(param, ProfilingInfo._Profiling) - ) - } - case TimelinessType => { - Nil - } - case _ => Nil + case AccuracyType => accuracyRulePlan(timeInfo, name, expr, param, processType) + case ProfilingType => profilingRulePlan(timeInfo, name, expr, param, processType) + case DuplicateType => duplicateRulePlan(timeInfo, name, expr, param, processType) + case TimelinessType => timelinessRulePlan(timeInfo, name, expr, param, processType) + case _ => emptyRulePlan } + } else { + warn(s"parse rule [ ${rule} ] fails: \n${result}") + emptyRulePlan + } + } catch { + case e: Throwable => { + error(s"generate rule plan ${name} fails: ${e.getMessage}") + emptyRulePlan } - case _ => Nil } } - def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = { - ruleStep match { - case rs @ GriffinDslStep(_, rule, dqType, _) => { - val exprOpt = try { - val result = parser.parseRule(rule, dqType) - if (result.successful) Some(result.get) - else { - println(result) - warn(s"adapt concrete rule step warn: parse rule [ ${rule} ] fails") - None - } - } catch { - case e: Throwable => { - error(s"adapt concrete rule step error: ${e.getMessage}") - None - } - } + // with accuracy opr + private def accuracyRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, + param: Map[String, Any], processType: ProcessType + ): RulePlan = { + val details = getDetails(param) + val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head) + val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head) + val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) - exprOpt match { - case Some(expr) => { - try { - transConcreteRuleSteps(rs, expr) - } catch { - case e: Throwable => { - error(s"trans concrete rule step error: ${e.getMessage}") - Nil - } - } - } - case _ => Nil + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { + println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists") + emptyRulePlan + } else { + // 1. miss record + val missRecordsTableName = "__missRecords" + val selClause = s"`${sourceName}`.*" + val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) { + println(s"[${timeInfo.calcTime}] data source ${targetName} not exists") + s"SELECT ${selClause} FROM `${sourceName}`" + } else { + val onClause = expr.coalesceDesc + val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => + s"${sel.desc} IS NULL" + }.mkString(" AND ") + val targetIsNull = analyzer.targetSelectionExprs.map { sel => + s"${sel.desc} IS NULL" + }.mkString(" AND ") + val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" + s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" + } + val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, emptyMap, true) + val missRecordsExports = processType match { + case BatchProcessType => { + val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + genRecordExport(recordParam, missRecordsTableName, missRecordsTableName) :: Nil } + case StreamingProcessType => Nil } - case _ => Nil - } - } - private def transConcreteRuleSteps(ruleStep: GriffinDslStep, expr: Expr - ): Seq[ConcreteRuleStep] = { - val details = ruleStep.details - ruleStep.dqType match { - case AccuracyType => { - val sourceName = getNameOpt(details, AccuracyInfo._Source) match { - case Some(name) => name - case _ => dataSourceNames.head + // 2. miss count + val missCountTableName = "__missCount" + val missColName = details.getStringOrKey(AccuracyKeys._miss) + val missCountSql = processType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`" + case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${InternalColumns.tmst}`" + } + val missCountStep = SparkSqlStep(missCountTableName, missCountSql, emptyMap) + + // 3. total count + val totalCountTableName = "__totalCount" + val totalColName = details.getStringOrKey(AccuracyKeys._total) + val totalCountSql = processType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" + case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`" + } + val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, emptyMap) + + // 4. accuracy metric + val accuracyTableName = name + val matchedColName = details.getStringOrKey(AccuracyKeys._matched) + val accuracyMetricSql = processType match { + case BatchProcessType => { + s""" + |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, + |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, + |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` + |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}` + """.stripMargin } - val targetName = getNameOpt(details, AccuracyInfo._Target) match { - case Some(name) => name - case _ => dataSourceNames.tail.head + case StreamingProcessType => { + s""" + |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, + |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, + |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, + |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` + |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}` + |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}` + """.stripMargin } - val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) - - - if (!checkDataSourceExists(sourceName)) { - Nil - } else { - // 1. miss record - val missRecordsSql = if (!checkDataSourceExists(targetName)) { - val selClause = s"`${sourceName}`.*" - s"SELECT ${selClause} FROM `${sourceName}`" - } else { - val selClause = s"`${sourceName}`.*" - val onClause = expr.coalesceDesc - val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => - s"${sel.desc} IS NULL" - }.mkString(" AND ") - val targetIsNull = analyzer.targetSelectionExprs.map { sel => - s"${sel.desc} IS NULL" - }.mkString(" AND ") - val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" - s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" - } - val missRecordsName = resultName(details, AccuracyInfo._MissRecords) - val missRecordsStep = SparkSqlStep( - missRecordsName, - missRecordsSql, - Map[String, Any](), - resultPersistType(details, AccuracyInfo._MissRecords, RecordPersistType), - resultUpdateDataSourceOpt(details, AccuracyInfo._MissRecords) - ) + } + val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, emptyMap) + val accuracyExports = processType match { + case BatchProcessType => { + val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) + genMetricExport(metricParam, accuracyTableName, accuracyTableName) :: Nil + } + case StreamingProcessType => Nil + } - // 2. miss count - val missTableName = "_miss_" - val missColName = getNameOpt(details, AccuracyInfo._Miss).getOrElse(AccuracyInfo._Miss) - val missSql = { - s"SELECT `${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsName}` GROUP BY `${GroupByColumn.tmst}`" - } - val missStep = SparkSqlStep( - missTableName, - missSql, - Map[String, Any](), - NonePersistType, - None - ) + // current accu plan + val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: accuracyStep :: Nil + val accuExports = missRecordsExports ++ accuracyExports + val accuPlan = RulePlan(accuSteps, accuExports) - // 3. total count - val totalTableName = "_total_" - val totalColName = getNameOpt(details, AccuracyInfo._Total).getOrElse(AccuracyInfo._Total) - val totalSql = { - s"SELECT `${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${GroupByColumn.tmst}`" - } - val totalStep = SparkSqlStep( - totalTableName, - totalSql, - Map[String, Any](), - NonePersistType, - None + // streaming extra accu plan + val streamingAccuPlan = processType match { + case BatchProcessType => emptyRulePlan + case StreamingProcessType => { + // 5. accuracy metric merge + val accuracyMetricTableName = "__accuracy" + val accuracyMetricRule = "accuracy" + val accuracyMetricDetails = Map[String, Any]( + (AccuracyOprKeys._dfName -> accuracyTableName), + (AccuracyOprKeys._miss -> missColName), + (AccuracyOprKeys._total -> totalColName), + (AccuracyOprKeys._matched -> matchedColName) ) + val accuracyMetricStep = DfOprStep(accuracyMetricTableName, + accuracyMetricRule, accuracyMetricDetails) + val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) + val accuracyMetricExports = genMetricExport(metricParam, name, accuracyMetricTableName) :: Nil - // 4. accuracy metric - val matchedColName = getNameOpt(details, AccuracyInfo._Matched).getOrElse(AccuracyInfo._Matched) - val accuracyMetricSql = { + // 6. collect accuracy records + val accuracyRecordTableName = "__accuracyRecords" + val accuracyRecordSql = { s""" - |SELECT `${totalTableName}`.`${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`, - |`${missTableName}`.`${missColName}` AS `${missColName}`, - |`${totalTableName}`.`${totalColName}` AS `${totalColName}` - |FROM `${totalTableName}` FULL JOIN `${missTableName}` - |ON `${totalTableName}`.`${GroupByColumn.tmst}` = `${missTableName}`.`${GroupByColumn.tmst}` - """.stripMargin + |SELECT `${InternalColumns.tmst}`, `${InternalColumns.empty}` + |FROM `${accuracyMetricTableName}` WHERE `${InternalColumns.record}` + """.stripMargin } - val accuracyMetricName = resultName(details, AccuracyInfo._Accuracy) - val accuracyMetricStep = SparkSqlStep( - accuracyMetricName, - accuracyMetricSql, - details, - // resultPersistType(details, AccuracyInfo._Accuracy, MetricPersistType) - NonePersistType, - None - ) + val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, accuracyRecordSql, emptyMap) + val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + val accuracyRecordParam = recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName) + .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName) + val accuracyRecordExports = genRecordExport( + accuracyRecordParam, missRecordsTableName, accuracyRecordTableName) :: Nil - // 5. accuracy metric filter - val accuracyStep = DfOprStep( - accuracyMetricName, - "accuracy", - Map[String, Any]( - ("df.name" -> accuracyMetricName), - ("miss" -> missColName), - ("total" -> totalColName), - ("matched" -> matchedColName) - ), - resultPersistType(details, AccuracyInfo._Accuracy, MetricPersistType), - None - ) + // gen accu plan + val extraSteps = accuracyMetricStep :: accuracyRecordStep :: Nil + val extraExports = accuracyMetricExports ++ accuracyRecordExports + val extraPlan = RulePlan(extraSteps, extraExports) - missRecordsStep :: missStep :: totalStep :: accuracyMetricStep :: accuracyStep :: Nil + extraPlan } } - case ProfilingType => { - val profilingClause = expr.asInstanceOf[ProfilingClause] - val sourceName = profilingClause.fromClauseOpt match { - case Some(fc) => fc.dataSource - case _ => { - getNameOpt(details, ProfilingInfo._Source) match { - case Some(name) => name - case _ => dataSourceNames.head - } - } - } - val analyzer = ProfilingAnalyzer(profilingClause, sourceName) -// analyzer.selectionExprs.foreach(println) + // return accu plan + accuPlan.merge(streamingAccuPlan) - val selExprDescs = analyzer.selectionExprs.map { sel => - val alias = sel match { - case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`" - case _ => "" - } - s"${sel.desc}${alias}" - } + } + } + +// private def accuracyRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, +// param: Map[String, Any], processType: ProcessType +// ): RulePlan = { +// val details = getDetails(param) +// val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head) +// val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head) +// val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) +// +// if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { +// emptyRulePlan +// } else { +// // 1. miss record +// val missRecordsTableName = "__missRecords" +// val selClause = s"`${sourceName}`.*" +// val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) { +// s"SELECT ${selClause} FROM `${sourceName}`" +// } else { +// val onClause = expr.coalesceDesc +// val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => +// s"${sel.desc} IS NULL" +// }.mkString(" AND ") +// val targetIsNull = analyzer.targetSelectionExprs.map { sel => +// s"${sel.desc} IS NULL" +// }.mkString(" AND ") +// val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" +// s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" +// } +// val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, emptyMap, true) +// val missRecordsExports = processType match { +// case BatchProcessType => { +// val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) +// genRecordExport(recordParam, missRecordsTableName, missRecordsTableName) :: Nil +// } +// case StreamingProcessType => Nil +// } +// +// // 2. miss count +// val missCountTableName = "__missCount" +// val missColName = details.getStringOrKey(AccuracyKeys._miss) +// val missCountSql = processType match { +// case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`" +// case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${InternalColumns.tmst}`" +// } +// val missCountStep = SparkSqlStep(missCountTableName, missCountSql, emptyMap) +// +// // 3. total count +// val totalCountTableName = "__totalCount" +// val totalColName = details.getStringOrKey(AccuracyKeys._total) +// val totalCountSql = processType match { +// case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" +// case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`" +// } +// val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, emptyMap) +// +// // 4. accuracy metric +// val accuracyTableName = name +// val matchedColName = details.getStringOrKey(AccuracyKeys._matched) +// val accuracyMetricSql = processType match { +// case BatchProcessType => { +// s""" +// |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, +// |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, +// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` +// |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}` +// """.stripMargin +// } +// case StreamingProcessType => { +// s""" +// |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, +// |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, +// |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, +// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` +// |FROM `${totalCountTableName}` FULL JOIN `${missCountTableName}` +// |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}` +// """.stripMargin +// } +// } +// val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, emptyMap, true) +// val accuracyExports = processType match { +// case BatchProcessType => { +// val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) +// genMetricExport(metricParam, accuracyTableName, accuracyTableName) :: Nil +// } +// case StreamingProcessType => Nil +// } +// +// // current accu plan +// val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: accuracyStep :: Nil +// val accuExports = missRecordsExports ++ accuracyExports +// val accuPlan = RulePlan(accuSteps, accuExports) +// +// // streaming extra accu plan +// val streamingAccuPlan = processType match { +// case BatchProcessType => emptyRulePlan +// case StreamingProcessType => { +// // 5. global accuracy metric merge +// val globalAccuracyTableName = "__globalAccuracy" +// val globalAccuracySql = { +// s""" +// |SELECT coalesce(`${globalAccuracyTableName}`.`${InternalColumns.tmst}`, `${accuracyTableName}`.`${InternalColumns.tmst}`) AS `${InternalColumns.tmst}`, +// |coalesce(`${accuracyTableName}`.`${missColName}`, `${globalAccuracyTableName}`.`${missColName}`) AS `${missColName}`, +// |coalesce(`${globalAccuracyTableName}`.`${totalColName}`, `${accuracyTableName}`.`${totalColName}`) AS `${totalColName}`, +// |((`${accuracyTableName}`.`${missColName}` IS NOT NULL) AND ((`${globalAccuracyTableName}`.`${missColName}` IS NULL) OR (`${accuracyTableName}`.`${missColName}` < `${globalAccuracyTableName}`.`${missColName}`))) AS `${InternalColumns.metric}` +// |FROM `${globalAccuracyTableName}` FULL JOIN `${accuracyTableName}` +// |ON `${globalAccuracyTableName}`.`${InternalColumns.tmst}` = `${accuracyTableName}`.`${InternalColumns.tmst}` +// """.stripMargin +// } +// val globalAccuracyInitSql = { +// s""" +// |SELECT `${InternalColumns.tmst}`, `${totalColName}`, `${missColName}`, +// |(true) AS `${InternalColumns.metric}` +// |FROM `${accuracyTableName}` +// """.stripMargin +// } +// val globalAccuracyDetails = Map[String, Any](GlobalKeys._initRule -> globalAccuracyInitSql) +// val globalAccuracyStep = SparkSqlStep(globalAccuracyTableName, +// globalAccuracySql, globalAccuracyDetails, true, true) +// +// // 6. collect accuracy metrics +// val accuracyMetricTableName = name +// val accuracyMetricSql = { +// s""" +// |SELECT `${InternalColumns.tmst}`, `${totalColName}`, `${missColName}`, +// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` +// |FROM `${globalAccuracyTableName}` WHERE `${InternalColumns.metric}` +// """.stripMargin +// } +// val accuracyMetricStep = SparkSqlStep(accuracyMetricTableName, accuracyMetricSql, emptyMap) +// val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) +// val accuracyMetricExports = genMetricExport(metricParam, accuracyMetricTableName, accuracyMetricTableName) :: Nil +// +// // 7. collect accuracy records +// val accuracyRecordTableName = "__accuracyRecords" +// val accuracyRecordSql = { +// s""" +// |SELECT `${InternalColumns.tmst}` +// |FROM `${accuracyMetricTableName}` WHERE `${matchedColName}` > 0 +// """.stripMargin +// } +// val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, accuracyRecordSql, emptyMap) +// val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) +// val accuracyRecordParam = recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName) +// .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName) +// val accuracyRecordExports = genRecordExport( +// accuracyRecordParam, missRecordsTableName, accuracyRecordTableName) :: Nil +// +// // 8. update global accuracy metric +// val updateGlobalAccuracyTableName = globalAccuracyTableName +// val globalMetricKeepTime = details.getString(GlobalKeys._globalMetricKeep, "") +// val updateGlobalAccuracySql = TimeUtil.milliseconds(globalMetricKeepTime) match { +// case Some(kt) => { +// s""" +// |SELECT * FROM `${globalAccuracyTableName}` +// |WHERE (`${missColName}` > 0) AND (`${InternalColumns.tmst}` > ${timeInfo.calcTime - kt}) +// """.stripMargin +// } +// case _ => { +// s""" +// |SELECT * FROM `${globalAccuracyTableName}` +// |WHERE (`${missColName}` > 0) +// """.stripMargin +// } +// } +// val updateGlobalAccuracyStep = SparkSqlStep(updateGlobalAccuracyTableName, +// updateGlobalAccuracySql, emptyMap, true, true) +// +// // gen accu plan +// val extraSteps = globalAccuracyStep :: accuracyMetricStep :: accuracyRecordStep :: updateGlobalAccuracyStep :: Nil +// val extraExports = accuracyMetricExports ++ accuracyRecordExports +// val extraPlan = RulePlan(extraSteps, extraExports) +// +// extraPlan +// } +// } +// +// // return accu plan +// accuPlan.merge(streamingAccuPlan) +// +// } +// } + + private def profilingRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, + param: Map[String, Any], processType: ProcessType + ): RulePlan = { + val details = getDetails(param) + val profilingClause = expr.asInstanceOf[ProfilingClause] + val sourceName = profilingClause.fromClauseOpt match { + case Some(fc) => fc.dataSource + case _ => details.getString(ProfilingKeys._source, dataSourceNames.head) + } + val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc -// val selClause = (s"`${GroupByColumn.tmst}`" +: selExprDescs).mkString(", ") - val selClause = if (analyzer.containsAllSelectionExpr) { - selExprDescs.mkString(", ") - } else { - (s"`${GroupByColumn.tmst}`" +: selExprDescs).mkString(", ") + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { + emptyRulePlan + } else { + val analyzer = ProfilingAnalyzer(profilingClause, sourceName) + val selExprDescs = analyzer.selectionExprs.map { sel => + val alias = sel match { + case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`" + case _ => "" } + s"${sel.desc}${alias}" + } + val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString + val selClause = processType match { + case BatchProcessType => selExprDescs.mkString(", ") + case StreamingProcessType => (s"`${InternalColumns.tmst}`" +: selExprDescs).mkString(", ") + } + val groupByClauseOpt = analyzer.groupbyExprOpt + val groupbyClause = processType match { + case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("") + case StreamingProcessType => { + val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${InternalColumns.tmst}`") :: Nil, None) + val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt match { + case Some(gbc) => gbc + case _ => GroupbyClause(Nil, None) + }) + mergedGroubbyClause.desc + } + } + val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ") + val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ") - val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc - -// val tailClause = analyzer.tailsExprs.map(_.desc).mkString(" ") - val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${GroupByColumn.tmst}`") :: Nil, None) - val mergedGroubbyClause = tmstGroupbyClause.merge(analyzer.groupbyExprOpt match { - case Some(gbc) => gbc - case _ => GroupbyClause(Nil, None) - }) - val groupbyClause = mergedGroubbyClause.desc - val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ") - val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ") - - if (!checkDataSourceExists(sourceName)) { - Nil - } else { - // 1. select statement - val profilingSql = { -// s"SELECT `${GroupByColumn.tmst}`, ${selClause} FROM ${sourceName} ${tailClause} GROUP BY `${GroupByColumn.tmst}`" - s"SELECT ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" - } - val profilingMetricName = resultName(details, ProfilingInfo._Profiling) - val profilingStep = SparkSqlStep( - profilingMetricName, - profilingSql, - details, - resultPersistType(details, ProfilingInfo._Profiling, MetricPersistType), - None - ) + // 1. select statement + val profilingSql = { + s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" + } + val profilingName = name + val profilingStep = SparkSqlStep(profilingName, profilingSql, details) + val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) + val profilingExports = genMetricExport(metricParam, name, profilingName) :: Nil - // 2. clear processed data -// val clearDataSourceStep = DfOprStep( -// s"${sourceName}_clear", -// "clear", -// Map[String, Any]( -// ("df.name" -> sourceName) -// ), -// NonePersistType, -// Some(sourceName) -// ) -// -// profilingStep :: clearDataSourceStep :: Nil + RulePlan(profilingStep :: Nil, profilingExports) + } + } - profilingStep:: Nil - } + private def duplicateRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, + param: Map[String, Any], processType: ProcessType + ): RulePlan = { + val details = getDetails(param) + val sourceName = details.getString(DuplicateKeys._source, dataSourceNames.head) + val targetName = details.getString(DuplicateKeys._target, dataSourceNames.tail.head) + val analyzer = DuplicateAnalyzer(expr.asInstanceOf[DuplicateClause], sourceName, targetName) + + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { + println(s"[${timeInfo.calcTime}] data source ${sourceName} not exists") + emptyRulePlan + } else if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) { + println(s"[${timeInfo.calcTime}] data source ${targetName} not exists") + emptyRulePlan + } else { + val selItemsClause = analyzer.selectionPairs.map { pair => + val (expr, alias) = pair + s"${expr.desc} AS `${alias}`" + }.mkString(", ") + val aliases = analyzer.selectionPairs.map(_._2) + + val selClause = processType match { + case BatchProcessType => selItemsClause + case StreamingProcessType => s"`${InternalColumns.tmst}`, ${selItemsClause}" + } + val selAliases = processType match { + case BatchProcessType => aliases + case StreamingProcessType => InternalColumns.tmst +: aliases + } + + // 1. source mapping + val sourceTableName = "__source" + val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}" + val sourceStep = SparkSqlStep(sourceTableName, sourceSql, emptyMap) + + // 2. target mapping + val targetTableName = "__target" + val targetSql = s"SELECT ${selClause} FROM ${targetName}" + val targetStep = SparkSqlStep(targetTableName, targetSql, emptyMap) + + // 3. joined + val joinedTableName = "__joined" + val joinedSelClause = selAliases.map { alias => + s"`${sourceTableName}`.`${alias}` AS `${alias}`" + }.mkString(", ") + val onClause = aliases.map { alias => + s"`${sourceTableName}`.`${alias}` = `${targetTableName}`.`${alias}`" + }.mkString(" AND ") + val joinedSql = { + s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN `${sourceTableName}` ON ${onClause}" + } + val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap) + // 4. group + val groupTableName = "__group" + val groupSelClause = selAliases.map { alias => + s"`${alias}`" + }.mkString(", ") + val dupColName = details.getStringOrKey(DuplicateKeys._dup) + val groupSql = { + s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM `${joinedTableName}` GROUP BY ${groupSelClause}" } - case TimelinessType => { - Nil + val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap) + + // 5. duplicate record + val dupRecordTableName = "__dupRecords" + val dupRecordSql = { + s""" + |SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0 + """.stripMargin + } + val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) + val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + val dupRecordxports = genRecordExport(recordParam, dupRecordTableName, dupRecordTableName) :: Nil + + // 6. duplicate metric + val dupMetricTableName = name + val numColName = details.getStringOrKey(DuplicateKeys._num) + val dupMetricSelClause = processType match { + case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`" + case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`" + } + val dupMetricGroupbyClause = processType match { + case BatchProcessType => s"`${dupColName}`" + case StreamingProcessType => s"`${InternalColumns.tmst}`, `${dupColName}`" + } + val dupMetricSql = { + s""" + |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}` + |GROUP BY ${dupMetricGroupbyClause} + """.stripMargin } - case _ => Nil + val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) + val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) + .addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) + val dupMetricExports = genMetricExport(metricParam, name, dupMetricTableName) :: Nil + + val dupSteps = sourceStep :: targetStep :: joinedStep :: groupStep :: dupRecordStep :: dupMetricStep :: Nil + val dupExports = dupRecordxports ++ dupMetricExports + + RulePlan(dupSteps, dupExports) } } - private def checkDataSourceExists(name: String): Boolean = { - try { - RuleAdaptorGroup.dataChecker.existDataSourceName(name) - } catch { - case e: Throwable => { - error(s"check data source exists error: ${e.getMessage}") - false + private def timelinessRulePlan(timeInfo: TimeInfo, name: String, expr: Expr, + param: Map[String, Any], processType: ProcessType + ): RulePlan = { + val details = getDetails(param) + val timelinessClause = expr.asInstanceOf[TimelinessClause] + val sourceName = details.getString(TimelinessKeys._source, dataSourceNames.head) + + if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { + emptyRulePlan + } else { + val analyzer = TimelinessAnalyzer(timelinessClause, sourceName) + val btsSel = analyzer.btsExpr + val etsSelOpt = analyzer.etsExprOpt + + // 1. in time + val inTimeTableName = "__inTime" + val inTimeSql = etsSelOpt match { + case Some(etsSel) => { + s""" + |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`, + |(${etsSel}) AS `${InternalColumns.endTs}` + |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) IS NOT NULL + """.stripMargin + } + case _ => { + s""" + |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}` + |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL + """.stripMargin + } + } + val inTimeStep = SparkSqlStep(inTimeTableName, inTimeSql, emptyMap) + + // 2. latency + val latencyTableName = "__lat" + val latencyColName = details.getStringOrKey(TimelinessKeys._latency) + val etsColName = etsSelOpt match { + case Some(_) => InternalColumns.endTs + case _ => InternalColumns.tmst + } + val latencySql = { + s"SELECT *, (`${etsColName}` - `${InternalColumns.beginTs}`) AS `${latencyColName}` FROM `${inTimeTableName}`" + } + val latencyStep = SparkSqlStep(latencyTableName, latencySql, emptyMap, true) + + // 3. timeliness metric + val metricTableName = name + val metricSql = processType match { + case BatchProcessType => { + s""" + |SELECT CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`, + |MAX(`${latencyColName}`) AS `max`, + |MIN(`${latencyColName}`) AS `min` + |FROM `${latencyTableName}` + """.stripMargin + } + case StreamingProcessType => { + s""" + |SELECT `${InternalColumns.tmst}`, + |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`, + |MAX(`${latencyColName}`) AS `max`, + |MIN(`${latencyColName}`) AS `min` + |FROM `${latencyTableName}` + |GROUP BY `${InternalColumns.tmst}` + """.stripMargin + } + } + val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap) + val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) + val metricExports = genMetricExport(metricParam, name, metricTableName) :: Nil + + // current timeliness plan + val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil + val timeExports = metricExports + val timePlan = RulePlan(timeSteps, timeExports) + + // 4. timeliness record + val recordPlan = TimeUtil.milliseconds(details.getString(TimelinessKeys._threshold, "")) match { + case Some(tsh) => { + val recordTableName = "__lateRecords" + val recordSql = { + s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > ${tsh}" + } + val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap) + val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + val recordExports = genRecordExport(recordParam, recordTableName, recordTableName) :: Nil + RulePlan(recordStep :: Nil, recordExports) + } + case _ => emptyRulePlan } + + // return timeliness plan + timePlan.merge(recordPlan) } } + // override def genRuleInfos(param: Map[String, Any], timeInfo: TimeInfo): Seq[RuleInfo] = { +// val ruleInfo = RuleInfoGen(param) +// val dqType = RuleInfoGen.dqType(param) +// try { +// val result = parser.parseRule(ruleInfo.rule, dqType) +// if (result.successful) { +// val expr = result.get +// dqType match { +// case AccuracyType => accuracyRuleInfos(ruleInfo, expr, timeInfo) +// case ProfilingType => profilingRuleInfos(ruleInfo, expr, timeInfo) +// case TimelinessType => Nil +// case _ => Nil +// } +// } else { +// warn(s"parse rule [ ${ruleInfo.rule} ] fails: \n${result}") +// Nil +// } +// } catch { +// case e: Throwable => { +// error(s"generate rule info ${ruleInfo} fails: ${e.getMessage}") +// Nil +// } +// } +// } + + // group by version +// private def accuracyRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: TimeInfo): Seq[RuleInfo] = { +// val calcTime = timeInfo.calcTime +// val details = ruleInfo.details +// val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head) +// val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head) +// val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) +// +// if (!TempTables.existTable(timeInfo.key, sourceName)) { +// Nil +// } else { +// // 1. miss record +// val missRecordsSql = if (!TempTables.existTable(timeInfo.key, targetName)) { +// val selClause = s"`${sourceName}`.*" +// s"SELECT ${selClause} FROM `${sourceName}`" +// } else { +// val selClause = s"`${sourceName}`.*" +// val onClause = expr.coalesceDesc +// val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => +// s"${sel.desc} IS NULL" +// }.mkString(" AND ") +// val targetIsNull = analyzer.targetSelectionExprs.map { sel => +// s"${sel.desc} IS NULL" +// }.mkString(" AND ") +// val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" +// s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" +// } +// val missRecordsName = AccuracyKeys._missRecords +// // val tmstMissRecordsName = TempName.tmstName(missRecordsName, timeInfo) +// val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords) +// .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc) +// .addIfNotExist(RuleDetailKeys._persistName, missRecordsName) +// val missRecordsRuleInfo = RuleInfo(missRecordsName, None, SparkSqlType, +// missRecordsSql, missRecordsParams, true) +// // val missRecordsStep = SparkSqlStep( +// // timeInfo, +// // RuleInfo(missRecordsName, Some(tmstMissRecordsName), missRecordsSql, missRecordsParams) +// // ) +// +// // 2. miss count +// val missTableName = "_miss_" +// // val tmstMissTableName = TempName.tmstName(missTableName, timeInfo) +// val missColName = details.getStringOrKey(AccuracyKeys._miss) +// val missSql = { +// s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsName}` GROUP BY `${InternalColumns.tmst}`" +// } +// val missRuleInfo = RuleInfo(missTableName, None, SparkSqlType, +// missSql, Map[String, Any](), true) +// // val missStep = SparkSqlStep( +// // timeInfo, +// // RuleInfo(missTableName, None, missSql, Map[String, Any]()) +// // ) +// +// // 3. total count +// val totalTableName = "_total_" +// // val tmstTotalTableName = TempName.tmstName(totalTableName, timeInfo) +// val totalColName = details.getStringOrKey(AccuracyKeys._total) +// val totalSql = { +// s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`" +// } +// val totalRuleInfo = RuleInfo(totalTableName, None, SparkSqlType, +// totalSql, Map[String, Any](), true) +// // val totalStep = SparkSqlStep( +// // timeInfo, +// // RuleInfo(totalTableName, None, totalSql, Map[String, Any]()) +// // ) +// +// // 4. accuracy metric +// val accuracyMetricName = details.getString(RuleDetailKeys._persistName, ruleInfo.name) +// // val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, timeInfo) +// val matchedColName = details.getStringOrKey(AccuracyKeys._matched) +// val accuracyMetricSql = { +// s""" +// |SELECT `${totalTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, +// |`${missTableName}`.`${missColName}` AS `${missColName}`, +// |`${totalTableName}`.`${totalColName}` AS `${totalColName}`, +// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` +// |FROM `${totalTableName}` FULL JOIN `${missTableName}` +// |ON `${totalTableName}`.`${InternalColumns.tmst}` = `${missTableName}`.`${InternalColumns.tmst}` +// """.stripMargin +// } +// // val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) +//// val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, SparkSqlType, +//// accuracyMetricSql, Map[String, Any](), true) +// val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) +// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName) +// val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, SparkSqlType, +// accuracyMetricSql, Map[String, Any](), true) +// +// // 5. accuracy metric merge +// val globalMetricName = "accu_global" +// val globalAccuSql = if (TempTables.existGlobalTable(globalMetricName)) { +// s""" +// |SELECT coalesce(`${globalMetricName}`.`${InternalColumns.tmst}`, `${accuracyMetricName}`.`${InternalColumns.tmst}`) AS `${InternalColumns.tmst}`, +// |coalesce(`${accuracyMetricName}`.`${missColName}`, `${globalMetricName}`.`${missColName}`) AS `${missColName}`, +// |coalesce(`${globalMetricName}`.`${totalColName}`, `${accuracyMetricName}`.`${totalColName}`) AS `${totalColName}`, +// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`, +// |(`${totalColName}` = 0) AS `empty`, +// |(`${missColName}` = 0) AS `no_miss`, +// |(`${accuracyMetricName}`.`${missColName}` < `${globalMetricName}`.`${missColName}`) AS `update` +// |FROM `${globalMetricName}` FULL JOIN `${accuracyMetricName}` +// |ON `${globalMetricName}`.`${InternalColumns.tmst}` = `${accuracyMetricName}`.`${InternalColumns.tmst}` +// """.stripMargin +// } else { +// s""" +// |SELECT `${accuracyMetricName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, +// |`${accuracyMetricName}`.`${missColName}` AS `${missColName}`, +// |`${accuracyMetricName}`.`${totalColName}` AS `${totalColName}`, +// |(`${totalColName}` - `${missColName}`) AS `${matchedColName}`, +// |(`${totalColName}` = 0) AS `empty`, +// |(`${missColName}` = 0) AS `no_miss`, +// |true AS `update` +// |FROM `${accuracyMetricName}` +// """.stripMargin +// } +// val globalAccuParams = Map[String, Any]( +// ("global" -> true) +// ) +// val mergeRuleInfo = RuleInfo(globalMetricName, None, SparkSqlType, +// globalAccuSql, globalAccuParams, true) +// +// // 6. persist metrics +// val persistMetricName = "persist" +// val persistSql = { +// s""" +// |SELECT `${InternalColumns.tmst}`, `${missColName}`, `${totalColName}`, `${matchedColName}` +// |FROM `${globalMetricName}` +// |WHERE `update` +// """.stripMargin +// } +// val persistParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) +// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName) +// val persistRuleInfo = RuleInfo(persistMetricName, None, SparkSqlType, +// persistSql, persistParams, true) +// +// // 5. accuracy metric filter +//// val accuracyParams = details.addIfNotExist("df.name", accuracyMetricName) +//// .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) +//// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName) +//// val accuracyRuleInfo = RuleInfo(accuracyMetricName, None, DfOprType, +//// "accuracy", accuracyParams, true) +// +//// missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo :: +//// accuracyMetricRuleInfo :: accuracyRuleInfo :: Nil +// missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo :: +// accuracyMetricRuleInfo :: mergeRuleInfo :: persistRuleInfo :: Nil +// } +// } + +// private def accuracyRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: TimeInfo): Seq[RuleInfo] = { +// val calcTime = timeInfo.calcTime +// val details = ruleInfo.details +// val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head) +// val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head) +// val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) +// +// if (!TempTables.existTable(timeInfo.key, sourceName)) { +// Nil +// } else { +// // 1. miss record +// val missRecordsSql = if (!TempTables.existTable(timeInfo.key, targetName)) { +// val selClause = s"`${sourceName}`.*" +// s"SELECT ${selClause} FROM `${sourceName}`" +// } else { +// val selClause = s"`${sourceName}`.*" +// val onClause = expr.coalesceDesc +// val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => +// s"${sel.desc} IS NULL" +// }.mkString(" AND ") +// val targetIsNull = analyzer.targetSelectionExprs.map { sel => +// s"${sel.desc} IS NULL" +// }.mkString(" AND ") +// val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" +// s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" +// } +// val missRecordsName = AccuracyKeys._missRecords +//// val tmstMissRecordsName = TempName.tmstName(missRecordsName, timeInfo) +// val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords) +// .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc) +// .addIfNotExist(RuleDetailKeys._persistName, missRecordsName) +// val missRecordsRuleInfo = RuleInfo(missRecordsName, None, SparkSqlType, +// missRecordsSql, missRecordsParams, true) +//// val missRecordsStep = SparkSqlStep( +//// timeInfo, +//// RuleInfo(missRecordsName, Some(tmstMissRecordsName), missRecordsSql, missRecordsParams) +//// ) +// +// // 2. miss count +// val missTableName = "_miss_" +// // val tmstMissTableName = TempName.tmstName(missTableName, timeInfo) +// val missColName = details.getStringOrKey(AccuracyKeys._miss) +// val missSql = { +// s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsName}`" +// } +// val missRuleInfo = RuleInfo(missTableName, None, SparkSqlType, +// missSql, Map[String, Any](), false) +//// val missStep = SparkSqlStep( +//// timeInfo, +//// RuleInfo(missTableName, None, missSql, Map[String, Any]()) +//// ) +// +// // 3. total count +// val totalTableName = "_total_" +// // val tmstTotalTableName = TempName.tmstName(totalTableName, timeInfo) +// val totalColName = details.getStringOrKey(AccuracyKeys._total) +// val totalSql = { +// s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" +// } +// val totalRuleInfo = RuleInfo(totalTableName, None, SparkSqlType, +// totalSql, Map[String, Any](), false) +//// val totalStep = SparkSqlStep( +//// timeInfo, +//// RuleInfo(totalTableName, None, totalSql, Map[String, Any]()) +//// ) +// +// // 4. accuracy metric +// val accuracyMetricName = details.getString(RuleDetailKeys._persistName, ruleInfo.name) +//// val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, timeInfo) +// val matchedColName = details.getStringOrKey(AccuracyKeys._matched) +// val accuracyMetricSql = { +// s""" +// |SELECT `${missTableName}`.`${missColName}` AS `${missColName}`, +// |`${totalTableName}`.`${totalColName}` AS `${totalColName}` +// |FROM `${totalTableName}` FULL JOIN `${missTableName}` +// """.stripMargin +// } +// // val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) +// val accuracyMetricRuleInfo = RuleInfo(accuracyMetricName, None, SparkSqlType, +// accuracyMetricSql, Map[String, Any](), false) +//// val accuracyMetricStep = SparkSqlStep( +//// timeInfo, +//// RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), accuracyMetricSql, Map[String, Any]()) +//// ) +// +// // 5. accuracy metric filter +// val accuracyParams = details.addIfNotExist("df.name", accuracyMetricName) +// .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) +// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName) +// val accuracyRuleInfo = RuleInfo(accuracyMetricName, None, DfOprType, +// "accuracy", accuracyParams, false) +//// val accuracyStep = DfOprStep( +//// timeInfo, +//// RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), "accuracy", accuracyParams) +//// ) +// +// missRecordsRuleInfo :: missRuleInfo :: totalRuleInfo :: +// accuracyMetricRuleInfo :: accuracyRuleInfo :: Nil +// } +// } + +// private def profilingRuleInfos(ruleInfo: RuleInfo, expr: Expr, timeInfo: TimeInfo): Seq[RuleInfo] = { +// val details = ruleInfo.details +// val profilingClause = expr.asInstanceOf[ProfilingClause] +// val sourceName = profilingClause.fromClauseOpt match { +// case Some(fc) => fc.dataSource +// case _ => details.getString(ProfilingKeys._source, dataSourceNames.head) +// } +// val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc +// +// if (!TempTables.existTable(timeInfo.key, sourceName)) { +// Nil +// } else { +// val tmstAnalyzer = ProfilingAnalyzer(profilingClause, sourceName) +// +// val selExprDescs = tmstAnalyzer.selectionExprs.map { sel => +// val alias = sel match { +// case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`" +// case _ => "" +// } +// s"${sel.desc}${alias}" +// } +// val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString +// val selClause = selExprDescs.mkString(", ") +//// val tmstFromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc +// val groupByClauseOpt = tmstAnalyzer.groupbyExprOpt +// val groupbyClause = groupByClauseOpt.map(_.desc).getOrElse("") +// val preGroupbyClause = tmstAnalyzer.preGroupbyExprs.map(_.desc).mkString(" ") +// val postGroupbyClause = tmstAnalyzer.postGroupbyExprs.map(_.desc).mkString(" ") +// +// // 1. select statement +// val profilingSql = { +// s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" +// } +// // println(profilingSql) +// val metricName = details.getString(RuleDetailKeys._persistName, ruleInfo.name) +// // val tmstMetricName = TempName.tmstName(metricName, timeInfo) +// val profilingParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) +// .addIfNotExist(RuleDetailKeys._persistName, metricName) +// val profilingRuleInfo = ruleInfo.setDslType(SparkSqlType) +// .setRule(profilingSql).setDetails(profilingParams) +//// val profilingStep = SparkSqlStep( +//// timeInfo, +//// ruleInfo.setRule(profilingSql).setDetails(profilingParams) +//// ) +// +// // filterStep :: profilingStep :: Nil +// profilingRuleInfo :: Nil +// } +// } + +// def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): Seq[RuleStep] = { +// val ruleInfo = RuleInfoGen(param, timeInfo) +// val dqType = RuleInfoGen.dqType(param) +// GriffinDslStep(timeInfo, ruleInfo, dqType) :: Nil +// } +// +// def adaptConcreteRuleStep(ruleStep: RuleStep +// ): Seq[ConcreteRuleStep] = { +// ruleStep match { +// case rs @ GriffinDslStep(_, ri, dqType) => { +// try { +// val result = parser.parseRule(ri.rule, dqType) +// if (result.successful) { +// val expr = result.get +// transConcreteRuleStep(rs, expr) +// } else { +// println(result) +// warn(s"adapt concrete rule step warn: parse rule [ ${ri.rule} ] fails") +// Nil +// } +// } catch { +// case e: Throwable => { +// error(s"adapt concrete rule step error: ${e.getMessage}") +// Nil +// } +// } +// } +// case _ => Nil +// } +// } +// +// private def transConcreteRuleStep(ruleStep: GriffinDslStep, expr: Expr +// ): Seq[ConcreteRuleStep] = { +// ruleStep.dqType match { +// case AccuracyType => transAccuracyRuleStep(ruleStep, expr) +// case ProfilingType => transProfilingRuleStep(ruleStep, expr) +// case TimelinessType => Nil +// case _ => Nil +// } +// } + +// private def transAccuracyRuleStep(ruleStep: GriffinDslStep, expr: Expr +// ): Seq[ConcreteRuleStep] = { +// val timeInfo = ruleStep.timeInfo +// val ruleInfo = ruleStep.ruleInfo +// val calcTime = timeInfo.calcTime +// val tmst = timeInfo.tmst +// +// val details = ruleInfo.details +// val sourceName = details.getString(AccuracyKeys._source, dataSourceNames.head) +// val targetName = details.getString(AccuracyKeys._target, dataSourceNames.tail.head) +// val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) +// +// if (!TempTables.existTable(key(calcTime), sourceName)) { +// Nil +// } else { +// // 1. miss record +// val missRecordsSql = if (!TempTables.existTable(key(calcTime), targetName)) { +// val selClause = s"`${sourceName}`.*" +// s"SELECT ${selClause} FROM `${sourceName}`" +// } else { +// val selClause = s"`${sourceName}`.*" +// val onClause = expr.coalesceDesc +// val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => +// s"${sel.desc} IS NULL" +// }.mkString(" AND ") +// val targetIsNull = analyzer.targetSelectionExprs.map { sel => +// s"${sel.desc} IS NULL" +// }.mkString(" AND ") +// val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" +// s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" +// } +// val missRecordsName = AccuracyKeys._missRecords +// val tmstMissRecordsName = TempName.tmstName(missRecordsName, timeInfo) +// val missRecordsParams = details.getParamMap(AccuracyKeys._missRecords) +// .addIfNotExist(RuleDetailKeys._persistType, RecordPersistType.desc) +// .addIfNotExist(RuleDetailKeys._persistName, missRecordsName) +// val missRecordsStep = SparkSqlStep( +// timeInfo, +// RuleInfo(missRecordsName, Some(tmstMissRecordsName), missRecordsSql, missRecordsParams) +// ) +// +// // 2. miss count +// val missTableName = "_miss_" +//// val tmstMissTableName = TempName.tmstName(missTableName, timeInfo) +// val missColName = details.getStringOrKey(AccuracyKeys._miss) +// val missSql = { +// s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsName}`" +// } +// val missStep = SparkSqlStep( +// timeInfo, +// RuleInfo(missTableName, None, missSql, Map[String, Any]()) +// ) +// +// // 3. total count +// val totalTableName = "_total_" +//// val tmstTotalTableName = TempName.tmstName(totalTableName, timeInfo) +// val totalColName = details.getStringOrKey(AccuracyKeys._total) +// val totalSql = { +// s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" +// } +// val totalStep = SparkSqlStep( +// timeInfo, +// RuleInfo(totalTableName, None, totalSql, Map[String, Any]()) +// ) +// +// // 4. accuracy metric +// val accuracyMetricName = details.getString(RuleDetailKeys._persistName, ruleStep.name) +// val tmstAccuracyMetricName = TempName.tmstName(accuracyMetricName, timeInfo) +// val matchedColName = details.getStringOrKey(AccuracyKeys._matched) +// val accuracyMetricSql = { +// s""" +// |SELECT `${missTableName}`.`${missColName}` AS `${missColName}`, +// |`${totalTableName}`.`${totalColName}` AS `${totalColName}` +// |FROM `${totalTableName}` FULL JOIN `${missTableName}` +// """.stripMargin +// } +//// val accuracyParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) +// val accuracyMetricStep = SparkSqlStep( +// timeInfo, +// RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), accuracyMetricSql, Map[String, Any]()) +// ) +// +// // 5. accuracy metric filter +// val accuracyParams = details.addIfNotExist("df.name", accuracyMetricName) +// .addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) +// .addIfNotExist(RuleDetailKeys._persistName, accuracyMetricName) +// val accuracyStep = DfOprStep( +// timeInfo, +// RuleInfo(accuracyMetricName, Some(tmstAccuracyMetricName), "accuracy", accuracyParams) +// ) +// +// missRecordsStep :: missStep :: totalStep :: accuracyMetricStep :: accuracyStep :: Nil +// } +// } + +// private def transProfilingRuleStep(ruleStep: GriffinDslStep, expr: Expr +// ): Seq[ConcreteRuleStep] = { +// val calcTime = ruleStep.timeInfo.calcTime +// val details = ruleStep.ruleInfo.details +// val profilingClause = expr.asInstanceOf[ProfilingClause] +// val sourceName = profilingClause.fromClauseOpt match { +// case Some(fc) => fc.dataSource +// case _ => details.getString(ProfilingKeys._source, dataSourceNames.head) +// } +// val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc +// +// if (!TempTables.existTable(key(calcTime), sourceName)) { +// Nil +// } else { +// val timeInfo = ruleStep.timeInfo +// val ruleInfo = ruleStep.ruleInfo +// val tmst = timeInfo.tmst +// +//// val tmstSourceName = TempName.tmstName(sourceName, timeInfo) +// +//// val tmstProfilingClause = profilingClause.map(dsHeadReplace(sourceName, tmstSourceName)) +// val tmstAnalyzer = ProfilingAnalyzer(profilingClause, sourceName) +// +// val selExprDescs = tmstAnalyzer.selectionExprs.map { sel => +// val alias = sel match { +// case s: AliasableExpr if (s.alias.nonEmpty) => s" AS `${s.alias.get}`" +// case _ => "" +// } +// s"${sel.desc}${alias}" +// } +// val selCondition = profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString +// val selClause = selExprDescs.mkString(", ") +//// val tmstFromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc +// val groupByClauseOpt = tmstAnalyzer.groupbyExprOpt +// val groupbyClause = groupByClauseOpt.map(_.desc).getOrElse("") +// val preGroupbyClause = tmstAnalyzer.preGroupbyExprs.map(_.desc).mkString(" ") +// val postGroupbyClause = tmstAnalyzer.postGroupbyExprs.map(_.desc).mkString(" ") +// +// // 1. select statement +// val profilingSql = { +// s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" +// } +//// println(profilingSql) +// val metricName = details.getString(RuleDetailKeys._persistName, ruleStep.name) +//// val tmstMetricName = TempName.tmstName(metricName, timeInfo) +// val profilingParams = details.addIfNotExist(RuleDetailKeys._persistType, MetricPersistType.desc) +// .addIfNotExist(RuleDetailKeys._persistName, metricName) +// val profilingStep = SparkSqlStep( +// timeInfo, +// ruleInfo.setRule(profilingSql).setDetails(profilingParams) +// ) +// +//// filterStep :: profilingStep :: Nil +// profilingStep :: Nil +// } +// +// } + +// private def dsHeadReplace(originName: String, replaceName: String): (Expr) => Expr = { expr: Expr => +// expr match { +// case DataSourceHeadExpr(sn) if (sn == originName) => { +// DataSourceHeadExpr(replaceName) +// } +// case FromClause(sn) if (sn == originName) => { +// FromClause(replaceName) +// } +// case _ => expr.map(dsHeadReplace(originName, replaceName)) +// } +// } + }
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala new file mode 100644 index 0000000..bd344b1 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala @@ -0,0 +1,31 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.adaptor + +object InternalColumns { + val tmst = "__tmst" + val metric = "__metric" + val record = "__record" + val empty = "__empty" + + val beginTs = "__begin_ts" + val endTs = "__end_ts" + + val columns = List[String](tmst, metric, record, empty, beginTs, endTs) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala index 744f52a..ebc8fdb 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptor.scala @@ -20,44 +20,159 @@ package org.apache.griffin.measure.rule.adaptor import java.util.concurrent.atomic.AtomicLong +import org.apache.griffin.measure.cache.tmst.TempName import scala.collection.mutable.{Set => MutableSet} import org.apache.griffin.measure.config.params.user._ import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.rule.step.{ConcreteRuleStep, RuleStep} -import org.apache.griffin.measure.rule.dsl.{DslType, PersistType} +import org.apache.griffin.measure.process.ProcessType +import org.apache.griffin.measure.rule.dsl._ +import org.apache.griffin.measure.rule.plan.{TimeInfo, _} -trait RuleAdaptor extends Loggable with Serializable { - - val adaptPhase: AdaptPhase +//object RuleInfoKeys { +// val _name = "name" +// val _rule = "rule" +// val _details = "details" +// val _dslType = "dsl.type" +// val _dqType = "dq.type" +// val _global = "global" +//// val _gatherStep = "gather.step" +// +// val _metric = "metric" +// val _record = "record" +//} +//import RuleInfoKeys._ +import org.apache.griffin.measure.utils.ParamUtil._ +object RuleParamKeys { val _name = "name" val _rule = "rule" - val _persistType = "persist.type" - val _updateDataSource = "update.data.source" + val _dslType = "dsl.type" + val _dqType = "dq.type" + val _cache = "cache" + val _global = "global" val _details = "details" - protected def getName(param: Map[String, Any]) = param.getOrElse(_name, RuleStepNameGenerator.genName).toString - protected def getRule(param: Map[String, Any]) = param.getOrElse(_rule, "").toString - protected def getPersistType(param: Map[String, Any]) = PersistType(param.getOrElse(_persistType, "").toString) - protected def getUpdateDataSource(param: Map[String, Any]) = param.get(_updateDataSource).map(_.toString) - protected def getDetails(param: Map[String, Any]) = param.get(_details) match { - case Some(dt: Map[String, Any]) => dt - case _ => Map[String, Any]() + val _metric = "metric" + val _record = "record" + + def getName(param: Map[String, Any], defName: String): String = param.getString(_name, defName) + def getRule(param: Map[String, Any]): String = param.getString(_rule, "") + def getDqType(param: Map[String, Any]): DqType = DqType(param.getString(_dqType, "")) + def getCache(param: Map[String, Any]): Boolean = param.getBoolean(_cache, false) + def getGlobal(param: Map[String, Any]): Boolean = param.getBoolean(_global, false) + def getDetails(param: Map[String, Any]): Map[String, Any] = param.getParamMap(_details) + + def getMetricOpt(param: Map[String, Any]): Option[Map[String, Any]] = param.getParamMapOpt(_metric) + def getRecordOpt(param: Map[String, Any]): Option[Map[String, Any]] = param.getParamMapOpt(_record) +} + +object ExportParamKeys { + val _name = "name" + val _collectType = "collect.type" + val _dataSourceCache = "data.source.cache" + val _originDF = "origin.DF" + + def getName(param: Map[String, Any], defName: String): String = param.getString(_name, defName) + def getCollectType(param: Map[String, Any]): CollectType = CollectType(param.getString(_collectType, "")) + def getDataSourceCacheOpt(param: Map[String, Any]): Option[String] = param.get(_dataSourceCache).map(_.toString) + def getOriginDFOpt(param: Map[String, Any]): Option[String] = param.get(_originDF).map(_.toString) +} + +trait RuleAdaptor extends Loggable with Serializable { + +// val adaptPhase: AdaptPhase + +// protected def genRuleInfo(param: Map[String, Any]): RuleInfo = RuleInfoGen(param) + +// protected def getName(param: Map[String, Any]) = param.getOrElse(_name, RuleStepNameGenerator.genName).toString +// protected def getRule(param: Map[String, Any]) = param.getOrElse(_rule, "").toString +// protected def getDetails(param: Map[String, Any]) = param.get(_details) match { +// case Some(dt: Map[String, Any]) => dt +// case _ => Map[String, Any]() +// } + + + +// def getPersistNames(steps: Seq[RuleStep]): Seq[String] = steps.map(_.ruleInfo.persistName) +// +// protected def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): Seq[RuleStep] +// protected def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] +// def genConcreteRuleStep(timeInfo: TimeInfo, param: Map[String, Any] +// ): Seq[ConcreteRuleStep] = { +// genRuleStep(timeInfo, param).flatMap { rs => +// adaptConcreteRuleStep(rs) +// } +// } + + + +// def genRuleInfos(param: Map[String, Any], timeInfo: TimeInfo): Seq[RuleInfo] = { +// RuleInfoGen(param) :: Nil +// } + + protected def getRuleName(param: Map[String, Any]): String = { + RuleParamKeys.getName(param, RuleStepNameGenerator.genName) } - def getTempSourceNames(param: Map[String, Any]): Seq[String] + def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: ProcessType): RulePlan - def genRuleStep(param: Map[String, Any]): Seq[RuleStep] - def genConcreteRuleStep(param: Map[String, Any]): Seq[ConcreteRuleStep] = { - genRuleStep(param).flatMap { rs => - adaptConcreteRuleStep(rs) - } + protected def genRuleExports(param: Map[String, Any], defName: String, stepName: String): Seq[RuleExport] = { + val metricOpt = RuleParamKeys.getMetricOpt(param) + val metricExportSeq = metricOpt.map(genMetricExport(_, defName, stepName)).toSeq + val recordOpt = RuleParamKeys.getRecordOpt(param) + val recordExportSeq = recordOpt.map(genRecordExport(_, defName, stepName)).toSeq + metricExportSeq ++ recordExportSeq } - protected def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] + protected def genMetricExport(param: Map[String, Any], name: String, stepName: String + ): MetricExport = { + MetricExport( + ExportParamKeys.getName(param, name), + stepName, + ExportParamKeys.getCollectType(param) + ) + } + protected def genRecordExport(param: Map[String, Any], name: String, stepName: String + ): RecordExport = { + RecordExport( + ExportParamKeys.getName(param, name), + stepName, + ExportParamKeys.getDataSourceCacheOpt(param), + ExportParamKeys.getOriginDFOpt(param) + ) + } + + } + + +//object RuleInfoGen { +// def apply(param: Map[String, Any]): RuleInfo = { +// val name = param.get(_name) match { +// case Some(n: String) => n +// case _ => RuleStepNameGenerator.genName +// } +// RuleInfo( +// name, +// None, +// DslType(param.getString(_dslType, "")), +// param.getString(_rule, ""), +// param.getParamMap(_details), +// param.getBoolean(_gatherStep, false) +// ) +// } +// def apply(ri: RuleInfo, timeInfo: TimeInfo): RuleInfo = { +// if (ri.persistType.needPersist) { +// val tmstName = TempName.tmstName(ri.name, timeInfo) +// ri.setTmstNameOpt(Some(tmstName)) +// } else ri +// } +// +// def dqType(param: Map[String, Any]): DqType = DqType(param.getString(_dqType, "")) +//} + object RuleStepNameGenerator { private val counter: AtomicLong = new AtomicLong(0L) private val head: String = "rs"
