Repository: incubator-griffin Updated Branches: refs/heads/master 38978a8c6 -> a0117811c
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/test/resources/config1.json ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/test/resources/config1.json b/griffin-measure/griffin-measure-batch/src/test/resources/config1.json new file mode 100644 index 0000000..d7290ba --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/test/resources/config1.json @@ -0,0 +1,27 @@ +{ + "name": "accu-test", + "type": "accuracy", + + "source": { + "type": "hive", + "version": "1.2", + "config": { + "table.name": "rheos_view_event", + "partitions": "dt=20170410, hour=15" + } + }, + + "target": { + "type": "hive", + "version": "1.2", + "config": { + "table.name": "be_view_event_queue", + "partitions": "dt=20170410, hour=15; dt=20170410, hour=16" + } + }, + + "evaluateRule": { + "sampleRatio": 1, + "rules": "@Key ${source}['uid'] === ${target}['uid']; @Key ${source}['eventtimestamp'] === ${target}['eventtimestamp']; ${source}['page_id'] === ${target}['page_id']; ${source}['site_id'] === ${target}['site_id']; ${source}['itm'] === ${target}['itm']" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/test/resources/env.json ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/test/resources/env.json b/griffin-measure/griffin-measure-batch/src/test/resources/env.json new file mode 100644 index 0000000..3a9e38c --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/test/resources/env.json @@ -0,0 +1,27 @@ +{ + "spark": { + "log.level": "ERROR", + "checkpoint.dir": "hdfs:///griffin/batch/cp", + "config": {} + }, + + "persist": [ + { + "type": "hdfs", + "config": { + "path": "hdfs:///griffin/streaming/persist" + } + }, + { + "type": "http", + "config": { + "method": "post", + "api": "http://phxbark4dq-360935.stratus.phx.ebay.com:8080/" + } + } + ], + + "cleaner": { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/test/resources/env1.json ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/test/resources/env1.json b/griffin-measure/griffin-measure-batch/src/test/resources/env1.json new file mode 100644 index 0000000..a059715 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/test/resources/env1.json @@ -0,0 +1,21 @@ +{ + "spark": { + "log.level": "INFO", + "checkpoint.dir": "hdfs:///griffin/batch/cp", + "config": {} + }, + + "persist": [ + { + "type": "hdfs", + "config": { + "path": "hdfs:///user/b_des/bark/griffin-batch/test", + "max.lines.per.file": 10000 + } + } + ], + + "cleaner": { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/test/resources/log4j.properties b/griffin-measure/griffin-measure-batch/src/test/resources/log4j.properties new file mode 100644 index 0000000..bd31e15 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/test/resources/log4j.properties @@ -0,0 +1,5 @@ +log4j.rootLogger=INFO, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/test/resources/users_info_src.avro ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/test/resources/users_info_src.avro b/griffin-measure/griffin-measure-batch/src/test/resources/users_info_src.avro new file mode 100644 index 0000000..3d5c939 Binary files /dev/null and b/griffin-measure/griffin-measure-batch/src/test/resources/users_info_src.avro differ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/test/resources/users_info_src.dat ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/test/resources/users_info_src.dat b/griffin-measure/griffin-measure-batch/src/test/resources/users_info_src.dat new file mode 100644 index 0000000..ce49443 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/test/resources/users_info_src.dat @@ -0,0 +1,50 @@ +10001|Tom001|Jerrya|201 DisneyCity|[email protected]|10000001|94022 +10002|Tom002|Jerrya|202 DisneyCity|[email protected]|10000002|94022 +10003|Tom003|Jerrya|203 DisneyCity|[email protected]|10000003|94022 +10004|Tom004|Jerrya|204 DisneyCity|[email protected]|10000004|94022 +10005|Tom005|Jerrya|205 DisneyCity|[email protected]|10000005|94022 +10006|Tom006|Jerrya|206 DisneyCity|[email protected]|10000006|94022 +10007|Tom007|Jerrya|207 DisneyCity|[email protected]|10000007|94022 +10008|Tom008|Jerrya|208 DisneyCity|[email protected]|10000008|94022 +10009|Tom009|Jerrya|209 DisneyCity|[email protected]|10000009|94022 +10010|Tom010|Jerrya|210 DisneyCity|[email protected]|10000010|94022 +10011|Tom011|Jerrya|211 DisneyCity|[email protected]|10000011|94022 +10012|Tom012|Jerrya|212 DisneyCity|[email protected]|10000012|94022 +10013|Tom013|Jerrya|213 DisneyCity|[email protected]|10000013|94022 +10014|Tom014|Jerrya|214 DisneyCity|[email protected]|10000014|94022 +10015|Tom015|Jerrya|215 DisneyCity|[email protected]|10000015|94022 +10016|Tom016|Jerrya|216 DisneyCity|[email protected]|10000016|94022 +10017|Tom017|Jerrya|217 DisneyCity|[email protected]|10000017|94022 +10018|Tom018|Jerrya|218 DisneyCity|[email protected]|10000018|94022 +10019|Tom019|Jerrya|219 DisneyCity|[email protected]|10000019|94022 +10020|Tom020|Jerrya|220 DisneyCity|[email protected]|10000020|94022 +10021|Tom021|Jerrya|221 DisneyCity|[email protected]|10000021|94022 +10022|Tom022|Jerrya|222 DisneyCity|[email protected]|10000022|94022 +10023|Tom023|Jerrya|223 DisneyCity|[email protected]|10000023|94022 +10024|Tom024|Jerrya|224 DisneyCity|[email protected]|10000024|94022 +10025|Tom025|Jerrya|225 DisneyCity|[email protected]|10000025|94022 +10026|Tom026|Jerrya|226 DisneyCity|[email protected]|10000026|94022 +10027|Tom027|Jerrya|227 DisneyCity|[email protected]|10000027|94022 +10028|Tom028|Jerrya|228 DisneyCity|[email protected]|10000028|94022 +10029|Tom029|Jerrya|229 DisneyCity|[email protected]|10000029|94022 +10030|Tom030|Jerrya|230 DisneyCity|[email protected]|10000030|94022 +10031|Tom031|Jerrya|231 DisneyCity|[email protected]|10000031|94022 +10032|Tom032|Jerrya|232 DisneyCity|[email protected]|10000032|94022 +10033|Tom033|Jerrya|233 DisneyCity|[email protected]|10000033|94022 +10034|Tom034|Jerrya|234 DisneyCity|[email protected]|10000034|94022 +10035|Tom035|Jerrya|235 DisneyCity|[email protected]|10000035|94022 +10036|Tom036|Jerrya|236 DisneyCity|[email protected]|10000036|94022 +10037|Tom037|Jerrya|237 DisneyCity|[email protected]|10000037|94022 +10038|Tom038|Jerrya|238 DisneyCity|[email protected]|10000038|94022 +10039|Tom039|Jerrya|239 DisneyCity|[email protected]|10000039|94022 +10040|Tom040|Jerrya|240 DisneyCity|[email protected]|10000040|94022 +10041|Tom041|Jerrya|241 DisneyCity|[email protected]|10000041|94022 +10042|Tom042|Jerrya|242 DisneyCity|[email protected]|10000042|94022 +10043|Tom043|Jerrya|243 DisneyCity|[email protected]|10000043|94022 +10044|Tom044|Jerrya|244 DisneyCity|[email protected]|10000044|94022 +10045|Tom045|Jerrya|245 DisneyCity|[email protected]|10000045|94022 +10046|Tom046|Jerrya|246 DisneyCity|[email protected]|10000046|94022 +10047|Tom047|Jerrya|247 DisneyCity|[email protected]|10000047|94022 +10048|Tom048|Jerrya|248 DisneyCity|[email protected]|10000048|94022 +10049|Tom049|Jerrya|249 DisneyCity|[email protected]|10000049|94022 +10050|Tom050|Jerrya|250 DisneyCity|[email protected]|10000050|94022 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/test/resources/users_info_target.avro ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/test/resources/users_info_target.avro b/griffin-measure/griffin-measure-batch/src/test/resources/users_info_target.avro new file mode 100644 index 0000000..104dd6c Binary files /dev/null and b/griffin-measure/griffin-measure-batch/src/test/resources/users_info_target.avro differ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/test/resources/users_info_target.dat ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/test/resources/users_info_target.dat b/griffin-measure/griffin-measure-batch/src/test/resources/users_info_target.dat new file mode 100644 index 0000000..07a6b40 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/test/resources/users_info_target.dat @@ -0,0 +1,50 @@ +10001|Tom001|Jerrya|201 DisneyCity|[email protected]|10000101|94022 +10002|Tom002|Jerrya|202 DisneyCity|[email protected]|10000102|94022 +10003|Tom003|Jerrya|203 DisneyCity|[email protected]|10000003|94022 +10004|Tom004|Jerrya|204 DisneyCity|[email protected]|10000004|94022 +10005|Tom005|Jerrya|205 DisneyCity|[email protected]|10000005|94022 +10006|Tom006|Jerrya|206 DisneyCity|[email protected]|10000006|94022 +10007|Tom007|Jerrya|207 DisneyCity|[email protected]|10000007|94022 +10008|Tom008|Jerrya|208 DisneyCity|[email protected]|10000008|94022 +10009|Tom009|Jerrya|209 DisneyCity|[email protected]|10000009|94022 +10010|Tom010|Jerrya|210 DisneyCity|[email protected]|10000010|94022 +10011|Tom011|Jerrya|211 DisneyCity|[email protected]|10000011|94022 +10012|Tom012|Jerrya|212 DisneyCity|[email protected]|10000012|94022 +10013|Tom013|Jerrya|213 DisneyCity|[email protected]|10000013|94022 +10014|Tom014|Jerrya|214 DisneyCity|[email protected]|10000014|94022 +10015|Tom015|Jerrya|215 DisneyCity|[email protected]|10000015|94022 +10016|Tom016|Jerrya|216 DisneyCity|[email protected]|10000016|94022 +10017|Tom017|Jerrya|217 DisneyCity|[email protected]|10000017|94022 +10018|Tom018|Jerrya|218 DisneyCity|[email protected]|10000018|94022 +10019|Tom019|Jerrya|219 DisneyCity|[email protected]|10000019|94022 +10020|Tom020|Jerrya|220 DisneyCity|[email protected]|10000020|94022 +10021|Tom021|Jerrya|221 DisneyCity|[email protected]|10000021|94022 +10022|Tom022|Jerrya|222 DisneyCity|[email protected]|10000022|94022 +10023|Tom023|Jerrya|223 DisneyCity|[email protected]|10000023|94022 +10024|Tom024|Jerrya|224 DisneyCity|[email protected]|10000024|94022 +10025|Tom025|Jerrya|225 DisneyCity|[email protected]|10000025|94022 +10026|Tom026|Jerrya|226 DisneyCity|[email protected]|10000026|94022 +10027|Tom027|Jerrya|227 DisneyCity|[email protected]|10000027|94022 +10028|Tom028|Jerrya|228 DisneyCity|[email protected]|10000028|94022 +10029|Tom029|Jerrya|229 DisneyCity|[email protected]|10000029|94022 +10030|Tom030|Jerrya|230 DisneyCity|[email protected]|10000030|94022 +10031|Tom031|Jerrya|231 DisneyCity|[email protected]|10000031|94022 +10032|Tom032|Jerrya|232 DisneyCity|[email protected]|10000032|94022 +10033|Tom033|Jerrya|233 DisneyCity|[email protected]|10000033|94022 +10034|Tom034|Jerrya|234 DisneyCity|[email protected]|10000034|94022 +10035|Tom035|Jerrya|235 DisneyCity|[email protected]|10000035|94022 +10036|Tom036|Jerrya|236 DisneyCity|[email protected]|10000036|94022 +10037|Tom037|Jerrya|237 DisneyCity|[email protected]|10000037|94022 +10038|Tom038|Jerrya|238 DisneyCity|[email protected]|10000038|94022 +10039|Tom039|Jerrya|239 DisneyCity|[email protected]|10000039|94022 +10040|Tom040|Jerrya|240 DisneyCity|[email protected]|10000040|94022 +10041|Tom041|Jerrya|241 DisneyCity|[email protected]|10000041|94022 +10042|Tom042|Jerrya|242 DisneyCity|[email protected]|10000042|94022 +10043|Tom043|Jerrya|243 DisneyCity|[email protected]|10000043|94022 +10044|Tom044|Jerrya|244 DisneyCity|[email protected]|10000044|94022 +10045|Tom045|Jerrya|245 DisneyCity|[email protected]|10000045|94022 +10046|Tom046|Jerrya|246 DisneyCity|[email protected]|10000046|94022 +10047|Tom047|Jerrya|247 DisneyCity|[email protected]|10000047|94022 +10048|Tom048|Jerrya|248 DisneyCity|[email protected]|10000048|94022 +10049|Tom049|Jerrya|249 DisneyCity|[email protected]|10000049|94022 +10050|Tom050|Jerrya|250 DisneyCity|[email protected]|10000050|94022 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala b/griffin-measure/griffin-measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala new file mode 100644 index 0000000..5149847 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/test/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgoTest.scala @@ -0,0 +1,219 @@ +package org.apache.griffin.measure.batch.algo + +import java.util.Date + +import org.junit.runner.RunWith +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.scalatest.junit.JUnitRunner +import org.apache.griffin.measure.batch.config.params.user._ +import org.apache.griffin.measure.batch.config.params.env._ +import org.apache.griffin.measure.batch.config.params._ +import org.apache.griffin.measure.batch.config.reader._ +import org.apache.griffin.measure.batch.config.validator._ +import org.apache.griffin.measure.batch.connector.{DataConnector, DataConnectorFactory, CacheDataUtil} +import org.apache.griffin.measure.batch.rule.{RuleAnalyzer, RuleFactory} +import org.apache.griffin.measure.batch.rule.expr._ + +import scala.util.{Failure, Success, Try} +import org.apache.griffin.measure.batch.log.Loggable +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext +import org.apache.spark.{SparkConf, SparkContext} + + +@RunWith(classOf[JUnitRunner]) +class BatchAccuracyAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { + + val envFile = "src/test/resources/env.json" +// val confFile = "src/test/resources/config.json" + val confFile = "{\"name\":\"accu1\",\"type\":\"accuracy\",\"source\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_src.avro\"}},\"target\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_target.avro\"}},\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.user_id > 10020 AND $source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code\"}}" + val envFsType = "local" + val userFsType = "raw" + + val args = Array(envFile, confFile) + + var sc: SparkContext = _ + var sqlContext: SQLContext = _ + + var allParam: AllParam = _ + + before { + // read param files + val envParam = readParamFile[EnvParam](envFile, envFsType) match { + case Success(p) => p + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-2) + } + } + val userParam = readParamFile[UserParam](confFile, userFsType) match { + case Success(p) => p + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-2) + } + } + allParam = AllParam(envParam, userParam) + + // validate param files + validateParams(allParam) match { + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-3) + } + case _ => { + info("params validation pass") + } + } + + val metricName = userParam.name + val conf = new SparkConf().setMaster("local[*]").setAppName(metricName) + sc = new SparkContext(conf) + sqlContext = new SQLContext(sc) + } + + test("algorithm") { + Try { + val envParam = allParam.envParam + val userParam = allParam.userParam + + // start time + val startTime = new Date().getTime() + + // get spark application id + val applicationId = sc.applicationId + + // rules + val ruleFactory = RuleFactory(userParam.evaluateRuleParam) + val rule: StatementExpr = ruleFactory.generateRule() + val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule) + + ruleAnalyzer.globalCacheExprs.foreach(println) + ruleAnalyzer.globalFinalCacheExprs.foreach(println) + + // global cache data + val globalCachedData = CacheDataUtil.genCachedMap(None, ruleAnalyzer.globalCacheExprs, Map[String, Any]()) + val globalFinalCachedData = CacheDataUtil.filterCachedMap(ruleAnalyzer.globalFinalCacheExprs, globalCachedData) + + // data connector + val sourceDataConnector: DataConnector = + DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam, + ruleAnalyzer.sourceGroupbyExprs, ruleAnalyzer.sourceCacheExprs, + ruleAnalyzer.sourceFinalCacheExprs, globalFinalCachedData) match { + case Success(cntr) => { + if (cntr.available) cntr + else throw new Exception("source data not available!") + } + case Failure(ex) => throw ex + } + val targetDataConnector: DataConnector = + DataConnectorFactory.getDataConnector(sqlContext, userParam.targetParam, + ruleAnalyzer.targetGroupbyExprs, ruleAnalyzer.targetCacheExprs, + ruleAnalyzer.targetFinalCacheExprs, globalFinalCachedData) match { + case Success(cntr) => { + if (cntr.available) cntr + else throw new Exception("target data not available!") + } + case Failure(ex) => throw ex + } + + // get metadata +// val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match { +// case Success(md) => md +// case Failure(ex) => throw ex +// } +// val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match { +// case Success(md) => md +// case Failure(ex) => throw ex +// } + + // get data + val sourceData: RDD[(Product, Map[String, Any])] = sourceDataConnector.data() match { + case Success(dt) => dt + case Failure(ex) => throw ex + } + val targetData: RDD[(Product, Map[String, Any])] = targetDataConnector.data() match { + case Success(dt) => dt + case Failure(ex) => throw ex + } + +// println("-- global cache exprs --") +// ruleAnalyzer.globalCacheExprs.foreach(a => println(s"${a._id} ${a.desc}")) +// println("-- global cache data --") +// globalCachedData.foreach(println) +// println("-- global final cache data --") +// globalFinalCachedData.foreach(println) +// +// println("-- source persist exprs --") +// ruleAnalyzer.sourcePersistExprs.foreach(a => println(s"${a._id} ${a.desc}")) +// println("-- source cache exprs --") +// ruleAnalyzer.sourceCacheExprs.foreach(a => println(s"${a._id} ${a.desc}")) +// +// println("-- source --") +// sourceData.foreach { a => +// val printMap = ruleAnalyzer.sourcePersistExprs.flatMap { expr => +// a._2.get(expr._id) match { +// case Some(v) => Some((expr._id + expr.desc, v)) +// case _ => None +// } +// }.toMap +// println(printMap) +// +// val cacheMap = ruleAnalyzer.sourceCacheExprs.flatMap { expr => +// a._2.get(expr._id) match { +// case Some(v) => Some((expr._id + expr.desc, v)) +// case _ => None +// } +// }.toMap +// println(cacheMap) +// +// println(a) +// println(a._2.size) +// } + +// println("-- target --") +// targetData.foreach { a => +// val printMap = ruleAnalyzer.targetPersistExprs.flatMap { expr => +// a._2.get(expr._id) match { +// case Some(v) => Some((expr.desc, v)) +// case _ => None +// } +// }.toMap +// println(printMap) +// } + + // my algo + val algo = BatchAccuracyAlgo(allParam) + + // accuracy algorithm + val (accuResult, missingRdd, matchingRdd) = algo.accuracy(sourceData, targetData, ruleAnalyzer) + + println(s"match percentage: ${accuResult.matchPercentage}, total count: ${accuResult.total}") + + missingRdd.foreach(println) + + // end time + val endTime = new Date().getTime + println(s"using time: ${endTime - startTime} ms") + } match { + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-4) + } + case _ => { + info("calculation finished") + } + } + } + + private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { + val paramReader = ParamReaderFactory.getParamReader(file, fsType) + paramReader.readConfig[T] + } + + private def validateParams(allParam: AllParam): Try[Boolean] = { + val allParamValidator = AllParamValidator() + allParamValidator.validate(allParam) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala b/griffin-measure/griffin-measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala new file mode 100644 index 0000000..f1ccefb --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/test/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReaderTest.scala @@ -0,0 +1,35 @@ +package org.apache.griffin.measure.batch.config.reader + +import org.junit.runner.RunWith +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.scalatest.junit.JUnitRunner +import org.apache.griffin.measure.batch.config.params.user._ +import org.apache.griffin.measure.batch.config.params.env._ + +import scala.util.{Failure, Success} +import org.apache.griffin.measure.batch.log.Loggable +import org.apache.griffin.measure.batch.persist.{HdfsPersist, PersistFactory} + + +@RunWith(classOf[JUnitRunner]) +class ParamFileReaderTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { + + test("test file reader") { + val userReader = ParamFileReader("src/test/resources/config1.json") + val envReader = ParamFileReader("src/test/resources/env1.json") + + val p1 = userReader.readConfig[UserParam] + val p2 = envReader.readConfig[EnvParam] + + p1 match { + case Success(v) => println(v) + case Failure(ex) => error(ex.getMessage) + } + + p2 match { + case Success(v) => println(v) + case Failure(ex) => error(ex.getMessage) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala b/griffin-measure/griffin-measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala new file mode 100644 index 0000000..f964bcd --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/test/scala/org/apache/griffin/measure/batch/rule/RuleParserTest.scala @@ -0,0 +1,77 @@ +package org.apache.griffin.measure.batch.rule + +import org.junit.runner.RunWith +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.scalatest.junit.JUnitRunner + +import scala.util.{Success, Failure} + +import org.apache.griffin.measure.batch.log.Loggable + + +@RunWith(classOf[JUnitRunner]) +class RuleParserTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { + +// test("test rule parser") { +// val ruleParser = RuleParser() +// +//// val rules = "outTime = 24h; @Invalid ${source}['__time'] + ${outTime} > ${target}['__time']" +// val rules = "@Key ${source}.json()['seeds'][*].json()['metadata'].json()['tracker']['crawlRequestCreateTS'] === ${target}.json()['groups'][0][\"attrsList\"]['name'=\"CRAWLMETADATA\"]['values'][0].json()['tracker']['crawlRequestCreateTS']" +//// val rules = "${source}['__time'] + ${outTime} > ${target}['__time']" +//// val rules = "${source}['__time'] > ${target}['__time']" +//// val rules = "432" +//// val rules = "${target}.json()['groups'][0]['attrsList']['name'='URL']['values'][0]" +// +// val result = ruleParser.parseAll(ruleParser.statementsExpr, rules) +// +// println(result) +// } + +// test("treat escape") { +// val es = """Hello\tworld\nmy name is \"ABC\"""" +// val un = StringContext treatEscapes es +// +// println(es) +// println(un) +// } + + test("test rule parser") { + val ruleParser = RuleParser() + +// val rules = "$SOUrce['tgt' < $source['tag' != 2] - -+-++---1] between ( -$target['32a'] + 9, 100, ----1000 ) and (45 > 9 or $target.type + 8 == 9 and $source['a'] >= 0) when not not not not $source._time + 24h < $target._time" + val rules = "$source['aaaf fd', (21, 43), '12']" + + val result = ruleParser.parseAll(ruleParser.selection, rules) + + println(result) + } + + test("test rule analyzer") { + val ruleParser = RuleParser() + +// val rules = "$source.tag == $target['take' >= 5] and $source.price + $source.price1 > $target['kk' < $target.age] and $source.ee = $target.fe + $target.a when $target.ggg = 1" + val rules = "$source." + val result = ruleParser.parseAll(ruleParser.rule, rules) + println(result) + + if (result.successful) { + val ruleAnalyzer = RuleAnalyzer(result.get) + + println("source") + ruleAnalyzer.sourceCacheExprs.foreach(a => println(a.desc)) + println("source final") + ruleAnalyzer.sourceFinalCacheExprs.foreach(a => println(a.desc)) + println("target") + ruleAnalyzer.targetCacheExprs.foreach(a => println(a.desc)) + println("target final") + ruleAnalyzer.targetFinalCacheExprs.foreach(a => println(a.desc)) + println("groupby") + ruleAnalyzer.sourceGroupbyExprs.foreach(println) + ruleAnalyzer.targetGroupbyExprs.foreach(println) + } + + + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/pom.xml ---------------------------------------------------------------------- diff --git a/griffin-measure/pom.xml b/griffin-measure/pom.xml new file mode 100644 index 0000000..01475da --- /dev/null +++ b/griffin-measure/pom.xml @@ -0,0 +1,197 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!--<parent>--> + <!--<artifactId>griffin-parent</artifactId>--> + <!--<groupId>com.ebay.oss</groupId>--> + <!--<version>0.1.0-SNAPSHOT</version>--> + <!--</parent>--> + <modelVersion>4.0.0</modelVersion> + + <groupId>org.apache.griffin</groupId> + <artifactId>griffin-measure</artifactId> + <packaging>pom</packaging> + <version>0.0.1-SNAPSHOT</version> + <modules> + <module>griffin-measure-batch</module> + </modules> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + + <scala.version>2.10.6</scala.version> + <spark.version>1.6.0</spark.version> + <scala.binary.version>2.10</scala.binary.version> + + <avro.version>1.7.7</avro.version> + <jackson.version>2.8.7</jackson.version> + <scalaj.version>2.3.0</scalaj.version> + <junit.version>4.11</junit.version> + <scalatest.version>2.2.4</scalatest.version> + <slf4j.version>1.7.21</slf4j.version> + <log4j.version>1.2.16</log4j.version> + </properties> + + <dependencies> + <!--scala--> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + + <!--spark, spark streaming, spark hive--> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> + + <!--jackson--> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.module</groupId> + <artifactId>jackson-module-scala_${scala.binary.version}</artifactId> + <version>${jackson.version}</version> + </dependency> + + <!--scalaj for http request--> + <dependency> + <groupId>org.scalaj</groupId> + <artifactId>scalaj-http_${scala.binary.version}</artifactId> + <version>${scalaj.version}</version> + </dependency> + + <!--avro--> + <dependency> + <groupId>com.databricks</groupId> + <artifactId>spark-avro_${scala.binary.version}</artifactId> + <version>2.0.1</version> + </dependency> + <!--csv--> + <dependency> + <groupId>com.databricks</groupId> + <artifactId>spark-csv_${scala.binary.version}</artifactId> + <version>1.5.0</version> + </dependency> + + <!--log4j--> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${slf4j.version}</version> + </dependency> + <!--<dependency>--> + <!--<groupId>org.slf4j</groupId>--> + <!--<artifactId>slf4j-simple</artifactId>--> + <!--<version>${slf4j.version}</version>--> + <!--</dependency>--> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>${slf4j.version}</version> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>${log4j.version}</version> + </dependency> + + <!--junit--> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + + <!--scala test--> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <version>${scalatest.version}</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <!-- or whatever version you use --> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.5.1</version> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-models/data/test/dataFile/users_info_src.avro ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/dataFile/users_info_src.avro b/griffin-models/data/test/dataFile/users_info_src.avro new file mode 100644 index 0000000..615f538 Binary files /dev/null and b/griffin-models/data/test/dataFile/users_info_src.avro differ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-models/data/test/dataFile/users_info_target.avro ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/dataFile/users_info_target.avro b/griffin-models/data/test/dataFile/users_info_target.avro new file mode 100644 index 0000000..c2b267b Binary files /dev/null and b/griffin-models/data/test/dataFile/users_info_target.avro differ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-models/data/test/recordFile/_RESULT_ACCU ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/recordFile/_RESULT_ACCU b/griffin-models/data/test/recordFile/_RESULT_ACCU new file mode 100644 index 0000000..77cb87b --- /dev/null +++ b/griffin-models/data/test/recordFile/_RESULT_ACCU @@ -0,0 +1,12 @@ +//========== 1. Test Accuracy model result with request file: data\test\reqJson\accuAvroTest.json ========== +match percentage: 80.0 % + +Map(id -> 44444, name -> Snoooooopy, age -> 16, desc -> dooooog) + +//========== 2. Test Accuracy model result with request file: data\test\reqJson\accuUsersInfo.json ========== +match percentage: 94.0 % + +Map(first_name -> Tom049, email -> [email protected], post_code -> 94022, user_id -> 10044, last_name -> Jerrya, address -> 244 DisneyCity, phone -> 10000044) +Map(first_name -> Tom049, email -> [email protected], post_code -> 94022, user_id -> 10050, last_name -> Jerrya, address -> 249 DisneyCity, phone -> 10000049) +Map(first_name -> Tom039, email -> [email protected], post_code -> 94022, user_id -> 10039, last_name -> Jerrya, address -> 239 DisneyCity123, phone -> 10000039) + http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-models/data/test/reqJson/accuUsersInfo.json ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/reqJson/accuUsersInfo.json b/griffin-models/data/test/reqJson/accuUsersInfo.json new file mode 100644 index 0000000..02d5dd2 --- /dev/null +++ b/griffin-models/data/test/reqJson/accuUsersInfo.json @@ -0,0 +1,55 @@ +{ + "source": "users_info_src", + "target": "users_info_target", + "accuracyMapping": [ + { + "sourceColId": 0, + "sourceColName": "user_id", + "targetColId": 0, + "targetColName": "user_id", + "isPK": true + }, + { + "sourceColId": 1, + "sourceColName": "first_name", + "targetColId": 1, + "targetColName": "first_name", + "isPK": false + }, + { + "sourceColId": 2, + "sourceColName": "last_name", + "targetColId": 2, + "targetColName": "last_name", + "isPK": false + }, + { + "sourceColId": 3, + "sourceColName": "address", + "targetColId": 3, + "targetColName": "address", + "isPK": false + }, + { + "sourceColId": 4, + "sourceColName": "email", + "targetColId": 4, + "targetColName": "email", + "isPK": false + }, + { + "sourceColId": 5, + "sourceColName": "phone", + "targetColId": 5, + "targetColName": "phone", + "isPK": false + }, + { + "sourceColId": 6, + "sourceColName": "post_code", + "targetColId": 6, + "targetColName": "post_code", + "isPK": false + } + ] +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-models/src/test/scala/modelTest/AccuTest.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/test/scala/modelTest/AccuTest.scala b/griffin-models/src/test/scala/modelTest/AccuTest.scala new file mode 100644 index 0000000..b7ebed8 --- /dev/null +++ b/griffin-models/src/test/scala/modelTest/AccuTest.scala @@ -0,0 +1,86 @@ +package modelTest + +import org.apache.griffin.accuracy._ +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.spark.{SparkConf, SparkContext} +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.apache.spark.sql.{DataFrame, SQLContext} +import java.io.{FileInputStream, FileOutputStream} + +import org.apache.griffin.dataLoaderUtils.{DataLoaderFactory, FileLoaderUtil} + +import scala.collection.mutable.MutableList +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +@RunWith(classOf[JUnitRunner]) +class AccuTest extends FunSuite with Matchers with BeforeAndAfter { + + val dataFilePath = FileLoaderUtil.convertPath("data/test/dataFile/") + val reqJsonPath = FileLoaderUtil.convertPath("data/test/reqJson/") + val recordFilePath = FileLoaderUtil.convertPath("data/test/recordFile/") + val recordFileName = "_RESULT_ACCU" + + case class AccuData() { + var cnt: Int = _ + var reqJson: String = _ + var configure: AccuracyConfEntity = _ + var dataFrameSrc: DataFrame = _ + var dataFrameTgt: DataFrame = _ + var result: ((Long, Long), List[String]) = _ + } + val accuDatas = MutableList[AccuData]() + + var sc: SparkContext = _ + + before { + val conf = new SparkConf().setMaster("local[*]").setAppName("AccTest") + sc = new SparkContext(conf) + val sqlContext = new SQLContext(sc) + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + + var cnt = 1; + val accTests = List("accuAvroTest.json", "accuUsersInfo.json") + for (tf <- accTests) { + val reqJson = reqJsonPath + tf + val accuData = new AccuData() + accuData.cnt = cnt + accuData.reqJson = reqJson + val input = new FileInputStream(reqJson) + accuData.configure = mapper.readValue(input, classOf[AccuracyConfEntity]) + val dataLoader = DataLoaderFactory.getDataLoader(sqlContext, DataLoaderFactory.avro, dataFilePath) + val dfs = dataLoader.getAccuDataFrame(accuData.configure) + accuData.dataFrameSrc = dfs._1 + accuData.dataFrameTgt = dfs._2 + accuDatas += accuData + cnt += 1 + } + } + + test("test accuracy requests") { + for (accuData <- accuDatas) { + //-- algorithm -- + accuData.result = Accu.calcAccu(accuData.configure, accuData.dataFrameSrc, accuData.dataFrameTgt) + } + } + + after { + val out = new FileOutputStream(recordFilePath + recordFileName) + for (accuData <- accuDatas) { + //output + out.write(("//" + "=" * 10).getBytes("utf-8")) + out.write((s" ${accuData.cnt}. Test Accuracy model result with request file: ${accuData.reqJson} ").getBytes("utf-8")) + out.write(("=" * 10 + "\n").getBytes("utf-8")) + + val ((missCount, srcCount), missedList) = accuData.result + val rslt = s"match percentage: ${((1 - missCount.toDouble / srcCount) * 100)} %" + val rcds = missedList.mkString("\n") + val rcd = rslt + "\n\n" + rcds + "\n\n"; + + out.write(rcd.getBytes("utf-8")) + } + out.close() + sc.stop() + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-streaming-model/src/main/scala/org/apache/griffin/Application.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/Application.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/Application.scala index 1ab4df8..38247eb 100644 --- a/griffin-streaming-model/src/main/scala/org/apache/griffin/Application.scala +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/Application.scala @@ -54,11 +54,10 @@ object Application extends Logging { // case AccuracyType => { // dataAssetTypeMap.get("source") match { // case Some("kafka") => StreamingAccuracyAlgo(sparkParam, dataAssetParamMap.get("source").get, dataAssetParamMap.get("target").get) -// case _ => StreamingAccuracyAlgo(sparkParam, dataAssetParamMap.get("source").get, dataAssetParamMap.get("target").get) // fixme: it's wrong type here +// case _ => StreamingAccuracyAlgo(sparkParam, dataAssetParamMap.get("source").get, dataAssetParamMap.get("target").get) // } // } // case _ => { -// // fixme: it's wrong type here // StreamingAccuracyAlgo(sparkParam, dataAssetParamMap.get("source").get, dataAssetParamMap.get("target").get) // } // } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/ui/js/controllers/metrics-ctrl.js ---------------------------------------------------------------------- diff --git a/ui/js/controllers/metrics-ctrl.js b/ui/js/controllers/metrics-ctrl.js index b2ca82e..5452f27 100644 --- a/ui/js/controllers/metrics-ctrl.js +++ b/ui/js/controllers/metrics-ctrl.js @@ -37,7 +37,10 @@ define(['./module'], function(controllers) { orgNode.assetMap = value; }); $scope.originalOrgs = angular.copy($scope.orgs); - $http.post(url_dashboard, {"query": {"match_all":{}}, "sort": [{"tmst": {"order": "asc"}}],"size":1000}).success(function(data) { + + var url_briefmetrics = $config.uri.dashboard; + $http.post(url_dashboard, {"query": {"match_all":{}}, "sort": [{"tmst": {"order": "asc"}}], "size": 1000}).success(function(data) { + $scope.briefmetrics = data; angular.forEach(data.hits.hits, function(sys) { var chartData = sys._source; chartData.sort = function(a,b){ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/ui/js/services/services.js ---------------------------------------------------------------------- diff --git a/ui/js/services/services.js b/ui/js/services/services.js index 1083e17..39b0df7 100644 --- a/ui/js/services/services.js +++ b/ui/js/services/services.js @@ -3,10 +3,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -62,8 +59,7 @@ define(['./module'], function (services) { metricsByOrg:'', // organization:'/org.json', // dashboard:'/dashboard.json', - - organization:BACKEND_SERVER + '/orgWithMetrics', + organization: BACKEND_SERVER + '/orgWithMetrics', dashboard:ES_SERVER+'/griffin/accuracy/_search?pretty&filter_path=hits.hits._source', metricsample: BACKEND_SERVER + API_ROOT_PATH + '/metrics/sample', metricdownload: BACKEND_SERVER + API_ROOT_PATH + '/metrics/download',
