http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/test-data.jsonFile ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/test-data.jsonFile b/measure/src/test/resources/test-data.jsonFile new file mode 100644 index 0000000..73707f4 --- /dev/null +++ b/measure/src/test/resources/test-data.jsonFile @@ -0,0 +1,3 @@ +{ "name": "emily", "age": 5, "map": { "a": 1, "b": 2 }, "list": [ { "c": 1, "d": 2 }, { "c": 3, "d": 4 } ], "t": [1, 2, 3] } +{ "name": "white", "age": 15, "map": { "a": 11, "b": 12 }, "list": [ { "c": 11, "d": 2 }, { "c": 23, "d": 4 } ], "t": [1, 2, 3] } +{ "name": "west", "age": 25, "map": { "a": 21, "b": 22 }, "list": [ { "c": 11, "d": 2 }, { "c": 23, "d": 4 } ], "t": [1, 2, 3] } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/test-data0.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/test-data0.json b/measure/src/test/resources/test-data0.json new file mode 100644 index 0000000..406acb8 --- /dev/null +++ b/measure/src/test/resources/test-data0.json @@ -0,0 +1,56 @@ +[ + { + "name": "emily", + "age": 5, + "map": { + "a": 1, + "b": 2 + }, + "list": [ + { + "c": 1, + "d": 2 + }, + { + "c": 3, + "d": 4 + } + ] + }, + { + "name": "white", + "age": 15, + "map": { + "a": 11, + "b": 12 + }, + "list": [ + { + "c": 11, + "d": 2 + }, + { + "c": 23, + "d": 4 + } + ] + }, + { + "name": "west", + "age": 25, + "map": { + "a": 21, + "b": 22 + }, + "list": [ + { + "c": 11, + "d": 2 + }, + { + "c": 23, + "d": 4 + } + ] + } +] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/test-data1.jsonFile ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/test-data1.jsonFile b/measure/src/test/resources/test-data1.jsonFile new file mode 100644 index 0000000..1e1f28a --- /dev/null +++ b/measure/src/test/resources/test-data1.jsonFile @@ -0,0 +1,31 @@ +[{ + "Year": "2013", + "First Name": "DAVID", + "County": "KINGS", + "Sex": "M", + "Count": "272" +}, { + "Year": "2013", + "First Name": "JAYDEN", + "County": "KINGS", + "Sex": "M", + "Count": "268" +}, { + "Year": "2013", + "First Name": "JAYDEN", + "County": "QUEENS", + "Sex": "M", + "Count": "219" +}, { + "Year": "2013", + "First Name": "MOSHE", + "County": "KINGS", + "Sex": "M", + "Count": "219" +}, { + "Year": "2013", + "First Name": "ETHAN", + "County": "QUEENS", + "Sex": "M", + "Count": "216" +}] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala deleted file mode 100644 index 6a60326..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala +++ /dev/null @@ -1,198 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.algo -// -//import java.util.Date -// -//import org.apache.griffin.measure.algo.batch.BatchAccuracyAlgo -//import org.apache.griffin.measure.config.params._ -//import org.apache.griffin.measure.config.params.env._ -//import org.apache.griffin.measure.config.params.user._ -//import org.apache.griffin.measure.config.reader._ -//import org.apache.griffin.measure.config.validator._ -//import org.apache.griffin.measure.connector.direct.DirectDataConnector -//import org.apache.griffin.measure.connector.{DataConnector, DataConnectorFactory} -//import org.apache.griffin.measure.log.Loggable -//import org.apache.griffin.measure.rule.expr._ -//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory} -//import org.apache.spark.rdd.RDD -//import org.apache.spark.sql.SQLContext -//import org.apache.spark.{SparkConf, SparkContext} -//import org.junit.runner.RunWith -//import org.scalatest.junit.JUnitRunner -//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -// -//import scala.util.{Failure, Success, Try} -// -// -//@RunWith(classOf[JUnitRunner]) -//class BatchAccuracyAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { -// -// val envFile = "src/test/resources/env.json" -// val confFile = "src/test/resources/config.json" -//// val confFile = "{\"name\":\"accu1\",\"type\":\"accuracy\",\"source\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_src.avro\"}},\"target\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_target.avro\"}},\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code AND (15 OR true) WHEN true AND $source.user_id > 10020\"}}" -// val envFsType = "local" -// val userFsType = "local" -// -// val args = Array(envFile, confFile) -// -// var sc: SparkContext = _ -// var sqlContext: SQLContext = _ -// -// var allParam: AllParam = _ -// -// before { -// // read param files -// val envParam = readParamFile[EnvParam](envFile, envFsType) match { -// case Success(p) => p -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-2) -// } -// } -// val userParam = readParamFile[UserParam](confFile, userFsType) match { -// case Success(p) => p -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-2) -// } -// } -// allParam = AllParam(envParam, userParam) -// -// // validate param files -// validateParams(allParam) match { -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-3) -// } -// case _ => { -// info("params validation pass") -// } -// } -// -// val metricName = userParam.name -// val conf = new SparkConf().setMaster("local[*]").setAppName(metricName) -// sc = new SparkContext(conf) -// sqlContext = new SQLContext(sc) -// } -// -// test("algorithm") { -// Try { -// val envParam = allParam.envParam -// val userParam = allParam.userParam -// -// // start time -// val startTime = new Date().getTime() -// -// // get spark application id -// val applicationId = sc.applicationId -// -// // rules -// val ruleFactory = RuleFactory(userParam.evaluateRuleParam) -// val rule: StatementExpr = ruleFactory.generateRule() -// val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule) -// -// ruleAnalyzer.constCacheExprs.foreach(println) -// ruleAnalyzer.constFinalCacheExprs.foreach(println) -// -// // global cache data -// val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]()) -// val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap) -// val finalConstMap = finalConstExprValueMap.headOption match { -// case Some(m) => m -// case _ => Map[String, Any]() -// } -// -// // data connector -// val sourceDataConnector: DirectDataConnector = -// DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam, -// ruleAnalyzer.sourceRuleExprs, finalConstMap -// ) match { -// case Success(cntr) => { -// if (cntr.available) cntr -// else throw new Exception("source data not available!") -// } -// case Failure(ex) => throw ex -// } -// val targetDataConnector: DirectDataConnector = -// DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.targetParam, -// ruleAnalyzer.targetRuleExprs, finalConstMap -// ) match { -// case Success(cntr) => { -// if (cntr.available) cntr -// else throw new Exception("target data not available!") -// } -// case Failure(ex) => throw ex -// } -// -// // get metadata -//// val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match { -//// case Success(md) => md -//// case Failure(ex) => throw ex -//// } -//// val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match { -//// case Success(md) => md -//// case Failure(ex) => throw ex -//// } -// -// // get data -// val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match { -// case Success(dt) => dt -// case Failure(ex) => throw ex -// } -// val targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = targetDataConnector.data() match { -// case Success(dt) => dt -// case Failure(ex) => throw ex -// } -// -// // my algo -// val algo = BatchAccuracyAlgo(allParam) -// -// // accuracy algorithm -// val (accuResult, missingRdd, matchedRdd) = algo.accuracy(sourceData, targetData, ruleAnalyzer) -// -// println(s"match percentage: ${accuResult.matchPercentage}, total count: ${accuResult.total}") -// -// missingRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs, ruleAnalyzer.targetRuleExprs.persistExprs)).foreach(println) -// -// // end time -// val endTime = new Date().getTime -// println(s"using time: ${endTime - startTime} ms") -// } match { -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-4) -// } -// case _ => { -// info("calculation finished") -// } -// } -// } -// -// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { -// val paramReader = ParamReaderFactory.getParamReader(file, fsType) -// paramReader.readConfig[T] -// } -// -// private def validateParams(allParam: AllParam): Try[Boolean] = { -// val allParamValidator = AllParamValidator() -// allParamValidator.validate(allParam) -// } -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala deleted file mode 100644 index e0f500a..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala +++ /dev/null @@ -1,173 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.algo -// -//import java.util.Date -// -//import org.apache.griffin.measure.algo.batch.BatchProfileAlgo -//import org.apache.griffin.measure.config.params._ -//import org.apache.griffin.measure.config.params.env._ -//import org.apache.griffin.measure.config.params.user._ -//import org.apache.griffin.measure.config.reader._ -//import org.apache.griffin.measure.config.validator._ -//import org.apache.griffin.measure.connector.direct.DirectDataConnector -//import org.apache.griffin.measure.connector.{DataConnector, DataConnectorFactory} -//import org.apache.griffin.measure.log.Loggable -//import org.apache.griffin.measure.rule.expr._ -//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory} -//import org.apache.spark.rdd.RDD -//import org.apache.spark.sql.SQLContext -//import org.apache.spark.{SparkConf, SparkContext} -//import org.junit.runner.RunWith -//import org.scalatest.junit.JUnitRunner -//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -// -//import scala.util.{Failure, Success, Try} -// -// -//@RunWith(classOf[JUnitRunner]) -//class BatchProfileAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { -// -// val envFile = "src/test/resources/env.json" -// val confFile = "src/test/resources/config-profile.json" -// val envFsType = "local" -// val userFsType = "local" -// -// val args = Array(envFile, confFile) -// -// var sc: SparkContext = _ -// var sqlContext: SQLContext = _ -// -// var allParam: AllParam = _ -// -// before { -// // read param files -// val envParam = readParamFile[EnvParam](envFile, envFsType) match { -// case Success(p) => p -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-2) -// } -// } -// val userParam = readParamFile[UserParam](confFile, userFsType) match { -// case Success(p) => p -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-2) -// } -// } -// allParam = AllParam(envParam, userParam) -// -// // validate param files -// validateParams(allParam) match { -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-3) -// } -// case _ => { -// info("params validation pass") -// } -// } -// -// val metricName = userParam.name -// val conf = new SparkConf().setMaster("local[*]").setAppName(metricName) -// sc = new SparkContext(conf) -// sqlContext = new SQLContext(sc) -// } -// -// test("algorithm") { -// Try { -// val envParam = allParam.envParam -// val userParam = allParam.userParam -// -// // start time -// val startTime = new Date().getTime() -// -// // get spark application id -// val applicationId = sc.applicationId -// -// // rules -// val ruleFactory = RuleFactory(userParam.evaluateRuleParam) -// val rule: StatementExpr = ruleFactory.generateRule() -// val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule) -// -// ruleAnalyzer.constCacheExprs.foreach(println) -// ruleAnalyzer.constFinalCacheExprs.foreach(println) -// -// // global cache data -// val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]()) -// val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap) -// val finalConstMap = finalConstExprValueMap.headOption match { -// case Some(m) => m -// case _ => Map[String, Any]() -// } -// -// // data connector -// val sourceDataConnector: DirectDataConnector = -// DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam, -// ruleAnalyzer.sourceRuleExprs, finalConstMap -// ) match { -// case Success(cntr) => { -// if (cntr.available) cntr -// else throw new Exception("source data not available!") -// } -// case Failure(ex) => throw ex -// } -// -// // get data -// val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match { -// case Success(dt) => dt -// case Failure(ex) => throw ex -// } -// -// // my algo -// val algo = BatchProfileAlgo(allParam) -// -// // profile algorithm -// val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, ruleAnalyzer) -// -// println(s"match percentage: ${profileResult.matchPercentage}, match count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}") -// -// matchedRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println) -// -// // end time -// val endTime = new Date().getTime -// println(s"using time: ${endTime - startTime} ms") -// } match { -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-4) -// } -// case _ => { -// info("calculation finished") -// } -// } -// } -// -// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { -// val paramReader = ParamReaderFactory.getParamReader(file, fsType) -// paramReader.readConfig[T] -// } -// -// private def validateParams(allParam: AllParam): Try[Boolean] = { -// val allParamValidator = AllParamValidator() -// allParamValidator.validate(allParam) -// } -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala deleted file mode 100644 index a76712f..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala +++ /dev/null @@ -1,172 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.algo.batch -// -//import java.util.Date -// -//import org.apache.griffin.measure.config.params._ -//import org.apache.griffin.measure.config.params.env._ -//import org.apache.griffin.measure.config.params.user._ -//import org.apache.griffin.measure.config.reader._ -//import org.apache.griffin.measure.config.validator._ -//import org.apache.griffin.measure.connector.DataConnectorFactory -//import org.apache.griffin.measure.connector.direct.DirectDataConnector -//import org.apache.griffin.measure.log.Loggable -//import org.apache.griffin.measure.rule.expr._ -//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory} -//import org.apache.spark.rdd.RDD -//import org.apache.spark.sql.SQLContext -//import org.apache.spark.{SparkConf, SparkContext} -//import org.junit.runner.RunWith -//import org.scalatest.junit.JUnitRunner -//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -// -//import scala.util.{Failure, Success, Try} -// -// -//@RunWith(classOf[JUnitRunner]) -//class DataFrameSaveTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { -// -// val envFile = "src/test/resources/env.json" -// val confFile = "src/test/resources/config-profile.json" -// val envFsType = "local" -// val userFsType = "local" -// -// val args = Array(envFile, confFile) -// -// var sc: SparkContext = _ -// var sqlContext: SQLContext = _ -// -// var allParam: AllParam = _ -// -// before { -// // read param files -// val envParam = readParamFile[EnvParam](envFile, envFsType) match { -// case Success(p) => p -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-2) -// } -// } -// val userParam = readParamFile[UserParam](confFile, userFsType) match { -// case Success(p) => p -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-2) -// } -// } -// allParam = AllParam(envParam, userParam) -// -// // validate param files -// validateParams(allParam) match { -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-3) -// } -// case _ => { -// info("params validation pass") -// } -// } -// -// val metricName = userParam.name -// val conf = new SparkConf().setMaster("local[*]").setAppName(metricName) -// sc = new SparkContext(conf) -// sqlContext = new SQLContext(sc) -// } -// -// test("algorithm") { -// Try { -// val envParam = allParam.envParam -// val userParam = allParam.userParam -// -// // start time -// val startTime = new Date().getTime() -// -// // get spark application id -// val applicationId = sc.applicationId -// -// // rules -// val ruleFactory = RuleFactory(userParam.evaluateRuleParam) -// val rule: StatementExpr = ruleFactory.generateRule() -// val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule) -// -// ruleAnalyzer.constCacheExprs.foreach(println) -// ruleAnalyzer.constFinalCacheExprs.foreach(println) -// -// // global cache data -// val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]()) -// val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap) -// val finalConstMap = finalConstExprValueMap.headOption match { -// case Some(m) => m -// case _ => Map[String, Any]() -// } -// -// // data connector -// val sourceDataConnector: DirectDataConnector = -// DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam, -// ruleAnalyzer.sourceRuleExprs, finalConstMap -// ) match { -// case Success(cntr) => { -// if (cntr.available) cntr -// else throw new Exception("source data not available!") -// } -// case Failure(ex) => throw ex -// } -// -// // get data -// val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match { -// case Success(dt) => dt -// case Failure(ex) => throw ex -// } -// -// // my algo -// val algo = BatchProfileAlgo(allParam) -// -// // profile algorithm -// val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, ruleAnalyzer) -// -// println(s"match percentage: ${profileResult.matchPercentage}, match count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}") -// -// matchedRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println) -// -// // end time -// val endTime = new Date().getTime -// println(s"using time: ${endTime - startTime} ms") -// } match { -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-4) -// } -// case _ => { -// info("calculation finished") -// } -// } -// } -// -// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { -// val paramReader = ParamReaderFactory.getParamReader(file, fsType) -// paramReader.readConfig[T] -// } -// -// private def validateParams(allParam: AllParam): Try[Boolean] = { -// val allParamValidator = AllParamValidator() -// allParamValidator.validate(allParam) -// } -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala deleted file mode 100644 index 2179fba..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.algo.core - -import org.apache.griffin.measure.config.params.user.EvaluateRuleParam -import org.apache.griffin.measure.rule.expr._ -import org.apache.griffin.measure.rule.{RuleAnalyzer, RuleFactory} -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -import org.scalatest.PrivateMethodTester - -@RunWith(classOf[JUnitRunner]) -class AccuracyCoreTest extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester { - - def findExprId(exprs: Iterable[Expr], desc: String): String = { - exprs.find(_.desc == desc) match { - case Some(expr) => expr._id - case _ => "" - } - } - - test ("match data success") { - val rule = "$source.name = $target.name AND $source.age < $target.age" - val evaluateRuleParam = EvaluateRuleParam(1.0, rule) - val ruleFactory = RuleFactory(evaluateRuleParam) - val statement = ruleFactory.generateRule - val ruleAnalyzer = RuleAnalyzer(statement) - - val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs - val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs - - val source = (Map[String, Any]( - (findExprId(sourcePersistExprs, "$source['name']") -> "jack"), - (findExprId(sourcePersistExprs, "$source['age']") -> 26) - ), Map[String, Any]()) - val target = (Map[String, Any]( - (findExprId(targetPersistExprs, "$target['name']") -> "jack"), - (findExprId(targetPersistExprs, "$target['age']") -> 27) - ), Map[String, Any]()) - - val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData) - val result = AccuracyCore invokePrivate matchData(source, target, ruleAnalyzer) - result._1 should be (true) - result._2.size should be (0) - } - - test ("match data fail") { - val rule = "$source.name = $target.name AND $source.age = $target.age" - val evaluateRuleParam = EvaluateRuleParam(1.0, rule) - val ruleFactory = RuleFactory(evaluateRuleParam) - val statement = ruleFactory.generateRule - val ruleAnalyzer = RuleAnalyzer(statement) - - val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs - val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs - - val source = (Map[String, Any]( - (findExprId(sourcePersistExprs, "$source['name']") -> "jack"), - (findExprId(sourcePersistExprs, "$source['age']") -> 26) - ), Map[String, Any]()) - val target = (Map[String, Any]( - (findExprId(targetPersistExprs, "$target['name']") -> "jack"), - (findExprId(targetPersistExprs, "$target['age']") -> 27) - ), Map[String, Any]()) - - val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData) - val result = AccuracyCore invokePrivate matchData(source, target, ruleAnalyzer) - result._1 should be (false) - result._2.size shouldNot be (0) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala deleted file mode 100644 index 087e8e5..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.algo.core - -import org.apache.griffin.measure.config.params.user.EvaluateRuleParam -import org.apache.griffin.measure.rule.expr._ -import org.apache.griffin.measure.rule.{RuleAnalyzer, RuleFactory} -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -import org.scalatest.PrivateMethodTester - -@RunWith(classOf[JUnitRunner]) -class ProfileCoreTest extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester { - - def findExprId(exprs: Iterable[Expr], desc: String): String = { - exprs.find(_.desc == desc) match { - case Some(expr) => expr._id - case _ => "" - } - } - - test ("match data success") { - val rule = "$source.name = 'jack' AND $source.age = null" - val evaluateRuleParam = EvaluateRuleParam(1.0, rule) - val ruleFactory = RuleFactory(evaluateRuleParam) - val statement = ruleFactory.generateRule - val ruleAnalyzer = RuleAnalyzer(statement) - - val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs - - val source = (Map[String, Any]( - (findExprId(sourcePersistExprs, "$source['name']") -> "jack"), - (findExprId(sourcePersistExprs, "$source['age']") -> null) - ), Map[String, Any]()) - - val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData) - val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer) - result._1 should be (true) - result._2.size should be (0) - } - - test ("match data fail") { - val rule = "$source.name = 'jack' AND $source.age != null" - val evaluateRuleParam = EvaluateRuleParam(1.0, rule) - val ruleFactory = RuleFactory(evaluateRuleParam) - val statement = ruleFactory.generateRule - val ruleAnalyzer = RuleAnalyzer(statement) - - val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs - - val source = (Map[String, Any]( - (findExprId(sourcePersistExprs, "$source['name']") -> "jack"), - (findExprId(sourcePersistExprs, "$source['age']") -> null) - ), Map[String, Any]()) - - val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData) - val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer) - result._1 should be (false) - result._2.size shouldNot be (0) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala deleted file mode 100644 index a22f91f..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala +++ /dev/null @@ -1,267 +0,0 @@ -///* -//Licensed to the Apache Software Foundation (ASF) under one -//or more contributor license agreements. See the NOTICE file -//distributed with this work for additional information -//regarding copyright ownership. The ASF licenses this file -//to you under the Apache License, Version 2.0 (the -//"License"); you may not use this file except in compliance -//with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -//Unless required by applicable law or agreed to in writing, -//software distributed under the License is distributed on an -//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -//KIND, either express or implied. See the License for the -//specific language governing permissions and limitations -//under the License. -//*/ -//package org.apache.griffin.measure.algo.streaming -// -//import java.util.Date -//import java.util.concurrent.TimeUnit -// -//import org.apache.griffin.measure.algo.batch.BatchAccuracyAlgo -//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} -//import org.apache.griffin.measure.cache.result._ -//import org.apache.griffin.measure.config.params._ -//import org.apache.griffin.measure.config.params.env._ -//import org.apache.griffin.measure.config.params.user._ -//import org.apache.griffin.measure.config.reader._ -//import org.apache.griffin.measure.config.validator._ -//import org.apache.griffin.measure.connector.direct.DirectDataConnector -//import org.apache.griffin.measure.connector.{DataConnector, DataConnectorFactory} -//import org.apache.griffin.measure.log.Loggable -//import org.apache.griffin.measure.persist.{Persist, PersistFactory, PersistType} -//import org.apache.griffin.measure.result._ -//import org.apache.griffin.measure.rule.expr._ -//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory} -//import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} -//import org.apache.spark.rdd.RDD -//import org.apache.spark.sql.SQLContext -//import org.apache.spark.sql.hive.HiveContext -//import org.apache.spark.streaming.{Milliseconds, StreamingContext} -//import org.apache.spark.{SparkConf, SparkContext} -//import org.junit.runner.RunWith -//import org.scalatest.junit.JUnitRunner -//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -// -//import scala.util.{Failure, Success, Try} -// -// -//@RunWith(classOf[JUnitRunner]) -//class StreamingAccuracyAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { -// -// val envFile = "src/test/resources/env-streaming.json" -// val confFile = "src/test/resources/config-streaming3.json" -// val envFsType = "local" -// val userFsType = "local" -// -// val args = Array(envFile, confFile) -// -// var sc: SparkContext = _ -// var sqlContext: SQLContext = _ -//// val ssc: StreamingContext = _ -// -// var allParam: AllParam = _ -// -// before { -// // read param files -// val envParam = readParamFile[EnvParam](envFile, envFsType) match { -// case Success(p) => p -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-2) -// } -// } -// val userParam = readParamFile[UserParam](confFile, userFsType) match { -// case Success(p) => p -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-2) -// } -// } -// allParam = AllParam(envParam, userParam) -// -// // validate param files -// validateParams(allParam) match { -// case Failure(ex) => { -// error(ex.getMessage) -// sys.exit(-3) -// } -// case _ => { -// info("params validation pass") -// } -// } -// -// val metricName = userParam.name -// val sparkParam = envParam.sparkParam -// val conf = new SparkConf().setMaster("local[*]").setAppName(metricName) -// conf.setAll(sparkParam.config) -// sc = new SparkContext(conf) -// sc.setLogLevel(envParam.sparkParam.logLevel) -// sqlContext = new SQLContext(sc) -//// sqlContext = new HiveContext(sc) -// -//// val a = sqlContext.sql("select * from s1 limit 10") -//// // val a = sqlContext.sql("show tables") -//// a.show(10) -//// -//// val b = HdfsUtil.existPath("/griffin/streaming") -//// println(b) -// } -// -// test("algorithm") { -// val envParam = allParam.envParam -// val userParam = allParam.userParam -// val metricName = userParam.name -// val sparkParam = envParam.sparkParam -// val cleanerParam = envParam.cleanerParam -// -//// val ssc = StreamingContext.getOrCreate(sparkParam.cpDir, -//// ( ) => { -//// try { -//// val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match { -//// case Some(interval) => Milliseconds(interval) -//// case _ => throw new Exception("invalid batch interval") -//// } -//// val ssc = new StreamingContext(sc, batchInterval) -//// ssc.checkpoint(sparkParam.cpDir) -//// ssc -//// } catch { -//// case runtime: RuntimeException => { -//// throw runtime -//// } -//// } -//// }) -// -// val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match { -// case Some(interval) => Milliseconds(interval) -// case _ => throw new Exception("invalid batch interval") -// } -// val ssc = new StreamingContext(sc, batchInterval) -// ssc.checkpoint(sparkParam.cpDir) -// -// // start time -// val startTime = new Date().getTime() -// -// val persistFactory = PersistFactory(envParam.persistParams, metricName) -// -// // get persists to persist measure result -// val appPersist: Persist = persistFactory.getPersists(startTime) -// -// // get spark application id -// val applicationId = sc.applicationId -// -// // persist start id -// appPersist.start(applicationId) -// -// InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName) -// InfoCacheInstance.init -// -// // generate rule from rule param, generate rule analyzer -// val ruleFactory = RuleFactory(userParam.evaluateRuleParam) -// val rule: StatementExpr = ruleFactory.generateRule() -// val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule) -// -// // const expr value map -// val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]()) -// val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap) -// val finalConstMap = finalConstExprValueMap.headOption match { -// case Some(m) => m -// case _ => Map[String, Any]() -// } -// -// // data connector -// val sourceDataConnector: DirectDataConnector = -// DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, userParam.sourceParam, -// ruleAnalyzer.sourceRuleExprs, finalConstMap -// ) match { -// case Success(cntr) => { -// if (cntr.available) cntr -// else throw new Exception("source data connection error!") -// } -// case Failure(ex) => throw ex -// } -// val targetDataConnector: DirectDataConnector = -// DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, userParam.targetParam, -// ruleAnalyzer.targetRuleExprs, finalConstMap -// ) match { -// case Success(cntr) => { -// if (cntr.available) cntr -// else throw new Exception("target data connection error!") -// } -// case Failure(ex) => throw ex -// } -// -// val cacheResultProcesser = CacheResultProcesser() -// -// // init data stream -// sourceDataConnector.init() -// targetDataConnector.init() -// -// // my algo -// val algo = StreamingAccuracyAlgo(allParam) -// -// val streamingAccuracyProcess = StreamingAccuracyProcess( -// sourceDataConnector, targetDataConnector, -// ruleAnalyzer, cacheResultProcesser, persistFactory, appPersist) -// -// val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match { -// case Some(interval) => interval -// case _ => throw new Exception("invalid batch interval") -// } -// val process = TimingProcess(processInterval, streamingAccuracyProcess) -// -// // clean thread -//// case class Clean() extends Runnable { -//// val lock = InfoCacheInstance.genLock("clean") -//// def run(): Unit = { -//// val locked = lock.lock(5, TimeUnit.SECONDS) -//// if (locked) { -//// try { -//// sourceDataConnector.cleanData -//// targetDataConnector.cleanData -//// } finally { -//// lock.unlock() -//// } -//// } -//// } -//// } -//// val cleanInterval = TimeUtil.milliseconds(cleanerParam.cleanInterval) match { -//// case Some(interval) => interval -//// case _ => throw new Exception("invalid batch interval") -//// } -//// val clean = TimingProcess(cleanInterval, Clean()) -// -// process.startup() -//// clean.startup() -// -// ssc.start() -// ssc.awaitTermination() -// ssc.stop(stopSparkContext=true, stopGracefully=true) -// -// println("================ end ================") -// -// // context stop -// sc.stop -// -// InfoCacheInstance.close -// -// appPersist.finish() -// -// process.shutdown() -//// clean.shutdown() -// } -// -// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { -// val paramReader = ParamReaderFactory.getParamReader(file, fsType) -// paramReader.readConfig[T] -// } -// -// private def validateParams(allParam: AllParam): Try[Boolean] = { -// val allParamValidator = AllParamValidator() -// allParamValidator.validate(allParam) -// } -// -//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala b/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala index b3c94e5..9e5d380 100644 --- a/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala +++ b/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala @@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} @RunWith(classOf[JUnitRunner]) class ParamRawStringReaderTest extends FunSuite with Matchers with BeforeAndAfter { - test("read config") { + test("read raw config") { val rawString = """{"type": "hdfs", "config": {"path": "/path/to", "time": 1234567}}""" val reader = ParamRawStringReader(rawString) @@ -34,5 +34,4 @@ class ParamRawStringReaderTest extends FunSuite with Matchers with BeforeAndAfte paramTry.isSuccess should be (true) paramTry.get should be (PersistParam("hdfs", Map[String, Any](("path" -> "/path/to"), ("time" -> 1234567)))) } - } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala deleted file mode 100644 index 2139ff7..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.connector - -import java.util.Date -import java.util.concurrent.TimeUnit - -import kafka.serializer.StringDecoder -import org.apache.griffin.measure.algo.streaming.TimingProcess -import org.apache.griffin.measure.cache.info.InfoCacheInstance -import org.apache.griffin.measure.config.params.env._ -import org.apache.griffin.measure.config.params.user.{DataCacheParam, DataConnectorParam, EvaluateRuleParam} -import org.apache.griffin.measure.config.reader.ParamRawStringReader -import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo} -import org.apache.griffin.measure.rule.expr.{Expr, StatementExpr} -import org.apache.griffin.measure.rule._ -import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, TimeUtil} -import org.apache.griffin.measure.rule.{DataTypeCalculationUtil, ExprValueUtil, RuleExprs} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types.{DataType, StructField, StructType} -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.apache.spark.streaming.dstream.InputDStream -import org.apache.spark.streaming.kafka.KafkaUtils -import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import org.apache.spark.{SparkConf, SparkContext} -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} - -import scala.reflect.ClassTag -import scala.util.{Failure, Success, Try} - -@RunWith(classOf[JUnitRunner]) -class ConnectorTest extends FunSuite with Matchers with BeforeAndAfter { - - test("read config") { - - val a = "java.lang.String" - val at = getClassTag(a) - println(at) - - at match { - case ClassTag(m) => println(m) - case _ => println("no") - } - - } - - private def getClassTag(tp: String): ClassTag[_] = { - val clazz = Class.forName(tp) - ClassTag(clazz) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/data/connector/ConnectorTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/data/connector/ConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/data/connector/ConnectorTest.scala new file mode 100644 index 0000000..ead84f7 --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/data/connector/ConnectorTest.scala @@ -0,0 +1,71 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.connector + +import java.util.Date +import java.util.concurrent.TimeUnit + +import kafka.serializer.StringDecoder +import org.apache.griffin.measure.cache.info.InfoCacheInstance +import org.apache.griffin.measure.config.params.env._ +import org.apache.griffin.measure.config.params.user.{DataConnectorParam, EvaluateRuleParam} +import org.apache.griffin.measure.config.reader.ParamRawStringReader +import org.apache.griffin.measure.data.connector.batch.TextDirBatchDataConnector +import org.apache.griffin.measure.process.TimingProcess +import org.apache.griffin.measure.process.engine.DqEngines +import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo} +import org.apache.griffin.measure.rule._ +import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, TimeUtil} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.streaming.dstream.InputDStream +import org.apache.spark.streaming.kafka.KafkaUtils +import org.apache.spark.streaming.{Milliseconds, StreamingContext} +import org.apache.spark.{SparkConf, SparkContext} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} + +import scala.reflect.ClassTag +import scala.util.{Failure, Success, Try} + +@RunWith(classOf[JUnitRunner]) +class ConnectorTest extends FunSuite with Matchers with BeforeAndAfter { + + test("read config") { + + val a = "java.lang.String" + val at = getClassTag(a) + println(at) + + at match { + case ClassTag(m) => println(m) + case _ => println("no") + } + + } + + private def getClassTag(tp: String): ClassTag[_] = { + val clazz = Class.forName(tp) + ClassTag(clazz) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala new file mode 100644 index 0000000..a1e4854 --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala @@ -0,0 +1,146 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process + +import org.apache.griffin.measure.config.params.env._ +import org.apache.griffin.measure.config.params.user._ +import org.apache.griffin.measure.config.params._ +import org.apache.griffin.measure.config.reader.ParamReaderFactory +import org.apache.griffin.measure.config.validator.AllParamValidator +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.persist.PersistThreadPool +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} + +import scala.util.{Failure, Success, Try} + +@RunWith(classOf[JUnitRunner]) +class BatchProcessTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { + + val envFile = "src/test/resources/env-test.json" + val confFile = "src/test/resources/config-test-profiling.json" +// val confFile = "src/test/resources/config-test-accuracy.json" + + val envFsType = "local" + val userFsType = "local" + + val args = Array(envFile, confFile) + + var allParam: AllParam = _ + + before { + // read param files + val envParam = readParamFile[EnvParam](envFile, envFsType) match { + case Success(p) => p + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-2) + } + } + val userParam = readParamFile[UserParam](confFile, userFsType) match { + case Success(p) => p + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-2) + } + } + allParam = AllParam(envParam, userParam) + + // validate param files + validateParams(allParam) match { + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-3) + } + case _ => { + info("params validation pass") + } + } + } + + test ("batch process") { + val procType = ProcessType(allParam.userParam.procType) + val proc: DqProcess = procType match { + case BatchProcessType => BatchDqProcess(allParam) + case StreamingProcessType => StreamingDqProcess(allParam) + case _ => { + error(s"${procType} is unsupported process type!") + sys.exit(-4) + } + } + + // process init + proc.init match { + case Success(_) => { + info("process init success") + } + case Failure(ex) => { + error(s"process init error: ${ex.getMessage}") + shutdown + sys.exit(-5) + } + } + + // process run + proc.run match { + case Success(_) => { + info("process run success") + } + case Failure(ex) => { + error(s"process run error: ${ex.getMessage}") + + if (proc.retriable) { + throw ex + } else { + shutdown + sys.exit(-5) + } + } + } + + // process end + proc.end match { + case Success(_) => { + info("process end success") + } + case Failure(ex) => { + error(s"process end error: ${ex.getMessage}") + shutdown + sys.exit(-5) + } + } + + shutdown + } + + private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { + val paramReader = ParamReaderFactory.getParamReader(file, fsType) + paramReader.readConfig[T] + } + + private def validateParams(allParam: AllParam): Try[Boolean] = { + val allParamValidator = AllParamValidator() + allParamValidator.validate(allParam) + } + + private def shutdown(): Unit = { + PersistThreadPool.shutdown + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala new file mode 100644 index 0000000..b119d76 --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala @@ -0,0 +1,531 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process + +import org.apache.griffin.measure.config.params._ +import org.apache.griffin.measure.config.params.env._ +import org.apache.griffin.measure.config.params.user._ +import org.apache.griffin.measure.config.reader.ParamReaderFactory +import org.apache.griffin.measure.config.validator.AllParamValidator +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.persist.PersistThreadPool +import org.apache.griffin.measure.process.engine.DataFrameOprs +import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil} +import org.apache.hadoop.hive.ql.exec.UDF +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.expressions.UserDefinedAggregateFunction +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.types._ +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} + +import scala.collection.mutable.WrappedArray +import scala.util.{Failure, Success, Try} + +@RunWith(classOf[JUnitRunner]) +class JsonParseTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { + + var sparkContext: SparkContext = _ + var sqlContext: SQLContext = _ + + before { + val conf = new SparkConf().setAppName("test json").setMaster("local[*]") + sparkContext = new SparkContext(conf) + sparkContext.setLogLevel("WARN") +// sqlContext = new HiveContext(sparkContext) + sqlContext = new SQLContext(sparkContext) + } + + test ("json test") { + // 0. prepare data +// val dt = +// """ +// |{"name": "s1", "age": 12, "items": [1, 2, 3], +// |"subs": [{"id": 1, "type": "seed"}, {"id": 2, "type": "frog"}], +// |"inner": {"a": 1, "b": 2}, "jstr": "{\"s1\": \"aaa\", \"s2\": 123}" +// |}""".stripMargin +// val rdd0 = sparkContext.parallelize(Seq(dt)).map(Row(_)) + val rdd0 = sparkContext.textFile("src/test/resources/input.msg").map(Row(_)) + + val vtp = StructField("value", StringType) + val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp))) + df0.registerTempTable("src") + +// val fromJson2Array = (s: String) => { +// JsonUtil.fromJson[Seq[String]](s) +// } +// sqlContext.udf.register("from_json_to_array", fromJson2Array) +// +// val df2 = sqlContext.sql("SELECT explode(from_json_to_array(get_json_object(value, '$.seeds'))) as value FROM src") +// df2.printSchema +// df2.show(10) +// df2.registerTempTable("df2") + + + + // 1. read from json string to extracted json row +// val readSql = "SELECT value FROM src" +// val df = sqlContext.sql(readSql) +// val df = sqlContext.table("src") +// val rdd = df.map { row => +// row.getAs[String]("value") +// } +// val df1 = sqlContext.read.json(rdd) +// df1.printSchema +// df1.show(10) +// df1.registerTempTable("df1") + val details = Map[String, Any](("df.name" -> "src")) + val df1 = DataFrameOprs.fromJson(sqlContext, details) + df1.registerTempTable("df1") + + // 2. extract json array into lines +// val rdd2 = df1.flatMap { row => +// row.getAs[WrappedArray[String]]("seeds") +// } +// val df2 = sqlContext.read.json(rdd2) + val df2 = sqlContext.sql("select explode(seeds) as value from df1") +// val tdf = sqlContext.sql("select name, age, explode(items) as item from df1") +// tdf.registerTempTable("tdf") +// val df2 = sqlContext.sql("select struct(name, age, item) as ttt from tdf") + df2.printSchema + df2.show(10) + df2.registerTempTable("df2") + println(df2.count) + + val sql1 = "SELECT value FROM df2" + val df22 = sqlContext.sql(sql1) + val rdd22 = df22.map { row => + row.getAs[String]("value") + } + import org.apache.spark.sql.functions._ + val df23 = sqlContext.read.json(rdd22) + df23.registerTempTable("df23") +// df23.withColumn("par", monotonicallyIncreasingId) + + val df24 = sqlContext.sql("SELECT url, cast(get_json_object(metadata, '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM df23") + df24.printSchema + df24.show(10) + df24.registerTempTable("df24") + println(df24.count) + +// val df25 = sqlContext.sql("select ") + +// +// // 3. extract json string into row +//// val df3 = sqlContext.sql("select cast(get_json_object(metadata, '$.tracker.crawlRequestCreateTS') as bigint), url from df2") +// val df3 = sqlContext.sql("select cast(get_json_object(get_json_object(value, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint), get_json_object(value, '$.url') from df2") +// df3.printSchema() +// df3.show(10) +// println(df3.count) + + + +// val df5 = sqlContext.sql("select get_json_object(value, '$.subs') as subs from src") +// df5.printSchema() +// df5.show(10) +// df5.registerTempTable("df5") +// val rdd5 = df5.map { row => +// row.getAs[String]("subs") +// } +// val df6 = sqlContext.read.json(rdd5) +// df6.printSchema +// df6.show(10) + + // 2. extract json string to row +// val df2 = sqlContext.sql("select jstr from df1") +// val rdd2 = df2.map { row => +// row.getAs[String]("jstr") +// } +// val df22 = sqlContext.read.json(rdd2) +// df22.printSchema +// df22.show(100) +// df22.registerTempTable("df2") +// +// val df23 = sqlContext.sql("select json_tuple(jstr, 's1', 's2') from df1") +// df23.printSchema() +// df23.show(100) + + // 3. extract json array into lines ?? + + // 3. flatmap from json row to json row +// val df3 = sqlContext.sql("select explode(subs) as sub, items from df1") +// df3.printSchema() +// df3.show(10) +// df3.registerTempTable("df3") +// +// val df4 = sqlContext.sql("select explode(items) as item, sub from df3") +// df4.printSchema() +// df4.show(10) + +// sqlContext.udf.register("length", (s: WrappedArray[_]) => s.length) + // + // val df2 = sqlContext.sql("SELECT inner from df1") + // df2.registerTempTable("df2") + // df2.printSchema + // df2.show(100) + +// def children(colname: String, df: DataFrame): Array[DataFrame] = { +// val parent = df.schema.fields.filter(_.name == colname).head +// println(parent) +// val fields: Array[StructField] = parent.dataType match { +// case x: StructType => x.fields +// case _ => Array.empty[StructField] +// } +// fields.map(x => col(s"$colname.${x.name}")) +//// fields.foreach(println) +// } +//// +// children("inner", df2) +// +// df2.select(children("bar", df): _*).printSchema + +// val df3 = sqlContext.sql("select inline(subs) from df1") +// df3.printSchema() +// df3.show(100) + +// val rdd2 = df2.flatMap { row => +// row.getAs[GenericRowWithSchema]("inner") :: Nil +// } +// +// rdd2. + +// val funcs = sqlContext.sql("show functions") +// funcs.printSchema() +// funcs.show(1000) +// +// val desc = sqlContext.sql("describe function inline") +// desc.printSchema() +// desc.show(100) + + // + + } + + test ("json test 2") { + val rdd0 = sparkContext.textFile("src/test/resources/output.msg").map(Row(_)) + + val vtp = StructField("value", StringType) + val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp))) + df0.registerTempTable("tgt") + +// val fromJson2StringArray = (s: String) => { +// val seq = JsonUtil.fromJson[Seq[Any]](s) +// seq.map(i => JsonUtil.toJson(i)) +// } +// sqlContext.udf.register("from_json_to_string_array", fromJson2StringArray) +// +// val df2 = sqlContext.sql("SELECT from_json_to_string_array(get_json_object(value, '$.groups[0].attrsList')) as value FROM tgt") +// df2.printSchema() +// df2.show(10) +// df2.registerTempTable("df2") +// +// val indexOfStringArray = (sa: String, ) + + + // 1. read from json string to extracted json row + val readSql = "SELECT value FROM tgt" + val df = sqlContext.sql(readSql) + val rdd = df.map { row => + row.getAs[String]("value") + } + val df1 = sqlContext.read.json(rdd) + df1.printSchema + df1.show(10) + df1.registerTempTable("df1") + + + val df2 = sqlContext.sql("select groups[0].attrsList as attrs from df1") + df2.printSchema + df2.show(10) + df2.registerTempTable("df2") + println(df2.count) + + val indexOf = (arr: Seq[String], v: String) => { + arr.indexOf(v) + } + sqlContext.udf.register("index_of", indexOf) + + val df3 = sqlContext.sql("select attrs.values[index_of(attrs.name, 'URL')][0] as url, cast(get_json_object(attrs.values[index_of(attrs.name, 'CRAWLMETADATA')][0], '$.tracker.crawlRequestCreateTS') as bigint) as ts from df2") + df3.printSchema() + df3.show(10) + df3.registerTempTable("df3") + } + + test ("testing") { + val dt = + """ + |{"name": "age", "age": 12, "items": [1, 2, 3], + |"subs": [{"id": 1, "type": "seed"}, {"id": 2, "type": "frog"}], + |"inner": {"a": 1, "b": 2}, "jstr": "{\"s1\": \"aaa\", \"s2\": 123}", "b": true + |}""".stripMargin + val rdd = sparkContext.parallelize(Seq(dt)).map(Row(_)) + val vtp = StructField("value", StringType) + val df = sqlContext.createDataFrame(rdd, StructType(Array(vtp))) + df.registerTempTable("df") + + val df1 = sqlContext.read.json(sqlContext.sql("select * from df").map(r => r.getAs[String]("value"))) + df1.printSchema() + df1.show(10) + df1.registerTempTable("df1") + + val test = (s: String) => { + s.toInt + } + sqlContext.udf.register("to_int", test) + + val df2 = sqlContext.sql("select (b) as aa, inner.a from df1 where age = to_int(\"12\")") + df2.printSchema() + df2.show(10) + } + + test ("test input only sql") { + val rdd0 = sparkContext.textFile("src/test/resources/input.msg").map(Row(_)) + + val vtp = StructField("value", StringType) + val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp))) + df0.registerTempTable("src") + df0.show(10) + + // 1. read from json string to extracted json row + val df1 = sqlContext.sql("SELECT get_json_object(value, '$.seeds') as seeds FROM src") + df1.printSchema + df1.show(10) + df1.registerTempTable("df1") + + val json2StringArray: (String) => Seq[String] = (s: String) => { + val seq = JsonUtil.fromJson[Seq[String]](s) +// seq.map(i => JsonUtil.toJson(i)) + seq + } + sqlContext.udf.register("json_to_string_array", json2StringArray) + + val df2 = sqlContext.sql("SELECT explode(json_to_string_array(seeds)) as seed FROM df1") + df2.printSchema + df2.show(10) + df2.registerTempTable("df2") + + + val df3 = sqlContext.sql("SELECT get_json_object(seed, '$.url') as url, cast(get_json_object(get_json_object(seed, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM df2") + df3.printSchema + df3.show(10) + } + + test ("test output only sql") { + val rdd0 = sparkContext.textFile("src/test/resources/output.msg").map(Row(_)) + + val vtp = StructField("value", StringType) + val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp))) + df0.registerTempTable("tgt") + df0.printSchema() + df0.show(10) + + val json2StringArray: (String) => Seq[String] = (s: String) => { + JsonUtil.fromJson[Seq[String]](s) + } + sqlContext.udf.register("json_to_string_array", json2StringArray) + + val json2StringJsonArray: (String) => Seq[String] = (s: String) => { + val seq = JsonUtil.fromJson[Seq[Any]](s) + seq.map(i => JsonUtil.toJson(i)) + } + sqlContext.udf.register("json_to_string_json_array", json2StringJsonArray) + + val indexOf = (arr: Seq[String], v: String) => { + arr.indexOf(v) + } + sqlContext.udf.register("index_of", indexOf) + + val indexOfField = (arr: Seq[String], k: String, v: String) => { + val seq = arr.flatMap { item => + JsonUtil.fromJson[Map[String, Any]](item).get(k) + } + seq.indexOf(v) + } + sqlContext.udf.register("index_of_field", indexOfField) + + // 1. read from json string to extracted json row + val df1 = sqlContext.sql("SELECT get_json_object(value, '$.groups[0].attrsList') as attrs FROM tgt") + df1.printSchema + df1.show(10) + df1.registerTempTable("df1") + + val df2 = sqlContext.sql("SELECT json_to_string_json_array(attrs) as attrs FROM df1") + df2.printSchema() + df2.show(10) + df2.registerTempTable("df2") + + val df3 = sqlContext.sql("SELECT attrs[index_of_field(attrs, 'name', 'URL')] as attr1, attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')] as attr2 FROM df2") + df3.printSchema() + df3.show(10) + df3.registerTempTable("df3") + + val df4 = sqlContext.sql("SELECT json_to_string_array(get_json_object(attr1, '$.values'))[0], cast(get_json_object(json_to_string_array(get_json_object(attr2, '$.values'))[0], '$.tracker.crawlRequestCreateTS') as bigint) FROM df3") + df4.printSchema() + df4.show(10) + } + + test ("test from json") { + val fromJson2Map = (str: String) => { + val a = JsonUtil.fromJson[Map[String, Any]](str) + a.mapValues { v => + v match { + case t: String => t + case _ => JsonUtil.toJson(v) + } + } + } + sqlContext.udf.register("from_json_to_map", fromJson2Map) + + val fromJson2Array = (str: String) => { + val a = JsonUtil.fromJson[Seq[Any]](str) + a.map { v => + v match { + case t: String => t + case _ => JsonUtil.toJson(v) + } + } + } + sqlContext.udf.register("from_json_to_array", fromJson2Array) + + // ======================== + + val srdd = sparkContext.textFile("src/test/resources/input.msg").map(Row(_)) + val svtp = StructField("value", StringType) + val sdf0 = sqlContext.createDataFrame(srdd, StructType(Array(svtp))) + sdf0.registerTempTable("sdf0") + sdf0.show(10) + + // 1. read from json string to extracted json row + val sdf1 = sqlContext.sql("SELECT explode(from_json_to_array(get_json_object(value, '$.seeds'))) as seed FROM sdf0") + sdf1.printSchema + sdf1.show(10) + sdf1.registerTempTable("sdf1") + + val sdf2 = sqlContext.sql("SELECT get_json_object(seed, '$.url') as url, cast(get_json_object(get_json_object(seed, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM sdf1") + sdf2.printSchema + sdf2.show(10) + + // --------------------------------------- + + val trdd = sparkContext.textFile("src/test/resources/output.msg").map(Row(_)) + val tvtp = StructField("value", StringType) + val tdf0 = sqlContext.createDataFrame(trdd, StructType(Array(tvtp))) + tdf0.registerTempTable("tdf0") + tdf0.printSchema() + tdf0.show(10) + +// val json2StringArray: (String) => Seq[String] = (s: String) => { +// JsonUtil.fromJson[Seq[String]](s) +// } +// sqlContext.udf.register("json_to_string_array", json2StringArray) +// +// val json2StringJsonArray: (String) => Seq[String] = (s: String) => { +// val seq = JsonUtil.fromJson[Seq[Any]](s) +// seq.map(i => JsonUtil.toJson(i)) +// } +// sqlContext.udf.register("json_to_string_json_array", json2StringJsonArray) +// +// val indexOf = (arr: Seq[String], v: String) => { +// arr.indexOf(v) +// } +// sqlContext.udf.register("index_of", indexOf) +// + val indexOfField = (arr: Seq[String], k: String, v: String) => { + val seq = arr.flatMap { item => + JsonUtil.fromJson[Map[String, Any]](item).get(k) + } + seq.indexOf(v) + } + sqlContext.udf.register("index_of_field", indexOfField) + + // 1. read from json string to extracted json row +// val df1 = sqlContext.sql("SELECT get_json_object(value, '$.groups[0].attrsList') as attrs FROM tdf0") + val tdf1 = sqlContext.sql("SELECT from_json_to_array(get_json_object(value, '$.groups[0].attrsList')) as attrs FROM tdf0") + tdf1.printSchema + tdf1.show(10) + tdf1.registerTempTable("tdf1") + +// val tdf2 = sqlContext.sql("SELECT attrs[index_of_field(attrs, 'name', 'URL')] as attr1, attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')] as attr2 FROM tdf1") +// tdf2.printSchema() +// tdf2.show(10) +// tdf2.registerTempTable("tdf2") + + val tdf3 = sqlContext.sql("SELECT from_json_to_array(get_json_object(attrs[index_of_field(attrs, 'name', 'URL')], '$.values'))[0] as url, cast(get_json_object(from_json_to_array(get_json_object(attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')], '$.values'))[0], '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM tdf1") + tdf3.printSchema() + tdf3.show(10) + } + + test ("sql functions") { + val functions = sqlContext.sql("show functions") + functions.printSchema() + functions.show(10) + + val functionNames = functions.map(_.getString(0)).collect + functionNames.foreach(println) + } + + test ("test text file read") { + val partitionPaths = Seq[String]( + "hdfs://localhost/griffin/streaming/dump/source/418010/25080625/1504837518000", + "hdfs://localhost/griffin/streaming/dump/target/418010/25080625/1504837518000") + val df = sqlContext.read.json(partitionPaths: _*) + df.printSchema() + df.show(10) + } + + test ("list paths") { + val filePath = "hdfs://localhost/griffin/streaming/dump/source" + val partitionRanges = List[(Long, Long)]((0, 0), (-2, 0)) + val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges) + println(partitionPaths) + } + + private def listPathsBetweenRanges(paths: List[String], + partitionRanges: List[(Long, Long)] + ): List[String] = { + partitionRanges match { + case Nil => paths + case head :: tail => { + val (lb, ub) = head + val curPaths = paths.flatMap { path => + val names = HdfsUtil.listSubPathsByType(path, "dir").toList + println(names) + names.filter { name => + str2Long(name) match { + case Some(t) => (t >= lb) && (t <= ub) + case _ => false + } + }.map(HdfsUtil.getHdfsFilePath(path, _)) + } + listPathsBetweenRanges(curPaths, tail) + } + } + } + + private def str2Long(str: String): Option[Long] = { + try { + Some(str.toLong) + } catch { + case e: Throwable => None + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala b/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala new file mode 100644 index 0000000..394917c --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala @@ -0,0 +1,85 @@ +package org.apache.griffin.measure.process + +import org.apache.griffin.measure.utils.JsonUtil +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.execution.datasources.json.JSONOptions +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + + +case class JsonToStructs( +// schema: DataType, +// options: Map[String, String], + child: Expression) + extends UnaryExpression with CodegenFallback with ExpectsInputTypes { + override def nullable: Boolean = true + +// def this(schema: DataType, options: Map[String, String], child: Expression) = +// this(schema, options, child, None) + + // Used in `FunctionRegistry` +// def this(child: Expression, schema: Expression) = +// this( +// schema = JsonExprUtils.validateSchemaLiteral(schema), +// options = Map.empty[String, String], +// child = child, +// timeZoneId = None) +// +// def this(child: Expression, schema: Expression, options: Expression) = +// this( +// schema = JsonExprUtils.validateSchemaLiteral(schema), +// options = JsonExprUtils.convertToMapData(options), +// child = child, +// timeZoneId = None) +// +// override def checkInputDataTypes(): TypeCheckResult = schema match { +// case _: StructType | ArrayType(_: StructType, _) => +// super.checkInputDataTypes() +// case _ => TypeCheckResult.TypeCheckFailure( +// s"Input schema ${schema.simpleString} must be a struct or an array of structs.") +// } + + override def dataType: DataType = MapType(StringType, StringType) + +// override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = +// copy(timeZoneId = Option(timeZoneId)) + + override def nullSafeEval(json: Any): Any = { + if (json.toString.trim.isEmpty) return null + + try { + JsonUtil.fromJson[Map[String, Any]](json.toString) + } catch { + case _: Throwable => null + } + } + + override def inputTypes: Seq[DataType] = StringType :: Nil +} +// +//object JsonExprUtils { +// +// def validateSchemaLiteral(exp: Expression): StructType = exp match { +// case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString) +// case e => throw new AnalysisException(s"Expected a string literal instead of $e") +// } +// +// def convertToMapData(exp: Expression): Map[String, String] = exp match { +// case m: CreateMap +// if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) => +// val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData] +// ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) => +// key.toString -> value.toString +// } +// case m: CreateMap => +// throw new AnalysisException( +// s"A type of keys and values in map() must be string, but got ${m.dataType}") +// case _ => +// throw new AnalysisException("Must use a map() function for options") +// } +//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala new file mode 100644 index 0000000..07b7c5e --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala @@ -0,0 +1,147 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process + +import org.apache.griffin.measure.config.params._ +import org.apache.griffin.measure.config.params.env._ +import org.apache.griffin.measure.config.params.user._ +import org.apache.griffin.measure.config.reader.ParamReaderFactory +import org.apache.griffin.measure.config.validator.AllParamValidator +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.persist.PersistThreadPool +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} + +import scala.util.{Failure, Success, Try} + +@RunWith(classOf[JUnitRunner]) +class StreamingProcessTest extends FunSuite with Matchers with BeforeAndAfter with Loggable { + + val envFile = "src/test/resources/env-streaming.json" +// val confFile = "src/test/resources/config-test-accuracy-streaming-multids.json" + val confFile = "src/test/resources/config-test-accuracy-streaming.json" +// val confFile = "src/test/resources/config-test-profiling-streaming.json" + + val envFsType = "local" + val userFsType = "local" + + val args = Array(envFile, confFile) + + var allParam: AllParam = _ + + before { + // read param files + val envParam = readParamFile[EnvParam](envFile, envFsType) match { + case Success(p) => p + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-2) + } + } + val userParam = readParamFile[UserParam](confFile, userFsType) match { + case Success(p) => p + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-2) + } + } + allParam = AllParam(envParam, userParam) + + // validate param files + validateParams(allParam) match { + case Failure(ex) => { + error(ex.getMessage) + sys.exit(-3) + } + case _ => { + info("params validation pass") + } + } + } + + test ("streaming process") { + val procType = ProcessType(allParam.userParam.procType) + val proc: DqProcess = procType match { + case BatchProcessType => BatchDqProcess(allParam) + case StreamingProcessType => StreamingDqProcess(allParam) + case _ => { + error(s"${procType} is unsupported process type!") + sys.exit(-4) + } + } + + // process init + proc.init match { + case Success(_) => { + info("process init success") + } + case Failure(ex) => { + error(s"process init error: ${ex.getMessage}") + shutdown + sys.exit(-5) + } + } + + // process run + proc.run match { + case Success(_) => { + info("process run success") + } + case Failure(ex) => { + error(s"process run error: ${ex.getMessage}") + + if (proc.retriable) { + throw ex + } else { + shutdown + sys.exit(-5) + } + } + } + + // process end + proc.end match { + case Success(_) => { + info("process end success") + } + case Failure(ex) => { + error(s"process end error: ${ex.getMessage}") + shutdown + sys.exit(-5) + } + } + + shutdown + } + + private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = { + val paramReader = ParamReaderFactory.getParamReader(file, fsType) + paramReader.readConfig[T] + } + + private def validateParams(allParam: AllParam): Try[Boolean] = { + val allParamValidator = AllParamValidator() + allParamValidator.validate(allParam) + } + + private def shutdown(): Unit = { + PersistThreadPool.shutdown + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/rule/ExprValueUtilTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/rule/ExprValueUtilTest.scala b/measure/src/test/scala/org/apache/griffin/measure/rule/ExprValueUtilTest.scala deleted file mode 100644 index dd8d4a0..0000000 --- a/measure/src/test/scala/org/apache/griffin/measure/rule/ExprValueUtilTest.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule - -import org.apache.griffin.measure.config.params.user.EvaluateRuleParam -import org.apache.griffin.measure.rule.expr.{Expr, StatementExpr} -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} - -@RunWith(classOf[JUnitRunner]) -class ExprValueUtilTest extends FunSuite with BeforeAndAfter with Matchers { - - test ("rule calculation") { - // val rules = "$source.json().name = 's2' and $source.json().age[*] = 32" - // val rules = "$source.json().items[*] = 202 AND $source.json().age[*] = 32 AND $source.json().df[*].a = 1" - val rules = "$source.json().items[*] = 202 AND $source.json().age[*] = 32 AND $source.json().df['a' = 1].b = 4" - // val rules = "$source.json().df[0].a = 1" - val ep = EvaluateRuleParam(1, rules) - - val ruleFactory = RuleFactory(ep) - val rule: StatementExpr = ruleFactory.generateRule() - val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule) - - val ruleExprs = ruleAnalyzer.sourceRuleExprs - val constFinalExprValueMap = Map[String, Any]() - - val data = List[String]( - ("""{"name": "s1", "age": [22, 23], "items": [102, 104, 106], "df": [{"a": 1, "b": 3}, {"a": 2, "b": 4}]}"""), - ("""{"name": "s2", "age": [32, 33], "items": [202, 204, 206], "df": [{"a": 1, "b": 4}, {"a": 2, "b": 4}]}"""), - ("""{"name": "s3", "age": [42, 43], "items": [302, 304, 306], "df": [{"a": 1, "b": 5}, {"a": 2, "b": 4}]}""") - ) - - def str(expr: Expr) = { - s"${expr._id}: ${expr.desc} [${expr.getClass.getSimpleName}]" - } - println("====") - ruleExprs.finalCacheExprs.foreach { expr => - println(str(expr)) - } - println("====") - ruleExprs.cacheExprs.foreach { expr => - println(str(expr)) - } - - val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]()) - val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap) - val finalConstMap = finalConstExprValueMap.headOption match { - case Some(m) => m - case _ => Map[String, Any]() - } - println("====") - println(ruleAnalyzer.constCacheExprs) - println(ruleAnalyzer.constFinalCacheExprs) - println(finalConstMap) - - println("====") - val valueMaps = data.flatMap { msg => - val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(msg), ruleExprs.cacheExprs, finalConstMap) - val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps) - - finalExprValueMaps - } - - valueMaps.foreach(println) - println(valueMaps.size) - - } - -}
