http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala index 237902a..1e077b1 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala @@ -18,11 +18,12 @@ under the License. */ package org.apache.griffin.measure.rule.adaptor +import org.apache.griffin.measure.cache.tmst.TempName import org.apache.griffin.measure.config.params.user._ import org.apache.griffin.measure.process.ProcessType -import org.apache.griffin.measure.process.check.DataChecker +import org.apache.griffin.measure.process.temp.TableRegisters import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.step._ +import org.apache.griffin.measure.rule.plan._ import org.apache.spark.sql.SQLContext import scala.collection.mutable.{Map => MutableMap} @@ -30,33 +31,39 @@ import scala.collection.mutable.{Map => MutableMap} object RuleAdaptorGroup { val _dslType = "dsl.type" +// import RuleInfoKeys._ - var dataSourceNames: Seq[String] = _ - var functionNames: Seq[String] = _ + var dataSourceNames: Seq[String] = Nil + var functionNames: Seq[String] = Nil - var dataChecker: DataChecker = _ + var baselineDsName: String = "" - def init(sqlContext: SQLContext, dsNames: Seq[String]): Unit = { + private val emptyRulePlan = RulePlan(Nil, Nil) + + def init(dsNames: Seq[String], blDsName: String, funcNames: Seq[String]): Unit = { + dataSourceNames = dsNames + baselineDsName = blDsName + functionNames = funcNames + } + + def init(sqlContext: SQLContext, dsNames: Seq[String], blDsName: String): Unit = { val functions = sqlContext.sql("show functions") - functionNames = functions.map(_.getString(0)).collect + functionNames = functions.map(_.getString(0)).collect.toSeq dataSourceNames = dsNames - dataChecker = DataChecker(sqlContext) + baselineDsName = blDsName } private def getDslType(param: Map[String, Any], defDslType: DslType) = { - val dt = DslType(param.getOrElse(_dslType, "").toString) - dt match { - case UnknownDslType => defDslType - case _ => dt - } + DslType(param.getOrElse(_dslType, defDslType.desc).toString) } - private def genRuleAdaptor(dslType: DslType, dsNames: Seq[String], adaptPhase: AdaptPhase): Option[RuleAdaptor] = { + private def genRuleAdaptor(dslType: DslType, dsNames: Seq[String] + ): Option[RuleAdaptor] = { dslType match { - case SparkSqlType => Some(SparkSqlAdaptor(adaptPhase)) - case DfOprType => Some(DataFrameOprAdaptor(adaptPhase)) - case GriffinDslType => Some(GriffinDslAdaptor(dsNames, functionNames, adaptPhase)) + case SparkSqlType => Some(SparkSqlAdaptor()) + case DfOprType => Some(DataFrameOprAdaptor()) + case GriffinDslType => Some(GriffinDslAdaptor(dsNames, functionNames)) case _ => None } } @@ -77,29 +84,217 @@ object RuleAdaptorGroup { // steps // } - def genConcreteRuleSteps(evaluateRuleParam: EvaluateRuleParam, - adaptPhase: AdaptPhase - ): Seq[ConcreteRuleStep] = { +// def genConcreteRuleSteps(timeInfo: TimeInfo, evaluateRuleParam: EvaluateRuleParam, +// dsTmsts: Map[String, Set[Long]], procType: ProcessType, +// adaptPhase: AdaptPhase +// ): Seq[ConcreteRuleStep] = { +// val dslTypeStr = if (evaluateRuleParam.dslType == null) "" else evaluateRuleParam.dslType +// val defaultDslType = DslType(dslTypeStr) +// val ruleParams = evaluateRuleParam.rules +// genConcreteRuleSteps(timeInfo, ruleParams, dsTmsts, defaultDslType, procType, adaptPhase) +// } +// +// def genConcreteRuleSteps(timeInfo: TimeInfo, ruleParams: Seq[Map[String, Any]], +// dsTmsts: Map[String, Set[Long]], defDslType: DslType, +// procType: ProcessType, adaptPhase: AdaptPhase +// ): Seq[ConcreteRuleStep] = { +// val (steps, dsNames) = ruleParams.foldLeft((Seq[ConcreteRuleStep](), dataSourceNames)) { (res, param) => +// val (preSteps, preNames) = res +// val dslType = getDslType(param, defDslType) +// val (curSteps, curNames) = genRuleAdaptor(dslType, preNames, procType, adaptPhase) match { +// case Some(ruleAdaptor) => { +// val concreteSteps = ruleAdaptor.genConcreteRuleStep(timeInfo, param, dsTmsts) +// (concreteSteps, preNames ++ ruleAdaptor.getPersistNames(concreteSteps)) +// } +// case _ => (Nil, preNames) +// } +// (preSteps ++ curSteps, curNames) +// } +// steps +// } + + // -- gen rule plan -- + def genRulePlan(timeInfo: TimeInfo, evaluateRuleParam: EvaluateRuleParam, procType: ProcessType + ): RulePlan = { val dslTypeStr = if (evaluateRuleParam.dslType == null) "" else evaluateRuleParam.dslType val defaultDslType = DslType(dslTypeStr) val ruleParams = evaluateRuleParam.rules - genConcreteRuleSteps(ruleParams, defaultDslType, adaptPhase) + genRulePlan(timeInfo, ruleParams, defaultDslType, procType) } - def genConcreteRuleSteps(ruleParams: Seq[Map[String, Any]], - defDslType: DslType, adaptPhase: AdaptPhase - ): Seq[ConcreteRuleStep] = { - val (steps, dsNames) = ruleParams.foldLeft((Seq[ConcreteRuleStep](), dataSourceNames)) { (res, param) => - val (preSteps, preNames) = res - val dslType = getDslType(param, defDslType) - val (curSteps, curNames) = genRuleAdaptor(dslType, preNames, adaptPhase) match { - case Some(ruleAdaptor) => (ruleAdaptor.genConcreteRuleStep(param), preNames ++ ruleAdaptor.getTempSourceNames(param)) - case _ => (Nil, preNames) + def genRulePlan(timeInfo: TimeInfo, ruleParams: Seq[Map[String, Any]], + defaultDslType: DslType, procType: ProcessType + ): RulePlan = { + val (rulePlan, dsNames) = ruleParams.foldLeft((emptyRulePlan, dataSourceNames)) { (res, param) => + val (plan, names) = res + val dslType = getDslType(param, defaultDslType) + val curPlan: RulePlan = genRuleAdaptor(dslType, names) match { + case Some(adaptor) => adaptor.genRulePlan(timeInfo, param, procType) + case _ => emptyRulePlan } - (preSteps ++ curSteps, curNames) + val globalNames = curPlan.globalRuleSteps.map(_.name) + globalNames.foreach(TableRegisters.registerCompileGlobalTable(_)) + val curNames = curPlan.normalRuleSteps.map(_.name) + curNames.foreach(TableRegisters.registerCompileTempTable(timeInfo.key, _)) + + val retPlan = plan.merge(curPlan) + (retPlan, names ++ globalNames ++ curNames) } - steps + + rulePlan } + // -- gen steps -- +// def genRuleSteps(timeInfo: TimeInfo, evaluateRuleParam: EvaluateRuleParam, dsTmsts: Map[String, Set[Long]] +// ): Seq[ConcreteRuleStep] = { +// val dslTypeStr = if (evaluateRuleParam.dslType == null) "" else evaluateRuleParam.dslType +// val defaultDslType = DslType(dslTypeStr) +// val ruleParams = evaluateRuleParam.rules +// val tmsts = dsTmsts.getOrElse(baselineDsName, Set[Long]()).toSeq +// genRuleSteps(timeInfo, ruleParams, tmsts, defaultDslType) +// } +// +// def genRuleSteps(timeInfo: TimeInfo, ruleParams: Seq[Map[String, Any]], +// tmsts: Seq[Long], defaultDslType: DslType, +// adapthase: AdaptPhase = RunPhase +// ): Seq[ConcreteRuleStep] = { +// val calcTime = timeInfo.calcTime +// val (ruleInfos, dsNames) = ruleParams.foldLeft((Seq[RuleInfo](), dataSourceNames)) { (res, param) => +// val (preRuleInfos, preNames) = res +// val dslType = getDslType(param, defaultDslType) +// val (curRuleInfos, curNames) = genRuleAdaptor(dslType, preNames) match { +// case Some(adaptor) => { +// val ris = adaptor.genRuleInfos(param, timeInfo) +// val rins = ris.filter(!_.global).map(_.name) +// (ris, rins) +// } +// case _ => (Nil, Nil) +// } +// if (adapthase == RunPhase) { +// curNames.foreach(TempTables.registerTempTableNameOnly(timeInfo.key, _)) +// } +// (preRuleInfos ++ curRuleInfos, preNames ++ curNames) +// } +// +// adapthase match { +// case PreProcPhase => { +// ruleInfos.flatMap { ri => +// genConcRuleSteps(timeInfo, ri) +// } +// } +// case RunPhase => { +// val riGroups = ruleInfos.foldRight(List[(List[RuleInfo], Boolean)]()) { (ri, groups) => +// groups match { +// case head :: tail if (ri.gather == head._2) => (ri :: head._1, head._2) :: tail +// case _ => (ri :: Nil, ri.gather) :: groups +// } +// }.foldLeft(List[(List[RuleInfo], Boolean, List[String], List[RuleInfo])]()) { (groups, rigs) => +// val preGatherNames = groups.lastOption match { +// case Some(t) => if (t._2) t._3 ::: t._1.map(_.name) else t._3 +// case _ => baselineDsName :: Nil +// } +// val persistRuleInfos = groups.lastOption match { +// case Some(t) if (t._2) => t._1.filter(_.persistType.needPersist) +// case _ => Nil +// } +// groups :+ (rigs._1, rigs._2, preGatherNames, persistRuleInfos) +// } +// +// riGroups.flatMap { group => +// val (ris, gather, srcNames, persistRis) = group +// if (gather) { +// ris.flatMap { ri => +// genConcRuleSteps(timeInfo, ri) +// } +// } else { +// tmsts.flatMap { tmst => +// val concTimeInfo = TmstTimeInfo(calcTime, tmst) +// val tmstInitRuleInfos = genTmstInitRuleInfo(concTimeInfo, srcNames, persistRis) +// (tmstInitRuleInfos ++ ris).flatMap { ri => +// genConcRuleSteps(concTimeInfo, ri) +// } +// } +// } +// } +// } +// } +// +// +// } +// +// private def genConcRuleSteps(timeInfo: TimeInfo, ruleInfo: RuleInfo): Seq[ConcreteRuleStep] = { +// val nri = if (ruleInfo.persistType.needPersist && ruleInfo.tmstNameOpt.isEmpty) { +// val tmstName = if (ruleInfo.gather) { +// TempName.tmstName(ruleInfo.name, timeInfo.calcTime) +// } else { +// TempName.tmstName(ruleInfo.name, timeInfo) +// } +// ruleInfo.setTmstNameOpt(Some(tmstName)) +// } else ruleInfo +// ruleInfo.dslType match { +// case SparkSqlType => SparkSqlStep(timeInfo, nri) :: Nil +// case DfOprType => DfOprStep(timeInfo, nri) :: Nil +// case _ => Nil +// } +// } +// +// private def genTmstInitRuleInfo(timeInfo: TmstTimeInfo, srcNames: Seq[String], +// persistRis: Seq[RuleInfo]): Seq[RuleInfo] = { +// val TmstTimeInfo(calcTime, tmst, _) = timeInfo +// srcNames.map { srcName => +// val srcTmstName = TempName.tmstName(srcName, calcTime) +// val filterSql = { +// s"SELECT * FROM `${srcTmstName}` WHERE `${InternalColumns.tmst}` = ${tmst}" +// } +// val params = persistRis.filter(_.name == srcName).headOption match { +// case Some(ri) => ri.details +// case _ => Map[String, Any]() +// } +// RuleInfo(srcName, None, SparkSqlType, filterSql, params, false) +// } +// } + +// def genRuleSteps(timeInfo: TimeInfo, ruleParams: Seq[Map[String, Any]], +// tmsts: Seq[Long], defaultDslType: DslType, +// adapthase: AdaptPhase = RunPhase +// ): Seq[ConcreteRuleStep] = { +// tmsts.flatMap { tmst => +// val newTimeInfo = TimeInfo(timeInfo.calcTime, tmst) +// val initSteps: Seq[ConcreteRuleStep] = adapthase match { +// case RunPhase => genTmstInitStep(newTimeInfo) +// case PreProcPhase => Nil +// } +// val (steps, dsNames) = ruleParams.foldLeft((initSteps, dataSourceNames)) { (res, param) => +// val (preSteps, preNames) = res +// val dslType = getDslType(param, defaultDslType) +// val (curSteps, curNames) = genRuleAdaptor(dslType, preNames) match { +// case Some(ruleAdaptor) => { +// val concreteSteps = ruleAdaptor.genConcreteRuleStep(newTimeInfo, param) +// val persistNames = ruleAdaptor.getPersistNames(concreteSteps) +// (concreteSteps, persistNames) +// } +// case _ => (Nil, Nil) +// } +// (preSteps ++ curSteps, preNames ++ curNames) +// } +// steps +// } +// } + + + +// private def genTmstInitStep(timeInfo: TimeInfo): Seq[ConcreteRuleStep] = { +// val TimeInfo(calcTime, tmst) = timeInfo +// val tmstDsName = TempName.tmstName(baselineDsName, calcTime) +// val filterSql = { +// s"SELECT * FROM `${tmstDsName}` WHERE `${InternalColumns.tmst}` = ${tmst}" +// } +// SparkSqlStep( +// timeInfo, +// RuleInfo(baselineDsName, None, filterSql, Map[String, Any]()) +// ) :: Nil +// } + + }
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala index 78121fa..6b3b7cb 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/SparkSqlAdaptor.scala @@ -18,37 +18,31 @@ under the License. */ package org.apache.griffin.measure.rule.adaptor -import org.apache.griffin.measure.data.connector.GroupByColumn -import org.apache.griffin.measure.rule.step._ +import org.apache.griffin.measure.cache.tmst.TempName +import org.apache.griffin.measure.process.ProcessType +import org.apache.griffin.measure.rule.dsl.MetricPersistType +import org.apache.griffin.measure.rule.plan.{TimeInfo, _} +import org.apache.griffin.measure.utils.ParamUtil._ -case class SparkSqlAdaptor(adaptPhase: AdaptPhase) extends RuleAdaptor { +case class SparkSqlAdaptor() extends RuleAdaptor { - def genRuleStep(param: Map[String, Any]): Seq[RuleStep] = { - SparkSqlStep(getName(param), getRule(param), getDetails(param), - getPersistType(param), getUpdateDataSource(param)) :: Nil - } - def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = { - ruleStep match { - case rs @ SparkSqlStep(name, rule, details, persistType, udsOpt) => { - adaptPhase match { - case PreProcPhase => rs :: Nil - case RunPhase => { - val repSel = rule.replaceFirst("(?i)select", s"SELECT `${GroupByColumn.tmst}` AS `${GroupByColumn.tmst}`,") - val groupbyRule = repSel.concat(s" GROUP BY `${GroupByColumn.tmst}`") - val nrs = SparkSqlStep(name, groupbyRule, details, persistType, udsOpt) - nrs :: Nil - } - } - } - case _ => Nil - } - } +// def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): Seq[RuleStep] = { +// val ruleInfo = RuleInfoGen(param, timeInfo) +// SparkSqlStep(timeInfo, ruleInfo) :: Nil +// } +// def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = { +// ruleStep match { +// case rs @ SparkSqlStep(ti, ri) => rs :: Nil +// case _ => Nil +// } +// } + + import RuleParamKeys._ - def getTempSourceNames(param: Map[String, Any]): Seq[String] = { - param.get(_name) match { - case Some(name) => name.toString :: Nil - case _ => Nil - } + def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: ProcessType): RulePlan = { + val name = getRuleName(param) + val step = SparkSqlStep(name, getRule(param), getDetails(param), getCache(param), getGlobal(param)) + RulePlan(step :: Nil, genRuleExports(param, name, name)) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/CollectType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/CollectType.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/CollectType.scala new file mode 100644 index 0000000..03a43d6 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/CollectType.scala @@ -0,0 +1,57 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.dsl + +import scala.util.matching.Regex + +sealed trait CollectType { + val regex: Regex + val desc: String +} + +object CollectType { + private val collectTypes: List[CollectType] = List(DefaultCollectType, EntriesCollectType, ArrayCollectType, MapCollectType) + def apply(ptn: String): CollectType = { + collectTypes.filter(tp => ptn match { + case tp.regex() => true + case _ => false + }).headOption.getOrElse(DefaultCollectType) + } + def unapply(pt: CollectType): Option[String] = Some(pt.desc) +} + +final case object DefaultCollectType extends CollectType { + val regex: Regex = "".r + val desc: String = "default" +} + +final case object EntriesCollectType extends CollectType { + val regex: Regex = "^(?i)entries$".r + val desc: String = "entries" +} + +final case object ArrayCollectType extends CollectType { + val regex: Regex = "^(?i)array|list$".r + val desc: String = "array" +} + +final case object MapCollectType extends CollectType { + val regex: Regex = "^(?i)map$".r + val desc: String = "map" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala index ac27403..da59348 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DqType.scala @@ -27,7 +27,9 @@ sealed trait DqType { } object DqType { - private val dqTypes: List[DqType] = List(AccuracyType, ProfilingType, TimelinessType, UnknownType) + private val dqTypes: List[DqType] = List( + AccuracyType, ProfilingType, DuplicateType, TimelinessType, UnknownType + ) def apply(ptn: String): DqType = { dqTypes.filter(tp => ptn match { case tp.regex() => true @@ -44,7 +46,12 @@ final case object AccuracyType extends DqType { final case object ProfilingType extends DqType { val regex = "^(?i)profiling$".r - val desc = "profiling$" + val desc = "profiling" +} + +final case object DuplicateType extends DqType { + val regex = "^(?i)duplicate$".r + val desc = "duplicate" } final case object TimelinessType extends DqType { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala index cfda393..27ab2ac 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/DslType.scala @@ -27,12 +27,12 @@ sealed trait DslType { } object DslType { - private val dslTypes: List[DslType] = List(SparkSqlType, GriffinDslType, DfOprType, UnknownDslType) + private val dslTypes: List[DslType] = List(SparkSqlType, GriffinDslType, DfOprType) def apply(ptn: String): DslType = { dslTypes.filter(tp => ptn match { case tp.regex() => true case _ => false - }).headOption.getOrElse(UnknownDslType) + }).headOption.getOrElse(GriffinDslType) } def unapply(pt: DslType): Option[String] = Some(pt.desc) } @@ -50,9 +50,4 @@ final case object DfOprType extends DslType { final case object GriffinDslType extends DslType { val regex = "^(?i)griffin-?dsl$".r val desc = "griffin-dsl" -} - -final case object UnknownDslType extends DslType { - val regex = "".r - val desc = "unknown" } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/PersistType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/PersistType.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/PersistType.scala index 10b83c8..f2857e3 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/PersistType.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/PersistType.scala @@ -26,6 +26,7 @@ sealed trait PersistType { // def temp: Boolean = false // def persist: Boolean = false // def collect: Boolean = false + def needPersist: Boolean = true } object PersistType { @@ -42,6 +43,7 @@ object PersistType { final case object NonePersistType extends PersistType { val regex: Regex = "".r val desc: String = "none" + override def needPersist: Boolean = false } final case object RecordPersistType extends PersistType { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/BasicAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/BasicAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/BasicAnalyzer.scala index 063eb7b..e14e0da 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/BasicAnalyzer.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/BasicAnalyzer.scala @@ -18,7 +18,7 @@ under the License. */ package org.apache.griffin.measure.rule.dsl.analyzer -import org.apache.griffin.measure.rule.dsl.expr._ +import org.apache.griffin.measure.rule.dsl.expr.{MathExpr, _} trait BasicAnalyzer extends Serializable { @@ -35,7 +35,7 @@ trait BasicAnalyzer extends Serializable { val seqSelectionExprs = (dsName: String) => (expr: Expr, v: Seq[SelectionExpr]) => { expr match { - case se @ SelectionExpr(head: DataSourceHeadExpr, _, _) if (head.desc == dsName) => v :+ se + case se @ SelectionExpr(head: DataSourceHeadExpr, _, _) if (head.name == dsName) => v :+ se case _ => v } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala new file mode 100644 index 0000000..1ca2b76 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DuplicateAnalyzer.scala @@ -0,0 +1,46 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.dsl.analyzer + +import org.apache.griffin.measure.rule.dsl.expr.{AliasableExpr, _} + + +case class DuplicateAnalyzer(expr: DuplicateClause, sourceName: String, targetName: String) extends BasicAnalyzer { + + val seqAlias = (expr: Expr, v: Seq[String]) => { + expr match { + case apr: AliasableExpr => v ++ apr.alias + case _ => v + } + } + val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b + + private val exprs = expr.exprs + private def genAlias(idx: Int): String = s"alias_${idx}" + val selectionPairs = exprs.zipWithIndex.map { pair => + val (pr, idx) = pair + val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias) + (pr, res.headOption.getOrElse(genAlias(idx))) + } + + if (selectionPairs.isEmpty) { + throw new Exception(s"duplicate analyzer error: empty selection") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/ProfilingAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/ProfilingAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/ProfilingAnalyzer.scala index 34bdbd3..6872977 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/ProfilingAnalyzer.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/ProfilingAnalyzer.scala @@ -25,24 +25,14 @@ case class ProfilingAnalyzer(expr: ProfilingClause, sourceName: String) extends val dataSourceNames = expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames) - val sourceSelectionExprs = { - val seq = seqSelectionExprs(sourceName) - expr.selectClause.preOrderTraverseDepthFirst(Seq[SelectionExpr]())(seq, combSelectionExprs) - } - - val selectionExprs = expr.selectClause.exprs.map(_.extractSelf) - def containsAllSelectionExpr = { - selectionExprs.filter { expr => + val selectionExprs: Seq[Expr] = { + expr.selectClause.exprs.map(_.extractSelf).flatMap { expr => expr match { - case SelectionExpr(head: ALLSelectHeadExpr, selectors: Seq[SelectExpr], _) => { - selectors.isEmpty - } - case SelectionExpr(head: DataSourceHeadExpr, selectors: Seq[SelectExpr], _) => { - (head == sourceName) && (selectors.size == 1) && (selectors.head.isInstanceOf[AllFieldsSelectExpr]) - } - case _ => false + case e: SelectionExpr => Some(e) + case e: FunctionExpr => Some(e) + case _ => None } - }.size > 0 + } } val groupbyExprOpt = expr.groupbyClauseOpt http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/TimelinessAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/TimelinessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/TimelinessAnalyzer.scala new file mode 100644 index 0000000..37d4651 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/TimelinessAnalyzer.scala @@ -0,0 +1,65 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.dsl.analyzer + +import org.apache.griffin.measure.rule.dsl.expr._ + + +case class TimelinessAnalyzer(expr: TimelinessClause, sourceName: String) extends BasicAnalyzer { + +// val tsExpr = expr.desc + +// val seqAlias = (expr: Expr, v: Seq[String]) => { +// expr match { +// case apr: AliasableExpr => v ++ apr.alias +// case _ => v +// } +// } +// val combAlias = (a: Seq[String], b: Seq[String]) => a ++ b +// +// private val exprs = expr.exprs.toList +// val selectionPairs = exprs.map { pr => +// val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias) +// println(res) +// println(pr) +// (pr, res.headOption) +// } +// +// val (tsExprPair, endTsPairOpt) = selectionPairs match { +// case Nil => throw new Exception(s"timeliness analyzer error: ts column not set") +// case tsPair :: Nil => (tsPair, None) +// case tsPair :: endTsPair :: _ => (tsPair, Some(endTsPair)) +// } +// +// def getSelAlias(pair: (Expr, Option[String]), defAlias: String): (String, String) = { +// val (pr, aliasOpt) = pair +// val alias = aliasOpt.getOrElse(defAlias) +// (pr.desc, alias) +// } + + + private val exprs = expr.exprs.map(_.desc).toList + + val (btsExpr, etsExprOpt) = exprs match { + case Nil => throw new Exception(s"timeliness analyzer error: ts column not set") + case btsExpr :: Nil => (btsExpr, None) + case btsExpr :: etsExpr :: _ => (btsExpr, Some(etsExpr)) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala index c0986e1..bc7af42 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala @@ -21,13 +21,23 @@ package org.apache.griffin.measure.rule.dsl.expr trait ClauseExpression extends Expr { } -case class SelectClause(exprs: Seq[Expr]) extends ClauseExpression { +case class SelectClause(exprs: Seq[Expr], extraConditionOpt: Option[ExtraConditionExpr] + ) extends ClauseExpression { addChildren(exprs) - def desc: String = s"${exprs.map(_.desc).mkString(", ")}" + def desc: String = { + extraConditionOpt match { + case Some(cdtn) => s"${cdtn.desc} ${exprs.map(_.desc).mkString(", ")}" + case _ => s"${exprs.map(_.desc).mkString(", ")}" + } + } def coalesceDesc: String = desc + override def map(func: (Expr) => Expr): SelectClause = { + SelectClause(exprs.map(func(_)), extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr])) + } + } case class FromClause(dataSource: String) extends ClauseExpression { @@ -44,6 +54,10 @@ case class WhereClause(expr: Expr) extends ClauseExpression { def desc: String = s"WHERE ${expr.desc}" def coalesceDesc: String = s"WHERE ${expr.coalesceDesc}" + override def map(func: (Expr) => Expr): WhereClause = { + WhereClause(func(expr)) + } + } case class GroupbyClause(exprs: Seq[Expr], havingClauseOpt: Option[Expr]) extends ClauseExpression { @@ -79,6 +93,10 @@ case class GroupbyClause(exprs: Seq[Expr], havingClauseOpt: Option[Expr]) extend GroupbyClause(exprs ++ other.exprs, newHavingClauseOpt) } + override def map(func: (Expr) => Expr): GroupbyClause = { + GroupbyClause(exprs.map(func(_)), havingClauseOpt.map(func(_))) + } + } case class OrderbyItem(expr: Expr, orderOpt: Option[String]) extends Expr { @@ -90,6 +108,10 @@ case class OrderbyItem(expr: Expr, orderOpt: Option[String]) extends Expr { } } def coalesceDesc: String = desc + + override def map(func: (Expr) => Expr): OrderbyItem = { + OrderbyItem(func(expr), orderOpt) + } } case class OrderbyClause(items: Seq[OrderbyItem]) extends ClauseExpression { @@ -104,6 +126,10 @@ case class OrderbyClause(items: Seq[OrderbyItem]) extends ClauseExpression { val obs = items.map(_.desc).mkString(", ") s"ORDER BY ${obs}" } + + override def map(func: (Expr) => Expr): OrderbyClause = { + OrderbyClause(items.map(func(_).asInstanceOf[OrderbyItem])) + } } case class LimitClause(expr: Expr) extends ClauseExpression { @@ -112,6 +138,10 @@ case class LimitClause(expr: Expr) extends ClauseExpression { def desc: String = s"LIMIT ${expr.desc}" def coalesceDesc: String = s"LIMIT ${expr.coalesceDesc}" + + override def map(func: (Expr) => Expr): LimitClause = { + LimitClause(func(expr)) + } } case class CombinedClause(selectClause: SelectClause, fromClauseOpt: Option[FromClause], @@ -139,6 +169,13 @@ case class CombinedClause(selectClause: SelectClause, fromClauseOpt: Option[From s"${head} ${tail.coalesceDesc}" } } + + override def map(func: (Expr) => Expr): CombinedClause = { + CombinedClause(func(selectClause).asInstanceOf[SelectClause], + fromClauseOpt.map(func(_).asInstanceOf[FromClause]), + tails.map(func(_).asInstanceOf[ClauseExpression]) + ) + } } case class ProfilingClause(selectClause: SelectClause, @@ -171,4 +208,29 @@ case class ProfilingClause(selectClause: SelectClause, val postDesc = postGroupbyClauses.map(_.coalesceDesc).mkString(" ") s"${selectDesc} ${fromDesc} ${preDesc} ${groupbyDesc} ${postDesc}" } + + override def map(func: (Expr) => Expr): ProfilingClause = { + ProfilingClause(func(selectClause).asInstanceOf[SelectClause], + fromClauseOpt.map(func(_).asInstanceOf[FromClause]), + groupbyClauseOpt.map(func(_).asInstanceOf[GroupbyClause]), + preGroupbyClauses.map(func(_).asInstanceOf[ClauseExpression]), + postGroupbyClauses.map(func(_).asInstanceOf[ClauseExpression]) + ) + } +} + +case class DuplicateClause(exprs: Seq[Expr]) extends ClauseExpression { + addChildren(exprs) + + def desc: String = exprs.map(_.desc).mkString(", ") + def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ") + override def map(func: (Expr) => Expr): DuplicateClause = DuplicateClause(exprs.map(func(_))) +} + +case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression { + addChildren(exprs) + + def desc: String = exprs.map(_.desc).mkString(", ") + def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ") + override def map(func: (Expr) => Expr): TimelinessClause = TimelinessClause(exprs.map(func(_))) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala index 850579c..c089e81 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala @@ -26,4 +26,7 @@ trait Expr extends TreeNode with Serializable { def extractSelf: Expr = this + // execution + def map(func: (Expr) => Expr): Expr = func(this) + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExtraConditionExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExtraConditionExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExtraConditionExpr.scala new file mode 100644 index 0000000..eb7ba48 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExtraConditionExpr.scala @@ -0,0 +1,27 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.dsl.expr + +case class ExtraConditionExpr(cdtn: String) extends Expr { + + def desc: String = cdtn.toUpperCase + + def coalesceDesc: String = desc + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala index e33b03d..1bbed83 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/FunctionExpr.scala @@ -18,16 +18,28 @@ under the License. */ package org.apache.griffin.measure.rule.dsl.expr -case class FunctionExpr(functionName: String, args: Seq[Expr], aliasOpt: Option[String] +case class FunctionExpr(functionName: String, args: Seq[Expr], + extraConditionOpt: Option[ExtraConditionExpr], + aliasOpt: Option[String] ) extends Expr with AliasableExpr { addChildren(args) - def desc: String = s"${functionName}(${args.map(_.desc).mkString(", ")})" + def desc: String = { + extraConditionOpt match { + case Some(cdtn) => s"${functionName}(${cdtn.desc} ${args.map(_.desc).mkString(", ")})" + case _ => s"${functionName}(${args.map(_.desc).mkString(", ")})" + } + } def coalesceDesc: String = desc def alias: Option[String] = { if (aliasOpt.isEmpty) { Some(functionName) } else aliasOpt } + + override def map(func: (Expr) => Expr): FunctionExpr = { + FunctionExpr(functionName, args.map(func(_)), + extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr]), aliasOpt) + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala index 4b16219..b4c35f5 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala @@ -33,6 +33,10 @@ case class InExpr(head: Expr, is: Boolean, range: Seq[Expr]) extends LogicalExpr val notStr = if (is) "" else " NOT" s"${head.coalesceDesc}${notStr} IN (${range.map(_.coalesceDesc).mkString(", ")})" } + + override def map(func: (Expr) => Expr): InExpr = { + InExpr(func(head), is, range.map(func(_))) + } } case class BetweenExpr(head: Expr, is: Boolean, range: Seq[Expr]) extends LogicalExpr { @@ -58,6 +62,10 @@ case class BetweenExpr(head: Expr, is: Boolean, range: Seq[Expr]) extends Logica } s"${head.coalesceDesc}${notStr} BETWEEN ${rangeStr}" } + + override def map(func: (Expr) => Expr): BetweenExpr = { + BetweenExpr(func(head), is, range.map(func(_))) + } } case class LikeExpr(head: Expr, is: Boolean, value: Expr) extends LogicalExpr { @@ -72,6 +80,10 @@ case class LikeExpr(head: Expr, is: Boolean, value: Expr) extends LogicalExpr { val notStr = if (is) "" else " NOT" s"${head.coalesceDesc}${notStr} LIKE ${value.coalesceDesc}" } + + override def map(func: (Expr) => Expr): LikeExpr = { + LikeExpr(func(head), is, func(value)) + } } case class IsNullExpr(head: Expr, is: Boolean) extends LogicalExpr { @@ -83,6 +95,10 @@ case class IsNullExpr(head: Expr, is: Boolean) extends LogicalExpr { s"${head.desc} IS${notStr} NULL" } def coalesceDesc: String = desc + + override def map(func: (Expr) => Expr): IsNullExpr = { + IsNullExpr(func(head), is) + } } case class IsNanExpr(head: Expr, is: Boolean) extends LogicalExpr { @@ -94,6 +110,10 @@ case class IsNanExpr(head: Expr, is: Boolean) extends LogicalExpr { s"${notStr}isnan(${head.desc})" } def coalesceDesc: String = desc + + override def map(func: (Expr) => Expr): IsNanExpr = { + IsNanExpr(func(head), is) + } } // ----------- @@ -110,6 +130,10 @@ case class LogicalFactorExpr(factor: Expr, withBracket: Boolean, aliasOpt: Optio if (aliasOpt.nonEmpty) this else factor.extractSelf } + + override def map(func: (Expr) => Expr): LogicalFactorExpr = { + LogicalFactorExpr(func(factor), withBracket, aliasOpt) + } } case class UnaryLogicalExpr(oprs: Seq[String], factor: LogicalExpr) extends LogicalExpr { @@ -136,6 +160,10 @@ case class UnaryLogicalExpr(oprs: Seq[String], factor: LogicalExpr) extends Logi if (oprs.nonEmpty) this else factor.extractSelf } + + override def map(func: (Expr) => Expr): UnaryLogicalExpr = { + UnaryLogicalExpr(oprs, func(factor).asInstanceOf[LogicalExpr]) + } } case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, LogicalExpr)]) extends LogicalExpr { @@ -167,4 +195,10 @@ case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, LogicalExp if (tails.nonEmpty) this else factor.extractSelf } + + override def map(func: (Expr) => Expr): BinaryLogicalExpr = { + BinaryLogicalExpr(func(factor).asInstanceOf[LogicalExpr], tails.map{ pair => + (pair._1, func(pair._2).asInstanceOf[LogicalExpr]) + }) + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala index b3d3db4..4217e44 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala @@ -33,6 +33,10 @@ case class MathFactorExpr(factor: Expr, withBracket: Boolean, aliasOpt: Option[S if (aliasOpt.nonEmpty) this else factor.extractSelf } + + override def map(func: (Expr) => Expr): MathFactorExpr = { + MathFactorExpr(func(factor), withBracket, aliasOpt) + } } case class UnaryMathExpr(oprs: Seq[String], factor: MathExpr) extends MathExpr { @@ -53,6 +57,10 @@ case class UnaryMathExpr(oprs: Seq[String], factor: MathExpr) extends MathExpr { if (oprs.nonEmpty) this else factor.extractSelf } + + override def map(func: (Expr) => Expr): UnaryMathExpr = { + UnaryMathExpr(oprs, func(factor).asInstanceOf[MathExpr]) + } } case class BinaryMathExpr(factor: MathExpr, tails: Seq[(String, MathExpr)]) extends MathExpr { @@ -77,4 +85,10 @@ case class BinaryMathExpr(factor: MathExpr, tails: Seq[(String, MathExpr)]) exte if (tails.nonEmpty) this else factor.extractSelf } + + override def map(func: (Expr) => Expr): BinaryMathExpr = { + BinaryMathExpr(func(factor).asInstanceOf[MathExpr], tails.map{ pair => + (pair._1, func(pair._2).asInstanceOf[MathExpr]) + }) + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala index 6525c88..d6e350b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala @@ -23,22 +23,17 @@ trait HeadExpr extends Expr with AliasableExpr { } case class DataSourceHeadExpr(name: String) extends HeadExpr { - def desc: String = name + def desc: String = s"`${name}`" def coalesceDesc: String = desc } case class FieldNameHeadExpr(field: String) extends HeadExpr { - def desc: String = field + def desc: String = s"`${field}`" def coalesceDesc: String = desc - override def alias: Option[String] = { - val innerField = if (field.startsWith("`") && field.endsWith("`")) { - field.substring(1, field.length - 1) - } else field - Some(innerField) - } + override def alias: Option[String] = Some(field) } -case class ALLSelectHeadExpr() extends HeadExpr { +case class AllSelectHeadExpr() extends HeadExpr { def desc: String = "*" def coalesceDesc: String = desc } @@ -50,6 +45,10 @@ case class OtherHeadExpr(expr: Expr) extends HeadExpr { def desc: String = expr.desc def coalesceDesc: String = expr.coalesceDesc override def alias: Option[String] = Some(expr.desc) + + override def map(func: (Expr) => Expr): OtherHeadExpr = { + OtherHeadExpr(func(expr)) + } } // ------------- @@ -64,14 +63,9 @@ case class AllFieldsSelectExpr() extends SelectExpr { } case class FieldSelectExpr(field: String) extends SelectExpr { - def desc: String = s".${field}" + def desc: String = s".`${field}`" def coalesceDesc: String = desc - override def alias: Option[String] = { - val innerField = if (field.startsWith("`") && field.endsWith("`")) { - field.substring(1, field.length - 1) - } else field - Some(innerField) - } + override def alias: Option[String] = Some(field) } case class IndexSelectExpr(index: Expr) extends SelectExpr { @@ -81,6 +75,10 @@ case class IndexSelectExpr(index: Expr) extends SelectExpr { def desc: String = s"[${index.desc}]" def coalesceDesc: String = desc def alias: Option[String] = Some(index.desc) + + override def map(func: (Expr) => Expr): IndexSelectExpr = { + IndexSelectExpr(func(index)) + } } case class FunctionSelectExpr(functionName: String, args: Seq[Expr]) extends SelectExpr { @@ -90,6 +88,10 @@ case class FunctionSelectExpr(functionName: String, args: Seq[Expr]) extends Sel def desc: String = "" def coalesceDesc: String = desc def alias: Option[String] = Some(functionName) + + override def map(func: (Expr) => Expr): FunctionSelectExpr = { + FunctionSelectExpr(functionName, args.map(func(_))) + } } // ------------- @@ -122,4 +124,9 @@ case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: O if (aliasSeq.size > 0) Some(aliasSeq.mkString("_")) else None } else aliasOpt } + + override def map(func: (Expr) => Expr): SelectionExpr = { + SelectionExpr(func(head).asInstanceOf[HeadExpr], + selectors.map(func(_).asInstanceOf[SelectExpr]), aliasOpt) + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala index 6415a02..846770b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala @@ -145,6 +145,8 @@ trait BasicParser extends JavaTokenParsers with Serializable { val COMMA: Parser[String] = "," val SELECT: Parser[String] = """(?i)select\s""".r + val DISTINCT: Parser[String] = """(?i)distinct""".r +// val ALL: Parser[String] = """(?i)all""".r val FROM: Parser[String] = """(?i)from\s""".r val AS: Parser[String] = """(?i)as\s""".r val WHERE: Parser[String] = """(?i)where\s""".r @@ -159,8 +161,6 @@ trait BasicParser extends JavaTokenParsers with Serializable { import Operator._ object Strings { - def innerString(s: String): String = s.substring(1, s.size - 1) - def AnyString: Parser[String] = """"(?:\"|[^\"])*"""".r | """'(?:\'|[^'])*'""".r def SimpleTableFieldName: Parser[String] = """[a-zA-Z_]\w*""".r def UnQuoteTableFieldName: Parser[String] = """`(?:[\\][`]|[^`])*`""".r @@ -209,23 +209,23 @@ trait BasicParser extends JavaTokenParsers with Serializable { case head ~ sels ~ aliasOpt => SelectionExpr(head, sels, aliasOpt) } def selectionHead: Parser[HeadExpr] = DataSourceName ^^ { - DataSourceHeadExpr(_) + ds => DataSourceHeadExpr(trim(ds)) } | function ^^ { OtherHeadExpr(_) } | SimpleTableFieldName ^^ { FieldNameHeadExpr(_) } | UnQuoteTableFieldName ^^ { s => - FieldNameHeadExpr(innerString(s)) + FieldNameHeadExpr(trim(s)) } | ALLSL ^^ { _ => - ALLSelectHeadExpr() + AllSelectHeadExpr() } def selector: Parser[SelectExpr] = functionSelect | allFieldsSelect | fieldSelect | indexSelect def allFieldsSelect: Parser[AllFieldsSelectExpr] = DOT ~> ALLSL ^^ { _ => AllFieldsSelectExpr() } def fieldSelect: Parser[FieldSelectExpr] = DOT ~> ( SimpleTableFieldName ^^ { FieldSelectExpr(_) - } | UnQuoteTableFieldName ^^ {s => - FieldSelectExpr(innerString(s)) + } | UnQuoteTableFieldName ^^ { s => + FieldSelectExpr(trim(s)) }) def indexSelect: Parser[IndexSelectExpr] = LSQBR ~> argument <~ RSQBR ^^ { IndexSelectExpr(_) } def functionSelect: Parser[FunctionSelectExpr] = DOT ~ FunctionName ~ LBR ~ repsep(argument, COMMA) ~ RBR ^^ { @@ -236,7 +236,7 @@ trait BasicParser extends JavaTokenParsers with Serializable { * -- as alias -- * <as-alias> ::= <as> <field-name> */ - def asAlias: Parser[String] = AS ~> (SimpleTableFieldName | UnQuoteTableFieldName ^^ { innerString(_) }) + def asAlias: Parser[String] = AS ~> (SimpleTableFieldName | UnQuoteTableFieldName ^^ { trim(_) }) /** * -- math expr -- @@ -333,8 +333,9 @@ trait BasicParser extends JavaTokenParsers with Serializable { * <arg> ::= <expr> */ - def function: Parser[FunctionExpr] = FunctionName ~ LBR ~ repsep(argument, COMMA) ~ RBR ~ opt(asAlias) ^^ { - case name ~ _ ~ args ~ _ ~ aliasOpt => FunctionExpr(name, args, aliasOpt) + def function: Parser[FunctionExpr] = FunctionName ~ LBR ~ opt(DISTINCT) ~ repsep(argument, COMMA) ~ RBR ~ opt(asAlias) ^^ { + case name ~ _ ~ extraCdtnOpt ~ args ~ _ ~ aliasOpt => + FunctionExpr(name, args, extraCdtnOpt.map(ExtraConditionExpr(_)), aliasOpt) } def argument: Parser[Expr] = expression @@ -350,7 +351,9 @@ trait BasicParser extends JavaTokenParsers with Serializable { * <limit-clause> = <limit> <expr> */ - def selectClause: Parser[SelectClause] = opt(SELECT) ~> rep1sep(expression, COMMA) ^^ { SelectClause(_) } + def selectClause: Parser[SelectClause] = opt(SELECT) ~> opt(DISTINCT) ~ rep1sep(expression, COMMA) ^^ { + case extraCdtnOpt ~ exprs => SelectClause(exprs, extraCdtnOpt.map(ExtraConditionExpr(_))) + } def fromClause: Parser[FromClause] = FROM ~> DataSourceName ^^ { ds => FromClause(trim(ds)) } def whereClause: Parser[WhereClause] = WHERE ~> expression ^^ { WhereClause(_) } def havingClause: Parser[Expr] = HAVING ~> expression http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala index 0800f45..8d04e76 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala @@ -38,10 +38,28 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str } } + /** + * -- duplicate clauses -- + * <duplicate-clauses> = <expr> [, <expr>]+ + */ + def duplicateClause: Parser[DuplicateClause] = rep1sep(expression, Operator.COMMA) ^^ { + case exprs => DuplicateClause(exprs) + } + + /** + * -- timeliness clauses -- + * <timeliness-clauses> = <expr> [, <expr>]+ + */ + def timelinessClause: Parser[TimelinessClause] = rep1sep(expression, Operator.COMMA) ^^ { + case exprs => TimelinessClause(exprs) + } + def parseRule(rule: String, dqType: DqType): ParseResult[Expr] = { val rootExpr = dqType match { case AccuracyType => logicalExpression case ProfilingType => profilingClause + case DuplicateType => duplicateClause + case TimelinessType => timelinessClause case _ => expression } parseAll(rootExpr, rule) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala new file mode 100644 index 0000000..f0afc6c --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala @@ -0,0 +1,32 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.plan + +import org.apache.griffin.measure.rule.dsl._ + +case class DfOprStep(name: String, + rule: String, + details: Map[String, Any], + cache: Boolean = false, + global: Boolean = false + ) extends RuleStep { + + val dslType: DslType = DfOprType + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala new file mode 100644 index 0000000..10f1f9b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala @@ -0,0 +1,28 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.plan + +import org.apache.griffin.measure.rule.dsl._ + +case class MetricExport(name: String, + stepName: String, + collectType: CollectType + ) extends RuleExport { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala new file mode 100644 index 0000000..a467543 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala @@ -0,0 +1,27 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.plan + +case class RecordExport(name: String, + stepName: String, + dataSourceCacheOpt: Option[String], + originDFOpt: Option[String] + ) extends RuleExport { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala new file mode 100644 index 0000000..26a962a --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala @@ -0,0 +1,27 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.plan + +trait RuleExport extends Serializable { + + val name: String // export name + + val stepName: String // the dependant step name + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala new file mode 100644 index 0000000..54a6062 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala @@ -0,0 +1,54 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.plan + +import scala.reflect.ClassTag + +case class RulePlan(ruleSteps: Seq[RuleStep], + ruleExports: Seq[RuleExport] + ) extends Serializable { + + val globalRuleSteps = filterRuleSteps(_.global) + val normalRuleSteps = filterRuleSteps(!_.global) + + val metricExports = filterRuleExports[MetricExport](ruleExports) + val recordExports = filterRuleExports[RecordExport](ruleExports) + + private def filterRuleSteps(func: (RuleStep) => Boolean): Seq[RuleStep] = { + ruleSteps.filter(func) + } + + private def filterRuleExports[T <: RuleExport: ClassTag](exports: Seq[RuleExport]): Seq[T] = { + exports.flatMap { exp => + exp match { + case e: T => Some(e) + case _ => None + } + } + } + +// def ruleStepNames(func: (RuleStep) => Boolean): Seq[String] = { +// ruleSteps.filter(func).map(_.name) +// } + + def merge(rp: RulePlan): RulePlan = { + RulePlan(this.ruleSteps ++ rp.ruleSteps, this.ruleExports ++ rp.ruleExports) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala new file mode 100644 index 0000000..dbdb2d5 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.plan + +import org.apache.griffin.measure.rule.dsl.DslType + +trait RuleStep extends Serializable { + + val dslType: DslType + + val name: String + + val rule: String + + val cache: Boolean + + val global: Boolean + + val details: Map[String, Any] + + def needCache: Boolean = cache || global + + def isGlobal: Boolean = global +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala new file mode 100644 index 0000000..16da9a5 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala @@ -0,0 +1,32 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.plan + +import org.apache.griffin.measure.rule.dsl._ + +case class SparkSqlStep(name: String, + rule: String, + details: Map[String, Any], + cache: Boolean = false, + global: Boolean = false + ) extends RuleStep { + + val dslType: DslType = SparkSqlType + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala new file mode 100644 index 0000000..129d068 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala @@ -0,0 +1,37 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.plan + +trait TimeInfo extends Serializable { + val calcTime: Long +// val tmst: Long + val head: String + + def key: String = if (head.nonEmpty) s"${head}_${calcTime}" else s"${calcTime}" + def setHead(h: String): TimeInfo +} + +case class CalcTimeInfo(calcTime: Long, head: String = "") extends TimeInfo { +// val tmst: Long = calcTime + def setHead(h: String): TimeInfo = CalcTimeInfo(calcTime, h) +} + +//case class TmstTimeInfo(calcTime: Long, tmst: Long, head: String = "") extends TimeInfo { +// def setHead(h: String): TimeInfo = TmstTimeInfo(calcTime, tmst, h) +//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala deleted file mode 100644 index 4b3a4d4..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.step - -import org.apache.griffin.measure.rule.dsl._ - -trait ConcreteRuleStep extends RuleStep { - - val persistType: PersistType - - val updateDataSource: Option[String] - -// def isGroupMetric: Boolean = { -// val _GroupMetric = "group.metric" -// details.get(_GroupMetric) match { -// case Some(b: Boolean) => b -// case _ => false -// } -// } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala deleted file mode 100644 index 86f0bf3..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.step - -import org.apache.griffin.measure.rule.dsl._ - -case class DfOprStep(name: String, rule: String, details: Map[String, Any], - persistType: PersistType, updateDataSource: Option[String] - ) extends ConcreteRuleStep { - - val dslType: DslType = DfOprType - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala deleted file mode 100644 index 21db8cf..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.step - -import org.apache.griffin.measure.rule.dsl._ - -case class GriffinDslStep(name: String, rule: String, dqType: DqType, details: Map[String, Any] - ) extends RuleStep { - - val dslType: DslType = GriffinDslType - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala deleted file mode 100644 index 4675ffe..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.step - -import org.apache.griffin.measure.rule.dsl.{DslType, PersistType} - -trait RuleStep extends Serializable { - - val dslType: DslType - - val name: String - val rule: String - val details: Map[String, Any] - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala deleted file mode 100644 index 62c3c35..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.step - -import org.apache.griffin.measure.persist._ -import org.apache.griffin.measure.rule.dsl._ - -case class SparkSqlStep(name: String, rule: String, details: Map[String, Any], - persistType: PersistType, updateDataSource: Option[String] - ) extends ConcreteRuleStep { - - val dslType: DslType = SparkSqlType - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala index 11e8c8f..37d2a5a 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala @@ -24,10 +24,15 @@ object GriffinUdfs { def register(sqlContext: SQLContext): Unit = { sqlContext.udf.register("index_of", indexOf) + sqlContext.udf.register("matches", matches) } private val indexOf = (arr: Seq[String], v: String) => { arr.indexOf(v) } + private val matches = (s: String, regex: String) => { + s.matches(regex) + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala index 416f567..8e0d9a3 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala @@ -18,11 +18,12 @@ under the License. */ package org.apache.griffin.measure.utils +import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD object HdfsFileDumpUtil { - val sepCount = 5000 + val sepCount = 50000 private def suffix(i: Long): String = { if (i == 0) "" else s".${i}" @@ -32,8 +33,15 @@ object HdfsFileDumpUtil { } def splitRdd[T](rdd: RDD[T])(implicit m: Manifest[T]): RDD[(Long, Iterable[T])] = { - val indexRdd = rdd.zipWithIndex - indexRdd.map(p => ((p._2 / sepCount), p._1)).groupByKey() +// val indexRdd = rdd.zipWithIndex // slow process +// indexRdd.map(p => ((p._2 / sepCount), p._1)).groupByKey() // slow process + val count = rdd.count + val splitCount = count / sepCount + 1 + val splitRdd = rdd.mapPartitionsWithIndex { (n, itr) => + val idx = n % splitCount + itr.map((idx, _)) + } + splitRdd.groupByKey() } def splitIterable[T](datas: Iterable[T])(implicit m: Manifest[T]): Iterator[(Int, Iterable[T])] = { val groupedData = datas.grouped(sepCount).zipWithIndex @@ -47,23 +55,33 @@ object HdfsFileDumpUtil { HdfsUtil.writeContent(path, strRecords) } - def dump(path: String, recordsRdd: RDD[String], lineSep: String): Boolean = { + def dump(path: String, recordsRdd: RDD[String], lineSep: String): Unit = { val groupedRdd = splitRdd(recordsRdd) - groupedRdd.aggregate(true)({ (res, pair) => + groupedRdd.foreach { pair => val (idx, list) = pair val filePath = path + suffix(idx) directDump(filePath, list, lineSep) - true - }, _ && _) + } +// groupedRdd.aggregate(true)({ (res, pair) => +// val (idx, list) = pair +// val filePath = path + suffix(idx) +// directDump(filePath, list, lineSep) +// true +// }, _ && _) } - def dump(path: String, records: Iterable[String], lineSep: String): Boolean = { + def dump(path: String, records: Iterable[String], lineSep: String): Unit = { val groupedRecords = splitIterable(records) - groupedRecords.aggregate(true)({ (res, pair) => + groupedRecords.foreach { pair => val (idx, list) = pair val filePath = path + suffix(idx) directDump(filePath, list, lineSep) - true - }, _ && _) + } +// groupedRecords.aggregate(true)({ (res, pair) => +// val (idx, list) = pair +// val filePath = path + suffix(idx) +// directDump(filePath, list, lineSep) +// true +// }, _ && _) } def remove(path: String, filename: String, withSuffix: Boolean): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala index 9fa6bcf..aa5643b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala @@ -27,7 +27,7 @@ object HdfsUtil extends Loggable { private val seprator = "/" private val conf = new Configuration() - conf.set("dfs.support.append", "true") + conf.setBoolean("dfs.support.append", true) // conf.set("fs.defaultFS", "hdfs://localhost") // debug @localhost private val dfs = FileSystem.get(conf) @@ -54,7 +54,9 @@ object HdfsUtil extends Loggable { def appendOrCreateFile(filePath: String): FSDataOutputStream = { val path = new Path(filePath) - if (dfs.exists(path)) dfs.append(path) else createFile(filePath) + if (dfs.getConf.getBoolean("dfs.support.append", false) && dfs.exists(path)) { + dfs.append(path) + } else createFile(filePath) } def openFile(filePath: String): FSDataInputStream = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala index 7954b6d..1ca32b3 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala @@ -47,6 +47,8 @@ object ParamUtil { } } + def getStringOrKey(key: String): String = getString(key, key) + def getByte(key: String, defValue: Byte): Byte = { try { params.get(key) match { @@ -153,12 +155,46 @@ object ParamUtil { try { params.get(key) match { case Some(v: String) => v.toBoolean + case Some(v: Boolean) => v + case _ => defValue + } + } catch { + case _: Throwable => defValue + } + } + + def getParamMap(key: String, defValue: Map[String, Any] = Map[String, Any]()): Map[String, Any] = { + try { + params.get(key) match { + case Some(v: Map[String, Any]) => v case _ => defValue } } catch { case _: Throwable => defValue } } + + def getParamMapOpt(key: String): Option[Map[String, Any]] = { + try { + params.get(key) match { + case Some(v: Map[String, Any]) => Some(v) + case _ => None + } + } catch { + case _: Throwable => None + } + } + + def addIfNotExist(key: String, value: Any): Map[String, Any] = { + params.get(key) match { + case Some(v) => params + case _ => params + (key -> value) + } + } + + def removeKeys(keys: Iterable[String]): Map[String, Any] = { + params -- keys + } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala index a8c079b..42a140f 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala @@ -18,9 +18,11 @@ under the License. */ package org.apache.griffin.measure.utils +import org.apache.griffin.measure.log.Loggable + import scala.util.{Failure, Success, Try} -object TimeUtil { +object TimeUtil extends Loggable { final val TimeRegex = """^([+\-]?\d+)(ms|s|m|h|d)$""".r final val PureTimeRegex = """^([+\-]?\d+)$""".r @@ -48,7 +50,7 @@ object TimeUtil { } } match { case Success(v) => Some(v) - case Failure(ex) => throw ex + case Failure(ex) => None } } value http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/test/resources/_accuracy-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json b/measure/src/test/resources/_accuracy-batch-griffindsl.json new file mode 100644 index 0000000..c702d46 --- /dev/null +++ b/measure/src/test/resources/_accuracy-batch-griffindsl.json @@ -0,0 +1,63 @@ +{ + "name": "accu_batch", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + }, + "pre.proc": [ + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select * from ${this} where user_id > 10010" + } + ] + } + ] + }, { + "name": "target", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_target.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "accuracy", + "name": "accu", + "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code", + "details": { + "source": "source", + "target": "target", + "miss": "miss_count", + "total": "total_count", + "matched": "matched_count" + }, + "metric": { + "name": "accu" + }, + "record": { + "name": "missRecords" + } + } + ] + } +} \ No newline at end of file
