http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala deleted file mode 100644 index 661e8f4..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/MathExpr.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.expr - -import org.apache.griffin.measure.rule.CalculationUtil._ -import org.apache.griffin.measure.rule.DataTypeCalculationUtil._ -import org.apache.spark.sql.types.DataType - -trait MathExpr extends Expr { - -} - -case class MathFactorExpr(self: Expr) extends MathExpr { - def calculateOnly(values: Map[String, Any]): Option[Any] = self.calculate(values) - val desc: String = self.desc - val dataSources: Set[String] = self.dataSources - override def getSubCacheExprs(ds: String): Iterable[Expr] = { - self.getCacheExprs(ds) - } - override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { - self.getFinalCacheExprs(ds) - } - override def getSubPersistExprs(ds: String): Iterable[Expr] = { - self.getPersistExprs(ds) - } -} - -case class UnaryMathExpr(oprList: Iterable[String], factor: Expr) extends MathExpr { - private val (posOpr, negOpr) = ("+", "-") - def calculateOnly(values: Map[String, Any]): Option[Any] = { - val fv = factor.calculate(values) - oprList.foldRight(fv) { (opr, v) => - opr match { - case this.posOpr => v - case this.negOpr => -v - case _ => None - } - } - } - val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev}${ex}" } - val dataSources: Set[String] = factor.dataSources - override def cacheUnit: Boolean = true - override def getSubCacheExprs(ds: String): Iterable[Expr] = { - factor.getCacheExprs(ds) - } - override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { - factor.getFinalCacheExprs(ds) - } - override def getSubPersistExprs(ds: String): Iterable[Expr] = { - factor.getPersistExprs(ds) - } -} - -case class BinaryMathExpr(first: MathExpr, others: Iterable[(String, MathExpr)]) extends MathExpr { - private val (addOpr, subOpr, mulOpr, divOpr, modOpr) = ("+", "-", "*", "/", "%") - def calculateOnly(values: Map[String, Any]): Option[Any] = { - val fv = first.calculate(values) - others.foldLeft(fv) { (v, pair) => - val (opr, next) = pair - val nv = next.calculate(values) - opr match { - case this.addOpr => v + nv - case this.subOpr => v - nv - case this.mulOpr => v * nv - case this.divOpr => v / nv - case this.modOpr => v % nv - case _ => None - } - } - } - val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" } - val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet - override def cacheUnit: Boolean = true - override def getSubCacheExprs(ds: String): Iterable[Expr] = { - first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds)) - } - override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { - first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds)) - } - override def getSubPersistExprs(ds: String): Iterable[Expr] = { - first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds)) - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala deleted file mode 100644 index 5b7f1b0..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/SelectExpr.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.expr - -import org.apache.spark.sql.types.DataType -import org.apache.griffin.measure.rule.CalculationUtil._ - -trait SelectExpr extends Expr { - def calculateOnly(values: Map[String, Any]): Option[Any] = None -} - -case class IndexFieldRangeSelectExpr(fields: Iterable[FieldDescOnly]) extends SelectExpr { - val desc: String = s"[${fields.map(_.desc).mkString(", ")}]" - val dataSources: Set[String] = Set.empty[String] -} - -case class FunctionOperationExpr(func: String, args: Iterable[MathExpr]) extends SelectExpr { - val desc: String = s".${func}(${args.map(_.desc).mkString(", ")})" - val dataSources: Set[String] = args.flatMap(_.dataSources).toSet - override def getSubCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getCacheExprs(ds)) - override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getFinalCacheExprs(ds)) - override def getSubPersistExprs(ds: String): Iterable[Expr] = args.flatMap(_.getPersistExprs(ds)) -} - -case class FilterSelectExpr(field: FieldDesc, compare: String, value: MathExpr) extends SelectExpr { - val desc: String = s"[${field.desc} ${compare} ${value.desc}]" - val dataSources: Set[String] = value.dataSources - override def getSubCacheExprs(ds: String): Iterable[Expr] = value.getCacheExprs(ds) - override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = value.getFinalCacheExprs(ds) - override def getSubPersistExprs(ds: String): Iterable[Expr] = value.getPersistExprs(ds) - private val (eqOpr, neqOpr, btOpr, bteOpr, ltOpr, lteOpr) = ("""==?""".r, """!==?""".r, ">", ">=", "<", "<=") - override def calculateOnly(values: Map[String, Any]): Option[Any] = { - val (lv, rv) = (values.get(fieldKey), value.calculate(values)) - compare match { - case this.eqOpr() => lv === rv - case this.neqOpr() => lv =!= rv - case this.btOpr => lv > rv - case this.bteOpr => lv >= rv - case this.ltOpr => lv < rv - case this.lteOpr => lv <= rv - case _ => None - } - } - def fieldKey: String = s"__${field.field}" -} - -// -- selection -- -case class SelectionExpr(head: SelectionHead, selectors: Iterable[SelectExpr]) extends Expr { - def calculateOnly(values: Map[String, Any]): Option[Any] = values.get(_id) - - val desc: String = { - val argsString = selectors.map(_.desc).mkString("") - s"${head.desc}${argsString}" - } - val dataSources: Set[String] = { - val selectorDataSources = selectors.flatMap(_.dataSources).toSet - selectorDataSources + head.head - } - - override def cacheUnit: Boolean = true - override def getSubCacheExprs(ds: String): Iterable[Expr] = { - selectors.flatMap(_.getCacheExprs(ds)) - } - override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { - selectors.flatMap(_.getFinalCacheExprs(ds)) - } - - override def persistUnit: Boolean = true - override def getSubPersistExprs(ds: String): Iterable[Expr] = { - selectors.flatMap(_.getPersistExprs(ds)) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala deleted file mode 100644 index 15161c3..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/func/DefaultFunctionDefine.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.func - -import org.apache.griffin.measure.utils.JsonUtil - -class DefaultFunctionDefine extends FunctionDefine { - - def json(strOpt: Option[_]): Map[String, Any] = { - try { - strOpt match { - case Some(str: String) => JsonUtil.toAnyMap(str) - case _ => throw new Exception("json function param should be string") - } - } catch { - case e: Throwable => throw e - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala deleted file mode 100644 index d23fc7a..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionDefine.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.func - -trait FunctionDefine extends Serializable { - -} - -class UnKnown {} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala deleted file mode 100644 index 57e934d..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/func/FunctionUtil.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.func - -import java.lang.reflect.Method - -import org.apache.griffin.measure.log.Loggable - -import scala.collection.mutable.{Map => MutableMap} - -object FunctionUtil extends Loggable { - - val functionDefines: MutableMap[String, FunctionDefine] = MutableMap[String, FunctionDefine]() - - registerFunctionDefine(Array(classOf[DefaultFunctionDefine].getCanonicalName)) - - def registerFunctionDefine(classes: Iterable[String]): Unit = { - for (cls <- classes) { - try { - val clz: Class[_] = Class.forName(cls) - if (classOf[FunctionDefine].isAssignableFrom(clz)) { - functionDefines += (cls -> clz.newInstance.asInstanceOf[FunctionDefine]) - } else { - warn(s"${cls} register fails: ${cls} is not sub class of ${classOf[FunctionDefine].getCanonicalName}") - } - } catch { - case e: Throwable => warn(s"${cls} register fails: ${e.getMessage}") - } - } - } - - def invoke(methodName: String, params: Array[Option[Any]]): Seq[Option[Any]] = { -// val paramTypes = params.map { param => -// try { -// param match { -// case Some(v) => v.getClass -// case _ => classOf[UnKnown] -// } -// } catch { -// case e: Throwable => classOf[UnKnown] -// } -// } - val paramTypes = params.map(a => classOf[Option[_]]) - - functionDefines.values.foldLeft(Nil: Seq[Option[Any]]) { (res, funcDef) => - if (res.isEmpty) { - val clz = funcDef.getClass - try { - val method = clz.getMethod(methodName, paramTypes: _*) - Seq(Some(method.invoke(funcDef, params: _*))) - } catch { - case e: Throwable => res - } - } else res - } - } - -} - http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala new file mode 100644 index 0000000..22d64d8 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala @@ -0,0 +1,72 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.preproc + +object PreProcRuleGenerator { + + val _name = "name" + + def genPreProcRules(rules: Seq[Map[String, Any]], suffix: String): Seq[Map[String, Any]] = { + if (rules == null) Nil else { + rules.map { rule => + genPreProcRule(rule, suffix) + } + } + } + + def getRuleNames(rules: Seq[Map[String, Any]]): Seq[String] = { + if (rules == null) Nil else { + rules.flatMap { rule => + rule.get(_name) match { + case Some(s: String) => Some(s) + case _ => None + } + } + } + } + + private def genPreProcRule(param: Map[String, Any], suffix: String + ): Map[String, Any] = { + val keys = param.keys + keys.foldLeft(param) { (map, key) => + map.get(key) match { + case Some(s: String) => map + (key -> genNewString(s, suffix)) + case Some(subMap: Map[String, Any]) => map + (key -> genPreProcRule(subMap, suffix)) + case Some(arr: Seq[_]) => map + (key -> genPreProcRule(arr, suffix)) + case _ => map + } + } + } + + private def genPreProcRule(paramArr: Seq[Any], suffix: String): Seq[Any] = { + paramArr.foldLeft(Nil: Seq[Any]) { (res, param) => + param match { + case s: String => res :+ genNewString(s, suffix) + case map: Map[String, Any] => res :+ genPreProcRule(map, suffix) + case arr: Seq[_] => res :+ genPreProcRule(arr, suffix) + case _ => res :+ param + } + } + } + + private def genNewString(str: String, suffix: String): String = { + str.replaceAll("""\$\{(.*)\}""", s"$$1_${suffix}") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala new file mode 100644 index 0000000..4b3a4d4 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/ConcreteRuleStep.scala @@ -0,0 +1,37 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.step + +import org.apache.griffin.measure.rule.dsl._ + +trait ConcreteRuleStep extends RuleStep { + + val persistType: PersistType + + val updateDataSource: Option[String] + +// def isGroupMetric: Boolean = { +// val _GroupMetric = "group.metric" +// details.get(_GroupMetric) match { +// case Some(b: Boolean) => b +// case _ => false +// } +// } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala new file mode 100644 index 0000000..86f0bf3 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/DfOprStep.scala @@ -0,0 +1,29 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.step + +import org.apache.griffin.measure.rule.dsl._ + +case class DfOprStep(name: String, rule: String, details: Map[String, Any], + persistType: PersistType, updateDataSource: Option[String] + ) extends ConcreteRuleStep { + + val dslType: DslType = DfOprType + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala new file mode 100644 index 0000000..21db8cf --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/GriffinDslStep.scala @@ -0,0 +1,28 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.step + +import org.apache.griffin.measure.rule.dsl._ + +case class GriffinDslStep(name: String, rule: String, dqType: DqType, details: Map[String, Any] + ) extends RuleStep { + + val dslType: DslType = GriffinDslType + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala new file mode 100644 index 0000000..4675ffe --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/RuleStep.scala @@ -0,0 +1,31 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.step + +import org.apache.griffin.measure.rule.dsl.{DslType, PersistType} + +trait RuleStep extends Serializable { + + val dslType: DslType + + val name: String + val rule: String + val details: Map[String, Any] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala new file mode 100644 index 0000000..62c3c35 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/step/SparkSqlStep.scala @@ -0,0 +1,30 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.step + +import org.apache.griffin.measure.persist._ +import org.apache.griffin.measure.rule.dsl._ + +case class SparkSqlStep(name: String, rule: String, details: Map[String, Any], + persistType: PersistType, updateDataSource: Option[String] + ) extends ConcreteRuleStep { + + val dslType: DslType = SparkSqlType + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala new file mode 100644 index 0000000..11e8c8f --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdfs.scala @@ -0,0 +1,33 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.udf + +import org.apache.spark.sql.SQLContext + +object GriffinUdfs { + + def register(sqlContext: SQLContext): Unit = { + sqlContext.udf.register("index_of", indexOf) + } + + private val indexOf = (arr: Seq[String], v: String) => { + arr.indexOf(v) + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala index 8a608ff..416f567 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala @@ -68,7 +68,7 @@ object HdfsFileDumpUtil { def remove(path: String, filename: String, withSuffix: Boolean): Unit = { if (withSuffix) { - val files = HdfsUtil.listSubPaths(path, "file") + val files = HdfsUtil.listSubPathsByType(path, "file") val patternFiles = files.filter(samePattern(_, filename)) patternFiles.foreach { f => val rmPath = HdfsUtil.getHdfsFilePath(path, f) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala index 6dd54b7..9fa6bcf 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala @@ -18,10 +18,11 @@ under the License. */ package org.apache.griffin.measure.utils +import org.apache.griffin.measure.log.Loggable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} -object HdfsUtil { +object HdfsUtil extends Loggable { private val seprator = "/" @@ -32,8 +33,17 @@ object HdfsUtil { private val dfs = FileSystem.get(conf) def existPath(filePath: String): Boolean = { - val path = new Path(filePath) - dfs.exists(path) + try { + val path = new Path(filePath) + dfs.exists(path) + } catch { + case e: Throwable => false + } + } + + def existFileInDir(dirPath: String, fileName: String): Boolean = { + val filePath = getHdfsFilePath(dirPath, fileName) + existPath(filePath) } def createFile(filePath: String): FSDataOutputStream = { @@ -75,8 +85,12 @@ object HdfsUtil { } def deleteHdfsPath(dirPath: String): Unit = { - val path = new Path(dirPath) - if (dfs.exists(path)) dfs.delete(path, true) + try { + val path = new Path(dirPath) + if (dfs.exists(path)) dfs.delete(path, true) + } catch { + case e: Throwable => error(s"delete path [${dirPath}] error: ${e.getMessage}") + } } // def listPathFiles(dirPath: String): Iterable[String] = { @@ -96,25 +110,38 @@ object HdfsUtil { // } // } - def listSubPaths(dirPath: String, subType: String, fullPath: Boolean = false): Iterable[String] = { - val path = new Path(dirPath) - try { - val fileStatusArray = dfs.listStatus(path) - fileStatusArray.filter { fileStatus => - subType match { - case "dir" => fileStatus.isDirectory - case "file" => fileStatus.isFile - case _ => true + def listSubPathsByType(dirPath: String, subType: String, fullPath: Boolean = false): Iterable[String] = { + if (existPath(dirPath)) { + try { + val path = new Path(dirPath) + val fileStatusArray = dfs.listStatus(path) + fileStatusArray.filter { fileStatus => + subType match { + case "dir" => fileStatus.isDirectory + case "file" => fileStatus.isFile + case _ => true + } + }.map { fileStatus => + val fname = fileStatus.getPath.getName + if (fullPath) getHdfsFilePath(dirPath, fname) else fname + } + } catch { + case e: Throwable => { + warn(s"list path [${dirPath}] warn: ${e.getMessage}") + Nil } - }.map { fileStatus => - val fname = fileStatus.getPath.getName - if (fullPath) getHdfsFilePath(dirPath, fname) else fname - } - } catch { - case e: Throwable => { - println(s"list path files error: ${e.getMessage}") - Nil } + } else Nil + } + + def listSubPathsByTypes(dirPath: String, subTypes: Iterable[String], fullPath: Boolean = false): Iterable[String] = { + subTypes.flatMap { subType => + listSubPathsByType(dirPath, subType, fullPath) } } + + def fileNameFromPath(filePath: String): String = { + val path = new Path(filePath) + path.getName + } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala new file mode 100644 index 0000000..7954b6d --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala @@ -0,0 +1,164 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.utils + +object ParamUtil { + + implicit class ParamMap(params: Map[String, Any]) { + def getAny(key: String, defValue: Any): Any = { + params.get(key) match { + case Some(v) => v + case _ => defValue + } + } + + def getAnyRef[T](key: String, defValue: T)(implicit m: Manifest[T]): T = { + params.get(key) match { + case Some(v: T) => v + case _ => defValue + } + } + + def getString(key: String, defValue: String): String = { + try { + params.get(key) match { + case Some(v: String) => v.toString + case Some(v) => v.toString + case _ => defValue + } + } catch { + case _: Throwable => defValue + } + } + + def getByte(key: String, defValue: Byte): Byte = { + try { + params.get(key) match { + case Some(v: String) => v.toByte + case Some(v: Byte) => v.toByte + case Some(v: Short) => v.toByte + case Some(v: Int) => v.toByte + case Some(v: Long) => v.toByte + case Some(v: Float) => v.toByte + case Some(v: Double) => v.toByte + case _ => defValue + } + } catch { + case _: Throwable => defValue + } + } + + def getShort(key: String, defValue: Short): Short = { + try { + params.get(key) match { + case Some(v: String) => v.toShort + case Some(v: Byte) => v.toShort + case Some(v: Short) => v.toShort + case Some(v: Int) => v.toShort + case Some(v: Long) => v.toShort + case Some(v: Float) => v.toShort + case Some(v: Double) => v.toShort + case _ => defValue + } + } catch { + case _: Throwable => defValue + } + } + + def getInt(key: String, defValue: Int): Int = { + try { + params.get(key) match { + case Some(v: String) => v.toInt + case Some(v: Byte) => v.toInt + case Some(v: Short) => v.toInt + case Some(v: Int) => v.toInt + case Some(v: Long) => v.toInt + case Some(v: Float) => v.toInt + case Some(v: Double) => v.toInt + case _ => defValue + } + } catch { + case _: Throwable => defValue + } + } + + def getLong(key: String, defValue: Long): Long = { + try { + params.get(key) match { + case Some(v: String) => v.toLong + case Some(v: Byte) => v.toLong + case Some(v: Short) => v.toLong + case Some(v: Int) => v.toLong + case Some(v: Long) => v.toLong + case Some(v: Float) => v.toLong + case Some(v: Double) => v.toLong + case _ => defValue + } + } catch { + case _: Throwable => defValue + } + } + + def getFloat(key: String, defValue: Float): Float = { + try { + params.get(key) match { + case Some(v: String) => v.toFloat + case Some(v: Byte) => v.toFloat + case Some(v: Short) => v.toFloat + case Some(v: Int) => v.toFloat + case Some(v: Long) => v.toFloat + case Some(v: Float) => v.toFloat + case Some(v: Double) => v.toFloat + case _ => defValue + } + } catch { + case _: Throwable => defValue + } + } + + def getDouble(key: String, defValue: Double): Double = { + try { + params.get(key) match { + case Some(v: String) => v.toDouble + case Some(v: Byte) => v.toDouble + case Some(v: Short) => v.toDouble + case Some(v: Int) => v.toDouble + case Some(v: Long) => v.toDouble + case Some(v: Float) => v.toDouble + case Some(v: Double) => v.toDouble + case _ => defValue + } + } catch { + case _: Throwable => defValue + } + } + + def getBoolean(key: String, defValue: Boolean): Boolean = { + try { + params.get(key) match { + case Some(v: String) => v.toBoolean + case _ => defValue + } + } catch { + case _: Throwable => defValue + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala index 0079d10..fe721d2 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala @@ -22,8 +22,8 @@ import scala.util.{Failure, Success, Try} object TimeUtil { - final val TimeRegex = """([+\-]?\d+)(d|h|m|s|ms)""".r - final val PureTimeRegex = """([+\-]?\d+)""".r + final val TimeRegex = """^([+\-]?\d+)(d|h|m|s|ms)$""".r + final val PureTimeRegex = """^([+\-]?\d+)$""".r def milliseconds(timeString: String): Option[Long] = { val value: Option[Long] = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-accuracy-streaming-multids.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-test-accuracy-streaming-multids.json b/measure/src/test/resources/config-test-accuracy-streaming-multids.json new file mode 100644 index 0000000..18532b0 --- /dev/null +++ b/measure/src/test/resources/config-test-accuracy-streaming-multids.json @@ -0,0 +1,144 @@ +{ + "name": "accu_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "persist.type": "cache", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + }, + { + "type": "text-dir", + "config": { + "dir.path": "hdfs://localhost/griffin/text", + "data.dir.depth": 0, + "success.file": "_SUCCESS", + "done.file": "_DONE" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "persist.type": "cache", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-2m", "0"] + } + }, { + "name": "target", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${t1}", + "rule": "from_json", + "persist.type": "cache", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${t1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/target", + "info.path": "target", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-2m", "0"] + } + } + ], + + "evaluateRule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "accuracy", + "rule": "source.name = target.name and source.age = target.age", + "details": { + "source": "source", + "target": "target", + "miss.records": { + "name": "miss.records", + "persist.type": "record", + "update.data.source": "source" + }, + "accuracy": { + "name": "accu", + "persist.type": "metric" + }, + "miss": "miss_count", + "total": "total_count", + "matched": "matched_count" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-accuracy-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-test-accuracy-streaming.json b/measure/src/test/resources/config-test-accuracy-streaming.json new file mode 100644 index 0000000..276f8dd --- /dev/null +++ b/measure/src/test/resources/config-test-accuracy-streaming.json @@ -0,0 +1,119 @@ +{ + "name": "accu_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "persist.type": "cache", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-2m", "0"] + } + }, { + "name": "target", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${t1}", + "rule": "from_json", + "persist.type": "cache", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${t1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/target", + "info.path": "target", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-2m", "0"] + } + } + ], + + "evaluateRule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "accuracy", + "rule": "source.name = target.name and source.age = target.age", + "details": { + "source": "source", + "target": "target", + "miss.records": { + "name": "miss.records", + "persist.type": "record", + "update.data.source": "source" + }, + "accuracy": { + "name": "accu", + "persist.type": "metric" + }, + "miss": "miss_count", + "total": "total_count", + "matched": "matched_count" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-accuracy.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-test-accuracy.json b/measure/src/test/resources/config-test-accuracy.json new file mode 100644 index 0000000..ecbdaaa --- /dev/null +++ b/measure/src/test/resources/config-test-accuracy.json @@ -0,0 +1,56 @@ +{ + "name": "accu_batch_test", + + "process.type": "batch", + + "data.sources": [ + { + "name": "src", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + }, { + "name": "tgt", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_target.avro" + } + } + ] + } + ], + + "evaluateRule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "accuracy", + "rule": "src.user_id = tgt.user_id AND upper(src.first_name) = upper(tgt.first_name) AND src.last_name = tgt.last_name AND src.address = tgt.address AND src.email = tgt.email AND src.phone = tgt.phone AND src.post_code = tgt.post_code", + "details": { + "source": "src", + "target": "tgt", + "miss.records": { + "name": "miss.records", + "persist.type": "record" + }, + "accuracy": { + "name": "accu", + "persist.type": "metric" + }, + "miss": "miss_count", + "total": "total_count", + "matched": "matched_count" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-profiling-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-test-profiling-streaming.json b/measure/src/test/resources/config-test-profiling-streaming.json new file mode 100644 index 0000000..b2a74b8 --- /dev/null +++ b/measure/src/test/resources/config-test-profiling-streaming.json @@ -0,0 +1,68 @@ +{ + "name": "prof_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "persist.type": "cache", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"] + } + } + ], + + "evaluateRule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "profiling", + "rule": "source.name.count(), source.age.avg(), source.age.max(), source.age.min() group by source.name", + "details": { + "source": "source", + "profiling": { + "name": "prof", + "persist.type": "metric" + } + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test-profiling.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-test-profiling.json b/measure/src/test/resources/config-test-profiling.json new file mode 100644 index 0000000..187e88a --- /dev/null +++ b/measure/src/test/resources/config-test-profiling.json @@ -0,0 +1,37 @@ +{ + "name": "prof_batch_test", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + } + ], + + "evaluateRule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "profiling", + "rule": "user_id as id, user_id.approx_count_distinct() as cnt group by user_id order by cnt desc, id desc limit 3", + "details": { + "source": "source", + "profiling": { + "name": "count", + "persist.type": "metric" + } + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-test.json b/measure/src/test/resources/config-test.json new file mode 100644 index 0000000..23eb5ff --- /dev/null +++ b/measure/src/test/resources/config-test.json @@ -0,0 +1,55 @@ +{ + "name": "accu batch test", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + }, { + "name": "target", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_target.avro" + } + } + ] + } + ], + + "evaluateRule": { + "rules": [ + { + "dsl.type": "spark-sql", + "name": "miss.records", + "rule": "SELECT source.user_id, source.first_name, source.last_name, source.address, source.email, source.phone, source.post_code FROM source LEFT JOIN target ON coalesce(source.user_id, 'null') = coalesce(target.user_id, 'null') AND coalesce(source.first_name, 'null') = coalesce(target.first_name, 'null') AND coalesce(source.last_name, 'null') = coalesce(target.last_name, 'null') AND coalesce(source.address, 'null') = coalesce(target.address, 'null') AND coalesce(source.email, 'null') = coalesce(target.email, 'null') AND coalesce(source.phone, 'null') = coalesce(target.phone, 'null') AND coalesce(source.post_code, 'null') = coalesce(target.post_code, 'null') WHERE (NOT (source.user_id IS NULL AND source.first_name IS NULL AND source.last_name IS NULL AND source.address IS NULL AND source.email IS NULL AND source.phone IS NULL AND source.post_code IS NULL)) AND (target.user_id IS NULL AND target.first_name IS NULL AND target.last_name IS NULL AND target.address IS NULL AND t arget.email IS NULL AND target.phone IS NULL AND target.post_code IS NULL)", + "persist.type": "record" + }, { + "dsl.type": "spark-sql", + "name": "miss", + "rule": "SELECT COUNT(*) AS `miss` FROM `miss.records`", + }, { + "dsl.type": "spark-sql", + "name": "total", + "rule": "SELECT COUNT(*) AS `total` FROM source", + }, { + "dsl.type": "spark-sql", + "name": "accuracy", + "rule": "SELECT `total`.`total` AS `total`, `miss`.`miss` AS `miss`, (`total`.`total` - `miss`.`miss`) AS `matched` FROM total JOIN miss", + "persist.type": "metric" + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config-test1.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-test1.json b/measure/src/test/resources/config-test1.json new file mode 100644 index 0000000..53a8765 --- /dev/null +++ b/measure/src/test/resources/config-test1.json @@ -0,0 +1,96 @@ +{ + "name": "accu batch test", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "hive", + "version": "1.2", + "config": { + "database": "default", + "table.name": "src" + } + } + ] + }, { + "name": "target", + "connectors": [ + { + "type": "hive", + "version": "1.2", + "config": { + "database": "default", + "table.name": "tgt" + } + } + ] + } + ], + + "evaluateRule": { + "rules": [ + { + "dsl.type": "df-opr", + "name": "source", + "rule": "from_json", + "details": { + "df.name": "source" + } + }, + { + "dsl.type": "spark-sql", + "name": "seeds", + "rule": "SELECT explode(seeds) as seed FROM source" + }, + { + "dsl.type": "df-opr", + "name": "seeds", + "rule": "from_json", + "details": { + "df.name": "seeds", + "col.name": "seed" + } + }, + { + "dsl.type": "spark-sql", + "name": "source", + "rule": "SELECT url, get_json_object(metadata, '$.tracker.crawlRequestCreateTS') AS ts FROM seeds" + }, + { + "dsl.type": "spark-opr", + "name": "target", + "rule": "from_json(target.value)" + }, + { + "dsl.type": "spark-sql", + "name": "attrs", + "rule": "SELECT groups[0].attrsList AS attrs FROM target" + }, + { + "dsl.type": "spark-sql", + "name": "target", + "rule": "SELECT attrs.values[index_of(attrs.name, 'URL')][0] AS url, get_json_object(attrs.values[index_of(attrs.name, 'CRAWLMETADATA')][0], '$.tracker.crawlRequestCreateTS') AS ts FROM df2" + }, + { + "dsl.type": "spark-sql", + "name": "miss.record", + "rule": "SELECT source.url, source.ts FROM source LEFT JOIN target ON coalesce(source.url, '') = coalesce(target.url, '') AND coalesce(source.ts, '') = coalesce(target.ts, '') WHERE (NOT (source.url IS NULL AND source.ts IS NULL)) AND (target.url IS NULL AND target.ts IS NULL)", + "persist.type": "record" + }, { + "dsl.type": "spark-sql", + "name": "miss.count", + "rule": "SELECT COUNT(*) AS `miss.count` FROM `miss.record`", + "persist.type": "metric" + }, { + "dsl.type": "spark-sql", + "name": "total.count", + "rule": "SELECT COUNT(*) AS `total.count` FROM source", + "persist.type": "metric" + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/config.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config.json b/measure/src/test/resources/config.json index 08a6021..0a17474 100644 --- a/measure/src/test/resources/config.json +++ b/measure/src/test/resources/config.json @@ -22,6 +22,6 @@ "evaluateRule": { "sampleRatio": 1, - "rules": "$source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code WHEN $source.user_id > 10015" + "rules": "$source.user_id = $target.user_id AND $source.first_name = $target.first_name AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code" } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/env-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env-streaming.json b/measure/src/test/resources/env-streaming.json index 42b4aa9..a01348f 100644 --- a/measure/src/test/resources/env-streaming.json +++ b/measure/src/test/resources/env-streaming.json @@ -5,6 +5,7 @@ "batch.interval": "2s", "process.interval": "10s", "config": { + "spark.master": "local[*]", "spark.task.maxFailures": 5, "spark.streaming.kafkaMaxRatePerPartition": 1000, "spark.streaming.concurrentJobs": 4, http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/env-test.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env-test.json b/measure/src/test/resources/env-test.json new file mode 100644 index 0000000..603fad8 --- /dev/null +++ b/measure/src/test/resources/env-test.json @@ -0,0 +1,38 @@ +{ + "spark": { + "log.level": "WARN", + "checkpoint.dir": "hdfs:///griffin/batch/cp", + "batch.interval": "10s", + "process.interval": "10m", + "config": { + "spark.master": "local[*]" + } + }, + + "persist": [ + { + "type": "log", + "config": { + "max.log.lines": 100 + } + } + ], + + "info.cache": [ + { + "type": "zk", + "config": { + "hosts": "localhost:2181", + "namespace": "griffin/infocache", + "lock.path": "lock", + "mode": "persist", + "init.clear": true, + "close.clear": false + } + } + ], + + "cleaner": { + + } +} \ No newline at end of file
