completeness streaming pass
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/18bbf241 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/18bbf241 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/18bbf241 Branch: refs/heads/griffin-0.2.0-incubating-rc4 Commit: 18bbf241f2988a942b228a5f971d389ebd47f84b Parents: 2e77168 Author: Lionel Liu <[email protected]> Authored: Tue Apr 10 12:52:48 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Tue Apr 10 12:52:48 2018 +0800 ---------------------------------------------------------------------- .../measure/process/engine/SparkSqlEngine.scala | 4 +- .../rule/trans/AccuracyRulePlanTrans.scala | 4 +- .../rule/trans/CompletenessRulePlanTrans.scala | 8 +-- .../_completeness-streaming-griffindsl.json | 64 ++++++++++++++++++++ 4 files changed, 72 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18bbf241/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala index 438595b..ce85e7a 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala @@ -47,8 +47,8 @@ case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine { } } else sqlContext.sql(rule) -// println(name) -// rdf.show(3) + println(name) + rdf.show(30) if (rs.isGlobal) { if (rs.needCache) DataFrameCaches.cacheGlobalDataFrame(name, rdf) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18bbf241/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala index 904b087..ec746d2 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala @@ -119,7 +119,7 @@ case class AccuracyRulePlanTrans(dataSourceNames: Seq[String], s""" |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, - |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` + |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}` |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` """.stripMargin } @@ -128,7 +128,7 @@ case class AccuracyRulePlanTrans(dataSourceNames: Seq[String], |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, - |(`${totalColName}` - `${missColName}`) AS `${matchedColName}` + |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}` |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}` """.stripMargin http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18bbf241/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala index 5b1a893..1b89587 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala @@ -89,11 +89,11 @@ case class CompletenessRulePlanTrans(dataSourceNames: Seq[String], val incompleteRecordExport = genRecordExport(recordParam, incompleteRecordsTableName, incompleteRecordsTableName, ct, mode) // 3. incomplete count - val incompleteCountTableName = "__missCount" + val incompleteCountTableName = "__incompleteCount" val incompleteColName = details.getStringOrKey(_incomplete) val incompleteCountSql = procType match { case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}`" - case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${incompleteCountTableName}` FROM `${incompleteRecordsTableName}` GROUP BY `${InternalColumns.tmst}`" + case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}` GROUP BY `${InternalColumns.tmst}`" } val incompleteCountStep = SparkSqlStep(incompleteCountTableName, incompleteCountSql, emptyMap) @@ -114,7 +114,7 @@ case class CompletenessRulePlanTrans(dataSourceNames: Seq[String], s""" |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`, - |(`${totalColName}` - `${incompleteColName}`) AS `${completeColName}` + |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}` |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}` """.stripMargin } @@ -123,7 +123,7 @@ case class CompletenessRulePlanTrans(dataSourceNames: Seq[String], |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`, - |(`${totalColName}` - `${incompleteColName}`) AS `${completeColName}` + |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}` |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}` |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${incompleteCountTableName}`.`${InternalColumns.tmst}` """.stripMargin http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18bbf241/measure/src/test/resources/_completeness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json new file mode 100644 index 0000000..df1b889 --- /dev/null +++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json @@ -0,0 +1,64 @@ +{ + "name": "comp_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.147.177.107:9092", + "group.id": "source", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "test", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/old", + "info.path": "old", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"] + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "completeness", + "name": "comp", + "rule": "name, age", + "metric": { + "name": "comp" + } + } + ] + } +} \ No newline at end of file
