Repository: incubator-griffin Updated Branches: refs/heads/master 1374427fb -> 00cbd1e56
update dq job spec Author: William Guo <gu...@apache.org> Closes #376 from guoyuepeng/update_griffin_job_spec. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/00cbd1e5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/00cbd1e5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/00cbd1e5 Branch: refs/heads/master Commit: 00cbd1e56c2a3247aae1033903f7f68a0e26e8b5 Parents: 1374427 Author: William Guo <gu...@apache.org> Authored: Tue Jul 31 15:04:58 2018 +0800 Committer: Lionel Liu <bhlx3l...@163.com> Committed: Tue Jul 31 15:04:58 2018 +0800 ---------------------------------------------------------------------- .../main/resources/config-batch-advanced.json | 61 ++++++++++++++++++++ measure/src/main/resources/config-batch.json | 24 ++------ .../src/main/resources/config-streaming.json | 50 ++++++++++------ measure/src/main/resources/env-batch.json | 12 ++-- measure/src/main/resources/env-streaming.json | 12 ++-- 5 files changed, 110 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/00cbd1e5/measure/src/main/resources/config-batch-advanced.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/config-batch-advanced.json b/measure/src/main/resources/config-batch-advanced.json new file mode 100644 index 0000000..96ae245 --- /dev/null +++ b/measure/src/main/resources/config-batch-advanced.json @@ -0,0 +1,61 @@ +{ + "name": "accu_batch", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "as.baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + }, { + "name": "target", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_target.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "accuracy", + "out.dataframe.name": "accu", + "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code", + "alias": { + "source": "source", + "target": "target", + "miss": "miss_count", + "total": "total_count", + "matched": "matched_count" + }, + "out":[ + { + "type":"metric", + "name": "accu" + }, + { + "type":"record", + "name": "missRecords" + } + ] + } + ] + }, + "sinks": ["CONSOLE","ELASTICSEARCH"] +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/00cbd1e5/measure/src/main/resources/config-batch.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/config-batch.json b/measure/src/main/resources/config-batch.json index 10167cd..f1b03fb 100644 --- a/measure/src/main/resources/config-batch.json +++ b/measure/src/main/resources/config-batch.json @@ -6,7 +6,7 @@ "data.sources": [ { "name": "source", - "baseline": true, + "as.baseline": true, "connectors": [ { "type": "avro", @@ -35,22 +35,10 @@ { "dsl.type": "griffin-dsl", "dq.type": "accuracy", - "name": "accu", - "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code", - "details": { - "source": "source", - "target": "target", - "miss": "miss_count", - "total": "total_count", - "matched": "matched_count" - }, - "metric": { - "name": "accu" - }, - "record": { - "name": "missRecords" - } + "out.dataframe.name": "accu", + "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code" } ] - } -} \ No newline at end of file + }, + "sinks": ["CONSOLE","ELASTICSEARCH"] +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/00cbd1e5/measure/src/main/resources/config-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/config-streaming.json b/measure/src/main/resources/config-streaming.json index 243a691..15a91ac 100644 --- a/measure/src/main/resources/config-streaming.json +++ b/measure/src/main/resources/config-streaming.json @@ -10,6 +10,7 @@ { "type": "kafka", "version": "0.8", + "dataframe.name" : "kafka", "config": { "kafka.config": { "bootstrap.servers": "10.147.177.107:9092", @@ -24,21 +25,21 @@ "pre.proc": [ { "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } + "in.dataframe.name": "kafka", + "out.dataframe.name": "out1", + "rule": "from_json" + }, { "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" + "in.dataframe.name":"out1", + "out.datafrmae.name": "out3", + "rule": "select name, age from out1" } ] } ], - "cache": { + "checkpoint": { "file.path": "hdfs://localhost/griffin/streaming/dump/source", "info.path": "source", "ready.time.interval": "10s", @@ -54,22 +55,33 @@ { "dsl.type": "griffin-dsl", "dq.type": "profiling", - "name": "prof", + "out.dataframe.name": "prof", "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source", - "metric": { - "name": "prof" - } + "out":[ + { + "type": "metric", + "name": "prof" + } + ] }, { "dsl.type": "griffin-dsl", "dq.type": "profiling", - "name": "grp", + "out.dataframe.name": "grp", "rule": "select name, count(*) as `cnt` from source group by name", - "metric": { - "name": "name_group", - "collect.type": "array" - } + "out":[ + { + "type": "array", + "name": "name_group", + "flatten":"array" + }, + { + "type": "record", + "name": "missRecords" + } + ] } ] - } -} \ No newline at end of file + }, + "sinks": ["ELASTICSEARCH"] +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/00cbd1e5/measure/src/main/resources/env-batch.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/env-batch.json b/measure/src/main/resources/env-batch.json index 3e8aa80..024ad80 100644 --- a/measure/src/main/resources/env-batch.json +++ b/measure/src/main/resources/env-batch.json @@ -6,15 +6,15 @@ } }, - "persist": [ + "sinks": [ { - "type": "log", + "type": "CONSOLE", "config": { "max.log.lines": 10 } }, { - "type": "hdfs", + "type": "HDFS", "config": { "path": "hdfs://localhost/griffin/batch/persist", "max.persist.lines": 10000, @@ -22,11 +22,11 @@ } }, { - "type": "http", + "type": "ELASTICSEARCH", "config": { "method": "post", "api": "http://10.148.181.248:39200/griffin/accuracy", - "over.time": "1m", + "connection.timeout": "1m", "retry": 10 } } @@ -35,4 +35,4 @@ "info.cache": [], "cleaner": {} -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/00cbd1e5/measure/src/main/resources/env-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/env-streaming.json b/measure/src/main/resources/env-streaming.json index 6871bb9..83ff6ab 100644 --- a/measure/src/main/resources/env-streaming.json +++ b/measure/src/main/resources/env-streaming.json @@ -18,15 +18,15 @@ } }, - "persist": [ + "sinks": [ { - "type": "log", + "type": "CONSOLE", "config": { "max.log.lines": 100 } }, { - "type": "hdfs", + "type": "HDFS", "config": { "path": "hdfs://localhost/griffin/streaming/persist", "max.persist.lines": 10000, @@ -34,7 +34,7 @@ } }, { - "type": "http", + "type": "ELASTICSEARCH", "config": { "method": "post", "api": "http://localhost:9200/griffin/accuracy" @@ -42,7 +42,7 @@ } ], - "info.cache": [ + "griffin.checkpoint": [ { "type": "zk", "config": { @@ -59,4 +59,4 @@ "cleaner": { "clean.interval": "2m" } -} \ No newline at end of file +}