http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/README.md ---------------------------------------------------------------------- diff --git a/griffin-models/README.md b/griffin-models/README.md deleted file mode 100644 index 144a275..0000000 --- a/griffin-models/README.md +++ /dev/null @@ -1,158 +0,0 @@ -# Models -models to calculate data quality metrics. - -### Accuracy model -accuracy model is to compare source and target content, given corresponding mapping relationship. - -#### Introduction -How to measure accuracy dimension of one target dataset T, given source of truth as golden dataset S. -To measure accuracy quality of target dataset T, -basic approach is to calculate discrepancy between target and source datasets by going through their contents, -examining whether all fields are exactly matched as below, -``` - Count(source.field1 == target.field1 && source.field2 == target.field2 && ...source.fieldN == target.fieldN) -Accuracy = --------------------------------------------------------------------------------------------------------------- - Count(source) - -``` - -Since two datasets are too big to fit in one box, so our approach is to leverage map reduce programming model by distributed computing. - -The real challenge is how to make this comparing algorithm generic enough to release data analysts and data scientists from coding burdens, and at the same time, it keeps flexibility to cover most of accuracy requirements. - -Traditional way is to use SQL based join to calculate this, like scripts in hive. - -But this SQL based solution can be improved since it has not considered unique natures of source dataset and target dataset in this context. - -Our approach is to provide a generic accuracy model, after taking into consideration of special natures of source dataset and target dataset. - -Our implementation is in scala, leveraging scala's declarative capability to cater for various requirements, and running in spark cluster. - -To make it concrete, schema for Source is as below - -``` -|-- uid: string (nullable = true) -|-- site_id: string (nullable = true) -|-- page_id: string (nullable = true) -|-- curprice: string (nullable = true) -|-- itm: string (nullable = true) -|-- itmcond: string (nullable = true) -|-- itmtitle: string (nullable = true) -|-- l1: string (nullable = true) -|-- l2: string (nullable = true) -|-- leaf: string (nullable = true) -|-- meta: string (nullable = true) -|-- st: string (nullable = true) -|-- dc: string (nullable = true) -|-- tr: string (nullable = true) -|-- eventtimestamp: string (nullable = true) -|-- cln: string (nullable = true) -|-- siid: string (nullable = true) -|-- ciid: string (nullable = true) -|-- sellerid: string (nullable = true) -|-- pri: string (nullable = true) -|-- pt: string (nullable = true) -|-- dt: string (nullable = true) -|-- hour: string (nullable = true) -``` - -and schema for target is below as - -``` -|-- uid: string (nullable = true) -|-- page_id: string (nullable = true) -|-- site_id: string (nullable = true) -|-- js_ev_mak: string (nullable = true) -|-- js_ev_orgn: string (nullable = true) -|-- curprice: string (nullable = true) -|-- itm: string (nullable = true) -|-- itmcond: string (nullable = true) -|-- itmtitle: string (nullable = true) -|-- l1: string (nullable = true) -|-- l2: string (nullable = true) -|-- leaf: string (nullable = true) -|-- meta: string (nullable = true) -|-- st: string (nullable = true) -|-- dc: string (nullable = true) -|-- tr: string (nullable = true) -|-- eventtimestamp: string (nullable = true) -|-- cln: string (nullable = true) -|-- siid: string (nullable = true) -|-- ciid: string (nullable = true) -|-- sellerid: string (nullable = true) -|-- product_ref_id: string (nullable = true) -|-- product_type: string (nullable = true) -|-- is_bu: string (nullable = true) -|-- is_udid: string (nullable = true) -|-- is_userid: string (nullable = true) -|-- is_cguid: string (nullable = true) -|-- dt: string (nullable = true) -|-- hour: string (nullable = true) -``` - - -#### Accuracy Model In Deep - -##### Pre-Process phase (transform raw data) -For efficient, we will convert our raw record to some key-value pair , after that, we just need to compare values which have the same key. -Since two dataset might have different names for the same field, and fields might come in different order, we will keep original information in associative map for later process. - -The records will look like, -``` -((uid,eventtimestamp)->(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...) -``` -and to track where are the data from, we add one labeling tag here. -for source dataset, we add label tag "\_\_source\_\_" and for target dataset, we add label tag "\_\_target\_\_". -``` -((uid,eventtimestamp)->("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...))) -((uid,eventtimestamp)->("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...))) -``` -Ideally, in dataset, applying those composite keys, we should be able to get unique records for every composite key. -but the reality is , for various unknown reasons, dataset might have duplicate records given one unique composite key. -To cover this problem, and to track all records from source node, we will append all duplicate records in a list during this step. -The record will look like after pre process , -``` -((uid,eventtimestamp)->List(("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)),...,("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)))) -``` -To save all records from target node, we will insert all records in a set during this step. -The record will look like after pre process , -``` -((uid,eventtimestamp)->Set(("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)),...,("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)))) -``` -##### Aggregate and Comparing phase -Union source and target together, execute one aggregate for all, we can apply rules defined by users to check whether records in source and target are matched or not. - -``` -aggregate { (List(sources),Set(targets)) => - if(foreach element from List(sources) in Set(targets)) emit true - else emit false -} -``` -We can also execute one aggregate to count the mismatch records in source -``` -aggregate (missedCount = 0) { (List(sources), Set(targets)) => - foreach (element in List(sources)) { - if (element in Set(targets)) continue - else missedCount += 1 - } -} -``` -#### Benefits - + It is two times faster than traditional SQL JOIN based solution, since it is using algorithm customized for this special accuracy problem. - + It is easily to iterate new accuracy metric as it is packaged as a common library as a basic service, previously it took us one week to develop and deploy one new metrics from scratch, but after applying this approach , it only need several hours to get all done. - - - - -#### Further discussion - + How to select keys? - How many keys we should use, if we use too many keys, it will reduce our calculation performance, otherwise, it might have too many duplicate records, which will make our comparison logic complex. - + How to define content equation? - For some data, it is straightforward, but for some data, it might require transform by some UDFS, how can we make our system extensible to support different raw data. - + How to fix data latency issue? - To compare, we have to have data available, but how to handle data latency issue which happens often in real enterprise environment. - + How to restore lost data? - Detect data lost is good, but the further action is how can we restore those lost data? - - -
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/accu_config.json ---------------------------------------------------------------------- diff --git a/griffin-models/accu_config.json b/griffin-models/accu_config.json deleted file mode 100644 index fc9875a..0000000 --- a/griffin-models/accu_config.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "source": "users_info_src", - "target": "users_info_target", - "accuracyMapping": [ - { - "sourceColId": 0, - "sourceColName": "user_id", - "targetColId": 0, - "targetColName": "user_id", - "matchFunction": "false", - "isPK": true - }, - { - "sourceColId": 1, - "sourceColName": "first_name", - "targetColId": 1, - "targetColName": "first_name", - "matchFunction": "false", - "isPK": false - }, - { - "sourceColId": 2, - "sourceColName": "last_name", - "targetColId": 2, - "targetColName": "last_name", - "matchFunction": "false", - "isPK": false - }, - { - "sourceColId": 3, - "sourceColName": "address", - "targetColId": 3, - "targetColName": "address", - "matchFunction": "false", - "isPK": false - }, - { - "sourceColId": 4, - "sourceColName": "email", - "targetColId": 4, - "targetColName": "email", - "matchFunction": "false", - "isPK": false - }, - { - "sourceColId": 5, - "sourceColName": "phone", - "targetColId": 5, - "targetColName": "phone", - "matchFunction": "false", - "isPK": false - }, - { - "sourceColId": 6, - "sourceColName": "post_code", - "targetColId": 6, - "targetColName": "post_code", - "matchFunction": "false", - "isPK": false - } - ] -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/app.conf.template ---------------------------------------------------------------------- diff --git a/griffin-models/app.conf.template b/griffin-models/app.conf.template deleted file mode 100644 index a0e8795..0000000 --- a/griffin-models/app.conf.template +++ /dev/null @@ -1,4 +0,0 @@ -models { - app.name = "BarkRealtimeApp" - batch.interval.seconds = 10 -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/dataFile/srcFile.avro ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/dataFile/srcFile.avro b/griffin-models/data/test/dataFile/srcFile.avro deleted file mode 100644 index e8c11ab..0000000 Binary files a/griffin-models/data/test/dataFile/srcFile.avro and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/dataFile/srcFile.avsc ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/dataFile/srcFile.avsc b/griffin-models/data/test/dataFile/srcFile.avsc deleted file mode 100644 index 1158b8f..0000000 --- a/griffin-models/data/test/dataFile/srcFile.avsc +++ /dev/null @@ -1,18 +0,0 @@ -{ - "type" : "record", - "name" : "src_data", - "namespace" : "org.apache.griffin", - "fields" : [ { - "name" : "id", - "type" : "long" - }, { - "name" : "name", - "type" : ["null", "string"] - }, { - "name" : "age", - "type" : "int" - }, { - "name" : "desc", - "type" : ["null", "string"] - } ] -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/dataFile/srcFile.json ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/dataFile/srcFile.json b/griffin-models/data/test/dataFile/srcFile.json deleted file mode 100644 index e1b320b..0000000 --- a/griffin-models/data/test/dataFile/srcFile.json +++ /dev/null @@ -1,5 +0,0 @@ -{"id": 11111, "name": {"string":"Amy"}, "age": 18, "desc": {"string":"student"}} -{"id": 22222, "name": {"string":"Shawn"}, "age": 28, "desc": {"string":"engineer"}} -{"id": 33333, "name": {"string":"Kitty"}, "age": 12, "desc": {"string":"cat"}} -{"id": 44444, "name": {"string":"Snoooooopy"}, "age": 16, "desc": {"string":"dooooog"}} -{"id": 55555, "name": {"string":"Jason"}, "age": 35, "desc": {"string":"cook"}} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/dataFile/srcFileCsv ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/dataFile/srcFileCsv b/griffin-models/data/test/dataFile/srcFileCsv deleted file mode 100644 index 6647f51..0000000 --- a/griffin-models/data/test/dataFile/srcFileCsv +++ /dev/null @@ -1,6 +0,0 @@ -id,name,age,desc -11111,Amy,18,student -22222,Shawn,28,engineer -33333,Kitty,12,cat -44444,Snoooooopy,16,dooooog -55555,Jason,35,cook \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/dataFile/srcFileCsv_type ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/dataFile/srcFileCsv_type b/griffin-models/data/test/dataFile/srcFileCsv_type deleted file mode 100644 index 567c1bc..0000000 --- a/griffin-models/data/test/dataFile/srcFileCsv_type +++ /dev/null @@ -1,5 +0,0 @@ -columnName,columnType -id,long -name,string -age,int -desc,string \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/dataFile/tgtFile.avro ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/dataFile/tgtFile.avro b/griffin-models/data/test/dataFile/tgtFile.avro deleted file mode 100644 index 9761676..0000000 Binary files a/griffin-models/data/test/dataFile/tgtFile.avro and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/dataFile/tgtFile.avsc ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/dataFile/tgtFile.avsc b/griffin-models/data/test/dataFile/tgtFile.avsc deleted file mode 100644 index 0e5f86d..0000000 --- a/griffin-models/data/test/dataFile/tgtFile.avsc +++ /dev/null @@ -1,18 +0,0 @@ -{ - "type" : "record", - "name" : "tgt_data", - "namespace" : "org.apache.griffin", - "fields" : [ { - "name" : "id", - "type" : "long" - }, { - "name" : "name", - "type" : ["string", "null"] - }, { - "name" : "age", - "type" : "int" - }, { - "name" : "desc", - "type" : ["string", "null"] - } ] -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/dataFile/tgtFile.json ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/dataFile/tgtFile.json b/griffin-models/data/test/dataFile/tgtFile.json deleted file mode 100644 index 2d992fa..0000000 --- a/griffin-models/data/test/dataFile/tgtFile.json +++ /dev/null @@ -1,5 +0,0 @@ -{"id": 11111, "name": {"string":"Amy"}, "age": 18, "desc": {"string":"student"}} -{"id": 22222, "name": {"string":"Shawn"}, "age": 28, "desc": {"string":"engineer"}} -{"id": 33333, "name": {"string":"Kitty"}, "age": 12, "desc": {"string":"cat"}} -{"id": 44444, "name": {"string":"Snoopy"}, "age": 16, "desc": {"string":"dog"}} -{"id": 55555, "name": {"string":"Jason"}, "age": 35, "desc": {"string":"cook"}} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/dataFile/tgtFileCsv ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/dataFile/tgtFileCsv b/griffin-models/data/test/dataFile/tgtFileCsv deleted file mode 100644 index 9c2351c..0000000 --- a/griffin-models/data/test/dataFile/tgtFileCsv +++ /dev/null @@ -1,6 +0,0 @@ -id,name,age,desc -11111,Amy,18,student -22222,Shawn,28,engineer -33333,Kitty,12,cat -44444,Snoopy,16,dog -55555,Jason,35,cook \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/dataFile/tgtFileCsv_type ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/dataFile/tgtFileCsv_type b/griffin-models/data/test/dataFile/tgtFileCsv_type deleted file mode 100644 index 567c1bc..0000000 --- a/griffin-models/data/test/dataFile/tgtFileCsv_type +++ /dev/null @@ -1,5 +0,0 @@ -columnName,columnType -id,long -name,string -age,int -desc,string \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/recordFile/_RESULT_ACCU ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/recordFile/_RESULT_ACCU b/griffin-models/data/test/recordFile/_RESULT_ACCU deleted file mode 100644 index 65ad8d7..0000000 --- a/griffin-models/data/test/recordFile/_RESULT_ACCU +++ /dev/null @@ -1,5 +0,0 @@ -//========== 1. Test Accuracy model result with request file: data\test\reqJson\accuAvroTest.json ========== -match percentage: 80.0 % - -Map(id -> 44444, name -> Snoooooopy, age -> 16, desc -> dooooog) - http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/recordFile/_RESULT_VALI ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/recordFile/_RESULT_VALI b/griffin-models/data/test/recordFile/_RESULT_VALI deleted file mode 100644 index 8b3b159..0000000 --- a/griffin-models/data/test/recordFile/_RESULT_VALI +++ /dev/null @@ -1,3 +0,0 @@ -//========== 1. Test Validity model result with request file: data\test\reqJson\valiAvroTest.json ========== -dataSet: srcFile, validityReq: List(colId: 0, colName: id, colType: bigint, isNum: true, metrics: List(name: 5, result: 55555.0, name: 6, result: 11111.0, name: 7, result: 33333.0, name: 8, result: 33333.0, name: 1, result: 5), colId: 2, colName: age, colType: int, isNum: true, metrics: List(name: 5, result: 35.0, name: 6, result: 12.0, name: 7, result: 21.8, name: 8, result: 18.0, name: 1, result: 5), colId: 1, colName: name, colType: string, isNum: false, metrics: List(name: 1, result: 5, name: 2, result: 0, name: 3, result: 5, name: 4, result: 0), colId: 3, colName: desc, colType: string, isNum: false, metrics: List(name: 1, result: 5, name: 2, result: 0, name: 3, result: 5, name: 4, result: 0)) - http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/reqJson/accuAvroTest.json ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/reqJson/accuAvroTest.json b/griffin-models/data/test/reqJson/accuAvroTest.json deleted file mode 100644 index 6f44baa..0000000 --- a/griffin-models/data/test/reqJson/accuAvroTest.json +++ /dev/null @@ -1,34 +0,0 @@ -{ - "source": "srcFile", - "target": "tgtFile", - "accuracyMapping": [ - { - "sourceColId": 0, - "sourceColName": "id", - "targetColId": 0, - "targetColName": "id", - "isPK": true - }, - { - "sourceColId": 1, - "sourceColName": "name", - "targetColId": 1, - "targetColName": "name", - "isPK": false - }, - { - "sourceColId": 2, - "sourceColName": "age", - "targetColId": 2, - "targetColName": "age", - "isPK": false - }, - { - "sourceColId": 3, - "sourceColName": "desc", - "targetColId": 3, - "targetColName": "desc", - "isPK": true - } - ] -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/reqJson/accuCsvTest.json ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/reqJson/accuCsvTest.json b/griffin-models/data/test/reqJson/accuCsvTest.json deleted file mode 100644 index 85db5ab..0000000 --- a/griffin-models/data/test/reqJson/accuCsvTest.json +++ /dev/null @@ -1,34 +0,0 @@ -{ - "source": "srcFileCsv", - "target": "tgtFileCsv", - "accuracyMapping": [ - { - "sourceColId": 0, - "sourceColName": "id", - "targetColId": 0, - "targetColName": "id", - "isPK": true - }, - { - "sourceColId": 1, - "sourceColName": "name", - "targetColId": 1, - "targetColName": "name", - "isPK": false - }, - { - "sourceColId": 2, - "sourceColName": "age", - "targetColId": 2, - "targetColName": "age", - "isPK": false - }, - { - "sourceColId": 3, - "sourceColName": "desc", - "targetColId": 3, - "targetColName": "desc", - "isPK": true - } - ] -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/reqJson/valiAvroTest.json ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/reqJson/valiAvroTest.json b/griffin-models/data/test/reqJson/valiAvroTest.json deleted file mode 100644 index 0099ab6..0000000 --- a/griffin-models/data/test/reqJson/valiAvroTest.json +++ /dev/null @@ -1,83 +0,0 @@ -{ - "dataSet": "srcFile", - "validityReq": [ - { - "colId": 0, - "colName": "id", - "metrics": [ - { - "name": 5 - }, - { - "name": 6 - }, - { - "name": 7 - }, - { - "name": 8 - }, - { - "name": 1 - } - ] - }, - { - "colId": 1, - "colName": "name", - "metrics": [ - { - "name": 1 - }, - { - "name": 2 - }, - { - "name": 3 - }, - { - "name": 4 - } - ] - }, - { - "colId": 2, - "colName": "age", - "metrics": [ - { - "name": 5 - }, - { - "name": 6 - }, - { - "name": 7 - }, - { - "name": 8 - }, - { - "name": 1 - } - ] - }, - { - "colId": 3, - "colName": "desc", - "metrics": [ - { - "name": 1 - }, - { - "name": 2 - }, - { - "name": 3 - }, - { - "name": 4 - } - ] - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/data/test/reqJson/valiCsvTest.json ---------------------------------------------------------------------- diff --git a/griffin-models/data/test/reqJson/valiCsvTest.json b/griffin-models/data/test/reqJson/valiCsvTest.json deleted file mode 100644 index ac75d85..0000000 --- a/griffin-models/data/test/reqJson/valiCsvTest.json +++ /dev/null @@ -1,83 +0,0 @@ -{ - "dataSet": "srcFileCsv", - "validityReq": [ - { - "colId": 0, - "colName": "id", - "metrics": [ - { - "name": 5 - }, - { - "name": 6 - }, - { - "name": 7 - }, - { - "name": 8 - }, - { - "name": 1 - } - ] - }, - { - "colId": 1, - "colName": "name", - "metrics": [ - { - "name": 1 - }, - { - "name": 2 - }, - { - "name": 3 - }, - { - "name": 4 - } - ] - }, - { - "colId": 2, - "colName": "age", - "metrics": [ - { - "name": 5 - }, - { - "name": 6 - }, - { - "name": 7 - }, - { - "name": 8 - }, - { - "name": 1 - } - ] - }, - { - "colId": 3, - "colName": "desc", - "metrics": [ - { - "name": 1 - }, - { - "name": 2 - }, - { - "name": 3 - }, - { - "name": 4 - } - ] - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/pom.xml ---------------------------------------------------------------------- diff --git a/griffin-models/pom.xml b/griffin-models/pom.xml deleted file mode 100644 index 851234c..0000000 --- a/griffin-models/pom.xml +++ /dev/null @@ -1,201 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Copyright (c) 2016 eBay Software Foundation. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - - <parent> - <groupId>com.ebay.oss</groupId> - <artifactId>griffin-parent</artifactId> - <version>0.1.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>griffin-models</artifactId> - <name>griffin-models</name> - - <!-- <licenses> - <license> - <name>My License</name> - <url>http://....</url> - <distribution>repo</distribution> - </license> - </licenses> --> - - <properties> - <maven.compiler.source>1.7</maven.compiler.source> - <maven.compiler.target>1.7</maven.compiler.target> - <encoding>UTF-8</encoding> - <scala.version>2.10.6</scala.version> - <scala.compat.version>2.10</scala.compat.version> - <spark.version>1.6.0</spark.version> - </properties> - - <dependencies> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_${scala.compat.version}</artifactId> - <version>${spark.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_${scala.compat.version}</artifactId> - <version>${spark.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_${scala.compat.version}</artifactId> - <version>${spark.version}</version> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-mllib_${scala.compat.version}</artifactId> - <version>${spark.version}</version> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.specs2</groupId> - <artifactId>specs2-core_${scala.compat.version}</artifactId> - <version>2.4.16</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.compat.version}</artifactId> - <version>2.2.4</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.specs2</groupId> - <artifactId>specs2-junit_${scala.compat.version}</artifactId> - <version>2.4.16</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.databricks</groupId> - <artifactId>spark-csv_2.10</artifactId> - <version>1.4.0</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.databricks</groupId> - <artifactId>spark-avro_2.10</artifactId> - <version>2.0.1</version> - </dependency> - - <!-- Jersey --> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-core</artifactId> - <version>${jersey.version}</version> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-json</artifactId> - <version>${jersey.version}</version> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-server</artifactId> - <version>${jersey.version}</version> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-bundle</artifactId> - <version>${jersey.version}</version> - </dependency> - <dependency> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-client</artifactId> - <version>${jersey.version}</version> - </dependency> - </dependencies> - - - - <build> - <sourceDirectory>src/main/scala</sourceDirectory> - <testSourceDirectory>src/test/scala</testSourceDirectory> - <plugins> - <plugin> - <!-- see http://davidb.github.com/scala-maven-plugin --> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <version>3.2.0</version> - <executions> - <execution> - <goals> - <goal>compile</goal> - <goal>testCompile</goal> - </goals> - <configuration> - <args> - <!--arg>-make:transitive</arg--> - <arg>-dependencyfile</arg> - <arg>${project.build.directory}/.scala_dependencies</arg> - </args> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>2.18.1</version> - <configuration> - <useFile>false</useFile> - <disableXmlReport>true</disableXmlReport> - <!-- If you have classpath issue like NoDefClassError,... --> - <!-- useManifestOnlyJar>false</useManifestOnlyJar --> - <includes> - <include>**/*Test.*</include> - <include>**/*Suite.*</include> - </includes> - </configuration> - </plugin> - - <!--<plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.4</version> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - </configuration> - <executions> - <execution> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin>--> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/accuracy/Accu.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/accuracy/Accu.scala b/griffin-models/src/main/scala/org/apache/griffin/accuracy/Accu.scala deleted file mode 100644 index bea397d..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/accuracy/Accu.scala +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Accuracy of source data comparing with target data - * - * Purpose: suppose that each row of source data could be found in target data, - * but there exists some errors resulting the data missing, in this progress - * we count the missing data of source data set, which is not found in the target data set - * - * - * - */ - -package org.apache.griffin.accuracy - -import org.apache.griffin.dataLoaderUtils.DataLoaderFactory -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.apache.griffin.util.{HdfsUtils, PartitionUtils} -import org.apache.spark.rdd.RDD.rddToPairRDDFunctions -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.{Logging, SparkConf, SparkContext} - -import scala.collection.immutable.HashSet -import scala.collection.mutable.{MutableList, HashSet => MutableHashSet, Map => MutableMap} - -object Accu extends Logging{ - - def main(args: Array[String]) { - if (args.length < 2) { - logError("Usage: class <input-conf-file> <outputPath>") - logError("For input-conf-file, please use accu_config.json as an template to reflect test dataset accordingly.") - sys.exit(-1) - } - val input = HdfsUtils.openFile(args(0)) - - val outputPath = args(1) + System.getProperty("file.separator") - - //some done files, some are for job scheduling purpose - val startFile = outputPath + "_START" - val resultFile = outputPath + "_RESULT" - val doneFile = outputPath + "_FINISHED" - val missingFile = outputPath + "missingRec.txt" - - //deserialize json to bean object - val mapper = new ObjectMapper() - mapper.registerModule(DefaultScalaModule) - - //read the config info of comparison - val configure = mapper.readValue(input, classOf[AccuracyConfEntity]) - - val conf = new SparkConf().setAppName("Accu") - val sc = new SparkContext(conf) - val sqlContext = new HiveContext(sc) - - //add spark applicationId for debugging - val applicationId = sc.applicationId - - //for spark monitoring - HdfsUtils.writeFile(startFile, applicationId) - - //get source data and target data - val dataLoader = DataLoaderFactory.getDataLoader(sqlContext, DataLoaderFactory.hive) - val (sojdf, bedf) = dataLoader.getAccuDataFrame(configure) - - //-- algorithm -- - val ((missCount, srcCount), missedList) = calcAccu(configure, sojdf, bedf) - - //record result and notify done - HdfsUtils.writeFile(resultFile, ((1 - missCount.toDouble / srcCount) * 100).toString()) - - val sb = new StringBuilder - missedList.foreach { item => - sb.append(item) - sb.append("\n") - } - - //for spark monitoring - HdfsUtils.writeFile(missingFile, sb.toString()) - HdfsUtils.createFile(doneFile) - - sc.stop() - - } - - def calcAccu(configure: AccuracyConfEntity, sojdf: DataFrame, bedf: DataFrame): ((Long, Long), List[String]) = { - val mp = configure.accuracyMapping - - //--0. prepare to start job-- - - //the key column info, to match different rows between source and target - val sojKeyIndexList = MutableList[Tuple2[Int, String]]() - val beKeyIndexList = MutableList[Tuple2[Int, String]]() - - //the value column info, to be compared with between the match rows - val sojValueIndexList = MutableList[Tuple2[Int, String]]() - val beValueIndexList = MutableList[Tuple2[Int, String]]() - - //get the key and value column info from config - for (i <- mp) { - if (i.isPK) { - - val sojkey = Tuple2(i.sourceColId, i.sourceColName) - sojKeyIndexList += sojkey - - val bekey = Tuple2(i.targetColId, i.targetColName) - beKeyIndexList += bekey - - } - - val sojValue = Tuple2(i.sourceColId, i.sourceColName) - sojValueIndexList += sojValue - - val beValue = Tuple2(i.targetColId, i.sourceColName) - beValueIndexList += beValue - - } - - def toTuple[A <: Object](as: MutableList[A]): Product = { - val tupleClass = Class.forName("scala.Tuple" + as.size) - tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product] - } - - //--1. convert data into same format (key, value)-- - - //convert source data rows into (key, ("__source__", valMap)), key is the key column value tuple, valMap is the value map of row - val sojkvs = sojdf.map { row => - val kk = sojKeyIndexList map { t => row.get(t._1).asInstanceOf[Object] } - - val kkk = toTuple(kk) - - val len = row.length - val v = sojValueIndexList.foldLeft(Map[String, Any]()) { (c, x) => c + (x._2 -> row.get(x._1)) } - - (kkk -> v) - } - - //convert source data rows into (key, ("__target__", valMap)), key is the key column value tuple, valMap is the value map of row - val bekvs = bedf.map { row => - val kk = beKeyIndexList map { t => row.get(t._1).asInstanceOf[Object] } - - val kkk = toTuple(kk) - - val len = row.length - val v = beValueIndexList.foldLeft(Map[String, Any]()) { (c, x) => c + (x._2 -> row.get(x._1)) } - - (kkk -> v) - } - - //--2. cogroup src RDD[(k, v1)] and tgt RDD[(k, v2)] into RDD[(k, (Iterable[v1], Iterable[v2]))] - val allkvs = sojkvs.cogroup(bekvs) - - //--3. get missed count of source data-- - - //with the same key, for each source data in list, if it does not exists in the target set, one missed data found - def seqMissed(cnt: ((Long, Long), List[String]), kv: (Product, (Iterable[Map[String, Any]], Iterable[Map[String, Any]]))) = { - val ls = kv._2._1 - val st = kv._2._2 - - if (ls.size > 2 && st.size > 4) { - val st1 = st.foldLeft(HashSet[Map[String, Any]]())((set, mp) => set + mp) - val ss = ls.foldLeft((0, List[String]())) { (c, mp) => - if (st1.contains(mp)) { - c - } else { - (c._1 + 1, mp.toString :: c._2) - } - } - ((cnt._1._1 + ss._1, cnt._1._2 + ls.size), ss._2 ::: cnt._2) - } else { - val ss = ls.foldLeft((0, List[String]())) { (c, mp) => - if (st.exists(mp.equals(_))) { - c - } else { - (c._1 + 1, mp.toString :: c._2) - } - } - ((cnt._1._1 + ss._1, cnt._1._2 + ls.size), ss._2 ::: cnt._2) - } - } - - //add missed count of each partition - def combMissed(cnt1: ((Long, Long), List[String]), cnt2: ((Long, Long), List[String])) = { - ((cnt1._1._1 + cnt2._1._1, cnt1._1._2 + cnt2._1._2), cnt1._2 ::: cnt2._2) - } - - //count missed source data - val missed = allkvs.aggregate(((0L, 0L), List[String]()))(seqMissed, combMissed) - - //output: need to change - logInfo("source count: " + missed._1._2 + " missed count : " + missed._1._1) - - missed - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/accuracy/AccuracyConfEntity.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/accuracy/AccuracyConfEntity.scala b/griffin-models/src/main/scala/org/apache/griffin/accuracy/AccuracyConfEntity.scala deleted file mode 100644 index d8eb185..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/accuracy/AccuracyConfEntity.scala +++ /dev/null @@ -1,17 +0,0 @@ -package org.apache.griffin.accuracy - -import org.apache.griffin.common.PartitionPair - -/** - * Accurarcy configuration entity - */ -class AccuracyConfEntity { - var source: String = _ - var target: String = _ - var dt: String = _ - var hour: String = _ - - var accuracyMapping: List[AccuracyMapping] = List() - var srcPartitions: List[PartitionPair] = List() - var tgtPartitions: List[List[PartitionPair]] = List() -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/accuracy/AccuracyMapping.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/accuracy/AccuracyMapping.scala b/griffin-models/src/main/scala/org/apache/griffin/accuracy/AccuracyMapping.scala deleted file mode 100644 index 4d4688c..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/accuracy/AccuracyMapping.scala +++ /dev/null @@ -1,21 +0,0 @@ -package org.apache.griffin.accuracy - -/** - * mapping between source column and target column - * - */ -class AccuracyMapping { - var sourceColId: Int = _ - var sourceColName: String = _ - var sourceConvertingFunctions: List[String] = List() - - var targetColId: Int = _ - var targetColName: String = _ - var targetConvertingFunctions: List[String] = List() - - /** - * matchFunction is still under specification, will implement it after requirement finalized. - */ - var matchFunction: String = _ - var isPK: Boolean = _ -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/common/PartitionPair.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/common/PartitionPair.scala b/griffin-models/src/main/scala/org/apache/griffin/common/PartitionPair.scala deleted file mode 100644 index 081b192..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/common/PartitionPair.scala +++ /dev/null @@ -1,3 +0,0 @@ -package org.apache.griffin.common - -case class PartitionPair(colName: String, colValue: String) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/AvroFileDataLoader.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/AvroFileDataLoader.scala b/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/AvroFileDataLoader.scala deleted file mode 100644 index 390e7e3..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/AvroFileDataLoader.scala +++ /dev/null @@ -1,27 +0,0 @@ -package org.apache.griffin.dataLoaderUtils - -import org.apache.griffin.accuracy.AccuracyConfEntity -import org.apache.griffin.validility.ValidityConfEntity -import org.apache.spark.sql.{DataFrame, SQLContext} - -import com.databricks.spark.avro._ - -case class AvroFileDataLoader (sqlc: SQLContext, dataFilePath: String) extends DataLoader(sqlc) { - final val dataAvroFileSuffix = ".avro" - - def getAccuDataFrame(accu: AccuracyConfEntity) : (DataFrame, DataFrame) = { - val srcDf = loadDataFile(dataFilePath + accu.source) - val tgtDf = loadDataFile(dataFilePath + accu.target) - (srcDf, tgtDf) - } - - def getValiDataFrame(vali: ValidityConfEntity) : DataFrame = { - val srcDf = loadDataFile(dataFilePath + vali.dataSet) - srcDf - } - - def loadDataFile(file: String) = { - val fpath = if (file.endsWith(dataAvroFileSuffix)) file else file + dataAvroFileSuffix - sqlContext.read.avro(fpath) - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/CsvFileDataLoader.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/CsvFileDataLoader.scala b/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/CsvFileDataLoader.scala deleted file mode 100644 index f337207..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/CsvFileDataLoader.scala +++ /dev/null @@ -1,45 +0,0 @@ -package org.apache.griffin.dataLoaderUtils - -import java.io.File - -import org.apache.griffin.accuracy.AccuracyConfEntity -import org.apache.griffin.util.{DataTypeUtils, PartitionUtils} -import org.apache.griffin.validility.ValidityConfEntity -import org.apache.spark.sql.types.{StructField, StructType} -import org.apache.spark.sql.{DataFrame, DataFrameReader, SQLContext} - -case class CsvFileDataLoader(sqlc: SQLContext, dataFilePath: String) extends DataLoader(sqlc) { - final val dataDescFileSuffix = "_type" - - def getAccuDataFrame(accu: AccuracyConfEntity) : (DataFrame, DataFrame) = { - val srcDf = loadDataFile(dataFilePath + accu.source) - val tgtDf = loadDataFile(dataFilePath + accu.target) - (srcDf, tgtDf) - } - - def getValiDataFrame(vali: ValidityConfEntity) : DataFrame = { - val srcDf = loadDataFile(dataFilePath + vali.dataSet) - srcDf - } - - def loadDataFile(file: String, fmt: String = "com.databricks.spark.csv") = { - val reader: DataFrameReader = sqlContext.read - .format(fmt) - .option("header", "true") - .option("treatEmptyValuesAsNulls", "true") - .option("delimiter", ",") - - val typeFile = file + dataDescFileSuffix - val typeExist = new File(typeFile).exists() - val schemaReader = if (typeExist) { - val types = reader.load(typeFile).collect.map( r => (r.getString(0), r.getString(1)) ) - val fields = types.map(kt => StructField(kt._1, DataTypeUtils.str2DataType(kt._2))) - val customSchema = StructType(fields) - reader.schema(customSchema) - } else { - reader.option("inferSchema", "true") - } - - schemaReader.load(file) - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/DataLoader.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/DataLoader.scala b/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/DataLoader.scala deleted file mode 100644 index df5a74e..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/DataLoader.scala +++ /dev/null @@ -1,11 +0,0 @@ -package org.apache.griffin.dataLoaderUtils - -import org.apache.griffin.validility.ValidityConfEntity -import org.apache.griffin.accuracy.AccuracyConfEntity -import org.apache.spark.Logging -import org.apache.spark.sql.{DataFrame, SQLContext} - -abstract class DataLoader(val sqlContext: SQLContext) extends Logging { - def getAccuDataFrame(accu: AccuracyConfEntity) : (DataFrame, DataFrame) - def getValiDataFrame(vali: ValidityConfEntity) : DataFrame -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/DataLoaderFactory.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/DataLoaderFactory.scala b/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/DataLoaderFactory.scala deleted file mode 100644 index ced1750..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/DataLoaderFactory.scala +++ /dev/null @@ -1,18 +0,0 @@ -package org.apache.griffin.dataLoaderUtils - -import org.apache.spark.sql.SQLContext - -object DataLoaderFactory { - final val hive = "hive" - final val avro = "avro" - final val csv = "csv" - - def getDataLoader(sqlc: SQLContext, tp: String, dataFilePath: String = ""): DataLoader = { - tp match { - case this.hive => HiveDataLoader(sqlc) - case this.avro => new AvroFileDataLoader(sqlc, dataFilePath) - case this.csv => CsvFileDataLoader(sqlc, dataFilePath) - case _ => null - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/FileLoaderUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/FileLoaderUtil.scala b/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/FileLoaderUtil.scala deleted file mode 100644 index 5130c16..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/FileLoaderUtil.scala +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.griffin.dataLoaderUtils - -object FileLoaderUtil { - def convertPath(path: String) : String = { - path.replace("/", System.getProperty("file.separator")) - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/HiveDataLoader.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/HiveDataLoader.scala b/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/HiveDataLoader.scala deleted file mode 100644 index 8379d54..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/dataLoaderUtils/HiveDataLoader.scala +++ /dev/null @@ -1,20 +0,0 @@ -package org.apache.griffin.dataLoaderUtils - -import org.apache.griffin.accuracy.AccuracyConfEntity -import org.apache.griffin.util.PartitionUtils -import org.apache.griffin.validility.ValidityConfEntity -import org.apache.spark.sql.{DataFrame, SQLContext} - -case class HiveDataLoader(sqlc: SQLContext) extends DataLoader(sqlc) { - - def getAccuDataFrame(accu: AccuracyConfEntity) : (DataFrame, DataFrame) = { - val srcDf = sqlContext.sql(PartitionUtils.generateSourceSQLClause(accu.source, accu.srcPartitions)) - val tgtDf = sqlContext.sql(PartitionUtils.generateTargetSQLClause(accu.target, accu.tgtPartitions)) - (srcDf, tgtDf) - } - - def getValiDataFrame(vali: ValidityConfEntity) : DataFrame = { - val srcDf = sqlContext.sql(PartitionUtils.generateSourceSQLClause(vali.dataSet, vali.timePartitions)) - srcDf - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/package.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/package.scala b/griffin-models/src/main/scala/org/apache/griffin/package.scala deleted file mode 100644 index d0cb7ed..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/package.scala +++ /dev/null @@ -1,9 +0,0 @@ -package org.apache - -package object griffin { - val Accu = org.apache.griffin.accuracy.Accu - val Vali = org.apache.griffin.validility.Vali - - type AccuracyConfEntity = org.apache.griffin.accuracy.AccuracyConfEntity - type ValidityConfEntity = org.apache.griffin.validility.ValidityConfEntity -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/util/DataTypeUtils.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/util/DataTypeUtils.scala b/griffin-models/src/main/scala/org/apache/griffin/util/DataTypeUtils.scala deleted file mode 100644 index 64f7fb5..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/util/DataTypeUtils.scala +++ /dev/null @@ -1,146 +0,0 @@ -package org.apache.griffin.util - -import java.sql.Timestamp -import java.util.Date - -import org.apache.spark.sql.Row -import org.apache.spark.sql.types._ - -object DataTypeUtils { - - final val intType = ("int", """^[Ii]nt(?:eger)?(?:Type)?$""".r, IntegerType, RowGetFunc.getInt _) - final val shortType = ("short", """^[Ss]hort(?:Type)?[Ss]mall(?:[Ii]nt)?$""".r, ShortType, RowGetFunc.getShort _) - final val longType = ("long", """^[Ll]ong(?:Type)?|[Bb]ig(?:[Ii]nt)?$""".r, LongType, RowGetFunc.getLong _) - final val byteType = ("byte", """^[Bb]yte(?:Type)?|[Tt]iny(?:[Ii]nt)?$""".r, ByteType, RowGetFunc.getByte _) - final val floatType = ("float", """^[Ff]loat(?:Type)?$""".r, FloatType, RowGetFunc.getFloat _) - final val doubleType = ("double", """^[Dd]ouble(?:Type)?$""".r, DoubleType, RowGetFunc.getDouble _) - final val dateType = ("date", """^[Dd]ate(?:Type)?$""".r, DateType, RowGetFunc.getDate _) - final val timestampType = ("timestamp", """^[Tt]ime(?:[Ss]tamp)?(?:Type)?$""".r, TimestampType, RowGetFunc.getTimestamp _) - final val stringType = ("string", """^[Ss]tr(?:ing)?(?:Type)?|[Vv]ar(?:[Cc]har)?|[Cc]har$""".r, StringType, RowGetFunc.getString _) - final val booleanType = ("boolean", """^[Bb]ool(?:ean)?(?:Type)?|[Bb]inary$""".r, BooleanType, RowGetFunc.getBoolean _) - - def str2DataType(tp: String): DataType = { - tp match { - case intType._2() => intType._3 - case shortType._2() => shortType._3 - case longType._2() => longType._3 - case byteType._2() => byteType._3 - case floatType._2() => floatType._3 - case doubleType._2() => doubleType._3 - case dateType._2() => dateType._3 - case timestampType._2() => timestampType._3 - case stringType._2() => stringType._3 - case booleanType._2() => booleanType._3 - case _ => stringType._3 - } - } - - def str2RowGetFunc(tp: String): (Row, Int) => Any = { - tp match { - case intType._2() => intType._4 - case shortType._2() => shortType._4 - case longType._2() => longType._4 - case byteType._2() => byteType._4 - case floatType._2() => floatType._4 - case doubleType._2() => doubleType._4 - case dateType._2() => dateType._4 - case timestampType._2() => timestampType._4 - case stringType._2() => stringType._4 - case booleanType._2() => booleanType._4 - case _ => stringType._4 - } - } - - def dataType2Str(dt: DataType): String = { - dt match { - case intType._3 => intType._1 - case shortType._3 => shortType._1 - case longType._3 => longType._1 - case byteType._3 => byteType._1 - case floatType._3 => floatType._1 - case doubleType._3 => doubleType._1 - case dateType._3 => dateType._1 - case timestampType._3 => timestampType._1 - case stringType._3 => stringType._1 - case booleanType._3 => booleanType._1 - case _ => stringType._1 - } - } - - def dataType2RowGetFunc(dt: DataType): (Row, Int) => Any = { - dt match { - case intType._3 => intType._4 - case shortType._3 => shortType._4 - case longType._3 => longType._4 - case byteType._3 => byteType._4 - case floatType._3 => floatType._4 - case doubleType._3 => doubleType._4 - case dateType._3 => dateType._4 - case timestampType._3 => timestampType._4 - case stringType._3 => stringType._4 - case booleanType._3 => booleanType._4 - case _ => stringType._4 - } - } - - def isNum(tp: String): Boolean = { - tp match { - case intType._2() => true - case shortType._2() => true - case longType._2() => true - case byteType._2() => true - case floatType._2() => true - case doubleType._2() => true - case _ => false - } - } - -} - - -object RowGetFunc { - def getInt(r: Row, col: Int) = { r.getInt(col) } - def getShort(r: Row, col: Int) = { r.getShort(col) } - def getLong(r: Row, col: Int) = { r.getLong(col) } - def getByte(r: Row, col: Int) = { r.getByte(col) } - def getFloat(r: Row, col: Int) = { r.getFloat(col) } - def getDouble(r: Row, col: Int) = { r.getDouble(col) } - def getDate(r: Row, col: Int) = { r.getDate(col) } - def getTimestamp(r: Row, col: Int) = { r.getTimestamp(col) } - def getString(r: Row, col: Int) = { r.getString(col) } - def getBoolean(r: Row, col: Int) = { r.getBoolean(col) } -} - -object DataConverter { - def getDouble(data: Any): Double = { - data match { - case x: Double => x - case x: Int => x.toDouble - case x: Short => x.toDouble - case x: Long => x.toDouble - case x: Byte => x.toDouble - case x: Float => x.toDouble - case x: Date => x.getTime -// case x: Timestamp => x.getTime - case x: String => x.toDouble - case x: Boolean => if (x) 1 else 0 - case _ => 0 - } - } - - def getString(data: Any): String = { - data match { - case x: String => x - case x: Int => x.toString - case x: Short => x.toString - case x: Long => x.toString - case x: Byte => x.toString - case x: Float => x.toString - case x: Double => x.toString - case x: Date => x.toString -// case x: Timestamp => x.toString - case x: Boolean => x.toString - case _ => "" - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/util/HdfsUtils.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/util/HdfsUtils.scala b/griffin-models/src/main/scala/org/apache/griffin/util/HdfsUtils.scala deleted file mode 100644 index a3d23ec..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/util/HdfsUtils.scala +++ /dev/null @@ -1,27 +0,0 @@ -package org.apache.griffin.util - -import java.io.File - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} - -object HdfsUtils { - - private val conf = new Configuration() - - private val dfs = FileSystem.get(conf) - - def createFile(filePath: String): FSDataOutputStream = { - return dfs.create(new Path(filePath)) - } - - def openFile(filePath: String): FSDataInputStream = { - return dfs.open(new Path(filePath)) - } - - def writeFile(filePath: String, message: String): Unit = { - val out = createFile(filePath) - out.write(message.getBytes("utf-8")) - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/util/PartitionUtils.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/util/PartitionUtils.scala b/griffin-models/src/main/scala/org/apache/griffin/util/PartitionUtils.scala deleted file mode 100644 index a92d6d8..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/util/PartitionUtils.scala +++ /dev/null @@ -1,31 +0,0 @@ -package org.apache.griffin.util - -import org.apache.griffin.common.PartitionPair - -object PartitionUtils { - def generateWhereClause(partition: List[PartitionPair]): String = { - var first = true - partition.foldLeft("") { (clause, pair) => - if (first) { - first = false - s"where ${pair.colName} = ${pair.colValue}" - } - else s"$clause AND ${pair.colName} = ${pair.colValue}" - } - } - - def generateSourceSQLClause(sourceTable: String, partition: List[PartitionPair]): String = { - s"SELECT * FROM $sourceTable ${generateWhereClause(partition)}" - } - - def generateTargetSQLClause(targetTable: String, partitions: List[List[PartitionPair]]): String = { - var first = true - partitions.foldLeft(s"SELECT * FROM $targetTable") { (clause, partition) => - if (first) { - first = false - s"$clause ${generateWhereClause(partition)}" - } - else s"$clause UNION ALL SELECT * FROM $targetTable ${generateWhereClause(partition)}" - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/validility/MetricsType.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/validility/MetricsType.scala b/griffin-models/src/main/scala/org/apache/griffin/validility/MetricsType.scala deleted file mode 100644 index acc831f..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/validility/MetricsType.scala +++ /dev/null @@ -1,16 +0,0 @@ -package org.apache.griffin.validility - -object MetricsType extends Enumeration{ - type MetricsType = Value - val DefaultCount = Value(0, "defaultCount") - val TotalCount = Value(1, "totalCount") - val NullCount = Value(2, "nullCount") - val UniqueCount = Value(3, "uniqueCount") - val DuplicateCount = Value(4, "duplicateCount") - val Maximum = Value(5, "maximum") - val Minimum = Value(6, "minimum") - val Mean = Value(7, "mean") - val Median = Value(8, "median") - val RegularExp = Value(9, "regularExp") - val PatternFreq = Value(10, "patternFreq") -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/validility/Vali.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/validility/Vali.scala b/griffin-models/src/main/scala/org/apache/griffin/validility/Vali.scala deleted file mode 100644 index 4c5b9f4..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/validility/Vali.scala +++ /dev/null @@ -1,211 +0,0 @@ -package org.apache.griffin.validility - -import org.apache.griffin.dataLoaderUtils.DataLoaderFactory -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.apache.griffin.util.{DataConverter, DataTypeUtils, HdfsUtils, PartitionUtils} -import org.apache.spark.{Logging, SparkConf, SparkContext} -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics} -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.hive.HiveContext - -object Vali extends Logging { - - def main(args: Array[String]): Unit ={ - if (args.length < 2) { - logError("Usage: class <input-conf-file> <outputPath>") - logError("For input-conf-file, please use vali_config.json as an template to reflect test dataset accordingly.") - sys.exit(-1) - } - val input = HdfsUtils.openFile(args(0)) - - val outputPath = args(1) + System.getProperty("file.separator") - - //add files for job scheduling - val startFile = outputPath + "_START" - val resultFile = outputPath + "_RESULT" - val doneFile = outputPath + "_FINISHED" - - val mapper = new ObjectMapper() - mapper.registerModule(DefaultScalaModule) - - //read the config info of comparison - val configure = mapper.readValue(input, classOf[ValidityConfEntity]) - - val conf = new SparkConf().setAppName("Vali") - val sc: SparkContext = new SparkContext(conf) - val sqlContext = new HiveContext(sc) - - //add spark applicationId for debugging - val applicationId = sc.applicationId - - //for spark monitoring - HdfsUtils.writeFile(startFile, applicationId) - - //get data - val dataLoader = DataLoaderFactory.getDataLoader(sqlContext, DataLoaderFactory.hive) - val sojdf = dataLoader.getValiDataFrame(configure) - - //-- algorithm -- - calcVali(configure, sojdf) - - //--output metrics data-- - val out = HdfsUtils.createFile(resultFile) - mapper.writeValue(out, configure) - - //for spark monitoring - HdfsUtils.createFile(doneFile) - - sc.stop() - } - - def calcVali(configure: ValidityConfEntity, sojdf: DataFrame) : Unit = { - val dfCount = sojdf.count() - - //--1. get all cols name, and types-- - val fnts = sojdf.schema.fields.map(x => (x.name, x.dataType.simpleString)).toMap - - //get col type - val req: List[ValidityReq] = configure.validityReq.map { r => - val fv = fnts.getOrElse(r.colName, None) - if (fv != None) { - r.colType = fv.toString - r.isNum = DataTypeUtils.isNum(r.colType) - } - r - } - - //--2. calc num cols metrics-- - val numcols = req.filter(r => r.isNum) - - val numIdx = numcols.map(c => c.colId).toArray - val numIdxZip = numIdx.zipWithIndex - val numColsCount = numcols.length - - //median number function - def funcMedian(df: DataFrame, col: Int): Double = { - val dt = sojdf.schema(col).dataType - val getFunc = DataTypeUtils.dataType2RowGetFunc(dt) - - val mp = df.map { v => - if (v.isNullAt(col)) (0.0, 0L) - else (DataConverter.getDouble(getFunc(v, col)), 1L) - }.reduceByKey(_+_) - val allCnt = mp.aggregate(0L)((c, m) => c + m._2, _+_) - val cnt = mp.sortByKey().collect() - var tmp, tmp1 = 0L - var median, median1 = cnt(0)._1 - if (allCnt % 2 != 0) { - for (i <- 0 until cnt.length if (tmp < allCnt / 2 + 1)) { - tmp += cnt(i)._2 - median = cnt(i)._1 - } - median - } else { - for (i <- 0 until cnt.length if (tmp1 < allCnt / 2 + 1)) { - tmp1 += cnt(i)._2 - median1 = cnt(i)._1 - if (tmp < allCnt / 2) { - tmp = tmp1 - median = median1 - } - } - (median + median1) / 2 - } - } - - //match num metrics request - def getNumStats(smry: MultivariateStatisticalSummary, df: DataFrame, op: Int, col: Int): Any = { - val i = numIdx.indexWhere(_ == col) - if (i >= 0) { - MetricsType(op) match { - case MetricsType.TotalCount => smry.count - case MetricsType.Maximum => smry.max(i) - case MetricsType.Minimum => smry.min(i) - case MetricsType.Mean => smry.mean(i) - case MetricsType.Median => funcMedian(df, col) -// case MetricsType.Variance => smry.variance(i) -// case MetricsType.NumNonZeros => smry.numNonzeros(i) - case _ => None - } - } - } - - if (numColsCount > 0) { - val idxType = numIdxZip.map(i => (i._2, i._1, sojdf.schema(i._1).dataType)) - - //calc metrics of all numeric cols once - val numcolVals = sojdf.map { row => - val vals = idxType.foldLeft((List[Int](), List[Double]())) { (arr, i) => - if (row.isNullAt(i._2)) arr - else { - val v = DataTypeUtils.dataType2RowGetFunc(i._3)(row, i._2) - (i._1 :: arr._1, DataConverter.getDouble(v) :: arr._2) - } - } - Vectors.sparse(numColsCount, vals._1.toArray, vals._2.toArray) - } - - val summary = Statistics.colStats(numcolVals) - - //get numeric metrics from summary - numcols.foreach(vr => vr.metrics.foreach(mc => mc.result = getNumStats(summary, sojdf, mc.name, vr.colId))) - } - - //--3. calc str/other cols metrics-- - val strcols = req.filter(r => !r.isNum) - - //count function - def funcCount(df: DataFrame, col: Int): Long = { - dfCount - } - //null count function - def funcNullCount(df: DataFrame, col: Int): Long = { - val nullRow = df.map(row => if (row.isNullAt(col)) 1L else 0) - nullRow.fold(0)((a,b)=>a+b) - } - //unique count function - def funcUniqueCount(df: DataFrame, col: Int): Long = { - val dt = sojdf.schema(col).dataType - val getFunc = DataTypeUtils.dataType2RowGetFunc(dt) - - val mp = df.map(v=>(DataConverter.getString(getFunc(v, col))->1L)) - val rs = mp.reduceByKey(_+_) - rs.count() - } - //duplicate count function - def funcDuplicateCount(df: DataFrame, col: Int): Long = { - val dt = sojdf.schema(col).dataType - val getFunc = DataTypeUtils.dataType2RowGetFunc(dt) - - val mp = df.map(v=>(DataConverter.getString(getFunc(v, col))->1L)) - val rs = mp.reduceByKey(_+_) - rs.aggregate(0)((s, v) => if (v._2 == 1) s else s + 1, (s1, s2) => s1 + s2) - } - - //regex and match str metrics request - def getStrResult(df: DataFrame, op: Int, col: Int): Any = { - MetricsType(op) match { - case MetricsType.TotalCount => funcCount(df, col) - case MetricsType.NullCount => funcNullCount(df, col) - case MetricsType.UniqueCount => funcUniqueCount(df, col) - case MetricsType.DuplicateCount => funcDuplicateCount(df, col) - case _ => None - } - } - - if (strcols.length > 0) { - //calc str metrics one by one - strcols.foreach(vr => vr.metrics.foreach(mc => mc.result = getStrResult(sojdf, mc.name, vr.colId))) - } - - //union the num cols and str cols metrics, and put the result into configure object - val rsltCols = numcols.union(strcols) - configure.validityReq = rsltCols - - //output: need to change - logInfo("== result ==\n" + rsltCols) - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/validility/ValidityConfEntity.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/validility/ValidityConfEntity.scala b/griffin-models/src/main/scala/org/apache/griffin/validility/ValidityConfEntity.scala deleted file mode 100644 index b488a15..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/validility/ValidityConfEntity.scala +++ /dev/null @@ -1,17 +0,0 @@ -package org.apache.griffin.validility - -import org.apache.griffin.common.PartitionPair - -class ValidityConfEntity { - var dataSet: String = _ - - var validityReq: List[ValidityReq] = List() - - var timePartitions: List[PartitionPair] = List() - - override def toString = "dataSet: " +dataSet+", validityReq: " +validityReq - -// { -// s"dataSet: $dataSet, validityReq: $validityReq" -// } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/main/scala/org/apache/griffin/validility/ValidityReq.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/main/scala/org/apache/griffin/validility/ValidityReq.scala b/griffin-models/src/main/scala/org/apache/griffin/validility/ValidityReq.scala deleted file mode 100644 index 5345269..0000000 --- a/griffin-models/src/main/scala/org/apache/griffin/validility/ValidityReq.scala +++ /dev/null @@ -1,22 +0,0 @@ -package org.apache.griffin.validility - -class ValidityReq { - var colId: Int = _ - var colName: String = _ - - var colType: String = _ - var isNum: Boolean = _ - - var metrics: List[MetricsReq] = List() - - override def toString = "colId: "+colId+", colName: "+colName+", colType: "+colType+", isNum: "+isNum+", metrics: "+metrics - -} - -class MetricsReq { - var name: Int = _ - var result: Any = _ - - override def toString = "name: "+name+", result: "+result - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/test/scala/modelTest/AccuTest.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/test/scala/modelTest/AccuTest.scala b/griffin-models/src/test/scala/modelTest/AccuTest.scala deleted file mode 100644 index 7f0699f..0000000 --- a/griffin-models/src/test/scala/modelTest/AccuTest.scala +++ /dev/null @@ -1,86 +0,0 @@ -package modelTest - -import org.apache.griffin._ -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.apache.spark.{SparkConf, SparkContext} -import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -import org.apache.spark.sql.{DataFrame, SQLContext} -import java.io.{FileInputStream, FileOutputStream} - -import org.apache.griffin.dataLoaderUtils.{DataLoaderFactory, FileLoaderUtil} - -import scala.collection.mutable.MutableList -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -@RunWith(classOf[JUnitRunner]) -class AccuTest extends FunSuite with Matchers with BeforeAndAfter { - - val dataFilePath = FileLoaderUtil.convertPath("data/test/dataFile/") - val reqJsonPath = FileLoaderUtil.convertPath("data/test/reqJson/") - val recordFilePath = FileLoaderUtil.convertPath("data/test/recordFile/") - val recordFileName = "_RESULT_ACCU" - - case class AccuData() { - var cnt: Int = _ - var reqJson: String = _ - var configure: AccuracyConfEntity = _ - var dataFrameSrc: DataFrame = _ - var dataFrameTgt: DataFrame = _ - var result: ((Long, Long), List[String]) = _ - } - val accuDatas = MutableList[AccuData]() - - var sc: SparkContext = _ - - before { - val conf = new SparkConf().setMaster("local[*]").setAppName("AccTest") - sc = new SparkContext(conf) - val sqlContext = new SQLContext(sc) - val mapper = new ObjectMapper() - mapper.registerModule(DefaultScalaModule) - - var cnt = 1; - val accTests = List("accuAvroTest.json") - for (tf <- accTests) { - val reqJson = reqJsonPath + tf - val accuData = new AccuData() - accuData.cnt = cnt - accuData.reqJson = reqJson - val input = new FileInputStream(reqJson) - accuData.configure = mapper.readValue(input, classOf[AccuracyConfEntity]) - val dataLoader = DataLoaderFactory.getDataLoader(sqlContext, DataLoaderFactory.avro, dataFilePath) - val dfs = dataLoader.getAccuDataFrame(accuData.configure) - accuData.dataFrameSrc = dfs._1 - accuData.dataFrameTgt = dfs._2 - accuDatas += accuData - cnt += 1 - } - } - - test("test accuracy requests") { - for (accuData <- accuDatas) { - //-- algorithm -- - accuData.result = Accu.calcAccu(accuData.configure, accuData.dataFrameSrc, accuData.dataFrameTgt) - } - } - - after { - val out = new FileOutputStream(recordFilePath + recordFileName) - for (accuData <- accuDatas) { - //output - out.write(("//" + "=" * 10).getBytes("utf-8")) - out.write((s" ${accuData.cnt}. Test Accuracy model result with request file: ${accuData.reqJson} ").getBytes("utf-8")) - out.write(("=" * 10 + "\n").getBytes("utf-8")) - - val ((missCount, srcCount), missedList) = accuData.result - val rslt = s"match percentage: ${((1 - missCount.toDouble / srcCount) * 100)} %" - val rcds = missedList.mkString("\n") - val rcd = rslt + "\n\n" + rcds + "\n\n"; - - out.write(rcd.getBytes("utf-8")) - } - out.close() - sc.stop() - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/test/scala/modelTest/ValiTest.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/test/scala/modelTest/ValiTest.scala b/griffin-models/src/test/scala/modelTest/ValiTest.scala deleted file mode 100644 index b487f66..0000000 --- a/griffin-models/src/test/scala/modelTest/ValiTest.scala +++ /dev/null @@ -1,80 +0,0 @@ -package modelTest - -import org.apache.griffin._ -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule -import org.apache.spark.{SparkConf, SparkContext} -import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} -import org.apache.spark.sql.{DataFrame, SQLContext} -import java.io.{FileInputStream, FileOutputStream} - -import org.apache.griffin.dataLoaderUtils.{DataLoaderFactory, FileLoaderUtil} - -import scala.collection.mutable.MutableList -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner - - -@RunWith(classOf[JUnitRunner]) -class ValiTest extends FunSuite with Matchers with BeforeAndAfter { - - val dataFilePath = FileLoaderUtil.convertPath("data/test/dataFile/") - val reqJsonPath = FileLoaderUtil.convertPath("data/test/reqJson/") - val recordFilePath = FileLoaderUtil.convertPath("data/test/recordFile/") - val recordFileName = "_RESULT_VALI" - - case class ValiData() { - var cnt: Int = _ - var reqJson: String = _ - var configure: ValidityConfEntity = _ - var dataFrame: DataFrame = _ - } - val valiDatas = MutableList[ValiData]() - - var sc: SparkContext = _ - - before { - val conf = new SparkConf().setMaster("local[*]").setAppName("AccTest") - sc = new SparkContext(conf) - val sqlContext = new SQLContext(sc) - val mapper = new ObjectMapper() - mapper.registerModule(DefaultScalaModule) - - var cnt = 1; - val valiTests = List("valiAvroTest.json") - for (tf <- valiTests) { - val reqJson = reqJsonPath + tf - val valiData = new ValiData() - valiData.cnt = cnt - valiData.reqJson = reqJson - val input = new FileInputStream(reqJson) - valiData.configure = mapper.readValue(input, classOf[ValidityConfEntity]) - val dataLoader = DataLoaderFactory.getDataLoader(sqlContext, DataLoaderFactory.avro, dataFilePath) - valiData.dataFrame = dataLoader.getValiDataFrame(valiData.configure) - valiDatas += valiData - cnt += 1 - } - } - - test("test validity requests") { - for (valiData <- valiDatas) { - //-- algorithm -- - Vali.calcVali(valiData.configure, valiData.dataFrame) - } - } - - after { - val out = new FileOutputStream(recordFilePath + recordFileName) - for (valiData <- valiDatas) { - //output - out.write(("//" + "=" * 10).getBytes("utf-8")) - out.write((s" ${valiData.cnt}. Test Validity model result with request file: ${valiData.reqJson} ").getBytes("utf-8")) - out.write(("=" * 10 + "\n").getBytes("utf-8")) - - val rcd = valiData.configure.toString + "\n\n" - out.write(rcd.getBytes("utf-8")) - } - out.close() - sc.stop() - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/test/scala/samples/junit.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/test/scala/samples/junit.scala b/griffin-models/src/test/scala/samples/junit.scala deleted file mode 100644 index 89513d5..0000000 --- a/griffin-models/src/test/scala/samples/junit.scala +++ /dev/null @@ -1,17 +0,0 @@ -package samples - -import org.junit._ -import Assert._ - -@Test -class AppTest { - - @Test - def testOK() = assertTrue(true) - -// @Test -// def testKO() = assertTrue(false) - -} - - http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/test/scala/samples/scalatest.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/test/scala/samples/scalatest.scala b/griffin-models/src/test/scala/samples/scalatest.scala deleted file mode 100644 index d326656..0000000 --- a/griffin-models/src/test/scala/samples/scalatest.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright 2001-2009 Artima, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package samples - -/* -ScalaTest facilitates different styles of testing by providing traits you can mix -together to get the behavior and syntax you prefer. A few examples are -included here. For more information, visit: - -http://www.scalatest.org/ - -One way to use ScalaTest is to help make JUnit or TestNG tests more -clear and concise. Here's an example: -*/ -import scala.collection.mutable.Stack -import org.scalatest.Assertions -import org.junit.Test - -class StackSuite extends Assertions { - -// @Test def stackShouldPopValuesIinLastInFirstOutOrder() { -// val stack = new Stack[Int] -// stack.push(1) -// stack.push(2) -// assert(stack.pop() === 2) -// assert(stack.pop() === 1) -// } - - @Test def stackShouldThrowNoSuchElementExceptionIfAnEmptyStackIsPopped() { - val emptyStack = new Stack[String] - intercept[NoSuchElementException] { - emptyStack.pop() - } - } -} - -/* -Here's an example of a FunSuite with ShouldMatchers mixed in: -*/ -import org.scalatest.FunSuite -import org.scalatest.matchers.ShouldMatchers - -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -@RunWith(classOf[JUnitRunner]) -class ListSuite extends FunSuite with ShouldMatchers { - - test("An empty list should be empty") { - List() should be ('empty) - Nil should be ('empty) - } - - test("A non-empty list should not be empty") { - List(1, 2, 3) should not be ('empty) - List("fee", "fie", "foe", "fum") should not be ('empty) - } - - test("A list's length should equal the number of elements it contains") { - List() should have length (0) - List(1, 2) should have length (2) - List("fee", "fie", "foe", "fum") should have length (4) - } -} - -/* -ScalaTest also supports the behavior-driven development style, in which you -combine tests with text that specifies the behavior being tested. Here's -an example whose text output when run looks like: - -A Map -- should only contain keys and values that were added to it -- should report its size as the number of key/value pairs it contains -*/ -import org.scalatest.FunSpec -import scala.collection.mutable.Stack - -class ExampleSpec extends FunSpec { - -// describe("A Stack") { -// -// it("should pop values in last-in-first-out order") { -// val stack = new Stack[Int] -// stack.push(1) -// stack.push(2) -// assert(stack.pop() === 2) -// assert(stack.pop() === 1) -// } -// -// it("should throw NoSuchElementException if an empty stack is popped") { -// val emptyStack = new Stack[Int] -// intercept[NoSuchElementException] { -// emptyStack.pop() -// } -// } -// } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/src/test/scala/samples/specs.scala ---------------------------------------------------------------------- diff --git a/griffin-models/src/test/scala/samples/specs.scala b/griffin-models/src/test/scala/samples/specs.scala deleted file mode 100644 index 9e4dfe9..0000000 --- a/griffin-models/src/test/scala/samples/specs.scala +++ /dev/null @@ -1,31 +0,0 @@ -package samples - -import org.junit.runner.RunWith -import org.specs2.mutable._ -import org.specs2.runner._ - - -/** - * Sample specification. - * - * This specification can be executed with: scala -cp <your classpath=""> ${package}.SpecsTest - * Or using maven: mvn test - * - * For more information on how to write or run specifications, please visit: - * http://etorreborre.github.com/specs2/guide/org.specs2.guide.Runners.html - * - */ -@RunWith(classOf[JUnitRunner]) -class MySpecTest extends Specification { - "The 'Hello world' string" should { - "contain 11 characters" in { - "Hello world" must have size(11) - } - "start with 'Hello'" in { - "Hello world" must startWith("Hello") - } - "end with 'world'" in { - "Hello world" must endWith("world") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-models/vali_config.json ---------------------------------------------------------------------- diff --git a/griffin-models/vali_config.json b/griffin-models/vali_config.json deleted file mode 100644 index a07c4cd..0000000 --- a/griffin-models/vali_config.json +++ /dev/null @@ -1,131 +0,0 @@ -{ - "dataSet": "users_info_src", - "validityReq": [ - { - "colId": 0, - "colName": "user_id", - "metrics": [ - { - "name": 1 - }, - { - "name": 2 - }, - { - "name": 3 - }, - { - "name": 4 - } - ] - }, - { - "colId": 1, - "colName": "first_name", - "metrics": [ - { - "name": 1 - }, - { - "name": 2 - }, - { - "name": 3 - }, - { - "name": 4 - } - ] - }, - { - "colId": 2, - "colName": "last_name", - "metrics": [ - { - "name": 1 - }, - { - "name": 2 - }, - { - "name": 3 - }, - { - "name": 4 - } - ] - }, - { - "colId": 3, - "colName": "address", - "metrics": [ - { - "name": 1 - }, - { - "name": 2 - }, - { - "name": 3 - }, - { - "name": 4 - } - ] - }, - { - "colId": 4, - "colName": "email", - "metrics": [ - { - "name": 1 - }, - { - "name": 2 - }, - { - "name": 3 - }, - { - "name": 4 - } - ] - }, - { - "colId": 5, - "colName": "phone", - "metrics": [ - { - "name": 1 - }, - { - "name": 2 - }, - { - "name": 3 - }, - { - "name": 4 - } - ] - }, - { - "colId": 6, - "colName": "post_code", - "metrics": [ - { - "name": 1 - }, - { - "name": 2 - }, - { - "name": 3 - }, - { - "name": 4 - } - ] - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-scheduler/pom.xml ---------------------------------------------------------------------- diff --git a/griffin-scheduler/pom.xml b/griffin-scheduler/pom.xml deleted file mode 100644 index 0231c6e..0000000 --- a/griffin-scheduler/pom.xml +++ /dev/null @@ -1,83 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- Copyright (c) 2016 eBay Software Foundation. Licensed under the Apache - License, Version 2.0 (the "License"); you may not use this file except in - compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed - under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES - OR CONDITIONS OF ANY KIND, either express or implied. See the License for - the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <parent> - <groupId>com.ebay.oss</groupId> - <artifactId>griffin-parent</artifactId> - <version>0.1.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - <artifactId>griffin-scheduler</artifactId> - <name>griffin-scheduler</name> - <packaging>jar</packaging> - - - <dependencies> - -<!-- <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <version>2.4</version> - </dependency> - - --> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - - - - - <dependency> - <groupId>org.mongodb</groupId> - <artifactId>mongo-java-driver</artifactId> - <version>${mongo.version}</version> - </dependency> - - -<!-- <dependency> - <groupId>com.google.code.morphia</groupId> - <artifactId>morphia</artifactId> - <version>0.104</version> - </dependency> - - <dependency> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - </dependency> --> - - - - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> - </dependencies> - - <build> - <finalName>griffin-scheduler</finalName> - <plugins> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <testFailureIgnore>true</testFailureIgnore> - </configuration> - </plugin> - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-scheduler/src/main/java/org/apache/bark/scheduler/BarkScheduler.java ---------------------------------------------------------------------- diff --git a/griffin-scheduler/src/main/java/org/apache/bark/scheduler/BarkScheduler.java b/griffin-scheduler/src/main/java/org/apache/bark/scheduler/BarkScheduler.java deleted file mode 100644 index f1cd084..0000000 --- a/griffin-scheduler/src/main/java/org/apache/bark/scheduler/BarkScheduler.java +++ /dev/null @@ -1,11 +0,0 @@ -package org.apache.griffin.scheduler; -/** - * TODO::This is for the next version design, currently scheduler is in bark-core - * @author lzhixing - * - */ -public class BarkScheduler { - // public String sayHello(){ - // return "Hello"; - // } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-scheduler/src/test/java/org/apache/bark/scheduler/BarkSchedulerTest.java ---------------------------------------------------------------------- diff --git a/griffin-scheduler/src/test/java/org/apache/bark/scheduler/BarkSchedulerTest.java b/griffin-scheduler/src/test/java/org/apache/bark/scheduler/BarkSchedulerTest.java deleted file mode 100644 index 70b75e2..0000000 --- a/griffin-scheduler/src/test/java/org/apache/bark/scheduler/BarkSchedulerTest.java +++ /dev/null @@ -1,13 +0,0 @@ -package org.apache.griffin.scheduler; - - -public class BarkSchedulerTest { - // @Test - // public void testSayHello(){ - // BarkScheduler sch = new BarkScheduler(); - // sch.sayHello(); - // assertEquals("Hello", sch.sayHello()); - // - // } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/griffin-ui/.gitignore ---------------------------------------------------------------------- diff --git a/griffin-ui/.gitignore b/griffin-ui/.gitignore deleted file mode 100644 index 90ad874..0000000 --- a/griffin-ui/.gitignore +++ /dev/null @@ -1,6 +0,0 @@ -node_modules -test-coverage -build -/.idea/ -*.iml -target \ No newline at end of file
