docker version Author: Liu <[email protected]> Author: Liu <[email protected]> Author: Liu <[email protected]> Author: bhlx3lyx7 <[email protected]>
Closes #34 from bhlx3lyx7/master. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/a0117811 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/a0117811 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/a0117811 Branch: refs/heads/master Commit: a0117811c2e1ee101632458bb7a01d22171300d7 Parents: 38978a8 Author: Liu <[email protected]> Authored: Tue May 23 11:10:57 2017 +0800 Committer: Liu <[email protected]> Committed: Tue May 23 11:10:57 2017 +0800 ---------------------------------------------------------------------- griffin-measure/griffin-measure-batch/pom.xml | 16 + .../src/main/resources/config-old.json | 45 +++ .../src/main/resources/config.json | 29 ++ .../src/main/resources/env.json | 29 ++ .../src/main/resources/log4j.properties | 5 + .../griffin/measure/batch/Application.scala | 87 ++++++ .../measure/batch/algo/AccuracyAlgo.scala | 6 + .../griffin/measure/batch/algo/Algo.scala | 16 + .../measure/batch/algo/BatchAccuracyAlgo.scala | 144 +++++++++ .../measure/batch/algo/core/AccuracyCore.scala | 78 +++++ .../measure/batch/config/params/AllParam.scala | 14 + .../measure/batch/config/params/Param.scala | 7 + .../batch/config/params/env/CleanerParam.scala | 10 + .../batch/config/params/env/EnvParam.scala | 13 + .../batch/config/params/env/PersistParam.scala | 12 + .../batch/config/params/env/SparkParam.scala | 13 + .../config/params/user/DataConnectorParam.scala | 13 + .../config/params/user/EvaluateRuleParam.scala | 12 + .../batch/config/params/user/UserParam.scala | 15 + .../batch/config/reader/ParamFileReader.scala | 20 ++ .../config/reader/ParamHdfsFileReader.scala | 20 ++ .../config/reader/ParamRawStringReader.scala | 17 ++ .../batch/config/reader/ParamReader.scala | 12 + .../config/reader/ParamReaderFactory.scala | 22 ++ .../config/validator/AllParamValidator.scala | 16 + .../batch/config/validator/ParamValidator.scala | 12 + .../batch/connector/AvroDataConnector.scala | 79 +++++ .../measure/batch/connector/CacheDataUtil.scala | 63 ++++ .../measure/batch/connector/DataConnector.scala | 16 + .../batch/connector/DataConnectorFactory.scala | 32 ++ .../batch/connector/HiveDataConnector.scala | 101 +++++++ .../griffin/measure/batch/log/Loggable.scala | 25 ++ .../measure/batch/persist/HdfsPersist.scala | 119 ++++++++ .../measure/batch/persist/HttpPersist.scala | 44 +++ .../measure/batch/persist/MultiPersists.scala | 27 ++ .../griffin/measure/batch/persist/Persist.scala | 23 ++ .../measure/batch/persist/PersistFactory.scala | 30 ++ .../measure/batch/result/AccuracyResult.scala | 26 ++ .../griffin/measure/batch/result/Result.scala | 14 + .../measure/batch/result/ResultInfo.scala | 33 ++ .../measure/batch/rule/RuleAnalyzer.scala | 28 ++ .../measure/batch/rule/RuleAnalyzerOld.scala | 26 ++ .../measure/batch/rule/RuleFactory.scala | 34 +++ .../griffin/measure/batch/rule/RuleParser.scala | 298 +++++++++++++++++++ .../batch/rule/RuleParserDescription.scala | 84 ++++++ .../batch/rule/expr/AnalyzableExpr.scala | 7 + .../measure/batch/rule/expr/Cacheable.scala | 15 + .../measure/batch/rule/expr/Calculatable.scala | 7 + .../batch/rule/expr/DataSourceable.scala | 10 + .../measure/batch/rule/expr/Describable.scala | 15 + .../griffin/measure/batch/rule/expr/Expr.scala | 33 ++ .../measure/batch/rule/expr/ExprDescOnly.scala | 22 ++ .../measure/batch/rule/expr/ExprIdCounter.scala | 42 +++ .../measure/batch/rule/expr/FieldDescOnly.scala | 40 +++ .../measure/batch/rule/expr/LiteralExpr.scala | 68 +++++ .../measure/batch/rule/expr/LogicalExpr.scala | 149 ++++++++++ .../measure/batch/rule/expr/MathExpr.scala | 79 +++++ .../measure/batch/rule/expr/SelectExpr.scala | 53 ++++ .../measure/batch/rule/expr/StatementExpr.scala | 50 ++++ .../batch/rule/expr_old/AnnotationExpr.scala | 16 + .../batch/rule/expr_old/Calculatable.scala | 7 + .../measure/batch/rule/expr_old/ConstExpr.scala | 59 ++++ .../measure/batch/rule/expr_old/DataExpr.scala | 27 ++ .../batch/rule/expr_old/ElementExpr.scala | 52 ++++ .../measure/batch/rule/expr_old/Expr.scala | 11 + .../batch/rule/expr_old/ExprAnalyzable.scala | 8 + .../batch/rule/expr_old/ExprIdCounter.scala | 39 +++ .../batch/rule/expr_old/Recordable.scala | 15 + .../batch/rule/expr_old/SelectExpr.scala | 48 +++ .../rule/expr_old/StatementAnalyzable.scala | 14 + .../batch/rule/expr_old/StatementExpr.scala | 85 ++++++ .../batch/rule/expr_old/VariableExpr.scala | 26 ++ .../measure/batch/utils/CalculationUtil.scala | 259 ++++++++++++++++ .../griffin/measure/batch/utils/HdfsUtil.scala | 62 ++++ .../griffin/measure/batch/utils/HttpUtil.scala | 30 ++ .../griffin/measure/batch/utils/JsonUtil.scala | 32 ++ .../measure/batch/utils/StringParseUtil.scala | 10 + .../src/test/resources/config.json | 25 ++ .../src/test/resources/config1.json | 27 ++ .../src/test/resources/env.json | 27 ++ .../src/test/resources/env1.json | 21 ++ .../src/test/resources/log4j.properties | 5 + .../src/test/resources/users_info_src.avro | Bin 0 -> 3850 bytes .../src/test/resources/users_info_src.dat | 50 ++++ .../src/test/resources/users_info_target.avro | Bin 0 -> 3852 bytes .../src/test/resources/users_info_target.dat | 50 ++++ .../batch/algo/BatchAccuracyAlgoTest.scala | 219 ++++++++++++++ .../config/reader/ParamFileReaderTest.scala | 35 +++ .../measure/batch/rule/RuleParserTest.scala | 77 +++++ griffin-measure/pom.xml | 197 ++++++++++++ .../data/test/dataFile/users_info_src.avro | Bin 0 -> 3799 bytes .../data/test/dataFile/users_info_target.avro | Bin 0 -> 3799 bytes .../data/test/recordFile/_RESULT_ACCU | 12 + .../data/test/reqJson/accuUsersInfo.json | 55 ++++ .../src/test/scala/modelTest/AccuTest.scala | 86 ++++++ .../scala/org/apache/griffin/Application.scala | 3 +- ui/js/controllers/metrics-ctrl.js | 5 +- ui/js/services/services.js | 6 +- 98 files changed, 4067 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/pom.xml ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/pom.xml b/griffin-measure/griffin-measure-batch/pom.xml new file mode 100644 index 0000000..6fd72ed --- /dev/null +++ b/griffin-measure/griffin-measure-batch/pom.xml @@ -0,0 +1,16 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>griffin-measure</artifactId> + <groupId>org.apache.griffin</groupId> + <version>0.0.1-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>griffin-measure-batch</artifactId> + <version>0.0.1-SNAPSHOT</version> + + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/resources/config-old.json ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/resources/config-old.json b/griffin-measure/griffin-measure-batch/src/main/resources/config-old.json new file mode 100644 index 0000000..63dee69 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/resources/config-old.json @@ -0,0 +1,45 @@ +{ + "name": "accu1", + "type": "accuracy", + + "source": { + "connector": { + "type": "hive", + "version": "1.2", + "config": { + "table.name": "users_info_src", + "partitions": "dt=20170410, hour=14" + } + } + }, + + "target": { + "connector": { + "type": "hive", + "version": "1.2", + "config": { + "database": "default", + "table.name": "users_info_target", + "partitions": "dt=20170410, hour=14; dt=20170410, hour=15" + } + } + }, + + "evaluateRule": { + "sampleRatio": 1, + "assertion": { + "type": "DSL-griffin", + "rules": [ + { + "rule": "@Key ${source}['user_id'] === ${target}['user_id']" + }, + { + "rule": "${source}['first_name'] === ${target}['first_name']; ${source}['last_name'] === ${target}['last_name']; ${source}['address'] === ${target}['address']" + }, + { + "rule": "${source}['email'] === ${target}['email']; ${source}['phone'] === ${target}['phone']; ${source}['post_code'] === ${target}['post_code']" + } + ] + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/resources/config.json ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/resources/config.json b/griffin-measure/griffin-measure-batch/src/main/resources/config.json new file mode 100644 index 0000000..edd2e6a --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/resources/config.json @@ -0,0 +1,29 @@ +{ + "name": "accu1", + "type": "accuracy", + + "source": { + "type": "hive", + "version": "1.2", + "config": { + "database": "default", + "table.name": "users_info_src", + "partitions": "dt=23123, hour=432; dt=35464, hour=4657" + } + }, + + "target": { + "type": "hive", + "version": "1.2", + "config": { + "database": "default", + "table.name": "users_info_target", + "partitions": "dt=23123, hour=432; dt=35464, hour=4657" + } + }, + + "evaluateRule": { + "sampleRatio": 0.2, + "rules": "$source.user_id = $target.user_id AND $source.first_name = $target.first_name AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/resources/env.json ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/resources/env.json b/griffin-measure/griffin-measure-batch/src/main/resources/env.json new file mode 100644 index 0000000..2e72601 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/resources/env.json @@ -0,0 +1,29 @@ +{ + "spark": { + "log.level": "INFO", + "checkpoint.dir": "hdfs:///griffin/batch/cp", + "config": {} + }, + + "persist": [ + { + "type": "hdfs", + "config": { + "path": "hdfs:///griffin/streaming/persist", + "max.persist.lines": 10000, + "max.lines.per.file": 10000 + } + }, + { + "type": "http", + "config": { + "method": "post", + "api": "http://phxbark4dq-360935.stratus.phx.ebay.com:8080/" + } + } + ], + + "cleaner": { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/resources/log4j.properties b/griffin-measure/griffin-measure-batch/src/main/resources/log4j.properties new file mode 100644 index 0000000..bd31e15 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/resources/log4j.properties @@ -0,0 +1,5 @@ +log4j.rootLogger=INFO, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala new file mode 100644 index 0000000..6fe6bcb --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala @@ -0,0 +1,87 @@ +package org.apache.griffin.measure.batch + +import org.apache.griffin.measure.batch.algo.{Algo, BatchAccuracyAlgo} +import org.apache.griffin.measure.batch.config.params._ +import org.apache.griffin.measure.batch.config.params.env._ +import org.apache.griffin.measure.batch.config.params.user._ +import org.apache.griffin.measure.batch.config.reader._ +import org.apache.griffin.measure.batch.config.validator.AllParamValidator +import org.apache.griffin.measure.batch.log.Loggable + +import scala.util.{Failure, Success, Try} + +object Application extends Loggable { + + def main(args: Array[String]): Unit = { + info(args.toString) + if (args.length < 2) { + error("Usage: class <env-param> <user-param> [List of String split by comma: raw | local | hdfs(default)]") + sys.exit(-1) + } + + val envParamFile = args(0) + val userParamFile = args(1) + val (envFsType, userFsType) = if (args.length > 2) { + val fsTypes = args(2).trim.split(",") + if (fsTypes.length == 1) (fsTypes(0).trim, fsTypes(0).trim) + else if (fsTypes.length >= 2) (fsTypes(0).trim, fsTypes(1).trim) + else ("hdfs", "hdfs") + } else ("hdfs", "hdfs") + + info(envParamFile) + info(userParamFile) + + // read param files + val envParam = readParamFile[EnvParam](envParamFile, envFsType) match { + case Success(p) => p + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-2) + } + } + val userParam = readParamFile[UserParam](userParamFile, userFsType) match { + case Success(p) => p + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-2) + } + } + val allParam: AllParam = AllParam(envParam, userParam) + + // validate param files + validateParams(allParam) match { + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-3) + } + case _ => { + info("params validation pass") + } + } + + // choose algorithm // fixme: not done, need to choose algorithm by param + val dqType = allParam.userParam.dqType + val algo: Algo = BatchAccuracyAlgo(allParam) + + algo.run match { + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-4) + } + case _ => { + info("calculation finished") + } + } + } + + private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { + val paramReader = ParamReaderFactory.getParamReader(file, fsType) + paramReader.readConfig[T] + } + + private def validateParams(allParam: AllParam): Try[Boolean] = { + val allParamValidator = AllParamValidator() + allParamValidator.validate(allParam) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala new file mode 100644 index 0000000..d22add2 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala @@ -0,0 +1,6 @@ +package org.apache.griffin.measure.batch.algo + + +trait AccuracyAlgo extends Algo { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala new file mode 100644 index 0000000..d7047ee --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala @@ -0,0 +1,16 @@ +package org.apache.griffin.measure.batch.algo + +import org.apache.griffin.measure.batch.config.params.env._ +import org.apache.griffin.measure.batch.config.params.user._ +import org.apache.griffin.measure.batch.log.Loggable + +import scala.util.Try + +trait Algo extends Loggable with Serializable { + + val envParam: EnvParam + val userParam: UserParam + + def run(): Try[_] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala new file mode 100644 index 0000000..24564a4 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala @@ -0,0 +1,144 @@ +package org.apache.griffin.measure.batch.algo + +import java.util.Date + +import org.apache.griffin.measure.batch.algo.core.AccuracyCore +import org.apache.griffin.measure.batch.config.params.AllParam +import org.apache.griffin.measure.batch.connector._ +import org.apache.griffin.measure.batch.rule._ +import org.apache.griffin.measure.batch.rule.expr._ +import org.apache.griffin.measure.batch.persist._ +import org.apache.griffin.measure.batch.result._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.{SparkConf, SparkContext} + +import scala.util.{Failure, Success, Try} + + +case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo { + val envParam = allParam.envParam + val userParam = allParam.userParam + + def run(): Try[_] = { + Try { + val metricName = userParam.name + + val conf = new SparkConf().setAppName(metricName) + val sc = new SparkContext(conf) + val sqlContext = new HiveContext(sc) + + // start time + val startTime = new Date().getTime() + + // get persists + val persist: Persist = PersistFactory(envParam.persistParams, metricName).getPersists(startTime) + + // get spark application id + val applicationId = sc.applicationId + + // start + persist.start(applicationId) + + // rules + val ruleFactory = RuleFactory(userParam.evaluateRuleParam) + val rule: StatementExpr = ruleFactory.generateRule() + val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule) + + // global cache data + val globalCachedData = CacheDataUtil.genCachedMap(None, ruleAnalyzer.globalCacheExprs, Map[String, Any]()) + val globalFinalCachedData = CacheDataUtil.filterCachedMap(ruleAnalyzer.globalFinalCacheExprs, globalCachedData) + + // data connector + val sourceDataConnector: DataConnector = + DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam, + ruleAnalyzer.sourceGroupbyExprs, ruleAnalyzer.sourceCacheExprs, + ruleAnalyzer.sourceFinalCacheExprs, globalFinalCachedData) match { + case Success(cntr) => { + if (cntr.available) cntr + else throw new Exception("source data connection error!") + } + case Failure(ex) => throw ex + } + val targetDataConnector: DataConnector = + DataConnectorFactory.getDataConnector(sqlContext, userParam.targetParam, + ruleAnalyzer.targetGroupbyExprs, ruleAnalyzer.targetCacheExprs, + ruleAnalyzer.targetFinalCacheExprs, globalFinalCachedData) match { + case Success(cntr) => { + if (cntr.available) cntr + else throw new Exception("target data connection error!") + } + case Failure(ex) => throw ex + } + + // get metadata +// val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match { +// case Success(md) => md +// case _ => throw new Exception("source metadata error!") +// } +// val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match { +// case Success(md) => md +// case _ => throw new Exception("target metadata error!") +// } + + // get data + val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match { + case Success(dt) => dt + case Failure(ex) => throw ex + } + val targetData: RDD[(Product, Map[String, Any])] = targetDataConnector.data() match { + case Success(dt) => dt + case Failure(ex) => throw ex + } + + // accuracy algorithm + val (accuResult, missingRdd, matchingRdd) = accuracy(sourceData, targetData, ruleAnalyzer) + + // end time + val endTime = new Date().getTime + persist.log(endTime, s"calculation using time: ${endTime - startTime} ms") + + // persist result + persist.result(endTime, accuResult) + val missingRecords = missingRdd.map(record2String(_)) + persist.missRecords(missingRecords) + + // persist end time + val persistEndTime = new Date().getTime + persist.log(persistEndTime, s"persist using time: ${persistEndTime - endTime} ms") + + // finish + persist.finish() + + // context stop + sc.stop + + } + } + + def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, Any]) = { + (data, Map[String, Any]()) + } + + def accuracy(sourceData: RDD[(Product, Map[String, Any])], targetData: RDD[(Product, Map[String, Any])], ruleAnalyzer: RuleAnalyzer + ): (AccuracyResult, RDD[(Product, (Map[String, Any], Map[String, Any]))], RDD[(Product, (Map[String, Any], Map[String, Any]))]) = { + + // 1. wrap data + val sourceWrappedData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceData.map(r => (r._1, wrapInitData(r._2))) + val targetWrappedData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = targetData.map(r => (r._1, wrapInitData(r._2))) + + // 2. cogroup + val allKvs = sourceWrappedData.cogroup(targetWrappedData) + + // 3. accuracy calculation + val (accuResult, missingRdd, matchingRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer) + + (accuResult, missingRdd, matchingRdd) + } + + def record2String(rec: (Product, (Map[String, Any], Map[String, Any]))): String = { + val (key, (data, info)) = rec + s"${data} [${info}]" + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala new file mode 100644 index 0000000..2989538 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala @@ -0,0 +1,78 @@ +package org.apache.griffin.measure.batch.algo.core + +import org.apache.griffin.measure.batch.rule.RuleAnalyzer +import org.apache.griffin.measure.batch.result._ +import org.apache.spark.rdd.RDD + + +object AccuracyCore { + + type V = Map[String, Any] + type T = Map[String, Any] + + def accuracy(allKvs: RDD[(Product, (Iterable[(V, T)], Iterable[(V, T)]))], ruleAnalyzer: RuleAnalyzer + ): (AccuracyResult, RDD[(Product, (V, T))], RDD[(Product, (V, T))]) = { + val result: RDD[(Long, Long, List[(Product, (V, T))], List[(Product, (V, T))])] = allKvs.map { kv => + val (key, (sourceDatas, targetDatas)) = kv + + // result: (missCount, matchCount, missDataList, matchDataList) + val rslt = sourceDatas.foldLeft((0L, 0L, List[(Product, (V, T))](), List[(Product, (V, T))]())) { (sr, sourcePair) => + val matchResult = if (targetDatas.isEmpty) { + (false, Map[String, Any]((MismatchInfo.key -> "no target"))) + } else { + targetDatas.foldLeft((false, Map[String, Any]())) { (tr, targetPair) => + if (tr._1) tr + else matchData(sourcePair, targetPair, ruleAnalyzer) + } + } + + if (matchResult._1) { + val matchItem = (key, sourcePair) + (sr._1, sr._2 + 1, sr._3, sr._4 :+ matchItem) + } else { + val missItem = (key, (sourcePair._1, sourcePair._2 ++ matchResult._2)) + (sr._1 + 1, sr._2, sr._3 :+ missItem, sr._4) + } + } + + rslt + } + + val missRdd = result.flatMap(_._3) + val matchRdd = result.flatMap(_._4) + + def seq(cnt: (Long, Long), rcd: (Long, Long, Any, Any)): (Long, Long) = { + (cnt._1 + rcd._1, cnt._2 + rcd._2) + } + def comb(c1: (Long, Long), c2: (Long, Long)): (Long, Long) = { + (c1._1 + c2._1, c1._2 + c2._2) + } + val countPair = result.aggregate((0L, 0L))(seq, comb) + + (AccuracyResult(countPair._1, (countPair._1 + countPair._2)), missRdd, matchRdd) + } + + private def matchData(source: (V, T), target: (V, T), ruleAnalyzer: RuleAnalyzer): (Boolean, T) = { + + // 1. merge source and target cached data + val mergedData: Map[String, Any] = mergeData(source, target) + + // 2. check valid + if (ruleAnalyzer.rule.valid(mergedData)) { + // 3. substitute the cached data into statement, get the statement value + // currently we can not get the mismatch reason, we need to add such information to figure out how it mismatches + ((ruleAnalyzer.rule.calculate(mergedData) match { + case Some(b: Boolean) => b + case _ => false + }), Map[String, Any]((MismatchInfo.key -> s"not matched with ${target._1}"))) + } else { + (false, Map[String, Any]((MismatchInfo.key -> s"not valid to compare with ${target._1}"))) + } + + } + + private def mergeData(source: (V, T), target: (V, T)): Map[String, Any] = { + source._1 ++ target._1 + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala new file mode 100644 index 0000000..b5618a8 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala @@ -0,0 +1,14 @@ +package org.apache.griffin.measure.batch.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.batch.config.params.env._ +import org.apache.griffin.measure.batch.config.params.user._ + +// simply composite of env and user params, for convenient usage +@JsonInclude(Include.NON_NULL) +case class AllParam( @JsonProperty("env") envParam: EnvParam, + @JsonProperty("user") userParam: UserParam + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala new file mode 100644 index 0000000..ce94c5c --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala @@ -0,0 +1,7 @@ +package org.apache.griffin.measure.batch.config.params + +trait Param extends Serializable { + + def validate(): Boolean = true + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala new file mode 100644 index 0000000..a29004c --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala @@ -0,0 +1,10 @@ +package org.apache.griffin.measure.batch.config.params.env + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.batch.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class CleanerParam() extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala new file mode 100644 index 0000000..79e2575 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala @@ -0,0 +1,13 @@ +package org.apache.griffin.measure.batch.config.params.env + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.batch.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class EnvParam( @JsonProperty("spark") sparkParam: SparkParam, + @JsonProperty("persist") persistParams: List[PersistParam], + @JsonProperty("cleaner") cleanerParam: CleanerParam + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala new file mode 100644 index 0000000..655f19b --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala @@ -0,0 +1,12 @@ +package org.apache.griffin.measure.batch.config.params.env + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.batch.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class PersistParam( @JsonProperty("type") persistType: String, + @JsonProperty("config") config: Map[String, Any] + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala new file mode 100644 index 0000000..ba6f9a6 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala @@ -0,0 +1,13 @@ +package org.apache.griffin.measure.batch.config.params.env + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.batch.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class SparkParam( @JsonProperty("log.level") logLevel: String, + @JsonProperty("checkpoint.dir") cpDir: String, + @JsonProperty("config") config: Map[String, Any] + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala new file mode 100644 index 0000000..7a8b2b2 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala @@ -0,0 +1,13 @@ +package org.apache.griffin.measure.batch.config.params.user + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.batch.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class DataConnectorParam( @JsonProperty("type") conType: String, + @JsonProperty("version") version: String, + @JsonProperty("config") config: Map[String, Any] + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala new file mode 100644 index 0000000..edf0a38 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala @@ -0,0 +1,12 @@ +package org.apache.griffin.measure.batch.config.params.user + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.batch.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class EvaluateRuleParam( @JsonProperty("sampleRatio") sampleRatio: Double, + @JsonProperty("rules") rules: String + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala new file mode 100644 index 0000000..be0d3c8 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala @@ -0,0 +1,15 @@ +package org.apache.griffin.measure.batch.config.params.user + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include +import org.apache.griffin.measure.batch.config.params.Param + +@JsonInclude(Include.NON_NULL) +case class UserParam(@JsonProperty("name") name: String, + @JsonProperty("type") dqType: String, + @JsonProperty("source") sourceParam: DataConnectorParam, + @JsonProperty("target") targetParam: DataConnectorParam, + @JsonProperty("evaluateRule") evaluateRuleParam: EvaluateRuleParam + ) extends Param { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala new file mode 100644 index 0000000..e30513f --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala @@ -0,0 +1,20 @@ +package org.apache.griffin.measure.batch.config.reader + +import org.apache.griffin.measure.batch.config.params.Param +import org.apache.griffin.measure.batch.utils.JsonUtil + +import scala.util.Try + +case class ParamFileReader(file: String) extends ParamReader { + + def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = { + Try { + val source = scala.io.Source.fromFile(file) + val lines = source.mkString + val param = JsonUtil.fromJson[T](lines) + source.close + param + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala new file mode 100644 index 0000000..473b428 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala @@ -0,0 +1,20 @@ +package org.apache.griffin.measure.batch.config.reader + +import org.apache.griffin.measure.batch.config.params.Param +import org.apache.griffin.measure.batch.utils.JsonUtil +import org.apache.griffin.measure.batch.utils.HdfsUtil + +import scala.util.Try + +case class ParamHdfsFileReader(filePath: String) extends ParamReader { + + def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = { + Try { + val source = HdfsUtil.openFile(filePath) + val param = JsonUtil.fromJson[T](source) + source.close + param + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala new file mode 100644 index 0000000..1e931de --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala @@ -0,0 +1,17 @@ +package org.apache.griffin.measure.batch.config.reader + +import org.apache.griffin.measure.batch.config.params.Param +import org.apache.griffin.measure.batch.utils.JsonUtil + +import scala.util.Try + +case class ParamRawStringReader(rawString: String) extends ParamReader { + + def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = { + Try { + val param = JsonUtil.fromJson[T](rawString) + param + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala new file mode 100644 index 0000000..2f42b09 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala @@ -0,0 +1,12 @@ +package org.apache.griffin.measure.batch.config.reader + +import org.apache.griffin.measure.batch.log.Loggable +import org.apache.griffin.measure.batch.config.params.Param + +import scala.util.Try + +trait ParamReader extends Loggable with Serializable { + + def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala new file mode 100644 index 0000000..dd77e8c --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala @@ -0,0 +1,22 @@ +package org.apache.griffin.measure.batch.config.reader + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + + +object ParamReaderFactory { + + val RawStringRegex = """^(?i)raw$""".r + val LocalFsRegex = """^(?i)local$""".r + val HdfsFsRegex = """^(?i)hdfs$""".r + + def getParamReader(filePath: String, fsType: String): ParamReader = { + fsType match { + case RawStringRegex() => ParamRawStringReader(filePath) + case LocalFsRegex() => ParamFileReader(filePath) + case HdfsFsRegex() => ParamHdfsFileReader(filePath) + case _ => ParamHdfsFileReader(filePath) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala new file mode 100644 index 0000000..b0ab8b5 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala @@ -0,0 +1,16 @@ +package org.apache.griffin.measure.batch.config.validator + +import org.apache.griffin.measure.batch.config.params.Param + +import scala.util.Try + +case class AllParamValidator() extends ParamValidator { + + def validate[T <: Param](param: Param): Try[Boolean] = { + Try { + // fixme: not done, need to validate param + true + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala new file mode 100644 index 0000000..9dc9e60 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala @@ -0,0 +1,12 @@ +package org.apache.griffin.measure.batch.config.validator + +import org.apache.griffin.measure.batch.log.Loggable +import org.apache.griffin.measure.batch.config.params.Param + +import scala.util.Try + +trait ParamValidator extends Loggable with Serializable { + + def validate[T <: Param](param: Param): Try[Boolean] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala new file mode 100644 index 0000000..d271e17 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala @@ -0,0 +1,79 @@ +package org.apache.griffin.measure.batch.connector + +import org.apache.griffin.measure.batch.rule.expr._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import com.databricks.spark.avro._ + +import scala.util.{Success, Try} +import java.nio.file.{Files, Paths} + +import org.apache.griffin.measure.batch.utils.HdfsUtil + +case class AvroDataConnector(sqlContext: SQLContext, config: Map[String, Any], + groupbyExprs: Seq[MathExpr], cacheExprs: Iterable[Expr], + finalCacheExprs: Iterable[Expr], globalFinalCacheMap: Map[String, Any] + ) extends DataConnector { + + val FilePath = "file.path" + val FileName = "file.name" + + val filePath = config.getOrElse(FilePath, "").toString + val fileName = config.getOrElse(FileName, "").toString + + val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName + + private def pathPrefix(): Boolean = { + filePath.nonEmpty + } + + private def fileExist(): Boolean = { + HdfsUtil.existPath(concreteFileFullPath) + } + + def available(): Boolean = { + (!concreteFileFullPath.isEmpty) && fileExist + } + + def metaData(): Try[Iterable[(String, String)]] = { + Try { + val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema + st.fields.map(f => (f.name, f.dataType.typeName)) + } + } + + def data(): Try[RDD[(Product, Map[String, Any])]] = { + Try { + loadDataFile.map { row => + // generate cache data + val cacheData: Map[String, Any] = cacheExprs.foldLeft(globalFinalCacheMap) { (cachedMap, expr) => + CacheDataUtil.genCachedMap(Some(row), expr, cachedMap) + } + val finalCacheData = CacheDataUtil.filterCachedMap(finalCacheExprs, cacheData) + + // get groupby data + val groupbyData: Seq[AnyRef] = groupbyExprs.flatMap { expr => + expr.calculate(finalCacheData) match { + case Some(v) => Some(v.asInstanceOf[AnyRef]) + case _ => None + } + } + val key = toTuple(groupbyData) + + (key, finalCacheData) + } + } + } + + private def loadDataFile() = { + sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath) + } + + private def toTuple[A <: AnyRef](as: Seq[A]): Product = { + if (as.size > 0) { + val tupleClass = Class.forName("scala.Tuple" + as.size) + tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product] + } else None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala new file mode 100644 index 0000000..884df5d --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala @@ -0,0 +1,63 @@ +package org.apache.griffin.measure.batch.connector + +import org.apache.griffin.measure.batch.rule.expr._ +import org.apache.spark.sql.Row + +import scala.util.{Success, Try} + +object CacheDataUtil { + + // for now, one expr only get one value, not supporting one expr get multiple values + private def getCacheData(data: Option[Any], expr: Expr, cachedMap: Map[String, Any]): Option[Any] = { + Try { + expr match { + case selection: SelectionExpr => { + selection.selectors.foldLeft(data) { (dt, selector) => + getCacheData(dt, selector, cachedMap) + } + } + case selector: IndexFieldRangeSelectExpr => { + data match { + case Some(row: Row) => { + if (selector.fields.size == 1) { + selector.fields.head match { + case i: IndexDesc => Some(row.getAs[Any](i.index)) + case f: FieldDesc => Some(row.getAs[Any](f.field)) + case _ => None + } + } else None + } + case _ => None + } + } + case _ => expr.calculate(cachedMap) + } + } match { + case Success(v) => v + case _ => None + } + } + + def genCachedMap(data: Option[Any], expr: Expr, initialCachedMap: Map[String, Any]): Map[String, Any] = { + val valueOpt = getCacheData(data, expr, initialCachedMap) + if (valueOpt.nonEmpty) { + initialCachedMap + (expr._id -> valueOpt.get) + } else initialCachedMap + } + + def genCachedMap(data: Option[Any], exprs: Iterable[Expr], initialCachedMap: Map[String, Any]): Map[String, Any] = { + exprs.foldLeft(initialCachedMap) { (cachedMap, expr) => + CacheDataUtil.genCachedMap(None, expr, cachedMap) + } + } + + def filterCachedMap(exprs: Iterable[Expr], cachedMap: Map[String, Any]): Map[String, Any] = { + exprs.foldLeft(Map[String, Any]()) { (newMap, expr) => + val valueOpt = expr.calculate(cachedMap) + if (valueOpt.nonEmpty) { + newMap + (expr._id -> valueOpt.get) + } else newMap + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala new file mode 100644 index 0000000..9cc9be6 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala @@ -0,0 +1,16 @@ +package org.apache.griffin.measure.batch.connector + +import org.apache.spark.rdd.RDD + +import scala.util.Try + + +trait DataConnector extends Serializable { + + def available(): Boolean + + def metaData(): Try[Iterable[(String, String)]] + + def data(): Try[RDD[(Product, Map[String, Any])]] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala new file mode 100644 index 0000000..ffa8d9c --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala @@ -0,0 +1,32 @@ +package org.apache.griffin.measure.batch.connector + +import org.apache.griffin.measure.batch.config.params.user._ +import org.apache.griffin.measure.batch.rule.expr._ +import org.apache.spark.sql.SQLContext + +import scala.util.Try + +object DataConnectorFactory { + + val HiveRegex = """^(?i)hive$""".r + val AvroRegex = """^(?i)avro$""".r + + def getDataConnector(sqlContext: SQLContext, + dataConnectorParam: DataConnectorParam, + groupbyExprs: Seq[MathExpr], + cacheExprs: Iterable[Expr], + finalCacheExprs: Iterable[Expr], + globalFinalCacheMap: Map[String, Any] + ): Try[DataConnector] = { + val conType = dataConnectorParam.conType + val version = dataConnectorParam.version + Try { + conType match { + case HiveRegex() => HiveDataConnector(sqlContext, dataConnectorParam.config, groupbyExprs, cacheExprs, finalCacheExprs, globalFinalCacheMap) + case AvroRegex() => AvroDataConnector(sqlContext, dataConnectorParam.config, groupbyExprs, cacheExprs, finalCacheExprs, globalFinalCacheMap) + case _ => throw new Exception("connector creation error!") + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala new file mode 100644 index 0000000..5c80e30 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala @@ -0,0 +1,101 @@ +package org.apache.griffin.measure.batch.connector + +import org.apache.griffin.measure.batch.rule.expr._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} + +import scala.util.{Success, Try} + +case class HiveDataConnector(sqlContext: SQLContext, config: Map[String, Any], + groupbyExprs: Seq[MathExpr], cacheExprs: Iterable[Expr], + finalCacheExprs: Iterable[Expr], globalFinalCacheMap: Map[String, Any] + ) extends DataConnector { + + val Database = "database" + val TableName = "table.name" + val Partitions = "partitions" + + val database = config.getOrElse(Database, "").toString + val tableName = config.getOrElse(TableName, "").toString + val partitionsString = config.getOrElse(Partitions, "").toString + + val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName + val partitions = partitionsString.split(";").map(s => s.split(",").map(_.trim)) + + private def dbPrefix(): Boolean = { + database.nonEmpty && !database.equals("default") + } + + def available(): Boolean = { + (!tableName.isEmpty) && { + Try { + if (dbPrefix) { + sqlContext.tables(database).filter(tableExistsSql).collect.size + } else { + sqlContext.tables().filter(tableExistsSql).collect.size + } + } match { + case Success(s) => s > 0 + case _ => false + } + } + } + + def metaData(): Try[Iterable[(String, String)]] = { + Try { + val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), r.getString(1))).collect + val partitionPos: Int = originRows.indexWhere(pair => pair._1.startsWith("# ")) + if (partitionPos < 0) originRows + else originRows.take(partitionPos) + } + } + + def data(): Try[RDD[(Product, Map[String, Any])]] = { + Try { + sqlContext.sql(dataSql).map { row => + // generate cache data + val cacheData: Map[String, Any] = cacheExprs.foldLeft(globalFinalCacheMap) { (cachedMap, expr) => + CacheDataUtil.genCachedMap(Some(row), expr, cachedMap) + } + val finalCacheData = CacheDataUtil.filterCachedMap(finalCacheExprs, cacheData) + + // get groupby data + val groupbyData: Seq[AnyRef] = groupbyExprs.flatMap { expr => + expr.calculate(finalCacheData) match { + case Some(v) => Some(v.asInstanceOf[AnyRef]) + case _ => None + } + } + val key = toTuple(groupbyData) + + (key, finalCacheData) + } + } + } + + private def tableExistsSql(): String = { +// s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql + s"tableName LIKE '${tableName}'" + } + + private def metaDataSql(): String = { + s"DESCRIBE ${concreteTableName}" + } + + private def dataSql(): String = { + val clauses = partitions.map { prtn => + val cls = prtn.mkString(" AND ") + if (cls.isEmpty) s"SELECT * FROM ${concreteTableName}" + else s"SELECT * FROM ${concreteTableName} WHERE ${cls}" + } + clauses.mkString(" UNION ALL ") + } + + private def toTuple[A <: AnyRef](as: Seq[A]): Product = { + if (as.size > 0) { + val tupleClass = Class.forName("scala.Tuple" + as.size) + tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product] + } else None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala new file mode 100644 index 0000000..f73e86c --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala @@ -0,0 +1,25 @@ +package org.apache.griffin.measure.batch.log + +import org.slf4j.LoggerFactory + +trait Loggable { + + @transient private lazy val logger = LoggerFactory.getLogger(getClass) + + protected def info(msg: String): Unit = { + logger.info(msg) + } + + protected def debug(msg: String): Unit = { + logger.debug(msg) + } + + protected def warn(msg: String): Unit = { + logger.warn(msg) + } + + protected def error(msg: String): Unit = { + logger.error(msg) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala new file mode 100644 index 0000000..7bff3b6 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala @@ -0,0 +1,119 @@ +package org.apache.griffin.measure.batch.persist + +import java.util.Date + +import org.apache.griffin.measure.batch.result._ +import org.apache.griffin.measure.batch.utils.HdfsUtil +import org.apache.spark.rdd.RDD + + +case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + + val Path = "path" + val MaxPersistLines = "max.persist.lines" + val MaxLinesPerFile = "max.lines.per.file" + + val path = config.getOrElse(Path, "").toString + val maxPersistLines = config.getOrElse(MaxPersistLines, -1).toString.toLong + val maxLinesPerFile = config.getOrElse(MaxLinesPerFile, 10000).toString.toLong + + val separator = "/" + + val StartFile = filePath("_START") + val FinishFile = filePath("_FINISH") + val ResultFile = filePath("_RESULT") + + val MissRecFile = filePath("_MISSREC") // optional + + val LogFile = filePath("_LOG") + + var _init = true + private def isInit = { + val i = _init + _init = false + i + } + + def available(): Boolean = { + (path.nonEmpty) && (maxPersistLines < Int.MaxValue) + } + + private def persistHead: String = { + val dt = new Date(timeStamp) + s"================ log of ${dt} ================\n" + } + + private def timeHead(rt: Long): String = { + val dt = new Date(rt) + s"--- ${dt} ---\n" + } + + protected def getFilePath(parentPath: String, fileName: String): String = { + if (parentPath.endsWith(separator)) parentPath + fileName else parentPath + separator + fileName + } + + protected def filePath(file: String): String = { + getFilePath(path, s"${metricName}/${timeStamp}/${file}") + } + + protected def withSuffix(path: String, suffix: String): String = { + s"${path}.${suffix}" + } + + def start(msg: String): Unit = { + HdfsUtil.writeContent(StartFile, msg) + } + def finish(): Unit = { + HdfsUtil.createEmptyFile(FinishFile) + } + + def result(rt: Long, result: Result): Unit = { + val resStr = result match { + case ar: AccuracyResult => { + s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}" + } + case _ => { + s"result: ${result}" + } + } + HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr) + log(rt, resStr) + + info(resStr) + } + + // need to avoid string too long + def missRecords(records: RDD[String]): Unit = { + val recordCount = records.count + val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) + if (count > 0) { + val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt + if (groupCount <= 1) { + val recs = records.take(count.toInt) + persistRecords(MissRecFile, recs) + } else { + val groupedRecords: RDD[(Long, Iterable[String])] = + records.zipWithIndex.flatMap { r => + val gid = r._2 / maxLinesPerFile + if (gid < groupCount) Some((gid, r._1)) else None + }.groupByKey() + groupedRecords.foreach { group => + val (gid, recs) = group + val hdfsPath = if (gid == 0) MissRecFile else withSuffix(MissRecFile, gid.toString) + persistRecords(hdfsPath, recs) + } + } + } + } + + private def persistRecords(hdfsPath: String, records: Iterable[String]): Unit = { + val recStr = records.mkString("\n") + HdfsUtil.appendContent(hdfsPath, recStr) + } + + def log(rt: Long, msg: String): Unit = { + val logStr = (if (isInit) persistHead else "") + timeHead(rt) + s"${msg}\n\n" + HdfsUtil.appendContent(LogFile, logStr) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala new file mode 100644 index 0000000..5765927 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala @@ -0,0 +1,44 @@ +package org.apache.griffin.measure.batch.persist + +import org.apache.griffin.measure.batch.result._ +import org.apache.griffin.measure.batch.utils.{HttpUtil, JsonUtil} +import org.apache.spark.rdd.RDD + +case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + + val Api = "api" + val Method = "method" + + val api = config.getOrElse(Api, "").toString + val method = config.getOrElse(Method, "post").toString + + def available(): Boolean = { + api.nonEmpty + } + + def start(msg: String): Unit = {} + def finish(): Unit = {} + + def result(rt: Long, result: Result): Unit = { + result match { + case ar: AccuracyResult => { + val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> ar.matchPercentage)) + val data = JsonUtil.toJson(dataMap) + + // post + val params = Map[String, Object]() + val header = Map[String, Object](("content-type" -> "application/json")) + val status = HttpUtil.httpRequest(api, method, params, header, data) + info(s"${method} to ${api} response status: ${status}") + } + case _ => { + info(s"result: ${result}") + } + } + } + + def missRecords(records: RDD[String]): Unit = {} + + def log(rt: Long, msg: String): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala new file mode 100644 index 0000000..2fa6942 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala @@ -0,0 +1,27 @@ +package org.apache.griffin.measure.batch.persist + +import org.apache.griffin.measure.batch.result._ +import org.apache.griffin.measure.batch.utils.{HttpUtil, JsonUtil} +import org.apache.spark.rdd.RDD + +case class MultiPersists(persists: Iterable[Persist]) extends Persist { + + val timeStamp: Long = persists match { + case Nil => 0 + case _ => persists.head.timeStamp + } + + val config: Map[String, Any] = Map[String, Any]() + + def available(): Boolean = { persists.exists(_.available()) } + + def start(msg: String): Unit = { persists.foreach(_.start(msg)) } + def finish(): Unit = {persists.foreach(_.finish())} + + def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, result)) } + + def missRecords(records: RDD[String]): Unit = { persists.foreach(_.missRecords(records)) } + + def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala new file mode 100644 index 0000000..7398c24 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala @@ -0,0 +1,23 @@ +package org.apache.griffin.measure.batch.persist + +import org.apache.griffin.measure.batch.log.Loggable +import org.apache.griffin.measure.batch.result._ +import org.apache.spark.rdd.RDD + + +trait Persist extends Loggable with Serializable { + val timeStamp: Long + + val config: Map[String, Any] + + def available(): Boolean + + def start(msg: String): Unit + def finish(): Unit + + def result(rt: Long, result: Result): Unit + + def missRecords(records: RDD[String]): Unit + + def log(rt: Long, msg: String): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala new file mode 100644 index 0000000..84dc4ce --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala @@ -0,0 +1,30 @@ +package org.apache.griffin.measure.batch.persist + +import org.apache.griffin.measure.batch.config.params.env._ + +import scala.util.{Success, Try} + + +case class PersistFactory(persistParams: Iterable[PersistParam], metricName: String) extends Serializable { + + val HDFS_REGEX = """^(?i)hdfs$""".r + val HTTP_REGEX = """^(?i)http$""".r + + def getPersists(timeStamp: Long): MultiPersists = { + MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param))) + } + + private def getPersist(timeStamp: Long, persistParam: PersistParam): Option[Persist] = { + val persistConfig = persistParam.config + val persistTry = persistParam.persistType match { + case HDFS_REGEX() => Try(HdfsPersist(persistConfig, metricName, timeStamp)) + case HTTP_REGEX() => Try(HttpPersist(persistConfig, metricName, timeStamp)) + case _ => throw new Exception("not supported persist type") + } + persistTry match { + case Success(persist) if (persist.available) => Some(persist) + case _ => None + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala new file mode 100644 index 0000000..55505ac --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala @@ -0,0 +1,26 @@ +package org.apache.griffin.measure.batch.result + + +case class AccuracyResult(miss: Long, total: Long) extends Result { + + type T = AccuracyResult + + def update(delta: T): T = { + AccuracyResult(delta.miss, total) + } + + def eventual(): Boolean = { + this.miss <= 0 + } + + def differsFrom(other: T): Boolean = { + (this.miss != other.miss) || (this.total != other.total) + } + + def getMiss = miss + def getTotal = total + def getMatch = total - miss + + def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100 + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala new file mode 100644 index 0000000..8d529db --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala @@ -0,0 +1,14 @@ +package org.apache.griffin.measure.batch.result + + +trait Result extends Serializable { + + type T <: Result + + def update(delta: T): T + + def eventual(): Boolean + + def differsFrom(other: T): Boolean + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala new file mode 100644 index 0000000..a430318 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala @@ -0,0 +1,33 @@ +package org.apache.griffin.measure.batch.result + + +sealed trait ResultInfo { + type T + val key: String + val tp: String + def wrap(value: T) = (key -> value) +} + +final case object TimeGroupInfo extends ResultInfo { + type T = Long + val key = "__time__" + val tp = "bigint" +} + +final case object NextFireTimeInfo extends ResultInfo { + type T = Long + val key = "__next_fire_time__" + val tp = "bigint" +} + +final case object MismatchInfo extends ResultInfo { + type T = String + val key = "__mismatch__" + val tp = "string" +} + +final case object ErrorInfo extends ResultInfo { + type T = String + val key = "__error__" + val tp = "string" +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala new file mode 100644 index 0000000..78665f9 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala @@ -0,0 +1,28 @@ +package org.apache.griffin.measure.batch.rule + +import org.apache.griffin.measure.batch.rule.expr._ + +case class RuleAnalyzer(rule: StatementExpr) extends Serializable { + + val GlobalData = "" + val SourceData = "source" + val TargetData = "target" + + val globalCacheExprs: Iterable[Expr] = rule.getCacheExprs(GlobalData) + val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData) + val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData) + + val sourcePersistExprs: Iterable[Expr] = rule.getPersistExprs(SourceData) + val targetPersistExprs: Iterable[Expr] = rule.getPersistExprs(TargetData) + + val globalFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(GlobalData).toSet + val sourceFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(SourceData).toSet ++ sourcePersistExprs.toSet + val targetFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(TargetData).toSet ++ targetPersistExprs.toSet + + val groupbyExprPairs: Seq[(MathExpr, MathExpr)] = rule.getGroupbyExprPairs((SourceData, TargetData)) + val sourceGroupbyExprs: Seq[MathExpr] = groupbyExprPairs.map(_._1) + val targetGroupbyExprs: Seq[MathExpr] = groupbyExprPairs.map(_._2) + + val whenClauseExpr: Option[LogicalExpr] = rule.getWhenClauseExpr + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerOld.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerOld.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerOld.scala new file mode 100644 index 0000000..dd90c81 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerOld.scala @@ -0,0 +1,26 @@ +package org.apache.griffin.measure.batch.rule + +//import org.apache.griffin.measure.batch.rule.expr._ +import org.apache.griffin.measure.batch.rule.expr_old._ + +case class RuleAnalyzerOld(rule: StatementExpr) extends Serializable { + + val SourceData = "source" + val TargetData = "target" + +// val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData) +// val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData) + + val sourceDataExprs: Iterable[DataExpr] = rule.getDataRelatedExprs(SourceData) + val targetDataExprs: Iterable[DataExpr] = rule.getDataRelatedExprs(TargetData) + + val keyMappings: Iterable[MappingExpr] = rule.getKeyMappings() + + val sourceDataKeyExprs: Seq[DataExpr] = keyMappings.flatMap(_.getDataRelatedExprs(SourceData)).toSeq + val targetDatakeyExprs: Seq[DataExpr] = keyMappings.flatMap(_.getDataRelatedExprs(TargetData)).toSeq + + val assigns: Iterable[AssignExpr] = rule.getAssigns() + val conditions: Iterable[ConditionExpr] = rule.getConditions() + val mappings: Iterable[MappingExpr] = rule.getMappings() + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala new file mode 100644 index 0000000..cc3e8b3 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala @@ -0,0 +1,34 @@ +package org.apache.griffin.measure.batch.rule + +import org.apache.griffin.measure.batch.config.params.user._ + +import scala.util.Failure +//import org.apache.griffin.measure.batch.rule.expr_old._ +import org.apache.griffin.measure.batch.rule.expr._ + +import scala.util.{Success, Try} + + +case class RuleFactory(evaluateRuleParam: EvaluateRuleParam) { + + val ruleParser: RuleParser = RuleParser() + + def generateRule(): StatementExpr = { + val rules = evaluateRuleParam.rules + val statement = parseExpr(rules) match { + case Success(se) => se + case Failure(ex) => throw ex + } + statement + } + + private def parseExpr(rules: String): Try[StatementExpr] = { + Try { + val result = ruleParser.parseAll(ruleParser.rule, rules) + if (result.successful) result.get + else throw new Exception("parse rule error!") +// throw new Exception("parse rule error!") + } + } + +}
