This is an automated email from the ASF dual-hosted git repository.
wankun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new ed47c2b Enum based configs
ed47c2b is described below
commit ed47c2b2232e07adea0a8919786ce835a15f2191
Author: tusharpatil <tus>
AuthorDate: Mon Dec 16 21:22:20 2019 +0800
Enum based configs
**What changes were proposed in this pull request?**
All the predefined: `DQTypes, DSLTypes, FlattenType, OutputType,
ProcessType, SinkType and WriteMode` are compared with config using a
regex-based approach. This will make unnecessary overhead in terms of execution
time and maintainability.
This PR uses a predefined enum based approach than regex-based to provide
the same functionality.
**Does this PR introduce any user-facing change?**
No
**How was this patch tested?**
Griffin test suite.
Author: tusharpatil <tus>
Author: tusharpatil20 <[email protected]>
Closes #558 from tusharpatil20/enum-based-configs.
---
.../org/apache/griffin/measure/Application.scala | 5 +-
.../configuration/dqdefinition/DQConfig.scala | 23 ++-
.../configuration/dqdefinition/EnvConfig.scala | 5 +-
.../measure/configuration/enums/DqType.scala | 123 ++++--------
.../measure/configuration/enums/DslType.scala | 59 +++---
.../measure/configuration/enums/FlattenType.scala | 108 ++++------
.../enums/{WriteMode.scala => GriffinEnum.scala} | 33 ++--
.../measure/configuration/enums/OutputType.scala | 80 ++------
.../measure/configuration/enums/ProcessType.scala | 41 +---
.../measure/configuration/enums/SinkType.scala | 97 +++------
.../measure/configuration/enums/WriteMode.scala | 6 +-
.../apache/griffin/measure/context/DQContext.scala | 3 +-
.../datasource/connector/DataConnector.scala | 2 +-
.../griffin/measure/launch/batch/BatchDQApp.scala | 2 +-
.../measure/launch/streaming/StreamingDQApp.scala | 2 +-
.../apache/griffin/measure/sink/SinkFactory.scala | 12 +-
.../measure/step/builder/DQStepBuilder.scala | 7 +-
.../step/builder/RuleParamStepBuilder.scala | 2 +-
.../step/builder/dsl/parser/GriffinDslParser.scala | 15 +-
.../dsl/transform/AccuracyExpr2DQSteps.scala | 133 +++++++++----
.../dsl/transform/CompletenessExpr2DQSteps.scala | 6 +-
.../dsl/transform/DistinctnessExpr2DQSteps.scala | 4 +-
.../step/builder/dsl/transform/Expr2DQSteps.scala | 14 +-
.../dsl/transform/ProfilingExpr2DQSteps.scala | 6 +-
.../dsl/transform/TimelinessExpr2DQSteps.scala | 6 +-
.../dsl/transform/UniquenessExpr2DQSteps.scala | 4 +-
.../step/builder/preproc/PreProcParamMaker.scala | 2 +-
.../measure/step/write/MetricWriteStep.scala | 3 +-
.../dqdefinition/reader/ParamEnumReaderSpec.scala | 218 +++++++++++++++++++++
.../dqdefinition/reader/ParamFileReaderSpec.scala | 18 +-
.../dqdefinition/reader/ParamJsonReaderSpec.scala | 18 +-
.../org/apache/griffin/measure/job/DQAppTest.scala | 7 +-
.../griffin/measure/sink/CustomSinkTest.scala | 4 +-
.../apache/griffin/measure/sink/SinkTestBase.scala | 2 +-
.../griffin/measure/step/TransformStepTest.scala | 2 +-
.../AccuracyTransformationsIntegrationTest.scala | 3 +-
36 files changed, 580 insertions(+), 495 deletions(-)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index 6bad4d5..deb2781 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -22,7 +22,8 @@ import scala.util.{Failure, Success, Try}
import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig,
EnvConfig, GriffinConfig, Param}
import
org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.ProcessType
+import org.apache.griffin.measure.configuration.enums.ProcessType._
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.launch.batch.BatchDQApp
import org.apache.griffin.measure.launch.streaming.StreamingDQApp
@@ -62,7 +63,7 @@ object Application extends Loggable {
val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
// choose process
- val procType = ProcessType(allParam.getDqConfig.getProcType)
+ val procType =
ProcessType.withNameWithDefault(allParam.getDqConfig.getProcType)
val dqApp: DQApp = procType match {
case BatchProcessType => BatchDQApp(allParam)
case StreamingProcessType => StreamingDQApp(allParam)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 77b7bac..afa1822 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -21,7 +21,12 @@ import com.fasterxml.jackson.annotation.{JsonInclude,
JsonProperty}
import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.{DqType, DslType,
FlattenType, OutputType, SinkType}
+import org.apache.griffin.measure.configuration.enums.DqType._
+import org.apache.griffin.measure.configuration.enums.DslType.{DslType,
GriffinDsl}
+import
org.apache.griffin.measure.configuration.enums.FlattenType.{DefaultFlattenType,
FlattenType}
+import org.apache.griffin.measure.configuration.enums.OutputType.{OutputType,
UnknownOutputType}
+import org.apache.griffin.measure.configuration.enums.SinkType.SinkType
/**
* dq param
@@ -153,8 +158,8 @@ case class RuleParam(@JsonProperty("dsl.type") private val
dslType: String,
@JsonProperty("out") private val outputs:
List[RuleOutputParam] = null,
@JsonProperty("error.confs") private val errorConfs:
List[RuleErrorConfParam] = null
) extends Param {
- def getDslType: DslType = if (dslType != null) DslType(dslType) else
DslType("")
- def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
+ def getDslType: DslType = if (dslType != null)
DslType.withNameWithDefault(dslType) else GriffinDsl
+ def getDqType: DqType = if (dqType != null)
DqType.withNameWithDefault(dqType) else Unknown
def getCache: Boolean = if (cache) cache else false
def getInDfName(defName: String = ""): String = if (inDfName != null)
inDfName else defName
@@ -185,7 +190,7 @@ case class RuleParam(@JsonProperty("dsl.type") private val
dslType: String,
}
def validate(): Unit = {
- assert(!(getDslType.equals(GriffinDslType) &&
getDqType.equals(UnknownType)),
+ assert(!(getDslType.equals(GriffinDsl) && getDqType.equals(Unknown)),
"unknown dq type for griffin dsl")
getOutputs.foreach(_.validate)
@@ -204,9 +209,15 @@ case class RuleOutputParam( @JsonProperty("type") private
val outputType: String
@JsonProperty("name") private val name: String,
@JsonProperty("flatten") private val flatten:
String
) extends Param {
- def getOutputType: OutputType = if (outputType != null)
OutputType(outputType) else OutputType("")
+ def getOutputType: OutputType = {
+ if (outputType != null) OutputType.withNameWithDefault(outputType)
+ else UnknownOutputType
+ }
def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name))
Some(name) else None
- def getFlatten: FlattenType = if (StringUtils.isNotBlank(flatten))
FlattenType(flatten) else FlattenType("")
+ def getFlatten: FlattenType = {
+ if (StringUtils.isNotBlank(flatten))
FlattenType.withNameWithDefault(flatten)
+ else DefaultFlattenType
+ }
def validate(): Unit = {}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
index 22edf2b..4c5f937 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
@@ -21,7 +21,8 @@ import com.fasterxml.jackson.annotation.{JsonInclude,
JsonProperty}
import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.SinkType
+import org.apache.griffin.measure.configuration.enums.SinkType.SinkType
/**
* environment param
@@ -86,7 +87,7 @@ case class SparkParam( @JsonProperty("log.level") private val
logLevel: String,
case class SinkParam(@JsonProperty("type") private val sinkType: String,
@JsonProperty("config") private val config: Map[String,
Any]
) extends Param {
- def getType: SinkType = SinkType(sinkType)
+ def getType: SinkType = SinkType.withNameWithDefault(sinkType)
def getConfig: Map[String, Any] = if (config != null) config else
Map[String, Any]()
def validate(): Unit = {
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
index cc47728..3176bad 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
@@ -17,101 +17,46 @@
package org.apache.griffin.measure.configuration.enums
-import scala.util.matching.Regex
+import org.apache.griffin.measure.configuration.enums
/**
* effective when dsl type is "griffin-dsl",
* indicates the dq type of griffin pre-defined measurements
+ * <li>{@link #Accuracy} - The match percentage of items between source and
target
+ * count(source items matched with the ones from
target) / count(source)
+ * e.g.: source [1, 2, 3, 4, 5], target: [1, 2, 3, 4]
+ * metric will be: { "total": 5, "miss": 1,
"matched": 4 } accuracy is 80%.</li>
+ * <li>{@link #Profiling} - The statistic data of data source
+ * e.g.: max, min, average, group by count, ...</li>
+ * <li>{@link #Uniqueness} - The uniqueness of data source comparing with
itself
+ * count(unique items in source) / count(source)
+ * e.g.: [1, 2, 3, 3] -> { "unique": 2, "total": 4,
"dup-arr": [ "dup": 1, "num": 1 ] }
+ * uniqueness indicates the items without any
replica of data</li>
+ * <li>{@link #Distinctness} - The distinctness of data source comparing with
itself
+ * count(distinct items in source) / count(source)
+ * e.g.: [1, 2, 3, 3] -> { "dist": 3, "total": 4,
"dup-arr": [ "dup": 1, "num": 1 ] }
+ * distinctness indicates the valid information
of data
+ * comparing with uniqueness, distinctness is
more meaningful</li>
+ * <li>{@link #Timeliness} - The latency of data source with timestamp
information
+ * e.g.: (receive_time - send_time)
+ * timeliness can get the statistic metric of
latency, like average, max, min,
+ * percentile-value,
+ * even more, it can record the items with latency
above threshold you configured</li>
+ * <li>{@link #Completeness} - The completeness of data source
+ * the columns you measure is incomplete if it is
null</li>
*/
-sealed trait DqType {
- val idPattern: Regex
- val desc: String
-}
-
-object DqType {
- private val dqTypes: List[DqType] = List(
- AccuracyType,
- ProfilingType,
- UniquenessType,
- DistinctnessType,
- TimelinessType,
- CompletenessType,
- UnknownType
- )
- def apply(ptn: String): DqType = {
- dqTypes.find(dqType => ptn match {
- case dqType.idPattern() => true
- case _ => false
- }).getOrElse(UnknownType)
- }
- def unapply(pt: DqType): Option[String] = Some(pt.desc)
-}
-
-/**
- * accuracy: the match percentage of items between source and target
- * count(source items matched with the ones from target) / count(source)
- * e.g.: source [1, 2, 3, 4, 5], target: [1, 2, 3, 4]
- * metric will be: { "total": 5, "miss": 1, "matched": 4 }
- * accuracy is 80%.
- */
- case object AccuracyType extends DqType {
- val idPattern = "^(?i)accuracy$".r
- val desc = "accuracy"
-}
+object DqType extends GriffinEnum {
-/**
- * profiling: the statistic data of data source
- * e.g.: max, min, average, group by count, ...
- */
- case object ProfilingType extends DqType {
- val idPattern = "^(?i)profiling$".r
- val desc = "profiling"
-}
+ type DqType = Value
-/**
- * uniqueness: the uniqueness of data source comparing with itself
- * count(unique items in source) / count(source)
- * e.g.: [1, 2, 3, 3] -> { "unique": 2, "total": 4, "dup-arr": [ "dup": 1,
"num": 1 ] }
- * uniqueness indicates the items without any replica of data
- */
- case object UniquenessType extends DqType {
- val idPattern = "^(?i)uniqueness|duplicate$".r
- val desc = "uniqueness"
-}
+ val Accuracy, Profiling, Uniqueness, Duplicate, Distinct, Timeliness,
+ Completeness = Value
-/**
- * distinctness: the distinctness of data source comparing with itself
- * count(distinct items in source) / count(source)
- * e.g.: [1, 2, 3, 3] -> { "dist": 3, "total": 4, "dup-arr": [ "dup": 1,
"num": 1 ] }
- * distinctness indicates the valid information of data
- * comparing with uniqueness, distinctness is more meaningful
- */
- case object DistinctnessType extends DqType {
- val idPattern = "^(?i)distinct$".r
- val desc = "distinct"
-}
-
-/**
- * timeliness: the latency of data source with timestamp information
- * e.g.: (receive_time - send_time)
- * timeliness can get the statistic metric of latency, like average, max,
min, percentile-value,
- * even more, it can record the items with latency above threshold you
configured
- */
- case object TimelinessType extends DqType {
- val idPattern = "^(?i)timeliness$".r
- val desc = "timeliness"
-}
-
-/**
- * completeness: the completeness of data source
- * the columns you measure is incomplete if it is null
- */
- case object CompletenessType extends DqType {
- val idPattern = "^(?i)completeness$".r
- val desc = "completeness"
-}
-
- case object UnknownType extends DqType {
- val idPattern = "".r
- val desc = "unknown"
+ override def withNameWithDefault(name: String): enums.DqType.Value = {
+ val dqType = super.withNameWithDefault(name)
+ dqType match {
+ case Uniqueness | Duplicate => Uniqueness
+ case _ => dqType
+ }
+ }
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
index 4e2d7cd..1b66f35 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
@@ -17,47 +17,34 @@
package org.apache.griffin.measure.configuration.enums
-import scala.util.matching.Regex
+import org.apache.griffin.measure.configuration.enums
/**
* dsl type indicates the language type of rule param
+ * <li>{@link #SparkSql} - spark-sql: rule defined in "SPARK-SQL"
directly</li>
+ * <li>{@link #DfOps} - df-ops|df-opr|: data frame operations rule, support
some pre-defined data frame ops()</li>
+ * <li>{@link #GriffinDsl} - griffin dsl rule, to define dq measurements
easier</li>
*/
-sealed trait DslType {
- val idPattern: Regex
- val desc: String
-}
+object DslType extends GriffinEnum {
+ type DslType = Value
-object DslType {
- private val dslTypes: List[DslType] = List(SparkSqlType, GriffinDslType,
DataFrameOpsType)
- def apply(ptn: String): DslType = {
- dslTypes.find(tp => ptn match {
- case tp.idPattern() => true
- case _ => false
- }).getOrElse(GriffinDslType)
- }
- def unapply(pt: DslType): Option[String] = Some(pt.desc)
-}
+ val SparkSql, DfOps, DfOpr, DfOperations, GriffinDsl, DataFrameOpsType =
Value
-/**
- * spark-sql: rule defined in "SPARK-SQL" directly
- */
- case object SparkSqlType extends DslType {
- val idPattern = "^(?i)spark-?sql$".r
- val desc = "spark-sql"
-}
+ /**
+ *
+ * @param name Dsltype from config file
+ * @return Enum value corresponding to string
+ */
+ def withNameWithDslType(name: String): Value =
+ values
+ .find(_.toString.toLowerCase == name.replace("-", "").toLowerCase())
+ .getOrElse(GriffinDsl)
-/**
- * df-ops: data frame operations rule, support some pre-defined data frame ops
- */
- case object DataFrameOpsType extends DslType {
- val idPattern = "^(?i)df-?(?:ops|opr|operations)$".r
- val desc = "df-ops"
-}
-
-/**
- * griffin-dsl: griffin dsl rule, to define dq measurements easier
- */
- case object GriffinDslType extends DslType {
- val idPattern = "^(?i)griffin-?dsl$".r
- val desc = "griffin-dsl"
+ override def withNameWithDefault(name: String): enums.DslType.Value = {
+ val dslType = withNameWithDslType(name)
+ dslType match {
+ case DfOps | DfOpr | DfOperations => DataFrameOpsType
+ case _ => dslType
+ }
+ }
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
index cde4c41..2586268 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
@@ -17,77 +17,47 @@
package org.apache.griffin.measure.configuration.enums
-import scala.util.matching.Regex
-
/**
* the strategy to flatten metric
+ * <li>{@link #DefaultFlattenType} - default flatten strategy
+ * metrics contains 1 row -> flatten
metric json map
+ * metrics contains n > 1 rows -> flatten
metric json array
+ * n = 0: { }
+ * n = 1: { "col1": "value1", "col2":
"value2", ... }
+ * n > 1: { "arr-name": [ { "col1":
"value1", "col2": "value2", ... }, ... ] }
+ * all rows
+ * </li>
+ * <li>{@link #EntriesFlattenType} - metrics contains n rows -> flatten
metric json map
+ * n = 0: { }
+ * n >= 1: { "col1": "value1", "col2":
"value2", ... }
+ * the first row only
+ * </li>
+ * <li>{@link #ArrayFlattenType} - metrics contains n rows -> flatten
metric json array
+ * n = 0: { "arr-name": [ ] }
+ * n >= 1: { "arr-name": [ { "col1":
"value1", "col2": "value2", ... }, ... ] }
+ * all rows
+ * </li>
+ * <li>{@link #MapFlattenType} - metrics contains n rows -> flatten metric
json wrapped map
+ * n = 0: { "map-name": { } }
+ * n >= 1: { "map-name": { "col1": "value1",
"col2": "value2", ... } }
+ * the first row only
+ * </li>
*/
-sealed trait FlattenType {
- val idPattern: Regex
- val desc: String
-}
-
-object FlattenType {
- private val flattenTypes: List[FlattenType] = List(
- DefaultFlattenType,
- EntriesFlattenType,
- ArrayFlattenType,
- MapFlattenType
- )
-
- val default = DefaultFlattenType
- def apply(ptn: String): FlattenType = {
- flattenTypes.find(tp => ptn match {
- case tp.idPattern() => true
- case _ => false
- }).getOrElse(default)
+object FlattenType extends GriffinEnum {
+ type FlattenType = Value
+
+ val DefaultFlattenType, EntriesFlattenType, ArrayFlattenType, MapFlattenType
=
+ Value
+
+ val List, Array, Entries, Map, Default = Value
+
+ override def withNameWithDefault(name: String): Value = {
+ val flattenType = super.withNameWithDefault(name)
+ flattenType match {
+ case Array | List => ArrayFlattenType
+ case Map => MapFlattenType
+ case Entries => EntriesFlattenType
+ case _ => DefaultFlattenType
+ }
}
- def unapply(pt: FlattenType): Option[String] = Some(pt.desc)
-}
-
-/**
- * default flatten strategy
- * metrics contains 1 row -> flatten metric json map
- * metrics contains n > 1 rows -> flatten metric json array
- * n = 0: { }
- * n = 1: { "col1": "value1", "col2": "value2", ... }
- * n > 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] }
- * all rows
- */
- case object DefaultFlattenType extends FlattenType {
- val idPattern: Regex = "".r
- val desc: String = "default"
-}
-
-/**
- * metrics contains n rows -> flatten metric json map
- * n = 0: { }
- * n >= 1: { "col1": "value1", "col2": "value2", ... }
- * the first row only
- */
- case object EntriesFlattenType extends FlattenType {
- val idPattern: Regex = "^(?i)entries$".r
- val desc: String = "entries"
-}
-
-/**
- * metrics contains n rows -> flatten metric json array
- * n = 0: { "arr-name": [ ] }
- * n >= 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ]
}
- * all rows
- */
- case object ArrayFlattenType extends FlattenType {
- val idPattern: Regex = "^(?i)array|list$".r
- val desc: String = "array"
-}
-
-/**
- * metrics contains n rows -> flatten metric json wrapped map
- * n = 0: { "map-name": { } }
- * n >= 1: { "map-name": { "col1": "value1", "col2": "value2", ... } }
- * the first row only
- */
- case object MapFlattenType extends FlattenType {
- val idPattern: Regex = "^(?i)map$".r
- val desc: String = "map"
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/GriffinEnum.scala
similarity index 62%
copy from
measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
copy to
measure/src/main/scala/org/apache/griffin/measure/configuration/enums/GriffinEnum.scala
index 9e74dda..b73742d 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/GriffinEnum.scala
@@ -17,26 +17,19 @@
package org.apache.griffin.measure.configuration.enums
-/**
- * write mode when write metrics and records
- */
-sealed trait WriteMode {}
+trait GriffinEnum extends Enumeration {
+ type GriffinEnum = Value
-object WriteMode {
- def defaultMode(procType: ProcessType): WriteMode = {
- procType match {
- case BatchProcessType => SimpleMode
- case StreamingProcessType => TimestampMode
- }
- }
-}
+ val Unknown = Value
-/**
- * simple mode: write metrics and records directly
- */
- case object SimpleMode extends WriteMode {}
+ /**
+ *
+ * @param name Constant value in String
+ * @return Enum constant value
+ */
+ def withNameWithDefault(name: String): Value =
+ values
+ .find(_.toString.toLowerCase == name.replace("-", "").toLowerCase())
+ .getOrElse(Unknown)
-/**
- * timestamp mode: write metrics and records with timestamp information
- */
- case object TimestampMode extends WriteMode {}
+}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
index aa7b8d5..dd0b2d1 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
@@ -17,66 +17,28 @@
package org.apache.griffin.measure.configuration.enums
-import scala.util.matching.Regex
-
/**
- * the strategy to flatten metric
+ * the strategy to output metric
+ * <li>{@link #MetricOutputType} - output the rule step result as metric</li>
+ * <li>{@link #RecordOutputType} - output the rule step result as
records</li>
+ * <li>{@link #DscUpdateOutputType} - output the rule step result to update
data source cache</li>
+ * <li>{@link #UnknownOutputType} - will not output the result </li>
*/
-sealed trait OutputType {
- val idPattern: Regex
- val desc: String
-}
-
-object OutputType {
- private val outputTypes: List[OutputType] = List(
- MetricOutputType,
- RecordOutputType,
- DscUpdateOutputType,
- UnknownOutputType
- )
-
- val default = UnknownOutputType
- def apply(ptn: String): OutputType = {
- outputTypes.find(tp => ptn match {
- case tp.idPattern() => true
- case _ => false
- }).getOrElse(default)
+object OutputType extends GriffinEnum {
+ type OutputType = Value
+
+ val MetricOutputType, RecordOutputType, DscUpdateOutputType,
+ UnknownOutputType = Value
+
+ val Metric, Record, Records, DscUpdate = Value
+
+ override def withNameWithDefault(name: String): Value = {
+ val flattenType = super.withNameWithDefault(name)
+ flattenType match {
+ case Metric => MetricOutputType
+ case Record | Records => RecordOutputType
+ case DscUpdate => DscUpdateOutputType
+ case _ => UnknownOutputType
+ }
}
- def unapply(pt: OutputType): Option[String] = Some(pt.desc)
-}
-
-/**
- * metric output type
- * output the rule step result as metric
- */
- case object MetricOutputType extends OutputType {
- val idPattern: Regex = "^(?i)metric$".r
- val desc: String = "metric"
-}
-
-/**
- * record output type
- * output the rule step result as records
- */
- case object RecordOutputType extends OutputType {
- val idPattern: Regex = "^(?i)record|records$".r
- val desc: String = "record"
-}
-
-/**
- * data source cache update output type
- * output the rule step result to update data source cache
- */
- case object DscUpdateOutputType extends OutputType {
- val idPattern: Regex = "^(?i)dsc-update$".r
- val desc: String = "dsc-update"
-}
-
-/**
- * unknown output type
- * will not output the result
- */
- case object UnknownOutputType extends OutputType {
- val idPattern: Regex = "".r
- val desc: String = "unknown"
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
index edfa94e..90e4ae9 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
@@ -17,43 +17,14 @@
package org.apache.griffin.measure.configuration.enums
-import scala.util.matching.Regex
-
/**
* process type enum
+ * <li>{@link #BatchProcessType} - Process in batch mode </li>
+ * <li>{@link #StreamingProcessType} - Process in streaming mode</li>
*/
-sealed trait ProcessType {
- val idPattern: Regex
- val desc: String
-}
-
-object ProcessType {
- private val procTypes: List[ProcessType] = List(
- BatchProcessType,
- StreamingProcessType
- )
+object ProcessType extends GriffinEnum {
+ type ProcessType = Value
- def apply(ptn: String): ProcessType = {
- procTypes.find(tp => ptn match {
- case tp.idPattern() => true
- case _ => false
- }).getOrElse(BatchProcessType)
- }
- def unapply(pt: ProcessType): Option[String] = Some(pt.desc)
-}
-
-/**
- * process in batch mode
- */
- case object BatchProcessType extends ProcessType {
- val idPattern = """^(?i)batch$""".r
- val desc = "batch"
-}
-
-/**
- * process in streaming mode
- */
- case object StreamingProcessType extends ProcessType {
- val idPattern = """^(?i)streaming$""".r
- val desc = "streaming"
+ val BatchProcessType = Value("Batch")
+ val StreamingProcessType = Value("Streaming")
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
index 62bd4b5..1476ae1 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -17,82 +17,39 @@
package org.apache.griffin.measure.configuration.enums
-import scala.util.matching.Regex
+import org.apache.griffin.measure.configuration.enums
/**
- * sink type
+ * Supported Sink types
+ * <li>{@link #Console #Log} - console sink, will sink metric in console
(alias log)</li>
+ * <li>{@link #Hdfs} - hdfs sink, will sink metric and record in hdfs</li>
+ * <li>{@link #Es #Elasticsearch #Http} - elasticsearch sink, will sink
metric
+ * in elasticsearch (alias Es and Http)</li>
+ * <li>{@link #Mongo #MongoDB} - mongo sink, will sink metric in mongo db
(alias MongoDb)</li>
+ * <li>{@link #Custom} - custom sink (needs using extra
jar-file-extension)</li>
+ * <li>{@link #Unknown} - </li>
*/
-sealed trait SinkType {
- val idPattern: Regex
- val desc: String
-}
-
-object SinkType {
- private val sinkTypes: List[SinkType] = List(
- ConsoleSinkType,
- HdfsSinkType,
- ElasticsearchSinkType,
- MongoSinkType,
- CustomSinkType,
- UnknownSinkType
- )
-
- def apply(ptn: String): SinkType = {
- sinkTypes.find(tp => ptn match {
- case tp.idPattern() => true
- case _ => false
- }).getOrElse(UnknownSinkType)
- }
+object SinkType extends GriffinEnum {
+ type SinkType = Value
- def unapply(pt: SinkType): Option[String] = Some(pt.desc)
+ val Console, Log, Hdfs, Es, Http, ElasticSearch, MongoDB, Mongo, Custom =
+ Value
def validSinkTypes(strs: Seq[String]): Seq[SinkType] = {
- val seq = strs.map(s => SinkType(s)).filter(_ != UnknownSinkType).distinct
- if (seq.size > 0) seq else Seq(ElasticsearchSinkType)
+ val seq = strs
+ .map(s => SinkType.withNameWithDefault(s))
+ .filter(_ != SinkType.Unknown)
+ .distinct
+ if (seq.size > 0) seq else Seq(SinkType.ElasticSearch)
}
-}
-
-/**
- * console sink, will sink metric in console
- */
-case object ConsoleSinkType extends SinkType {
- val idPattern = "^(?i)console|log$".r
- val desc = "console"
-}
-
-/**
- * hdfs sink, will sink metric and record in hdfs
- */
-case object HdfsSinkType extends SinkType {
- val idPattern = "^(?i)hdfs$".r
- val desc = "hdfs"
-}
-/**
- * elasticsearch sink, will sink metric in elasticsearch
- */
-case object ElasticsearchSinkType extends SinkType {
- val idPattern = "^(?i)es|elasticsearch|http$".r
- val desc = "elasticsearch"
-}
-
-/**
- * mongo sink, will sink metric in mongo db
- */
-case object MongoSinkType extends SinkType {
- val idPattern = "^(?i)mongo|mongodb$".r
- val desc = "mongo"
-}
-
-/**
- * custom sink (needs using extra jar-file-extension)
- */
-case object CustomSinkType extends SinkType {
- val idPattern = "^(?i)custom$".r
- val desc = "custom"
-}
-
-case object UnknownSinkType extends SinkType {
- val idPattern = "".r
- val desc = "unknown"
+ override def withNameWithDefault(name: String): enums.SinkType.Value = {
+ val sinkType = super.withNameWithDefault(name)
+ sinkType match {
+ case Console | Log => Console
+ case Es | ElasticSearch | Http => ElasticSearch
+ case MongoDB | Mongo => MongoDB
+ case _ => sinkType
+ }
+ }
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
index 9e74dda..7057588 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
@@ -23,10 +23,10 @@ package org.apache.griffin.measure.configuration.enums
sealed trait WriteMode {}
object WriteMode {
- def defaultMode(procType: ProcessType): WriteMode = {
+ def defaultMode(procType: ProcessType.ProcessType): WriteMode = {
procType match {
- case BatchProcessType => SimpleMode
- case StreamingProcessType => TimestampMode
+ case ProcessType.BatchProcessType => SimpleMode
+ case ProcessType.StreamingProcessType => TimestampMode
}
}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index 34acffa..bea7f7d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -20,7 +20,8 @@ package org.apache.griffin.measure.context
import org.apache.spark.sql.{Encoders, SparkSession}
import org.apache.griffin.measure.configuration.dqdefinition._
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
+import org.apache.griffin.measure.configuration.enums.WriteMode
import org.apache.griffin.measure.datasource._
import org.apache.griffin.measure.sink.{Sink, SinkFactory}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
index b621aa6..fedf6e3 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.functions._
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
-import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import
org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.job.builder.DQJobBuilder
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index af426d7..3c42094 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -25,7 +25,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.griffin.measure.configuration.dqdefinition._
-import org.apache.griffin.measure.configuration.enums._
+import
org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
import org.apache.griffin.measure.context._
import org.apache.griffin.measure.datasource.DataSourceFactory
import org.apache.griffin.measure.job.builder.DQJobBuilder
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index fc10bdb..44dca49 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -28,7 +28,7 @@ import org.apache.spark.streaming.{Milliseconds,
StreamingContext}
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition._
-import org.apache.griffin.measure.configuration.enums._
+import
org.apache.griffin.measure.configuration.enums.ProcessType.StreamingProcessType
import org.apache.griffin.measure.context._
import
org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
import org.apache.griffin.measure.context.streaming.metric.CacheResults
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index aab638e..e7806f4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -21,7 +21,7 @@ import scala.util.{Failure, Success, Try}
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.SinkType._
import org.apache.griffin.measure.utils.ParamUtil._
case class SinkFactory(sinkParamIter: Iterable[SinkParam],
@@ -42,11 +42,11 @@ case class SinkFactory(sinkParamIter: Iterable[SinkParam],
val config = sinkParam.getConfig
val sinkType = sinkParam.getType
val sinkTry = sinkType match {
- case ConsoleSinkType => Try(ConsoleSink(config, metricName, timeStamp))
- case HdfsSinkType => Try(HdfsSink(config, metricName, timeStamp))
- case ElasticsearchSinkType => Try(ElasticSearchSink(config, metricName,
timeStamp, block))
- case MongoSinkType => Try(MongoSink(config, metricName, timeStamp,
block))
- case CustomSinkType => Try(getCustomSink(config, metricName, timeStamp,
block))
+ case Console => Try(ConsoleSink(config, metricName, timeStamp))
+ case Hdfs => Try(HdfsSink(config, metricName, timeStamp))
+ case ElasticSearch => Try(ElasticSearchSink(config, metricName,
timeStamp, block))
+ case MongoDB => Try(MongoSink(config, metricName, timeStamp, block))
+ case Custom => Try(getCustomSink(config, metricName, timeStamp, block))
case _ => throw new Exception(s"sink type ${sinkType} is not supported!")
}
sinkTry match {
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
index 47f740f..efc4537 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
@@ -21,7 +21,8 @@ import org.apache.commons.lang.StringUtils
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.{DataSourceParam,
Param, RuleParam}
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.DslType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step._
@@ -74,9 +75,9 @@ object DQStepBuilder {
private def getRuleParamStepBuilder(dslType: DslType, dsNames: Seq[String],
funcNames: Seq[String]
): Option[RuleParamStepBuilder] = {
dslType match {
- case SparkSqlType => Some(SparkSqlDQStepBuilder())
+ case SparkSql => Some(SparkSqlDQStepBuilder())
case DataFrameOpsType => Some(DataFrameOpsDQStepBuilder())
- case GriffinDslType => Some(GriffinDslDQStepBuilder(dsNames, funcNames))
+ case GriffinDsl => Some(GriffinDslDQStepBuilder(dsNames, funcNames))
case _ => None
}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
index 804c503..c7f6051 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -18,7 +18,7 @@
package org.apache.griffin.measure.step.builder
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.OutputType._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.{DQStep, SeqDQStep}
import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep,
MetricWriteStep, RecordWriteStep}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
index 9b7269a..baec077 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
@@ -17,8 +17,7 @@
package org.apache.griffin.measure.step.builder.dsl.parser
-import org.apache.griffin.measure.configuration.enums._
-import org.apache.griffin.measure.step.builder.dsl._
+import org.apache.griffin.measure.configuration.enums.DqType._
import org.apache.griffin.measure.step.builder.dsl.expr._
/**
@@ -82,12 +81,12 @@ case class GriffinDslParser(dataSourceNames: Seq[String],
functionNames: Seq[Str
def parseRule(rule: String, dqType: DqType): ParseResult[Expr] = {
val rootExpr = dqType match {
- case AccuracyType => logicalExpression
- case ProfilingType => profilingClause
- case UniquenessType => uniquenessClause
- case DistinctnessType => distinctnessClause
- case TimelinessType => timelinessClause
- case CompletenessType => completenessClause
+ case Accuracy => logicalExpression
+ case Profiling => profilingClause
+ case Uniqueness => uniquenessClause
+ case Distinct => distinctnessClause
+ case Timeliness => timelinessClause
+ case Completeness => completenessClause
case _ => expression
}
parseAll(rootExpr, rule)
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index 931032b..c9e3c1c 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -18,7 +18,9 @@
package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import
org.apache.griffin.measure.configuration.enums.FlattenType.DefaultFlattenType
+import org.apache.griffin.measure.configuration.enums.OutputType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -26,7 +28,11 @@ import org.apache.griffin.measure.step.builder.dsl.expr._
import
org.apache.griffin.measure.step.builder.dsl.transform.analyzer.AccuracyAnalyzer
import org.apache.griffin.measure.step.transform.{DataFrameOps,
DataFrameOpsTransformStep, SparkSqlTransformStep}
import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys
-import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep,
MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.step.write.{
+ DataSourceUpdateWriteStep,
+ MetricWriteStep,
+ RecordWriteStep
+}
import org.apache.griffin.measure.utils.ParamUtil._
/**
@@ -34,8 +40,8 @@ import org.apache.griffin.measure.utils.ParamUtil._
*/
case class AccuracyExpr2DQSteps(context: DQContext,
expr: Expr,
- ruleParam: RuleParam
- ) extends Expr2DQSteps {
+ ruleParam: RuleParam)
+ extends Expr2DQSteps {
private object AccuracyKeys {
val _source = "source"
@@ -65,36 +71,52 @@ case class AccuracyExpr2DQSteps(context: DQContext,
// 1. miss record
val missRecordsTableName = "__missRecords"
val selClause = s"`${sourceName}`.*"
- val missRecordsSql = if
(!context.runTimeTableRegister.existsTable(targetName)) {
- warn(s"[${timestamp}] data source ${targetName} not exists")
- s"SELECT ${selClause} FROM `${sourceName}`"
- } else {
- val onClause = expr.coalesceDesc
- val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
- s"${sel.desc} IS NULL"
- }.mkString(" AND ")
- val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
- s"${sel.desc} IS NULL"
- }.mkString(" AND ")
- val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
- s"SELECT ${selClause} FROM `${sourceName}` " +
- s"LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
- }
+ val missRecordsSql =
+ if (!context.runTimeTableRegister.existsTable(targetName)) {
+ warn(s"[${timestamp}] data source ${targetName} not exists")
+ s"SELECT ${selClause} FROM `${sourceName}`"
+ } else {
+ val onClause = expr.coalesceDesc
+ val sourceIsNull = analyzer.sourceSelectionExprs
+ .map { sel =>
+ s"${sel.desc} IS NULL"
+ }
+ .mkString(" AND ")
+ val targetIsNull = analyzer.targetSelectionExprs
+ .map { sel =>
+ s"${sel.desc} IS NULL"
+ }
+ .mkString(" AND ")
+ val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+ s"SELECT ${selClause} FROM `${sourceName}` " +
+ s"LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+ }
val missRecordsWriteSteps = procType match {
case BatchProcessType =>
val rwName =
- ruleParam.getOutputOpt(RecordOutputType).
- flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
+ ruleParam
+ .getOutputOpt(RecordOutputType)
+ .flatMap(_.getNameOpt)
+ .getOrElse(missRecordsTableName)
RecordWriteStep(rwName, missRecordsTableName)
case StreamingProcessType =>
val dsName =
-
ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
+ ruleParam
+ .getOutputOpt(DscUpdateOutputType)
+ .flatMap(_.getNameOpt)
+ .getOrElse(sourceName)
DataSourceUpdateWriteStep(dsName, missRecordsTableName)
}
val missRecordsTransStep =
- SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap,
Some(missRecordsWriteSteps), true)
+ SparkSqlTransformStep(
+ missRecordsTableName,
+ missRecordsSql,
+ emptyMap,
+ Some(missRecordsWriteSteps),
+ true
+ )
// 2. miss count
val missCountTableName = "__missCount"
@@ -106,19 +128,22 @@ case class AccuracyExpr2DQSteps(context: DQContext,
s"SELECT `${ConstantColumns.tmst}`,COUNT(*) AS `${missColName}` " +
s"FROM `${missRecordsTableName}` GROUP BY
`${ConstantColumns.tmst}`"
}
- val missCountTransStep = SparkSqlTransformStep(missCountTableName,
missCountSql, emptyMap)
+ val missCountTransStep =
+ SparkSqlTransformStep(missCountTableName, missCountSql, emptyMap)
missCountTransStep.parentSteps += missRecordsTransStep
// 3. total count
val totalCountTableName = "__totalCount"
val totalColName = details.getStringOrKey(_total)
val totalCountSql = procType match {
- case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM
`${sourceName}`"
+ case BatchProcessType =>
+ s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
case StreamingProcessType =>
s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` " +
s"FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`"
}
- val totalCountTransStep = SparkSqlTransformStep(totalCountTableName,
totalCountSql, emptyMap)
+ val totalCountTransStep =
+ SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
// 4. accuracy metric
val accuracyTableName = ruleParam.getOutDfName()
@@ -153,14 +178,22 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val accuracyMetricWriteStep = procType match {
case BatchProcessType =>
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
- val mwName =
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
- val flattenType =
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+ val mwName =
+ metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
+ val flattenType = metricOpt
+ .map(_.getFlatten)
+ .getOrElse(DefaultFlattenType)
Some(MetricWriteStep(mwName, accuracyTableName, flattenType))
case StreamingProcessType => None
}
val accuracyTransStep =
- SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap,
accuracyMetricWriteStep)
+ SparkSqlTransformStep(
+ accuracyTableName,
+ accuracyMetricSql,
+ emptyMap,
+ accuracyMetricWriteStep
+ )
accuracyTransStep.parentSteps += missCountTransStep
accuracyTransStep.parentSteps += totalCountTransStep
@@ -171,22 +204,30 @@ case class AccuracyExpr2DQSteps(context: DQContext,
// 5. accuracy metric merge
val accuracyMetricTableName = "__accuracy"
val accuracyMetricRule = DataFrameOps._accuracy
- val accuracyMetricDetails = Map[String, Any](
- (AccuracyOprKeys._miss -> missColName),
- (AccuracyOprKeys._total -> totalColName),
- (AccuracyOprKeys._matched -> matchedColName)
+ val accuracyMetricDetails: Map[String, Any] = Map(
+ (AccuracyOprKeys._miss, missColName),
+ (AccuracyOprKeys._total, totalColName),
+ (AccuracyOprKeys._matched, matchedColName)
)
val accuracyMetricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
- val mwName =
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
- val flattenType =
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+ val mwName = metricOpt
+ .flatMap(_.getNameOpt)
+ .getOrElse(ruleParam.getOutDfName())
+ val flattenType = metricOpt
+ .map(_.getFlatten)
+ .getOrElse(DefaultFlattenType)
MetricWriteStep(mwName, accuracyMetricTableName, flattenType)
}
- val accuracyMetricTransStep =
DataFrameOpsTransformStep(accuracyMetricTableName,
- accuracyTableName, accuracyMetricRule, accuracyMetricDetails,
Some(accuracyMetricWriteStep))
+ val accuracyMetricTransStep = DataFrameOpsTransformStep(
+ accuracyMetricTableName,
+ accuracyTableName,
+ accuracyMetricRule,
+ accuracyMetricDetails,
+ Some(accuracyMetricWriteStep)
+ )
accuracyMetricTransStep.parentSteps += accuracyTransStep
-
// 6. collect accuracy records
val accuracyRecordTableName = "__accuracyRecords"
val accuracyRecordSql = {
@@ -198,13 +239,23 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val accuracyRecordWriteStep = {
val rwName =
- ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+ ruleParam
+ .getOutputOpt(RecordOutputType)
+ .flatMap(_.getNameOpt)
.getOrElse(missRecordsTableName)
- RecordWriteStep(rwName, missRecordsTableName,
Some(accuracyRecordTableName))
+ RecordWriteStep(
+ rwName,
+ missRecordsTableName,
+ Some(accuracyRecordTableName)
+ )
}
val accuracyRecordTransStep = SparkSqlTransformStep(
- accuracyRecordTableName, accuracyRecordSql, emptyMap,
Some(accuracyRecordWriteStep))
+ accuracyRecordTableName,
+ accuracyRecordSql,
+ emptyMap,
+ Some(accuracyRecordWriteStep)
+ )
accuracyRecordTransStep.parentSteps += accuracyMetricTransStep
accuracyRecordTransStep :: Nil
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 1a611da..360c8e9 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -20,7 +20,9 @@ package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.commons.lang.StringUtils
import
org.apache.griffin.measure.configuration.dqdefinition.{RuleErrorConfParam,
RuleParam}
-import org.apache.griffin.measure.configuration.enums._
+import
org.apache.griffin.measure.configuration.enums.FlattenType.DefaultFlattenType
+import org.apache.griffin.measure.configuration.enums.OutputType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -165,7 +167,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
val completeWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName =
metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName)
- val flattenType =
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+ val flattenType =
metricOpt.map(_.getFlatten).getOrElse(DefaultFlattenType)
MetricWriteStep(mwName, completeTableName, flattenType)
}
val completeTransStep =
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index b68b3f7..6be5e17 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -18,7 +18,9 @@
package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import
org.apache.griffin.measure.configuration.enums.FlattenType.{ArrayFlattenType,
EntriesFlattenType}
+import org.apache.griffin.measure.configuration.enums.OutputType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
index 278e351..8955a8a 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
@@ -19,7 +19,7 @@ package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.DqType._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.dsl.expr.Expr
@@ -46,12 +46,12 @@ object Expr2DQSteps {
ruleParam: RuleParam
): Expr2DQSteps = {
ruleParam.getDqType match {
- case AccuracyType => AccuracyExpr2DQSteps(context, expr, ruleParam)
- case ProfilingType => ProfilingExpr2DQSteps(context, expr, ruleParam)
- case UniquenessType => UniquenessExpr2DQSteps(context, expr, ruleParam)
- case DistinctnessType => DistinctnessExpr2DQSteps(context, expr,
ruleParam)
- case TimelinessType => TimelinessExpr2DQSteps(context, expr, ruleParam)
- case CompletenessType => CompletenessExpr2DQSteps(context, expr,
ruleParam)
+ case Accuracy => AccuracyExpr2DQSteps(context, expr, ruleParam)
+ case Profiling => ProfilingExpr2DQSteps(context, expr, ruleParam)
+ case Uniqueness => UniquenessExpr2DQSteps(context, expr, ruleParam)
+ case Distinct => DistinctnessExpr2DQSteps(context, expr, ruleParam)
+ case Timeliness => TimelinessExpr2DQSteps(context, expr, ruleParam)
+ case Completeness => CompletenessExpr2DQSteps(context, expr, ruleParam)
case _ => emtptExpr2DQSteps
}
}
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index 928fbd4..bc111fa 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -20,7 +20,9 @@ package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.commons.lang.StringUtils
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums.{BatchProcessType,
FlattenType, MetricOutputType, StreamingProcessType}
+import
org.apache.griffin.measure.configuration.enums.FlattenType.DefaultFlattenType
+import org.apache.griffin.measure.configuration.enums.OutputType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -99,7 +101,7 @@ case class ProfilingExpr2DQSteps(context: DQContext,
val profilingMetricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName =
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
- val flattenType =
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+ val flattenType =
metricOpt.map(_.getFlatten).getOrElse(DefaultFlattenType)
MetricWriteStep(mwName, profilingName, flattenType)
}
val profilingTransStep =
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index ec6a178..aea5dca 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -18,7 +18,9 @@
package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import
org.apache.griffin.measure.configuration.enums.FlattenType.{ArrayFlattenType,
DefaultFlattenType}
+import org.apache.griffin.measure.configuration.enums.OutputType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -131,7 +133,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
val metricWriteStep = {
val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName =
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
- val flattenType =
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+ val flattenType =
metricOpt.map(_.getFlatten).getOrElse(DefaultFlattenType)
MetricWriteStep(mwName, metricTableName, flattenType)
}
val metricTransStep =
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index eed07e4..534641b 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -18,7 +18,9 @@
package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import
org.apache.griffin.measure.configuration.enums.FlattenType.{ArrayFlattenType,
EntriesFlattenType}
+import org.apache.griffin.measure.configuration.enums.OutputType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
import org.apache.griffin.measure.step.builder.ConstantColumns
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
index 581c42d..ca93710 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
@@ -18,7 +18,7 @@
package org.apache.griffin.measure.step.builder.preproc
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.DslType._
/**
* generate each entity pre-proc params by template defined in pre-proc param
diff --git
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index d83f6ca..6b99765 100644
---
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -17,7 +17,8 @@
package org.apache.griffin.measure.step.write
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.{SimpleMode,
TimestampMode}
+import
org.apache.griffin.measure.configuration.enums.FlattenType.{ArrayFlattenType,
EntriesFlattenType, FlattenType, MapFlattenType}
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.utils.JsonUtil
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
new file mode 100644
index 0000000..a30d51e
--- /dev/null
+++
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.configuration.dqdefinition.reader
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig,
EvaluateRuleParam, RuleOutputParam, RuleParam}
+
+
+class ParamEnumReaderSpec extends FlatSpec with Matchers {
+ import org.apache.griffin.measure.configuration.enums.DslType._
+ "dsltype" should "be parsed to predefined set of values" in {
+ val validDslSparkSqlValues =
+ Seq("spark-sql", "spark-SQL", "SPARK-SQL", "sparksql")
+ validDslSparkSqlValues foreach { x =>
+ val ruleParam = new RuleParam(x, "accuracy")
+ ruleParam.getDslType should ===(SparkSql)
+ }
+ val invalidDslSparkSqlValues = Seq("spark", "sql", "")
+ invalidDslSparkSqlValues foreach { x =>
+ val ruleParam = new RuleParam(x, "accuracy")
+ ruleParam.getDslType should not be (SparkSql)
+ }
+
+ val validDslGriffinValues =
+ Seq("griffin-dsl", "griffindsl", "griFfin-dsl", "")
+ validDslGriffinValues foreach { x =>
+ val ruleParam = new RuleParam(x, "accuracy")
+ ruleParam.getDslType should ===(GriffinDsl)
+ }
+
+ val validDslDfOpsValues = Seq(
+ "df-ops",
+ "dfops",
+ "DFOPS",
+ "df-opr",
+ "dfopr",
+ "df-operations",
+ "dfoperations"
+ )
+ validDslDfOpsValues foreach { x =>
+ val ruleParam = new RuleParam(x, "accuracy")
+ ruleParam.getDslType should ===(DataFrameOpsType)
+ }
+
+ val invalidDslDfOpsValues = Seq("df-oprts", "-")
+ invalidDslDfOpsValues foreach { x =>
+ val ruleParam = new RuleParam(x, "accuracy")
+ ruleParam.getDslType should not be (DataFrameOpsType)
+ }
+ }
+
+ "griffindsl" should "be returned as default dsl type" in {
+ import org.apache.griffin.measure.configuration.enums.DslType._
+ val dslGriffinDslValues = Seq("griffin", "dsl")
+ dslGriffinDslValues foreach { x =>
+ val ruleParam = new RuleParam(x, "accuracy")
+ ruleParam.getDslType should be(GriffinDsl)
+ }
+ }
+
+ "dqtype" should "be parsed to predefined set of values" in {
+ import org.apache.griffin.measure.configuration.enums.DqType._
+ var ruleParam = new RuleParam("griffin-dsl", "accuracy")
+ ruleParam.getDqType should be(Accuracy)
+ ruleParam = new RuleParam("griffin-dsl", "accu")
+ ruleParam.getDqType should not be (Accuracy)
+ ruleParam.getDqType should be(Unknown)
+
+ ruleParam = new RuleParam("griffin-dsl", "profiling")
+ ruleParam.getDqType should be(Profiling)
+ ruleParam = new RuleParam("griffin-dsl", "profilin")
+ ruleParam.getDqType should not be (Profiling)
+ ruleParam.getDqType should be(Unknown)
+
+ ruleParam = new RuleParam("griffin-dsl", "TIMELINESS")
+ ruleParam.getDqType should be(Timeliness)
+ ruleParam = new RuleParam("griffin-dsl", "timeliness ")
+ ruleParam.getDqType should not be (Timeliness)
+ ruleParam.getDqType should be(Unknown)
+
+ ruleParam = new RuleParam("griffin-dsl", "UNIQUENESS")
+ ruleParam.getDqType should be(Uniqueness)
+ ruleParam = new RuleParam("griffin-dsl", "UNIQUE")
+ ruleParam.getDqType should not be (Uniqueness)
+ ruleParam.getDqType should be(Unknown)
+
+ ruleParam = new RuleParam("griffin-dsl", "Duplicate")
+ ruleParam.getDqType should be(Uniqueness)
+ ruleParam = new RuleParam("griffin-dsl", "duplica")
+ ruleParam.getDqType should not be (Duplicate)
+ ruleParam.getDqType should be(Unknown)
+
+ ruleParam = new RuleParam("griffin-dsl", "COMPLETENESS")
+ ruleParam.getDqType should be(Completeness)
+ ruleParam = new RuleParam("griffin-dsl", "complete")
+ ruleParam.getDqType should not be (Completeness)
+ ruleParam.getDqType should be(Unknown)
+
+ ruleParam = new RuleParam("griffin-dsl", "")
+ ruleParam.getDqType should be(Unknown)
+ ruleParam = new RuleParam("griffin-dsl", "duplicate")
+ ruleParam.getDqType should not be (Unknown)
+ }
+
+ "outputtype" should "be valid" in {
+ import org.apache.griffin.measure.configuration.enums.OutputType._
+ var ruleOutputParam = new RuleOutputParam("metric", "", "map")
+ ruleOutputParam.getOutputType should be(MetricOutputType)
+ ruleOutputParam = new RuleOutputParam("metr", "", "map")
+ ruleOutputParam.getOutputType should not be (MetricOutputType)
+ ruleOutputParam.getOutputType should be(UnknownOutputType)
+
+ ruleOutputParam = new RuleOutputParam("record", "", "map")
+ ruleOutputParam.getOutputType should be(RecordOutputType)
+ ruleOutputParam = new RuleOutputParam("rec", "", "map")
+ ruleOutputParam.getOutputType should not be (RecordOutputType)
+ ruleOutputParam.getOutputType should be(UnknownOutputType)
+
+ ruleOutputParam = new RuleOutputParam("dscupdate", "", "map")
+ ruleOutputParam.getOutputType should be(DscUpdateOutputType)
+ ruleOutputParam = new RuleOutputParam("dsc", "", "map")
+ ruleOutputParam.getOutputType should not be (DscUpdateOutputType)
+ ruleOutputParam.getOutputType should be(UnknownOutputType)
+
+ }
+
+ "flattentype" should "be valid" in {
+ import org.apache.griffin.measure.configuration.enums.FlattenType._
+ var ruleOutputParam = new RuleOutputParam("metric", "", "map")
+ ruleOutputParam.getFlatten should be(MapFlattenType)
+ ruleOutputParam = new RuleOutputParam("metric", "", "metr")
+ ruleOutputParam.getFlatten should not be (MapFlattenType)
+ ruleOutputParam.getFlatten should be(DefaultFlattenType)
+
+ ruleOutputParam = new RuleOutputParam("metric", "", "array")
+ ruleOutputParam.getFlatten should be(ArrayFlattenType)
+ ruleOutputParam = new RuleOutputParam("metric", "", "list")
+ ruleOutputParam.getFlatten should be(ArrayFlattenType)
+ ruleOutputParam = new RuleOutputParam("metric", "", "arrays")
+ ruleOutputParam.getFlatten should not be (ArrayFlattenType)
+ ruleOutputParam.getFlatten should be(DefaultFlattenType)
+
+ ruleOutputParam = new RuleOutputParam("metric", "", "entries")
+ ruleOutputParam.getFlatten should be(EntriesFlattenType)
+ ruleOutputParam = new RuleOutputParam("metric", "", "entry")
+ ruleOutputParam.getFlatten should not be (EntriesFlattenType)
+ ruleOutputParam.getFlatten should be(DefaultFlattenType)
+ }
+
+ "sinktype" should "be valid" in {
+ import org.mockito.Mockito._
+ import org.apache.griffin.measure.configuration.enums.SinkType._
+ var dqConfig = new DQConfig(
+ "test",
+ 1234,
+ "",
+ Nil,
+ mock(classOf[EvaluateRuleParam]),
+ List(
+ "Console",
+ "Log",
+ "CONSOLE",
+ "LOG",
+ "Es",
+ "ElasticSearch",
+ "Http",
+ "MongoDB",
+ "mongo",
+ "hdfs"
+ )
+ )
+ dqConfig.getValidSinkTypes should be(
+ Seq(
+ Console,
+ ElasticSearch,
+ MongoDB,
+ Hdfs
+ )
+ )
+ dqConfig = new DQConfig(
+ "test",
+ 1234,
+ "",
+ Nil,
+ mock(classOf[EvaluateRuleParam]),
+ List("Consol", "Logg")
+ )
+ dqConfig.getValidSinkTypes should not be (Seq(Console))
+ dqConfig.getValidSinkTypes should be(Seq(ElasticSearch))
+
+ dqConfig = new DQConfig(
+ "test",
+ 1234,
+ "",
+ Nil,
+ mock(classOf[EvaluateRuleParam]),
+ List("")
+ )
+ dqConfig.getValidSinkTypes should be(Seq(ElasticSearch))
+ }
+
+}
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
index 024e96b..38d8db3 100644
---
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
+++
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
@@ -17,21 +17,22 @@
package org.apache.griffin.measure.configuration.dqdefinition.reader
-import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
import org.scalatest._
+import scala.util.{Failure, Success}
+import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
+import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
-import scala.util.{Failure, Success}
class ParamFileReaderSpec extends FlatSpec with Matchers{
"params " should "be parsed from a valid file" in {
- val reader :ParamReader =
ParamFileReader(getClass.getResource("/_accuracy-batch-griffindsl.json").getFile)
+ val reader: ParamReader =
ParamFileReader(getClass.getResource("/_accuracy-batch-griffindsl.json").getFile)
val params = reader.readConfig[DQConfig]
params match {
case Success(v) =>
- v.getEvaluateRule.getRules(0).getDslType.desc should ===
("griffin-dsl")
+ v.getEvaluateRule.getRules(0).getDslType should === (GriffinDsl)
v.getEvaluateRule.getRules(0).getOutDfName() should === ("accu")
case Failure(_) =>
fail("it should not happen")
@@ -40,7 +41,8 @@ class ParamFileReaderSpec extends FlatSpec with Matchers{
}
it should "fail for an invalid file" in {
- val reader :ParamReader =
ParamFileReader(getClass.getResource("/invalidconfigs/missingrule_accuracy_batch_sparksql.json").getFile)
+ val reader: ParamReader = ParamFileReader(getClass.
+
getResource("/invalidconfigs/missingrule_accuracy_batch_sparksql.json").getFile)
val params = reader.readConfig[DQConfig]
params match {
case Success(_) =>
@@ -52,7 +54,8 @@ class ParamFileReaderSpec extends FlatSpec with Matchers{
}
it should "fail for an invalid completeness json file" in {
- val reader: ParamFileReader =
ParamFileReader(getClass.getResource("/invalidconfigs/invalidtype_completeness_batch_griffindal.json").getFile)
+ val reader: ParamFileReader = ParamFileReader(getClass.
+
getResource("/invalidconfigs/invalidtype_completeness_batch_griffindal.json").getFile)
val params = reader.readConfig[DQConfig]
params match {
case Success(_) =>
@@ -63,7 +66,8 @@ class ParamFileReaderSpec extends FlatSpec with Matchers{
}
it should "be parsed from a valid errorconf completeness json file" in {
- val reader :ParamReader =
ParamFileReader(getClass.getResource("/_completeness_errorconf-batch-griffindsl.json").getFile)
+ val reader: ParamReader = ParamFileReader(getClass.
+ getResource("/_completeness_errorconf-batch-griffindsl.json").getFile)
val params = reader.readConfig[DQConfig]
params match {
case Success(v) =>
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
index 6227c20..54bd95b 100644
---
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
+++
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
@@ -17,12 +17,15 @@
package org.apache.griffin.measure.configuration.dqdefinition.reader
-import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
-import org.scalatest.{FlatSpec, Matchers}
-
import scala.io.Source
+
+import org.scalatest.{FlatSpec, Matchers}
import scala.util.{Failure, Success}
+import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
+import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
+
+
class ParamJsonReaderSpec extends FlatSpec with Matchers{
@@ -31,11 +34,11 @@ class ParamJsonReaderSpec extends FlatSpec with Matchers{
val jsonString = bufferedSource.getLines().mkString
bufferedSource.close
- val reader :ParamReader = ParamJsonReader(jsonString)
+ val reader: ParamReader = ParamJsonReader(jsonString)
val params = reader.readConfig[DQConfig]
params match {
case Success(v) =>
- v.getEvaluateRule.getRules(0).getDslType.desc should ===
("griffin-dsl")
+ v.getEvaluateRule.getRules(0).getDslType should === (GriffinDsl)
v.getEvaluateRule.getRules(0).getOutDfName() should === ("accu")
case Failure(_) =>
fail("it should not happen")
@@ -44,11 +47,12 @@ class ParamJsonReaderSpec extends FlatSpec with Matchers{
}
it should "fail for an invalid file" in {
- val bufferedSource =
Source.fromFile(getClass.getResource("/invalidconfigs/missingrule_accuracy_batch_sparksql.json").getFile)
+ val bufferedSource = Source.fromFile(getClass.
+
getResource("/invalidconfigs/missingrule_accuracy_batch_sparksql.json").getFile)
val jsonString = bufferedSource.getLines().mkString
bufferedSource.close
- val reader :ParamReader = ParamJsonReader(jsonString)
+ val reader: ParamReader = ParamJsonReader(jsonString)
val params = reader.readConfig[DQConfig]
params match {
case Success(_) =>
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
index bf163aa..952c591 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -19,16 +19,15 @@ package org.apache.griffin.measure.job
import scala.util.Failure
import scala.util.Success
-
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FlatSpec
import org.scalatest.Matchers
-
import org.apache.griffin.measure.Application._
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.SparkSuiteBase
import org.apache.griffin.measure.configuration.dqdefinition._
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.ProcessType
+import org.apache.griffin.measure.configuration.enums.ProcessType._
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.launch.batch.BatchDQApp
import org.apache.griffin.measure.launch.streaming.StreamingDQApp
@@ -55,7 +54,7 @@ class DQAppTest extends FlatSpec with SparkSuiteBase with
BeforeAndAfterAll with
val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
// choose process
- val procType = ProcessType(allParam.getDqConfig.getProcType)
+ val procType =
ProcessType.withNameWithDefault(allParam.getDqConfig.getProcType)
dqApp = procType match {
case BatchProcessType => BatchDQApp(allParam)
case StreamingProcessType => StreamingDQApp(allParam)
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
index c226299..33886f8 100644
---
a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
+++
b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.sink
import scala.collection.mutable
import org.apache.griffin.measure.configuration.dqdefinition.{RuleOutputParam,
SinkParam}
-import org.apache.griffin.measure.configuration.enums.FlattenType
+import
org.apache.griffin.measure.configuration.enums.FlattenType.DefaultFlattenType
import org.apache.griffin.measure.step.write.{MetricFlushStep,
MetricWriteStep, RecordWriteStep}
class CustomSinkTest extends SinkTestBase {
@@ -113,7 +113,7 @@ class CustomSinkTest extends SinkTestBase {
val metricWriteStep = {
val metricOpt = Some(metricsDefaultOutput)
val mwName =
metricOpt.flatMap(_.getNameOpt).getOrElse("default_metrics_name")
- val flattenType =
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+ val flattenType =
metricOpt.map(_.getFlatten).getOrElse(DefaultFlattenType)
MetricWriteStep(mwName, resultTable, flattenType)
}
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
index c474081..5380b4c 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
@@ -25,7 +25,7 @@ import org.scalatest.Matchers
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
-import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import
org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
import org.apache.griffin.measure.context.{ContextId, DQContext}
import org.apache.griffin.measure.SparkSuiteBase
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
index b7f1bc3..c3a9330 100644
---
a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
+++
b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.step
import org.scalatest._
import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import
org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
import org.apache.griffin.measure.context.ContextId
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.SparkSuiteBase
diff --git
a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
index 11042ff..2bf5119 100644
---
a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
+++
b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
@@ -19,9 +19,8 @@ package org.apache.griffin.measure.transformations
import org.apache.spark.sql.DataFrame
import org.scalatest._
-
import org.apache.griffin.measure.configuration.dqdefinition._
-import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import
org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
import org.apache.griffin.measure.context.{ContextId, DQContext}
import org.apache.griffin.measure.datasource.DataSourceFactory
import org.apache.griffin.measure.job.builder.DQJobBuilder