Repository: incubator-griffin Updated Branches: refs/heads/master 1d7acd57a -> 299aa476d
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/config-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-griffindsl.json b/measure/src/test/resources/config-griffindsl.json deleted file mode 100644 index 10167cd..0000000 --- a/measure/src/test/resources/config-griffindsl.json +++ /dev/null @@ -1,56 +0,0 @@ -{ - "name": "accu_batch", - - "process.type": "batch", - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - } - } - ] - }, { - "name": "target", - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_target.avro" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "accuracy", - "name": "accu", - "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code", - "details": { - "source": "source", - "target": "target", - "miss": "miss_count", - "total": "total_count", - "matched": "matched_count" - }, - "metric": { - "name": "accu" - }, - "record": { - "name": "missRecords" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/config-streaming-accuracy.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-streaming-accuracy.json b/measure/src/test/resources/config-streaming-accuracy.json deleted file mode 100644 index 240d768..0000000 --- a/measure/src/test/resources/config-streaming-accuracy.json +++ /dev/null @@ -1,121 +0,0 @@ -{ - "name": "accu_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.147.177.107:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "sss", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "type": "parquet", - "file.path": "hdfs://localhost/griffin/streaming/dump/source", - "info.path": "source", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["-2m", "0"], - "init.clear": true, - "updatable": true - } - }, { - "name": "target", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.147.177.107:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "ttt", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${t1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${t1}" - } - ] - } - ], - "cache": { - "type": "parquet", - "file.path": "hdfs://localhost/griffin/streaming/dump/target", - "info.path": "target", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["-2m", "0"], - "init.clear": true - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "accuracy", - "name": "accu", - "rule": "source.name = target.name and source.age = target.age", - "details": { - "source": "source", - "target": "target", - "miss": "miss_count", - "total": "total_count", - "matched": "matched_count" - }, - "metric": { - "name": "accu" - }, - "record": { - "name": "missRecords" - } - } - ] - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/config-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-streaming.json b/measure/src/test/resources/config-streaming.json deleted file mode 100644 index 243a691..0000000 --- a/measure/src/test/resources/config-streaming.json +++ /dev/null @@ -1,75 +0,0 @@ -{ - "name": "prof_streaming", - - "process.type": "streaming", - - "data.sources": [ - { - "name": "source", - "connectors": [ - { - "type": "kafka", - "version": "0.8", - "config": { - "kafka.config": { - "bootstrap.servers": "10.147.177.107:9092", - "group.id": "group1", - "auto.offset.reset": "smallest", - "auto.commit.enable": "false" - }, - "topics": "sss", - "key.type": "java.lang.String", - "value.type": "java.lang.String" - }, - "pre.proc": [ - { - "dsl.type": "df-opr", - "name": "${s1}", - "rule": "from_json", - "details": { - "df.name": "${this}" - } - }, - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select name, age from ${s1}" - } - ] - } - ], - "cache": { - "file.path": "hdfs://localhost/griffin/streaming/dump/source", - "info.path": "source", - "ready.time.interval": "10s", - "ready.time.delay": "0", - "time.range": ["0", "0"], - "init.clear": true - } - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "griffin-dsl", - "dq.type": "profiling", - "name": "prof", - "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source", - "metric": { - "name": "prof" - } - }, - { - "dsl.type": "griffin-dsl", - "dq.type": "profiling", - "name": "grp", - "rule": "select name, count(*) as `cnt` from source group by name", - "metric": { - "name": "name_group", - "collect.type": "array" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/config.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config.json b/measure/src/test/resources/config.json deleted file mode 100644 index 99687b3..0000000 --- a/measure/src/test/resources/config.json +++ /dev/null @@ -1,71 +0,0 @@ -{ - "name": "accu_batch", - - "process.type": "batch", - - "data.sources": [ - { - "name": "source", - "baseline": true, - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_src.avro" - }, - "pre.proc": [ - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select * from ${this} where user_id >= 10044" - } - ] - } - ] - }, { - "name": "target", - "connectors": [ - { - "type": "avro", - "version": "1.7", - "config": { - "file.name": "src/test/resources/users_info_target.avro" - } - } - ] - } - ], - - "evaluate.rule": { - "rules": [ - { - "dsl.type": "spark-sql", - "name": "missRecords", - "rule": "SELECT source.* FROM source LEFT JOIN target ON coalesce(source.user_id, '') = coalesce(target.user_id, '') AND coalesce(source.first_name, '') = coalesce(target.first_name, '') AND coalesce(source.post_code, '') = coalesce(target.post_code, '') WHERE (NOT (source.user_id IS NULL AND source.first_name IS NULL AND source.post_code IS NULL)) AND (target.user_id IS NULL AND target.first_name IS NULL AND target.post_code IS NULL)", - "record": { - "name": "miss" - }, - "cache": true - }, - { - "dsl.type": "spark-sql", - "name": "miss_count", - "rule": "SELECT count(*) as miss FROM `missRecords`" - }, - { - "dsl.type": "spark-sql", - "name": "total_count", - "rule": "SELECT count(*) as total FROM source" - }, - { - "dsl.type": "spark-sql", - "name": "accu", - "rule": "SELECT `total_count`.`total` AS `total`, coalesce(`miss_count`.`miss`, 0) AS `miss`, (`total` - `miss`) AS `matched` FROM `total_count` FULL JOIN `miss_count`", - "metric": { - "name": "accu" - } - } - ] - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/env-batch.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env-batch.json b/measure/src/test/resources/env-batch.json new file mode 100644 index 0000000..3e8aa80 --- /dev/null +++ b/measure/src/test/resources/env-batch.json @@ -0,0 +1,38 @@ +{ + "spark": { + "log.level": "WARN", + "config": { + "spark.master": "local[*]" + } + }, + + "persist": [ + { + "type": "log", + "config": { + "max.log.lines": 10 + } + }, + { + "type": "hdfs", + "config": { + "path": "hdfs://localhost/griffin/batch/persist", + "max.persist.lines": 10000, + "max.lines.per.file": 10000 + } + }, + { + "type": "http", + "config": { + "method": "post", + "api": "http://10.148.181.248:39200/griffin/accuracy", + "over.time": "1m", + "retry": 10 + } + } + ], + + "info.cache": [], + + "cleaner": {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/env-streaming-mongo.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env-streaming-mongo.json b/measure/src/test/resources/env-streaming-mongo.json new file mode 100644 index 0000000..0d50462 --- /dev/null +++ b/measure/src/test/resources/env-streaming-mongo.json @@ -0,0 +1,54 @@ +{ + "spark": { + "log.level": "WARN", + "checkpoint.dir": "hdfs://localhost/test/griffin/cp", + "batch.interval": "2s", + "process.interval": "10s", + "config": { + "spark.master": "local[*]", + "spark.task.maxFailures": 5, + "spark.streaming.kafkaMaxRatePerPartition": 1000, + "spark.streaming.concurrentJobs": 4, + "spark.yarn.maxAppAttempts": 5, + "spark.yarn.am.attemptFailuresValidityInterval": "1h", + "spark.yarn.max.executor.failures": 120, + "spark.yarn.executor.failuresValidityInterval": "1h", + "spark.hadoop.fs.hdfs.impl.disable.cache": true + } + }, + + "persist": [ + { + "type": "log", + "config": { + "max.log.lines": 100 + } + }, + { + "type": "mongo", + "config": { + "url": "10.149.247.156", + "database": "test", + "collection": "sss" + } + } + ], + + "info.cache": [ + { + "type": "zk", + "config": { + "hosts": "localhost:2181", + "namespace": "griffin/infocache", + "lock.path": "lock", + "mode": "persist", + "init.clear": true, + "close.clear": false + } + } + ], + + "cleaner": { + "clean.interval": "2m" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/env.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env.json b/measure/src/test/resources/env.json deleted file mode 100644 index 4a8e3d0..0000000 --- a/measure/src/test/resources/env.json +++ /dev/null @@ -1,39 +0,0 @@ -{ - "spark": { - "log.level": "WARN", - "checkpoint.dir": "hdfs:///griffin/batch/cp", - "batch.interval": "10s", - "process.interval": "10m", - "config": { - "spark.master": "local[*]" - }, - "init.clear": true - }, - - "persist": [ - { - "type": "log", - "config": { - "max.log.lines": 10 - } - } - ], - - "info.cache": [ - { - "type": "zk", - "config": { - "hosts": "localhost:2181", - "namespace": "griffin/infocache", - "lock.path": "lock", - "mode": "persist", - "init.clear": true, - "close.clear": false - } - } - ], - - "cleaner": { - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/log4j.properties b/measure/src/test/resources/log4j.properties new file mode 100644 index 0000000..ff9399b --- /dev/null +++ b/measure/src/test/resources/log4j.properties @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +log4j.rootLogger=INFO, stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %-5p [%c] - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/scala/org/apache/griffin/measure/ApplicationTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/ApplicationTest.scala b/measure/src/test/scala/org/apache/griffin/measure/ApplicationTest.scala index edf1134..7576969 100644 --- a/measure/src/test/scala/org/apache/griffin/measure/ApplicationTest.scala +++ b/measure/src/test/scala/org/apache/griffin/measure/ApplicationTest.scala @@ -9,13 +9,11 @@ import scala.util.{Failure, Success, Try} @RunWith(classOf[JUnitRunner]) class ApplicationTest extends FunSuite with Matchers with BeforeAndAfter { - val envFile = "src/test/resources/env.json" + val envFile = "src/test/resources/env-batch.json" // val envFile = "src/test/resources/env-streaming.json" -// val confFile = "src/test/resources/config.json" - val confFile = "src/test/resources/config-griffindsl.json" -// val confFile = "src/test/resources/config-streaming.json" -// val confFile = "src/test/resources/config-streaming-accuracy.json" + val confFile = "src/test/resources/_accuracy-batch-griffindsl.json" +// val confFile = "src/test/resources/_accuracy-streaming-griffindsl.json" test("test application") { val args = Array[String](envFile, confFile)
