This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 72b0447 [GRIFFIN-289] New feature for griffin COMPLETENESS dq type
72b0447 is described below
commit 72b0447d38ebd7aec23ad1494967716e84b4c367
Author: ‘Zhao <[email protected]>
AuthorDate: Thu Oct 10 23:13:27 2019 +0800
[GRIFFIN-289] New feature for griffin COMPLETENESS dq type
As describing in GRIFFIN-289, add two new ways to check 'incompleteness'
record: regular expression and enumeration.
Add 'error.confs' in dq json file. Each json object in 'error.confs' list
means one column configuration.
If do not have 'error.confs', using old 'incompleteness' process, which is
compatible for existing json file.
Add ut for the new json format.
Author: ‘Zhao <[email protected]>
Author: Zhao Li <[email protected]>
Closes #538 from LittleZhao/griffin-289.
---
.../configuration/dqdefinition/DQConfig.scala | 28 ++++++++++-
.../dsl/transform/CompletenessExpr2DQSteps.scala | 56 ++++++++++++++++++++--
.../_completeness_errorconf-batch-griffindsl.json | 51 ++++++++++++++++++++
.../invalidtype_completeness_batch_griffindal.json | 46 ++++++++++++++++++
.../dqdefinition/reader/ParamFileReaderSpec.scala | 23 +++++++++
5 files changed, 200 insertions(+), 4 deletions(-)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index d41abf3..ffbf8d1 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -141,6 +141,7 @@ case class EvaluateRuleParam( @JsonProperty("rules")
private val rules: List[Rul
* @param details detail config of rule (optional)
* @param cache cache the result for multiple usage (optional, valid for
"spark-sql" and "df-ops" mode)
* @param outputs output ways configuration (optional)
+ * @param errorConfs error configuration (valid for 'COMPLETENESS' mode)
*/
@JsonInclude(Include.NON_NULL)
case class RuleParam(@JsonProperty("dsl.type") private val dslType: String,
@@ -150,7 +151,8 @@ case class RuleParam(@JsonProperty("dsl.type") private val
dslType: String,
@JsonProperty("rule") private val rule: String = null,
@JsonProperty("details") private val details: Map[String,
Any] = null,
@JsonProperty("cache") private val cache: Boolean = false,
- @JsonProperty("out") private val outputs:
List[RuleOutputParam] = null
+ @JsonProperty("out") private val outputs:
List[RuleOutputParam] = null,
+ @JsonProperty("error.confs") private val errorConfs:
List[RuleErrorConfParam] = null
) extends Param {
def getDslType: DslType = if (dslType != null) DslType(dslType) else
DslType("")
def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
@@ -164,6 +166,8 @@ case class RuleParam(@JsonProperty("dsl.type") private val
dslType: String,
def getOutputs: Seq[RuleOutputParam] = if (outputs != null) outputs else Nil
def getOutputOpt(tp: OutputType): Option[RuleOutputParam] =
getOutputs.filter(_.getOutputType == tp).headOption
+ def getErrorConfs: Seq[RuleErrorConfParam] = if (errorConfs != null)
errorConfs else Nil
+
def replaceInDfName(newName: String): RuleParam = {
if (StringUtils.equals(newName, inDfName)) this
else RuleParam(dslType, dqType, newName, outDfName, rule, details, cache,
outputs)
@@ -186,6 +190,7 @@ case class RuleParam(@JsonProperty("dsl.type") private val
dslType: String,
"unknown dq type for griffin dsl")
getOutputs.foreach(_.validate)
+ getErrorConfs.foreach(_.validate)
}
}
@@ -206,3 +211,24 @@ case class RuleOutputParam( @JsonProperty("type") private
val outputType: String
def validate(): Unit = {}
}
+
+/**
+ * error configuration parameter
+ * @param columnName the name of the column
+ * @param errorType the way to match error, regex or enumeration
+ * @param values error value list
+ */
+@JsonInclude(Include.NON_NULL)
+case class RuleErrorConfParam( @JsonProperty("column.name") private val
columnName: String,
+ @JsonProperty("type") private val errorType:
String,
+ @JsonProperty("values") private val values:
List[String]
+ ) extends Param {
+ def getColumnName: Option[String] = if (StringUtils.isNotBlank(columnName))
Some(columnName) else None
+ def getErrorType: Option[String] = if (StringUtils.isNotBlank(errorType))
Some(errorType) else None
+ def getValues: Seq[String] = if (values != null) values else Nil
+
+ def validate(): Unit = {
+ assert("regex".equalsIgnoreCase(getErrorType.get) ||
+ "enumeration".equalsIgnoreCase(getErrorType.get), "error error.conf
type")
+ }
+}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 7312f29..4d3344e 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -18,7 +18,7 @@ under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.transform
-import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import
org.apache.griffin.measure.configuration.dqdefinition.{RuleErrorConfParam,
RuleParam}
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
@@ -85,8 +85,16 @@ case class CompletenessExpr2DQSteps(context: DQContext,
// 2. incomplete record
val incompleteRecordsTableName = "__incompleteRecords"
- val completeWhereClause = aliases.map(a => s"`${a}` IS NOT
NULL").mkString(" AND ")
- val incompleteWhereClause = s"NOT (${completeWhereClause})"
+ val errorConfs: Seq[RuleErrorConfParam] = ruleParam.getErrorConfs
+ var incompleteWhereClause: String = ""
+ if (errorConfs.size == 0) {
+ // without errorConfs
+ val completeWhereClause = aliases.map(a => s"`${a}` IS NOT
NULL").mkString(" AND ")
+ incompleteWhereClause = s"NOT (${completeWhereClause})"
+ } else {
+ // with errorConfs
+ incompleteWhereClause =
this.getErrorConfCompleteWhereClause(errorConfs)
+ }
val incompleteRecordsSql =
s"SELECT * FROM `${sourceAliasTableName}` WHERE
${incompleteWhereClause}"
@@ -167,4 +175,46 @@ case class CompletenessExpr2DQSteps(context: DQContext,
}
}
+ /**
+ * get 'error' where clause
+ * @param errorConfs error configuraion list
+ * @return 'error' where clause
+ */
+ def getErrorConfCompleteWhereClause(errorConfs: Seq[RuleErrorConfParam]):
String = {
+ errorConfs.map(errorConf =>
this.getEachErrorWhereClause(errorConf)).mkString(" OR ")
+ }
+
+ /**
+ * get error sql for each column
+ * @param errorConf error configuration
+ * @return 'error' sql for each column
+ */
+ def getEachErrorWhereClause(errorConf: RuleErrorConfParam): String = {
+ val errorType: Option[String] = errorConf.getErrorType
+ val columnName: String = errorConf.getColumnName.get
+ if ("regex".equalsIgnoreCase(errorType.get)) {
+ // only have one regular expression
+ val regexValue: String = errorConf.getValues.apply(0)
+ val afterReplace: String = regexValue.replaceAll("""\\""", """\\\\""")
+ val result: String = s"`${columnName}` REGEXP '${afterReplace}'"
+ return result
+ } else if ("enumeration".equalsIgnoreCase(errorType.get)) {
+ val values: Seq[String] = errorConf.getValues
+ // hive_none means None
+ var hasNone: Boolean = false
+ if (values.contains("hive_none")) {
+ hasNone = true
+ }
+
+ val valueWithQuote: String = values.filter(value =>
!"hive_none".equals(value))
+ .map(value => s"'${value}'").mkString(", ")
+
+ var result = s"(`${columnName}` IN (${valueWithQuote}))"
+ if (hasNone) {
+ result = s"((${result}) OR (`${columnName}` IS NULL))"
+ }
+ return result
+ }
+ throw new IllegalArgumentException("type in error.confs only supports
regex and enumeration way")
+ }
}
diff --git
a/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
b/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
new file mode 100644
index 0000000..e3b1f1c
--- /dev/null
+++ b/measure/src/test/resources/_completeness_errorconf-batch-griffindsl.json
@@ -0,0 +1,51 @@
+{
+ "data.sources": [
+ {
+ "connectors": [
+ {
+ "dataframe.name": "prof_table",
+ "config": {
+ "table.name": "efg",
+ "database": "abc",
+ "where": "`date`=\"20190825\""
+ },
+ "type": "hive"
+ }
+ ],
+ "name": "source"
+ }
+ ],
+ "sinks": [
+ "CONSOLE"
+ ],
+ "name": "test_griffin_complete_lizhao.bd",
+ "evaluate.rule": {
+ "rules": [
+ {
+ "rule": "user",
+ "out.dataframe.name": "prof",
+ "dsl.type": "griffin-dsl",
+ "dq.type": "completeness",
+ "error.confs":[
+ {
+ "column.name": "user",
+ "type": "enumeration",
+ "values":["1", "2", "hive_none", ""]
+ },
+ {
+ "column.name": "name",
+ "type": "regex",
+ "values":["^zhanglei.natur\\w{1}$"]
+ }
+ ],
+ "out": [
+ {
+ "type": "metric",
+ "flatten": "map"
+ }
+ ]
+ }
+ ]
+ },
+ "process.type": "batch"
+}
diff --git
a/measure/src/test/resources/invalidconfigs/invalidtype_completeness_batch_griffindal.json
b/measure/src/test/resources/invalidconfigs/invalidtype_completeness_batch_griffindal.json
new file mode 100644
index 0000000..be6435d
--- /dev/null
+++
b/measure/src/test/resources/invalidconfigs/invalidtype_completeness_batch_griffindal.json
@@ -0,0 +1,46 @@
+{
+ "data.sources": [
+ {
+ "connectors": [
+ {
+ "dataframe.name": "prof_table",
+ "config": {
+ "table.name": "efg",
+ "database": "abc",
+ "where": "`date`=\"20190825\""
+ },
+ "type": "hive"
+ }
+ ],
+ "name": "source"
+ }
+ ],
+ "sinks": [
+ "CONSOLE"
+ ],
+ "name": "test_griffin_complete",
+ "evaluate.rule": {
+ "rules": [
+ {
+ "rule": "user",
+ "out.dataframe.name": "prof",
+ "dsl.type": "griffin-dsl",
+ "dq.type": "completeness",
+ "error.confs":[
+ {
+ "column.name": "user",
+ "type": "abc",
+ "values":["1", "2", "hive_none", ""]
+ }
+ ],
+ "out": [
+ {
+ "type": "metric",
+ "flatten": "map"
+ }
+ ]
+ }
+ ]
+ },
+ "process.type": "batch"
+}
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
index dfa2598..e2135ea 100644
---
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
+++
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
@@ -52,4 +52,27 @@ class ParamFileReaderSpec extends FlatSpec with Matchers{
}
+ it should "fail for an invalid completeness json file" in {
+ val reader: ParamFileReader =
ParamFileReader(getClass.getResource("/invalidconfigs/invalidtype_completeness_batch_griffindal.json").getFile)
+ val params = reader.readConfig[DQConfig]
+ params match {
+ case Success(_) =>
+ fail("it is an invalid config file")
+ case Failure(e) =>
+ e.getMessage contains ("error error.conf type")
+ }
+ }
+
+ it should "be parsed from a valid errorconf completeness json file" in {
+ val reader :ParamReader =
ParamFileReader(getClass.getResource("/_completeness_errorconf-batch-griffindsl.json").getFile)
+ val params = reader.readConfig[DQConfig]
+ params match {
+ case Success(v) =>
+ v.getEvaluateRule.getRules(0).getErrorConfs.length should === (2)
+ v.getEvaluateRule.getRules(0).getErrorConfs(0).getColumnName.get
should === ("user")
+ v.getEvaluateRule.getRules(0).getErrorConfs(1).getColumnName.get
should === ("name")
+ case Failure(_) =>
+ fail("it should not happen")
+ }
+ }
}