Repository: incubator-griffin Updated Branches: refs/heads/master e7e4c3a76 -> 2d268aaf1
update dsl-guide document Author: Lionel Liu <[email protected]> Closes #186 from bhlx3lyx7/tmst. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/2d268aaf Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/2d268aaf Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/2d268aaf Branch: refs/heads/master Commit: 2d268aaf1c4489fc18ad76d8b5ed3160acf62113 Parents: e7e4c3a Author: Lionel Liu <[email protected]> Authored: Tue Jan 9 17:58:25 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Tue Jan 9 17:58:25 2018 +0800 ---------------------------------------------------------------------- griffin-doc/measure/dsl-guide.md | 39 +++++++++++++++++--- .../rule/adaptor/GriffinDslAdaptor.scala | 2 +- .../resources/_accuracy-batch-griffindsl.json | 9 +---- .../resources/_accuracy-batch-sparksql.json | 2 +- 4 files changed, 37 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2d268aaf/griffin-doc/measure/dsl-guide.md ---------------------------------------------------------------------- diff --git a/griffin-doc/measure/dsl-guide.md b/griffin-doc/measure/dsl-guide.md index e7f8569..fb2eeb9 100644 --- a/griffin-doc/measure/dsl-guide.md +++ b/griffin-doc/measure/dsl-guide.md @@ -24,16 +24,15 @@ Griffin DSL is designed for DQ measurement, as a SQL-like language, trying to de Griffin DSL is SQL-like, case insensitive, and easy to learn. ### Supporting process -- logical operation: not, and, or, in, between, like, is null, is nan, =, !=, <=, >=, <, > +- logical operation: not, and, or, in, between, like, is null, is nan, =, !=, <>, <=, >=, <, > - mathematical operation: +, -, *, /, % - sql statement: as, where, group by, having, order by, limit - ### Keywords - `null, nan, true, false` - `not, and, or` - `in, between, like, is` -- `select, from, as, where, group, by, having, order, desc, asc, limit` +- `select, distinct, from, as, where, group, by, having, order, desc, asc, limit` ### Operators - `!, &&, ||, =, !=, <, >, <=, >=, <>` @@ -122,6 +121,14 @@ Accuracy rule expression in Griffin DSL is a logical expression, telling the map Profiling rule expression in Griffin DSL is a sql-like expression, with select clause ahead, following optional from clause, where clause, group-by clause, order-by clause, limit clause in order. e.g. `source.gender, source.id.count() where source.age > 20 group by source.gender`, `select country, max(age), min(age), count(*) as cnt from source group by country order by cnt desc limit 5` +### Duplicate Rule +Duplicate rule expression in Griffin DSL is a list of selection expressions separated by comma, indicates the duplicate columns to measure. + e.g. `name, age`, `name, (age + 1) as next_age` + +### Timeliness Rule +Timeliness rule expression in Griffin DSL is a list of selection expressions separated by comma, indicates the input time and output time (calculate time as default if not set). + e.g. `ts`, `ts, end_ts` + ## Griffin DSL translation to SQL Griffin DSL is defined for DQ measurement, to describe DQ domain problem. Actually, in Griffin, we get Griffin DSL rules, translate them into spark-sql rules for calculation in spark-sql engine. @@ -144,6 +151,27 @@ For example, the dsl rule is `source.cntry, source.id.count(), source.age.max() After the translation, the metrics will be persisted in table `profiling`. +### Duplicate +For duplicate, or called uniqueness, is to find out the duplicate items of data, and rollup the items count group by duplicate times. +For example, the dsl rule is `name, age`, which represents the duplicate requests, in this case, source and target are the same data set. After the translation, the sql rule is as below: +- **get distinct items from source**: `SELECT name, age FROM source`, save as table `src`. +- **get all items from target**: `SELECT name, age FROM target`, save as table `tgt`. +- **join two tables**: `SELECT src.name, src.age FROM tgt RIGHT JOIN src ON coalesce(src.name, '') = coalesce(tgt.name, '') AND coalesce(src.age, '') = coalesce(tgt.age, '')`, save as table `joined`. +- **get duplicate items**: `SELECT name, age, (count(*) - 1) AS dup FROM joined GROUP BY name, age`, save as table `grouped`. +- **get duplicate record**: `SELECT * FROM grouped WHERE dup > 0`, save as table `dup_record`. +- **get duplicate metric**: `SELECT dup, count(*) AS num FROM dup_records GROUP BY dup`, save as table `dup_metric`. + +After the translation, the metrics will be persisted in table `dup_metric`. + +### Timeliness +For timeliness, is to measure the latency of each item, and get the statistics of the latencies. +For example, the dsl rule is `ts, out_ts`, the first column means the input time of item, the second column means the output time of item, if not set, `__tmst` will be the default output time column. After the translation, the sql rule is as below: +- **get input and output time column**: `SELECT *, ts AS _bts, out_ts AS _ets FROM source`, save as table `origin_time`. +- **get latency**: `SELECT *, (_ets - _bts) AS latency FROM origin_time`, save as table `lat`. +- **get timeliness metric**: `SELECT CAST(AVG(latency) AS BIGINT) AS avg, MAX(latency) AS max, MIN(latency) AS min FROM lat`, save as table `time_metric`. + +After the translation, the metrics will be persisted in table `time_metric`. + ## Alternative Rules You can simply use Griffin DSL rule to describe your problem in DQ domain, for some complicate requirement, you can also use some alternative rules supported by Griffin. @@ -174,8 +202,9 @@ Griffin will do the operation to extract json strings. Actually, you can also extend the df-opr engine and df-opr adaptor in Griffin to support more types of data frame operations. ## Tips -Griffin engine runs on spark, it might works in two phases, pre-proc phase and run phase. +Griffin engine runs on spark, it might work in two phases, pre-proc phase and run phase. - **Pre-proc phase**: Griffin calculates data source directly, to get appropriate data format, as a preparation for DQ calculation. In this phase, you can use df-opr and spark-sql rules. After preparation, to support streaming DQ calculation, a timestamp column will be added in each row of data, so the data frame in run phase contains an extra column named "__tmst". - **Run phase**: Griffin calculates with prepared data, to get the DQ metrics. In this phase, you can use griffin-dsl, spark-sql rules, and a part of df-opr rules. -For griffin-dsl rule, griffin translates it into spark-sql rule with a group-by condition for column "__tmst", it's useful for especially streaming DQ calculation. But for spark-sql rule, griffin use it directly, you need to add the "__tmst" column in your spark-sql rule explicitly, or you can't get correct metrics result after calculation. \ No newline at end of file +For griffin-dsl rule, griffin translates it into spark-sql rule with a group-by condition for column "__tmst", it's useful for especially streaming DQ calculation. +But for spark-sql rule, griffin use it directly, you need to add the "__tmst" column in your spark-sql rule explicitly, or you can't get correct metrics result after calculation. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2d268aaf/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala index 6ba0cf8..a02335a 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala @@ -527,7 +527,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], s"`${sourceTableName}`.`${alias}` AS `${alias}`" }.mkString(", ") val onClause = aliases.map { alias => - s"`${sourceTableName}`.`${alias}` = `${targetTableName}`.`${alias}`" + s"coalesce(`${sourceTableName}`.`${alias}`, '') = coalesce(`${targetTableName}`.`${alias}`, '')" }.mkString(" AND ") val joinedSql = { s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN `${sourceTableName}` ON ${onClause}" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2d268aaf/measure/src/test/resources/_accuracy-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json b/measure/src/test/resources/_accuracy-batch-griffindsl.json index c702d46..10167cd 100644 --- a/measure/src/test/resources/_accuracy-batch-griffindsl.json +++ b/measure/src/test/resources/_accuracy-batch-griffindsl.json @@ -13,14 +13,7 @@ "version": "1.7", "config": { "file.name": "src/test/resources/users_info_src.avro" - }, - "pre.proc": [ - { - "dsl.type": "spark-sql", - "name": "${this}", - "rule": "select * from ${this} where user_id > 10010" - } - ] + } } ] }, { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/2d268aaf/measure/src/test/resources/_accuracy-batch-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-batch-sparksql.json b/measure/src/test/resources/_accuracy-batch-sparksql.json index a24ffbe..2eef9f1 100644 --- a/measure/src/test/resources/_accuracy-batch-sparksql.json +++ b/measure/src/test/resources/_accuracy-batch-sparksql.json @@ -35,7 +35,7 @@ { "dsl.type": "spark-sql", "name": "missRecords", - "rule": "SELECT source.* FROM source LEFT JOIN target ON coalesce(source.user_id, '') = coalesce(target.user_id, '') AND coalesce(source.first_name, '') = coalesce(target.first_name, '') AND coalesce(source.post_code, '') = coalesce(target.post_code, '') WHERE (NOT (source.user_id IS NULL AND source.user_id IS NULL AND source.post_code IS NULL)) AND (target.user_id IS NULL AND target.user_id IS NULL AND target.post_code IS NULL)", + "rule": "SELECT source.* FROM source LEFT JOIN target ON coalesce(source.user_id, '') = coalesce(target.user_id, '') AND coalesce(source.first_name, '') = coalesce(target.first_name, '') AND coalesce(source.post_code, '') = coalesce(target.post_code, '') WHERE (NOT (source.user_id IS NULL AND source.first_name IS NULL AND source.post_code IS NULL)) AND (target.user_id IS NULL AND target.first_name IS NULL AND target.post_code IS NULL)", "record": { "name": "miss" }
