This is an automated email from the ASF dual-hosted git repository.

wankun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new ed47c2b  Enum based configs
ed47c2b is described below

commit ed47c2b2232e07adea0a8919786ce835a15f2191
Author: tusharpatil <tus>
AuthorDate: Mon Dec 16 21:22:20 2019 +0800

    Enum based configs
    
    **What changes were proposed in this pull request?**
    
    All the predefined: `DQTypes, DSLTypes, FlattenType, OutputType, 
ProcessType, SinkType and WriteMode` are compared with config using a 
regex-based approach. This will make unnecessary overhead in terms of execution 
time and maintainability.
    
    This PR uses a predefined enum based approach than regex-based to provide 
the same functionality.
    
    **Does this PR introduce any user-facing change?**
    No
    
    **How was this patch tested?**
    Griffin test suite.
    
    Author: tusharpatil <tus>
    Author: tusharpatil20 <[email protected]>
    
    Closes #558 from tusharpatil20/enum-based-configs.
---
 .../org/apache/griffin/measure/Application.scala   |   5 +-
 .../configuration/dqdefinition/DQConfig.scala      |  23 ++-
 .../configuration/dqdefinition/EnvConfig.scala     |   5 +-
 .../measure/configuration/enums/DqType.scala       | 123 ++++--------
 .../measure/configuration/enums/DslType.scala      |  59 +++---
 .../measure/configuration/enums/FlattenType.scala  | 108 ++++------
 .../enums/{WriteMode.scala => GriffinEnum.scala}   |  33 ++--
 .../measure/configuration/enums/OutputType.scala   |  80 ++------
 .../measure/configuration/enums/ProcessType.scala  |  41 +---
 .../measure/configuration/enums/SinkType.scala     |  97 +++------
 .../measure/configuration/enums/WriteMode.scala    |   6 +-
 .../apache/griffin/measure/context/DQContext.scala |   3 +-
 .../datasource/connector/DataConnector.scala       |   2 +-
 .../griffin/measure/launch/batch/BatchDQApp.scala  |   2 +-
 .../measure/launch/streaming/StreamingDQApp.scala  |   2 +-
 .../apache/griffin/measure/sink/SinkFactory.scala  |  12 +-
 .../measure/step/builder/DQStepBuilder.scala       |   7 +-
 .../step/builder/RuleParamStepBuilder.scala        |   2 +-
 .../step/builder/dsl/parser/GriffinDslParser.scala |  15 +-
 .../dsl/transform/AccuracyExpr2DQSteps.scala       | 133 +++++++++----
 .../dsl/transform/CompletenessExpr2DQSteps.scala   |   6 +-
 .../dsl/transform/DistinctnessExpr2DQSteps.scala   |   4 +-
 .../step/builder/dsl/transform/Expr2DQSteps.scala  |  14 +-
 .../dsl/transform/ProfilingExpr2DQSteps.scala      |   6 +-
 .../dsl/transform/TimelinessExpr2DQSteps.scala     |   6 +-
 .../dsl/transform/UniquenessExpr2DQSteps.scala     |   4 +-
 .../step/builder/preproc/PreProcParamMaker.scala   |   2 +-
 .../measure/step/write/MetricWriteStep.scala       |   3 +-
 .../dqdefinition/reader/ParamEnumReaderSpec.scala  | 218 +++++++++++++++++++++
 .../dqdefinition/reader/ParamFileReaderSpec.scala  |  18 +-
 .../dqdefinition/reader/ParamJsonReaderSpec.scala  |  18 +-
 .../org/apache/griffin/measure/job/DQAppTest.scala |   7 +-
 .../griffin/measure/sink/CustomSinkTest.scala      |   4 +-
 .../apache/griffin/measure/sink/SinkTestBase.scala |   2 +-
 .../griffin/measure/step/TransformStepTest.scala   |   2 +-
 .../AccuracyTransformationsIntegrationTest.scala   |   3 +-
 36 files changed, 580 insertions(+), 495 deletions(-)

diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/Application.scala 
b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index 6bad4d5..deb2781 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -22,7 +22,8 @@ import scala.util.{Failure, Success, Try}
 
 import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, 
EnvConfig, GriffinConfig, Param}
 import 
org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.ProcessType
+import org.apache.griffin.measure.configuration.enums.ProcessType._
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.launch.batch.BatchDQApp
 import org.apache.griffin.measure.launch.streaming.StreamingDQApp
@@ -62,7 +63,7 @@ object Application extends Loggable {
     val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
 
     // choose process
-    val procType = ProcessType(allParam.getDqConfig.getProcType)
+    val procType = 
ProcessType.withNameWithDefault(allParam.getDqConfig.getProcType)
     val dqApp: DQApp = procType match {
       case BatchProcessType => BatchDQApp(allParam)
       case StreamingProcessType => StreamingDQApp(allParam)
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 77b7bac..afa1822 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -21,7 +21,12 @@ import com.fasterxml.jackson.annotation.{JsonInclude, 
JsonProperty}
 import com.fasterxml.jackson.annotation.JsonInclude.Include
 import org.apache.commons.lang.StringUtils
 
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.{DqType, DslType, 
FlattenType, OutputType, SinkType}
+import org.apache.griffin.measure.configuration.enums.DqType._
+import org.apache.griffin.measure.configuration.enums.DslType.{DslType, 
GriffinDsl}
+import 
org.apache.griffin.measure.configuration.enums.FlattenType.{DefaultFlattenType, 
FlattenType}
+import org.apache.griffin.measure.configuration.enums.OutputType.{OutputType, 
UnknownOutputType}
+import org.apache.griffin.measure.configuration.enums.SinkType.SinkType
 
 /**
   * dq param
@@ -153,8 +158,8 @@ case class RuleParam(@JsonProperty("dsl.type") private val 
dslType: String,
                      @JsonProperty("out") private val outputs: 
List[RuleOutputParam] = null,
                      @JsonProperty("error.confs") private val errorConfs: 
List[RuleErrorConfParam] = null
                     ) extends Param {
-  def getDslType: DslType = if (dslType != null) DslType(dslType) else 
DslType("")
-  def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
+  def getDslType: DslType = if (dslType != null) 
DslType.withNameWithDefault(dslType) else GriffinDsl
+  def getDqType: DqType = if (dqType != null) 
DqType.withNameWithDefault(dqType) else Unknown
   def getCache: Boolean = if (cache) cache else false
 
   def getInDfName(defName: String = ""): String = if (inDfName != null) 
inDfName else defName
@@ -185,7 +190,7 @@ case class RuleParam(@JsonProperty("dsl.type") private val 
dslType: String,
   }
 
   def validate(): Unit = {
-    assert(!(getDslType.equals(GriffinDslType) && 
getDqType.equals(UnknownType)),
+    assert(!(getDslType.equals(GriffinDsl) && getDqType.equals(Unknown)),
       "unknown dq type for griffin dsl")
 
     getOutputs.foreach(_.validate)
@@ -204,9 +209,15 @@ case class RuleOutputParam( @JsonProperty("type") private 
val outputType: String
                             @JsonProperty("name") private val name: String,
                             @JsonProperty("flatten") private val flatten: 
String
                           ) extends Param {
-  def getOutputType: OutputType = if (outputType != null) 
OutputType(outputType) else OutputType("")
+  def getOutputType: OutputType = {
+    if (outputType != null) OutputType.withNameWithDefault(outputType)
+    else UnknownOutputType
+  }
   def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name)) 
Some(name) else None
-  def getFlatten: FlattenType = if (StringUtils.isNotBlank(flatten)) 
FlattenType(flatten) else FlattenType("")
+  def getFlatten: FlattenType = {
+    if (StringUtils.isNotBlank(flatten)) 
FlattenType.withNameWithDefault(flatten)
+    else DefaultFlattenType
+  }
 
   def validate(): Unit = {}
 }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
index 22edf2b..4c5f937 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
@@ -21,7 +21,8 @@ import com.fasterxml.jackson.annotation.{JsonInclude, 
JsonProperty}
 import com.fasterxml.jackson.annotation.JsonInclude.Include
 import org.apache.commons.lang.StringUtils
 
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.SinkType
+import org.apache.griffin.measure.configuration.enums.SinkType.SinkType
 
 /**
   * environment param
@@ -86,7 +87,7 @@ case class SparkParam( @JsonProperty("log.level") private val 
logLevel: String,
 case class SinkParam(@JsonProperty("type") private val sinkType: String,
                      @JsonProperty("config") private val config: Map[String, 
Any]
                     ) extends Param {
-  def getType: SinkType = SinkType(sinkType)
+  def getType: SinkType = SinkType.withNameWithDefault(sinkType)
   def getConfig: Map[String, Any] = if (config != null) config else 
Map[String, Any]()
 
   def validate(): Unit = {
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
index cc47728..3176bad 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
@@ -17,101 +17,46 @@
 
 package org.apache.griffin.measure.configuration.enums
 
-import scala.util.matching.Regex
+import org.apache.griffin.measure.configuration.enums
 
 /**
   * effective when dsl type is "griffin-dsl",
   * indicates the dq type of griffin pre-defined measurements
+  * <li>{@link #Accuracy} - The match percentage of items between source and 
target
+  *                         count(source items matched with the ones from 
target) / count(source)
+  *                         e.g.: source [1, 2, 3, 4, 5], target: [1, 2, 3, 4]
+  *                         metric will be: { "total": 5, "miss": 1, 
"matched": 4 } accuracy is 80%.</li>
+  * <li>{@link #Profiling} - The statistic data of data source
+  *                          e.g.: max, min, average, group by count, ...</li>
+  * <li>{@link #Uniqueness} - The uniqueness of data source comparing with 
itself
+  *                           count(unique items in source) / count(source)
+  *                           e.g.: [1, 2, 3, 3] -> { "unique": 2, "total": 4, 
"dup-arr": [ "dup": 1, "num": 1 ] }
+  *                           uniqueness indicates the items without any 
replica of data</li>
+  * <li>{@link #Distinctness} - The distinctness of data source comparing with 
itself
+  *                             count(distinct items in source) / count(source)
+  *                             e.g.: [1, 2, 3, 3] -> { "dist": 3, "total": 4, 
"dup-arr": [ "dup": 1, "num": 1 ] }
+  *                             distinctness indicates the valid information 
of data
+  *                             comparing with uniqueness, distinctness is 
more meaningful</li>
+  * <li>{@link #Timeliness} - The latency of data source with timestamp 
information
+  *                           e.g.: (receive_time - send_time)
+  *                           timeliness can get the statistic metric of 
latency, like average, max, min,
+  *                            percentile-value,
+  *                           even more, it can record the items with latency 
above threshold you configured</li>
+  * <li>{@link #Completeness} - The completeness of data source
+  *                             the columns you measure is incomplete if it is 
null</li>
   */
-sealed trait DqType {
-  val idPattern: Regex
-  val desc: String
-}
-
-object DqType {
-  private val dqTypes: List[DqType] = List(
-    AccuracyType,
-    ProfilingType,
-    UniquenessType,
-    DistinctnessType,
-    TimelinessType,
-    CompletenessType,
-    UnknownType
-  )
-  def apply(ptn: String): DqType = {
-    dqTypes.find(dqType => ptn match {
-      case dqType.idPattern() => true
-      case _ => false
-    }).getOrElse(UnknownType)
-  }
-  def unapply(pt: DqType): Option[String] = Some(pt.desc)
-}
-
-/**
-  * accuracy: the match percentage of items between source and target
-  * count(source items matched with the ones from target) / count(source)
-  * e.g.: source [1, 2, 3, 4, 5], target: [1, 2, 3, 4]
-  *       metric will be: { "total": 5, "miss": 1, "matched": 4 }
-  *       accuracy is 80%.
-  */
- case object AccuracyType extends DqType {
-  val idPattern = "^(?i)accuracy$".r
-  val desc = "accuracy"
-}
+object DqType extends GriffinEnum {
 
-/**
-  * profiling: the statistic data of data source
-  * e.g.: max, min, average, group by count, ...
-  */
- case object ProfilingType extends DqType {
-  val idPattern = "^(?i)profiling$".r
-  val desc = "profiling"
-}
+  type DqType = Value
 
-/**
-  * uniqueness: the uniqueness of data source comparing with itself
-  * count(unique items in source) / count(source)
-  * e.g.: [1, 2, 3, 3] -> { "unique": 2, "total": 4, "dup-arr": [ "dup": 1, 
"num": 1 ] }
-  * uniqueness indicates the items without any replica of data
-  */
- case object UniquenessType extends DqType {
-  val idPattern = "^(?i)uniqueness|duplicate$".r
-  val desc = "uniqueness"
-}
+  val Accuracy, Profiling, Uniqueness, Duplicate, Distinct, Timeliness,
+  Completeness = Value
 
-/**
-  * distinctness: the distinctness of data source comparing with itself
-  * count(distinct items in source) / count(source)
-  * e.g.: [1, 2, 3, 3] -> { "dist": 3, "total": 4, "dup-arr": [ "dup": 1, 
"num": 1 ] }
-  * distinctness indicates the valid information of data
-  * comparing with uniqueness, distinctness is more meaningful
-  */
- case object DistinctnessType extends DqType {
-  val idPattern = "^(?i)distinct$".r
-  val desc = "distinct"
-}
-
-/**
-  * timeliness: the latency of data source with timestamp information
-  * e.g.: (receive_time - send_time)
-  * timeliness can get the statistic metric of latency, like average, max, 
min, percentile-value,
-  * even more, it can record the items with latency above threshold you 
configured
-  */
- case object TimelinessType extends DqType {
-  val idPattern = "^(?i)timeliness$".r
-  val desc = "timeliness"
-}
-
-/**
-  * completeness: the completeness of data source
-  * the columns you measure is incomplete if it is null
-  */
- case object CompletenessType extends DqType {
-  val idPattern = "^(?i)completeness$".r
-  val desc = "completeness"
-}
-
- case object UnknownType extends DqType {
-  val idPattern = "".r
-  val desc = "unknown"
+  override def withNameWithDefault(name: String): enums.DqType.Value = {
+    val dqType = super.withNameWithDefault(name)
+    dqType match {
+      case Uniqueness | Duplicate => Uniqueness
+      case _ => dqType
+    }
+  }
 }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
index 4e2d7cd..1b66f35 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
@@ -17,47 +17,34 @@
 
 package org.apache.griffin.measure.configuration.enums
 
-import scala.util.matching.Regex
+import org.apache.griffin.measure.configuration.enums
 
 /**
   * dsl type indicates the language type of rule param
+  * <li>{@link #SparkSql} - spark-sql: rule defined in "SPARK-SQL" 
directly</li>
+  * <li>{@link #DfOps} - df-ops|df-opr|: data frame operations rule, support 
some pre-defined data frame ops()</li>
+  * <li>{@link #GriffinDsl} - griffin dsl rule, to define dq measurements 
easier</li>
   */
-sealed trait DslType {
-  val idPattern: Regex
-  val desc: String
-}
+object DslType extends GriffinEnum {
+  type DslType = Value
 
-object DslType {
-  private val dslTypes: List[DslType] = List(SparkSqlType, GriffinDslType, 
DataFrameOpsType)
-  def apply(ptn: String): DslType = {
-    dslTypes.find(tp => ptn match {
-      case tp.idPattern() => true
-      case _ => false
-    }).getOrElse(GriffinDslType)
-  }
-  def unapply(pt: DslType): Option[String] = Some(pt.desc)
-}
+  val SparkSql, DfOps, DfOpr, DfOperations, GriffinDsl, DataFrameOpsType = 
Value
 
-/**
-  * spark-sql: rule defined in "SPARK-SQL" directly
-  */
- case object SparkSqlType extends DslType {
-  val idPattern = "^(?i)spark-?sql$".r
-  val desc = "spark-sql"
-}
+  /**
+    *
+    * @param name Dsltype from config file
+    * @return Enum value corresponding to string
+    */
+  def withNameWithDslType(name: String): Value =
+    values
+      .find(_.toString.toLowerCase == name.replace("-", "").toLowerCase())
+      .getOrElse(GriffinDsl)
 
-/**
-  * df-ops: data frame operations rule, support some pre-defined data frame ops
-  */
- case object DataFrameOpsType extends DslType {
-  val idPattern = "^(?i)df-?(?:ops|opr|operations)$".r
-  val desc = "df-ops"
-}
-
-/**
-  * griffin-dsl: griffin dsl rule, to define dq measurements easier
-  */
- case object GriffinDslType extends DslType {
-  val idPattern = "^(?i)griffin-?dsl$".r
-  val desc = "griffin-dsl"
+  override def withNameWithDefault(name: String): enums.DslType.Value = {
+    val dslType = withNameWithDslType(name)
+    dslType match {
+      case DfOps | DfOpr | DfOperations => DataFrameOpsType
+      case _ => dslType
+    }
+  }
 }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
index cde4c41..2586268 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
@@ -17,77 +17,47 @@
 
 package org.apache.griffin.measure.configuration.enums
 
-import scala.util.matching.Regex
-
 /**
   * the strategy to flatten metric
+  *  <li>{@link #DefaultFlattenType} -  default flatten strategy
+  *                                     metrics contains 1 row -> flatten 
metric json map
+  *                                     metrics contains n > 1 rows -> flatten 
metric json array
+  *                                     n = 0: { }
+  *                                     n = 1: { "col1": "value1", "col2": 
"value2", ... }
+  *                                     n > 1: { "arr-name": [ { "col1": 
"value1", "col2": "value2", ... }, ... ] }
+  *                                     all rows
+  *  </li>
+  *  <li>{@link #EntriesFlattenType} - metrics contains n rows -> flatten 
metric json map
+  *                                    n = 0: { }
+  *                                    n >= 1: { "col1": "value1", "col2": 
"value2", ... }
+  *                                    the first row only
+  *  </li>
+  *  <li>{@link #ArrayFlattenType} -   metrics contains n rows -> flatten 
metric json array
+  *                                    n = 0: { "arr-name": [ ] }
+  *                                    n >= 1: { "arr-name": [ { "col1": 
"value1", "col2": "value2", ... }, ... ] }
+  *                                    all rows
+  *  </li>
+  *  <li>{@link #MapFlattenType} - metrics contains n rows -> flatten metric 
json wrapped map
+  *                                n = 0: { "map-name": { } }
+  *                                n >= 1: { "map-name": { "col1": "value1", 
"col2": "value2", ... } }
+  *                                the first row only
+  *  </li>
   */
-sealed trait FlattenType {
-  val idPattern: Regex
-  val desc: String
-}
-
-object FlattenType {
-  private val flattenTypes: List[FlattenType] = List(
-    DefaultFlattenType,
-    EntriesFlattenType,
-    ArrayFlattenType,
-    MapFlattenType
-  )
-
-  val default = DefaultFlattenType
-  def apply(ptn: String): FlattenType = {
-    flattenTypes.find(tp => ptn match {
-      case tp.idPattern() => true
-      case _ => false
-    }).getOrElse(default)
+object FlattenType extends GriffinEnum {
+  type FlattenType = Value
+
+  val DefaultFlattenType, EntriesFlattenType, ArrayFlattenType, MapFlattenType 
=
+    Value
+
+  val List, Array, Entries, Map, Default = Value
+
+  override def withNameWithDefault(name: String): Value = {
+    val flattenType = super.withNameWithDefault(name)
+    flattenType match {
+      case Array | List => ArrayFlattenType
+      case Map => MapFlattenType
+      case Entries => EntriesFlattenType
+      case _ => DefaultFlattenType
+    }
   }
-  def unapply(pt: FlattenType): Option[String] = Some(pt.desc)
-}
-
-/**
-  * default flatten strategy
-  * metrics contains 1 row -> flatten metric json map
-  * metrics contains n > 1 rows -> flatten metric json array
-  * n = 0: { }
-  * n = 1: { "col1": "value1", "col2": "value2", ... }
-  * n > 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] }
-  * all rows
-  */
- case object DefaultFlattenType extends FlattenType {
-  val idPattern: Regex = "".r
-  val desc: String = "default"
-}
-
-/**
-  * metrics contains n rows -> flatten metric json map
-  * n = 0: { }
-  * n >= 1: { "col1": "value1", "col2": "value2", ... }
-  * the first row only
-  */
- case object EntriesFlattenType extends FlattenType {
-  val idPattern: Regex = "^(?i)entries$".r
-  val desc: String = "entries"
-}
-
-/**
-  * metrics contains n rows -> flatten metric json array
-  * n = 0: { "arr-name": [ ] }
-  * n >= 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] 
}
-  * all rows
-  */
- case object ArrayFlattenType extends FlattenType {
-  val idPattern: Regex = "^(?i)array|list$".r
-  val desc: String = "array"
-}
-
-/**
-  * metrics contains n rows -> flatten metric json wrapped map
-  * n = 0: { "map-name": { } }
-  * n >= 1: { "map-name": { "col1": "value1", "col2": "value2", ... } }
-  * the first row only
-  */
- case object MapFlattenType extends FlattenType {
-  val idPattern: Regex = "^(?i)map$".r
-  val desc: String = "map"
 }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/GriffinEnum.scala
similarity index 62%
copy from 
measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
copy to 
measure/src/main/scala/org/apache/griffin/measure/configuration/enums/GriffinEnum.scala
index 9e74dda..b73742d 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/GriffinEnum.scala
@@ -17,26 +17,19 @@
 
 package org.apache.griffin.measure.configuration.enums
 
-/**
-  * write mode when write metrics and records
-  */
-sealed trait WriteMode {}
+trait GriffinEnum extends Enumeration {
+  type GriffinEnum = Value
 
-object WriteMode {
-  def defaultMode(procType: ProcessType): WriteMode = {
-    procType match {
-      case BatchProcessType => SimpleMode
-      case StreamingProcessType => TimestampMode
-    }
-  }
-}
+  val Unknown = Value
 
-/**
-  * simple mode: write metrics and records directly
-  */
- case object SimpleMode extends WriteMode {}
+  /**
+    *
+    * @param name Constant value in String
+    * @return Enum constant value
+    */
+  def withNameWithDefault(name: String): Value =
+    values
+      .find(_.toString.toLowerCase == name.replace("-", "").toLowerCase())
+      .getOrElse(Unknown)
 
-/**
-  * timestamp mode: write metrics and records with timestamp information
-  */
- case object TimestampMode extends WriteMode {}
+}
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
index aa7b8d5..dd0b2d1 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
@@ -17,66 +17,28 @@
 
 package org.apache.griffin.measure.configuration.enums
 
-import scala.util.matching.Regex
-
 /**
-  * the strategy to flatten metric
+  * the strategy to output metric
+  *  <li>{@link #MetricOutputType} - output the rule step result as metric</li>
+  *  <li>{@link #RecordOutputType} - output the rule step result as 
records</li>
+  *  <li>{@link #DscUpdateOutputType} - output the rule step result to update 
data source cache</li>
+  *  <li>{@link #UnknownOutputType} - will not output the result </li>
   */
-sealed trait OutputType {
-  val idPattern: Regex
-  val desc: String
-}
-
-object OutputType {
-  private val outputTypes: List[OutputType] = List(
-    MetricOutputType,
-    RecordOutputType,
-    DscUpdateOutputType,
-    UnknownOutputType
-  )
-
-  val default = UnknownOutputType
-  def apply(ptn: String): OutputType = {
-    outputTypes.find(tp => ptn match {
-      case tp.idPattern() => true
-      case _ => false
-    }).getOrElse(default)
+object OutputType extends GriffinEnum {
+  type OutputType = Value
+
+  val MetricOutputType, RecordOutputType, DscUpdateOutputType,
+  UnknownOutputType = Value
+
+  val Metric, Record, Records, DscUpdate = Value
+
+  override def withNameWithDefault(name: String): Value = {
+    val flattenType = super.withNameWithDefault(name)
+    flattenType match {
+      case Metric => MetricOutputType
+      case Record | Records => RecordOutputType
+      case DscUpdate => DscUpdateOutputType
+      case _ => UnknownOutputType
+    }
   }
-  def unapply(pt: OutputType): Option[String] = Some(pt.desc)
-}
-
-/**
-  * metric output type
-  * output the rule step result as metric
-  */
- case object MetricOutputType extends OutputType {
-  val idPattern: Regex = "^(?i)metric$".r
-  val desc: String = "metric"
-}
-
-/**
-  * record output type
-  * output the rule step result as records
-  */
- case object RecordOutputType extends OutputType {
-  val idPattern: Regex = "^(?i)record|records$".r
-  val desc: String = "record"
-}
-
-/**
-  * data source cache update output type
-  * output the rule step result to update data source cache
-  */
- case object DscUpdateOutputType extends OutputType {
-  val idPattern: Regex = "^(?i)dsc-update$".r
-  val desc: String = "dsc-update"
-}
-
-/**
-  * unknown output type
-  * will not output the result
-  */
- case object UnknownOutputType extends OutputType {
-  val idPattern: Regex = "".r
-  val desc: String = "unknown"
 }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
index edfa94e..90e4ae9 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
@@ -17,43 +17,14 @@
 
 package org.apache.griffin.measure.configuration.enums
 
-import scala.util.matching.Regex
-
 /**
   * process type enum
+  *  <li>{@link #BatchProcessType} - Process in batch mode </li>
+  *  <li>{@link #StreamingProcessType} - Process in streaming mode</li>
   */
-sealed trait ProcessType {
-  val idPattern: Regex
-  val desc: String
-}
-
-object ProcessType {
-  private val procTypes: List[ProcessType] = List(
-    BatchProcessType,
-    StreamingProcessType
-  )
+object ProcessType extends GriffinEnum {
+  type ProcessType = Value
 
-  def apply(ptn: String): ProcessType = {
-    procTypes.find(tp => ptn match {
-      case tp.idPattern() => true
-      case _ => false
-    }).getOrElse(BatchProcessType)
-  }
-  def unapply(pt: ProcessType): Option[String] = Some(pt.desc)
-}
-
-/**
-  * process in batch mode
-  */
- case object BatchProcessType extends ProcessType {
-  val idPattern = """^(?i)batch$""".r
-  val desc = "batch"
-}
-
-/**
-  * process in streaming mode
-  */
- case object StreamingProcessType extends ProcessType {
-  val idPattern = """^(?i)streaming$""".r
-  val desc = "streaming"
+  val BatchProcessType = Value("Batch")
+  val StreamingProcessType = Value("Streaming")
 }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
index 62bd4b5..1476ae1 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -17,82 +17,39 @@
 
 package org.apache.griffin.measure.configuration.enums
 
-import scala.util.matching.Regex
+import org.apache.griffin.measure.configuration.enums
 
 /**
-  * sink type
+  * Supported Sink types
+  *  <li>{@link #Console #Log} -  console sink, will sink metric in console 
(alias log)</li>
+  *  <li>{@link #Hdfs} - hdfs sink, will sink metric and record in hdfs</li>
+  *  <li>{@link #Es #Elasticsearch #Http} - elasticsearch sink, will sink 
metric
+  *  in elasticsearch (alias Es and Http)</li>
+  *  <li>{@link #Mongo #MongoDB} - mongo sink, will sink metric in mongo db 
(alias MongoDb)</li>
+  *  <li>{@link #Custom} - custom sink (needs using extra 
jar-file-extension)</li>
+  *  <li>{@link #Unknown} - </li>
   */
-sealed trait SinkType {
-  val idPattern: Regex
-  val desc: String
-}
-
-object SinkType {
-  private val sinkTypes: List[SinkType] = List(
-    ConsoleSinkType,
-    HdfsSinkType,
-    ElasticsearchSinkType,
-    MongoSinkType,
-    CustomSinkType,
-    UnknownSinkType
-  )
-
-  def apply(ptn: String): SinkType = {
-    sinkTypes.find(tp => ptn match {
-      case tp.idPattern() => true
-      case _ => false
-    }).getOrElse(UnknownSinkType)
-  }
+object SinkType extends GriffinEnum {
+  type SinkType = Value
 
-  def unapply(pt: SinkType): Option[String] = Some(pt.desc)
+  val Console, Log, Hdfs, Es, Http, ElasticSearch, MongoDB, Mongo, Custom =
+    Value
 
   def validSinkTypes(strs: Seq[String]): Seq[SinkType] = {
-    val seq = strs.map(s => SinkType(s)).filter(_ != UnknownSinkType).distinct
-    if (seq.size > 0) seq else Seq(ElasticsearchSinkType)
+    val seq = strs
+      .map(s => SinkType.withNameWithDefault(s))
+      .filter(_ != SinkType.Unknown)
+      .distinct
+    if (seq.size > 0) seq else Seq(SinkType.ElasticSearch)
   }
-}
-
-/**
-  * console sink, will sink metric in console
-  */
-case object ConsoleSinkType extends SinkType {
-  val idPattern = "^(?i)console|log$".r
-  val desc = "console"
-}
-
-/**
-  * hdfs sink, will sink metric and record in hdfs
-  */
-case object HdfsSinkType extends SinkType {
-  val idPattern = "^(?i)hdfs$".r
-  val desc = "hdfs"
-}
 
-/**
-  * elasticsearch sink, will sink metric in elasticsearch
-  */
-case object ElasticsearchSinkType extends SinkType {
-  val idPattern = "^(?i)es|elasticsearch|http$".r
-  val desc = "elasticsearch"
-}
-
-/**
-  * mongo sink, will sink metric in mongo db
-  */
-case object MongoSinkType extends SinkType {
-  val idPattern = "^(?i)mongo|mongodb$".r
-  val desc = "mongo"
-}
-
-/**
-  * custom sink (needs using extra jar-file-extension)
-  */
-case object CustomSinkType extends SinkType {
-  val idPattern = "^(?i)custom$".r
-  val desc = "custom"
-}
-
-case object UnknownSinkType extends SinkType {
-  val idPattern = "".r
-  val desc = "unknown"
+  override def withNameWithDefault(name: String): enums.SinkType.Value = {
+    val sinkType = super.withNameWithDefault(name)
+    sinkType match {
+      case Console | Log => Console
+      case Es | ElasticSearch | Http => ElasticSearch
+      case MongoDB | Mongo => MongoDB
+      case _ => sinkType
+    }
+  }
 }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
index 9e74dda..7057588 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
@@ -23,10 +23,10 @@ package org.apache.griffin.measure.configuration.enums
 sealed trait WriteMode {}
 
 object WriteMode {
-  def defaultMode(procType: ProcessType): WriteMode = {
+  def defaultMode(procType: ProcessType.ProcessType): WriteMode = {
     procType match {
-      case BatchProcessType => SimpleMode
-      case StreamingProcessType => TimestampMode
+      case ProcessType.BatchProcessType => SimpleMode
+      case ProcessType.StreamingProcessType => TimestampMode
     }
   }
 }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala 
b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index 34acffa..bea7f7d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -20,7 +20,8 @@ package org.apache.griffin.measure.context
 import org.apache.spark.sql.{Encoders, SparkSession}
 
 import org.apache.griffin.measure.configuration.dqdefinition._
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
+import org.apache.griffin.measure.configuration.enums.WriteMode
 import org.apache.griffin.measure.datasource._
 import org.apache.griffin.measure.sink.{Sink, SinkFactory}
 
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
index b621aa6..fedf6e3 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.functions._
 
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
-import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import 
org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
 import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
 import org.apache.griffin.measure.datasource.TimestampStorage
 import org.apache.griffin.measure.job.builder.DQJobBuilder
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index af426d7..3c42094 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -25,7 +25,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.SparkSession
 
 import org.apache.griffin.measure.configuration.dqdefinition._
-import org.apache.griffin.measure.configuration.enums._
+import 
org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
 import org.apache.griffin.measure.context._
 import org.apache.griffin.measure.datasource.DataSourceFactory
 import org.apache.griffin.measure.job.builder.DQJobBuilder
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index fc10bdb..44dca49 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -28,7 +28,7 @@ import org.apache.spark.streaming.{Milliseconds, 
StreamingContext}
 
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition._
-import org.apache.griffin.measure.configuration.enums._
+import 
org.apache.griffin.measure.configuration.enums.ProcessType.StreamingProcessType
 import org.apache.griffin.measure.context._
 import 
org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
 import org.apache.griffin.measure.context.streaming.metric.CacheResults
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala 
b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index aab638e..e7806f4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -21,7 +21,7 @@ import scala.util.{Failure, Success, Try}
 
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.SinkType._
 import org.apache.griffin.measure.utils.ParamUtil._
 
 case class SinkFactory(sinkParamIter: Iterable[SinkParam],
@@ -42,11 +42,11 @@ case class SinkFactory(sinkParamIter: Iterable[SinkParam],
     val config = sinkParam.getConfig
     val sinkType = sinkParam.getType
     val sinkTry = sinkType match {
-      case ConsoleSinkType => Try(ConsoleSink(config, metricName, timeStamp))
-      case HdfsSinkType => Try(HdfsSink(config, metricName, timeStamp))
-      case ElasticsearchSinkType => Try(ElasticSearchSink(config, metricName, 
timeStamp, block))
-      case MongoSinkType => Try(MongoSink(config, metricName, timeStamp, 
block))
-      case CustomSinkType => Try(getCustomSink(config, metricName, timeStamp, 
block))
+      case Console => Try(ConsoleSink(config, metricName, timeStamp))
+      case Hdfs => Try(HdfsSink(config, metricName, timeStamp))
+      case ElasticSearch => Try(ElasticSearchSink(config, metricName, 
timeStamp, block))
+      case MongoDB => Try(MongoSink(config, metricName, timeStamp, block))
+      case Custom => Try(getCustomSink(config, metricName, timeStamp, block))
       case _ => throw new Exception(s"sink type ${sinkType} is not supported!")
     }
     sinkTry match {
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
index 47f740f..efc4537 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
@@ -21,7 +21,8 @@ import org.apache.commons.lang.StringUtils
 
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.{DataSourceParam, 
Param, RuleParam}
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.DslType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step._
 
@@ -74,9 +75,9 @@ object DQStepBuilder {
   private def getRuleParamStepBuilder(dslType: DslType, dsNames: Seq[String], 
funcNames: Seq[String]
                                      ): Option[RuleParamStepBuilder] = {
     dslType match {
-      case SparkSqlType => Some(SparkSqlDQStepBuilder())
+      case SparkSql => Some(SparkSqlDQStepBuilder())
       case DataFrameOpsType => Some(DataFrameOpsDQStepBuilder())
-      case GriffinDslType => Some(GriffinDslDQStepBuilder(dsNames, funcNames))
+      case GriffinDsl => Some(GriffinDslDQStepBuilder(dsNames, funcNames))
       case _ => None
     }
   }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
index 804c503..c7f6051 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -18,7 +18,7 @@
 package org.apache.griffin.measure.step.builder
 
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.OutputType._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.{DQStep, SeqDQStep}
 import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, 
MetricWriteStep, RecordWriteStep}
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
index 9b7269a..baec077 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
@@ -17,8 +17,7 @@
 
 package org.apache.griffin.measure.step.builder.dsl.parser
 
-import org.apache.griffin.measure.configuration.enums._
-import org.apache.griffin.measure.step.builder.dsl._
+import org.apache.griffin.measure.configuration.enums.DqType._
 import org.apache.griffin.measure.step.builder.dsl.expr._
 
 /**
@@ -82,12 +81,12 @@ case class GriffinDslParser(dataSourceNames: Seq[String], 
functionNames: Seq[Str
 
   def parseRule(rule: String, dqType: DqType): ParseResult[Expr] = {
     val rootExpr = dqType match {
-      case AccuracyType => logicalExpression
-      case ProfilingType => profilingClause
-      case UniquenessType => uniquenessClause
-      case DistinctnessType => distinctnessClause
-      case TimelinessType => timelinessClause
-      case CompletenessType => completenessClause
+      case Accuracy => logicalExpression
+      case Profiling => profilingClause
+      case Uniqueness => uniquenessClause
+      case Distinct => distinctnessClause
+      case Timeliness => timelinessClause
+      case Completeness => completenessClause
       case _ => expression
     }
     parseAll(rootExpr, rule)
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index 931032b..c9e3c1c 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -18,7 +18,9 @@
 package org.apache.griffin.measure.step.builder.dsl.transform
 
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import 
org.apache.griffin.measure.configuration.enums.FlattenType.DefaultFlattenType
+import org.apache.griffin.measure.configuration.enums.OutputType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -26,7 +28,11 @@ import org.apache.griffin.measure.step.builder.dsl.expr._
 import 
org.apache.griffin.measure.step.builder.dsl.transform.analyzer.AccuracyAnalyzer
 import org.apache.griffin.measure.step.transform.{DataFrameOps, 
DataFrameOpsTransformStep, SparkSqlTransformStep}
 import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys
-import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, 
MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.step.write.{
+  DataSourceUpdateWriteStep,
+  MetricWriteStep,
+  RecordWriteStep
+}
 import org.apache.griffin.measure.utils.ParamUtil._
 
 /**
@@ -34,8 +40,8 @@ import org.apache.griffin.measure.utils.ParamUtil._
   */
 case class AccuracyExpr2DQSteps(context: DQContext,
                                 expr: Expr,
-                                ruleParam: RuleParam
-                               ) extends Expr2DQSteps {
+                                ruleParam: RuleParam)
+    extends Expr2DQSteps {
 
   private object AccuracyKeys {
     val _source = "source"
@@ -65,36 +71,52 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       // 1. miss record
       val missRecordsTableName = "__missRecords"
       val selClause = s"`${sourceName}`.*"
-      val missRecordsSql = if 
(!context.runTimeTableRegister.existsTable(targetName)) {
-        warn(s"[${timestamp}] data source ${targetName} not exists")
-        s"SELECT ${selClause} FROM `${sourceName}`"
-      } else {
-        val onClause = expr.coalesceDesc
-        val sourceIsNull = analyzer.sourceSelectionExprs.map { sel =>
-          s"${sel.desc} IS NULL"
-        }.mkString(" AND ")
-        val targetIsNull = analyzer.targetSelectionExprs.map { sel =>
-          s"${sel.desc} IS NULL"
-        }.mkString(" AND ")
-        val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
-        s"SELECT ${selClause} FROM `${sourceName}` " +
-          s"LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
-      }
+      val missRecordsSql =
+        if (!context.runTimeTableRegister.existsTable(targetName)) {
+          warn(s"[${timestamp}] data source ${targetName} not exists")
+          s"SELECT ${selClause} FROM `${sourceName}`"
+        } else {
+          val onClause = expr.coalesceDesc
+          val sourceIsNull = analyzer.sourceSelectionExprs
+            .map { sel =>
+              s"${sel.desc} IS NULL"
+            }
+            .mkString(" AND ")
+          val targetIsNull = analyzer.targetSelectionExprs
+            .map { sel =>
+              s"${sel.desc} IS NULL"
+            }
+            .mkString(" AND ")
+          val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
+          s"SELECT ${selClause} FROM `${sourceName}` " +
+            s"LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
+        }
 
       val missRecordsWriteSteps = procType match {
         case BatchProcessType =>
           val rwName =
-            ruleParam.getOutputOpt(RecordOutputType).
-              flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
+            ruleParam
+              .getOutputOpt(RecordOutputType)
+              .flatMap(_.getNameOpt)
+              .getOrElse(missRecordsTableName)
           RecordWriteStep(rwName, missRecordsTableName)
         case StreamingProcessType =>
           val dsName =
-            
ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
+            ruleParam
+              .getOutputOpt(DscUpdateOutputType)
+              .flatMap(_.getNameOpt)
+              .getOrElse(sourceName)
           DataSourceUpdateWriteStep(dsName, missRecordsTableName)
       }
 
       val missRecordsTransStep =
-        SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, 
Some(missRecordsWriteSteps), true)
+        SparkSqlTransformStep(
+          missRecordsTableName,
+          missRecordsSql,
+          emptyMap,
+          Some(missRecordsWriteSteps),
+          true
+        )
 
       // 2. miss count
       val missCountTableName = "__missCount"
@@ -106,19 +128,22 @@ case class AccuracyExpr2DQSteps(context: DQContext,
           s"SELECT `${ConstantColumns.tmst}`,COUNT(*) AS `${missColName}` " +
             s"FROM `${missRecordsTableName}` GROUP BY 
`${ConstantColumns.tmst}`"
       }
-      val missCountTransStep = SparkSqlTransformStep(missCountTableName, 
missCountSql, emptyMap)
+      val missCountTransStep =
+        SparkSqlTransformStep(missCountTableName, missCountSql, emptyMap)
       missCountTransStep.parentSteps += missRecordsTransStep
 
       // 3. total count
       val totalCountTableName = "__totalCount"
       val totalColName = details.getStringOrKey(_total)
       val totalCountSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
+        case BatchProcessType =>
+          s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`"
         case StreamingProcessType =>
           s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` " +
             s"FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`"
       }
-      val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, 
totalCountSql, emptyMap)
+      val totalCountTransStep =
+        SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
 
       // 4. accuracy metric
       val accuracyTableName = ruleParam.getOutDfName()
@@ -153,14 +178,22 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val accuracyMetricWriteStep = procType match {
         case BatchProcessType =>
           val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
-          val mwName = 
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
-          val flattenType = 
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+          val mwName =
+            metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
+          val flattenType = metricOpt
+            .map(_.getFlatten)
+            .getOrElse(DefaultFlattenType)
           Some(MetricWriteStep(mwName, accuracyTableName, flattenType))
         case StreamingProcessType => None
       }
 
       val accuracyTransStep =
-        SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap, 
accuracyMetricWriteStep)
+        SparkSqlTransformStep(
+          accuracyTableName,
+          accuracyMetricSql,
+          emptyMap,
+          accuracyMetricWriteStep
+        )
       accuracyTransStep.parentSteps += missCountTransStep
       accuracyTransStep.parentSteps += totalCountTransStep
 
@@ -171,22 +204,30 @@ case class AccuracyExpr2DQSteps(context: DQContext,
           // 5. accuracy metric merge
           val accuracyMetricTableName = "__accuracy"
           val accuracyMetricRule = DataFrameOps._accuracy
-          val accuracyMetricDetails = Map[String, Any](
-            (AccuracyOprKeys._miss -> missColName),
-            (AccuracyOprKeys._total -> totalColName),
-            (AccuracyOprKeys._matched -> matchedColName)
+          val accuracyMetricDetails: Map[String, Any] = Map(
+            (AccuracyOprKeys._miss, missColName),
+            (AccuracyOprKeys._total, totalColName),
+            (AccuracyOprKeys._matched, matchedColName)
           )
           val accuracyMetricWriteStep = {
             val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
-            val mwName = 
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
-            val flattenType = 
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+            val mwName = metricOpt
+              .flatMap(_.getNameOpt)
+              .getOrElse(ruleParam.getOutDfName())
+            val flattenType = metricOpt
+              .map(_.getFlatten)
+              .getOrElse(DefaultFlattenType)
             MetricWriteStep(mwName, accuracyMetricTableName, flattenType)
           }
-          val accuracyMetricTransStep = 
DataFrameOpsTransformStep(accuracyMetricTableName,
-            accuracyTableName, accuracyMetricRule, accuracyMetricDetails, 
Some(accuracyMetricWriteStep))
+          val accuracyMetricTransStep = DataFrameOpsTransformStep(
+            accuracyMetricTableName,
+            accuracyTableName,
+            accuracyMetricRule,
+            accuracyMetricDetails,
+            Some(accuracyMetricWriteStep)
+          )
           accuracyMetricTransStep.parentSteps += accuracyTransStep
 
-
           // 6. collect accuracy records
           val accuracyRecordTableName = "__accuracyRecords"
           val accuracyRecordSql = {
@@ -198,13 +239,23 @@ case class AccuracyExpr2DQSteps(context: DQContext,
 
           val accuracyRecordWriteStep = {
             val rwName =
-              ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+              ruleParam
+                .getOutputOpt(RecordOutputType)
+                .flatMap(_.getNameOpt)
                 .getOrElse(missRecordsTableName)
 
-            RecordWriteStep(rwName, missRecordsTableName, 
Some(accuracyRecordTableName))
+            RecordWriteStep(
+              rwName,
+              missRecordsTableName,
+              Some(accuracyRecordTableName)
+            )
           }
           val accuracyRecordTransStep = SparkSqlTransformStep(
-            accuracyRecordTableName, accuracyRecordSql, emptyMap, 
Some(accuracyRecordWriteStep))
+            accuracyRecordTableName,
+            accuracyRecordSql,
+            emptyMap,
+            Some(accuracyRecordWriteStep)
+          )
           accuracyRecordTransStep.parentSteps += accuracyMetricTransStep
 
           accuracyRecordTransStep :: Nil
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 1a611da..360c8e9 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -20,7 +20,9 @@ package org.apache.griffin.measure.step.builder.dsl.transform
 import org.apache.commons.lang.StringUtils
 
 import 
org.apache.griffin.measure.configuration.dqdefinition.{RuleErrorConfParam, 
RuleParam}
-import org.apache.griffin.measure.configuration.enums._
+import 
org.apache.griffin.measure.configuration.enums.FlattenType.DefaultFlattenType
+import org.apache.griffin.measure.configuration.enums.OutputType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -165,7 +167,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       val completeWriteStep = {
         val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
         val mwName = 
metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName)
-        val flattenType = 
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+        val flattenType = 
metricOpt.map(_.getFlatten).getOrElse(DefaultFlattenType)
         MetricWriteStep(mwName, completeTableName, flattenType)
       }
       val completeTransStep =
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index b68b3f7..6be5e17 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -18,7 +18,9 @@
 package org.apache.griffin.measure.step.builder.dsl.transform
 
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import 
org.apache.griffin.measure.configuration.enums.FlattenType.{ArrayFlattenType, 
EntriesFlattenType}
+import org.apache.griffin.measure.configuration.enums.OutputType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
index 278e351..8955a8a 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
@@ -19,7 +19,7 @@ package org.apache.griffin.measure.step.builder.dsl.transform
 
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.DqType._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.dsl.expr.Expr
@@ -46,12 +46,12 @@ object Expr2DQSteps {
             ruleParam: RuleParam
            ): Expr2DQSteps = {
     ruleParam.getDqType match {
-      case AccuracyType => AccuracyExpr2DQSteps(context, expr, ruleParam)
-      case ProfilingType => ProfilingExpr2DQSteps(context, expr, ruleParam)
-      case UniquenessType => UniquenessExpr2DQSteps(context, expr, ruleParam)
-      case DistinctnessType => DistinctnessExpr2DQSteps(context, expr, 
ruleParam)
-      case TimelinessType => TimelinessExpr2DQSteps(context, expr, ruleParam)
-      case CompletenessType => CompletenessExpr2DQSteps(context, expr, 
ruleParam)
+      case Accuracy => AccuracyExpr2DQSteps(context, expr, ruleParam)
+      case Profiling => ProfilingExpr2DQSteps(context, expr, ruleParam)
+      case Uniqueness => UniquenessExpr2DQSteps(context, expr, ruleParam)
+      case Distinct => DistinctnessExpr2DQSteps(context, expr, ruleParam)
+      case Timeliness => TimelinessExpr2DQSteps(context, expr, ruleParam)
+      case Completeness => CompletenessExpr2DQSteps(context, expr, ruleParam)
       case _ => emtptExpr2DQSteps
     }
   }
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index 928fbd4..bc111fa 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -20,7 +20,9 @@ package org.apache.griffin.measure.step.builder.dsl.transform
 import org.apache.commons.lang.StringUtils
 
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums.{BatchProcessType, 
FlattenType, MetricOutputType, StreamingProcessType}
+import 
org.apache.griffin.measure.configuration.enums.FlattenType.DefaultFlattenType
+import org.apache.griffin.measure.configuration.enums.OutputType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -99,7 +101,7 @@ case class ProfilingExpr2DQSteps(context: DQContext,
       val profilingMetricWriteStep = {
         val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
         val mwName = 
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
-        val flattenType = 
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+        val flattenType = 
metricOpt.map(_.getFlatten).getOrElse(DefaultFlattenType)
         MetricWriteStep(mwName, profilingName, flattenType)
       }
       val profilingTransStep =
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index ec6a178..aea5dca 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -18,7 +18,9 @@
 package org.apache.griffin.measure.step.builder.dsl.transform
 
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import 
org.apache.griffin.measure.configuration.enums.FlattenType.{ArrayFlattenType, 
DefaultFlattenType}
+import org.apache.griffin.measure.configuration.enums.OutputType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -131,7 +133,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
       val metricWriteStep = {
         val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
         val mwName = 
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
-        val flattenType = 
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+        val flattenType = 
metricOpt.map(_.getFlatten).getOrElse(DefaultFlattenType)
         MetricWriteStep(mwName, metricTableName, flattenType)
       }
       val metricTransStep =
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index eed07e4..534641b 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -18,7 +18,9 @@
 package org.apache.griffin.measure.step.builder.dsl.transform
 
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import 
org.apache.griffin.measure.configuration.enums.FlattenType.{ArrayFlattenType, 
EntriesFlattenType}
+import org.apache.griffin.measure.configuration.enums.OutputType._
+import org.apache.griffin.measure.configuration.enums.ProcessType._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
index 581c42d..ca93710 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
@@ -18,7 +18,7 @@
 package org.apache.griffin.measure.step.builder.preproc
 
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.DslType._
 
 /**
   * generate each entity pre-proc params by template defined in pre-proc param
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index d83f6ca..6b99765 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -17,7 +17,8 @@
 
 package org.apache.griffin.measure.step.write
 
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.{SimpleMode, 
TimestampMode}
+import 
org.apache.griffin.measure.configuration.enums.FlattenType.{ArrayFlattenType, 
EntriesFlattenType, FlattenType, MapFlattenType}
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.builder.ConstantColumns
 import org.apache.griffin.measure.utils.JsonUtil
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
new file mode 100644
index 0000000..a30d51e
--- /dev/null
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamEnumReaderSpec.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.griffin.measure.configuration.dqdefinition.reader
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, 
EvaluateRuleParam, RuleOutputParam, RuleParam}
+
+
+class ParamEnumReaderSpec extends FlatSpec with Matchers {
+  import org.apache.griffin.measure.configuration.enums.DslType._
+  "dsltype" should "be parsed to predefined set of values" in {
+    val validDslSparkSqlValues =
+      Seq("spark-sql", "spark-SQL", "SPARK-SQL", "sparksql")
+    validDslSparkSqlValues foreach { x =>
+      val ruleParam = new RuleParam(x, "accuracy")
+      ruleParam.getDslType should ===(SparkSql)
+    }
+    val invalidDslSparkSqlValues = Seq("spark", "sql", "")
+    invalidDslSparkSqlValues foreach { x =>
+      val ruleParam = new RuleParam(x, "accuracy")
+      ruleParam.getDslType should not be (SparkSql)
+    }
+
+    val validDslGriffinValues =
+      Seq("griffin-dsl", "griffindsl", "griFfin-dsl", "")
+    validDslGriffinValues foreach { x =>
+      val ruleParam = new RuleParam(x, "accuracy")
+      ruleParam.getDslType should ===(GriffinDsl)
+    }
+
+    val validDslDfOpsValues = Seq(
+      "df-ops",
+      "dfops",
+      "DFOPS",
+      "df-opr",
+      "dfopr",
+      "df-operations",
+      "dfoperations"
+    )
+    validDslDfOpsValues foreach { x =>
+      val ruleParam = new RuleParam(x, "accuracy")
+      ruleParam.getDslType should ===(DataFrameOpsType)
+    }
+
+    val invalidDslDfOpsValues = Seq("df-oprts", "-")
+    invalidDslDfOpsValues foreach { x =>
+      val ruleParam = new RuleParam(x, "accuracy")
+      ruleParam.getDslType should not be (DataFrameOpsType)
+    }
+  }
+
+  "griffindsl" should "be returned as default dsl type" in {
+    import org.apache.griffin.measure.configuration.enums.DslType._
+    val dslGriffinDslValues = Seq("griffin", "dsl")
+    dslGriffinDslValues foreach { x =>
+      val ruleParam = new RuleParam(x, "accuracy")
+      ruleParam.getDslType should be(GriffinDsl)
+    }
+  }
+
+  "dqtype" should "be parsed to predefined set of values" in {
+    import org.apache.griffin.measure.configuration.enums.DqType._
+    var ruleParam = new RuleParam("griffin-dsl", "accuracy")
+    ruleParam.getDqType should be(Accuracy)
+    ruleParam = new RuleParam("griffin-dsl", "accu")
+    ruleParam.getDqType should not be (Accuracy)
+    ruleParam.getDqType should be(Unknown)
+
+    ruleParam = new RuleParam("griffin-dsl", "profiling")
+    ruleParam.getDqType should be(Profiling)
+    ruleParam = new RuleParam("griffin-dsl", "profilin")
+    ruleParam.getDqType should not be (Profiling)
+    ruleParam.getDqType should be(Unknown)
+
+    ruleParam = new RuleParam("griffin-dsl", "TIMELINESS")
+    ruleParam.getDqType should be(Timeliness)
+    ruleParam = new RuleParam("griffin-dsl", "timeliness ")
+    ruleParam.getDqType should not be (Timeliness)
+    ruleParam.getDqType should be(Unknown)
+
+    ruleParam = new RuleParam("griffin-dsl", "UNIQUENESS")
+    ruleParam.getDqType should be(Uniqueness)
+    ruleParam = new RuleParam("griffin-dsl", "UNIQUE")
+    ruleParam.getDqType should not be (Uniqueness)
+    ruleParam.getDqType should be(Unknown)
+
+    ruleParam = new RuleParam("griffin-dsl", "Duplicate")
+    ruleParam.getDqType should be(Uniqueness)
+    ruleParam = new RuleParam("griffin-dsl", "duplica")
+    ruleParam.getDqType should not be (Duplicate)
+    ruleParam.getDqType should be(Unknown)
+
+    ruleParam = new RuleParam("griffin-dsl", "COMPLETENESS")
+    ruleParam.getDqType should be(Completeness)
+    ruleParam = new RuleParam("griffin-dsl", "complete")
+    ruleParam.getDqType should not be (Completeness)
+    ruleParam.getDqType should be(Unknown)
+
+    ruleParam = new RuleParam("griffin-dsl", "")
+    ruleParam.getDqType should be(Unknown)
+    ruleParam = new RuleParam("griffin-dsl", "duplicate")
+    ruleParam.getDqType should not be (Unknown)
+  }
+
+  "outputtype" should "be valid" in {
+    import org.apache.griffin.measure.configuration.enums.OutputType._
+    var ruleOutputParam = new RuleOutputParam("metric", "", "map")
+    ruleOutputParam.getOutputType should be(MetricOutputType)
+    ruleOutputParam = new RuleOutputParam("metr", "", "map")
+    ruleOutputParam.getOutputType should not be (MetricOutputType)
+    ruleOutputParam.getOutputType should be(UnknownOutputType)
+
+    ruleOutputParam = new RuleOutputParam("record", "", "map")
+    ruleOutputParam.getOutputType should be(RecordOutputType)
+    ruleOutputParam = new RuleOutputParam("rec", "", "map")
+    ruleOutputParam.getOutputType should not be (RecordOutputType)
+    ruleOutputParam.getOutputType should be(UnknownOutputType)
+
+    ruleOutputParam = new RuleOutputParam("dscupdate", "", "map")
+    ruleOutputParam.getOutputType should be(DscUpdateOutputType)
+    ruleOutputParam = new RuleOutputParam("dsc", "", "map")
+    ruleOutputParam.getOutputType should not be (DscUpdateOutputType)
+    ruleOutputParam.getOutputType should be(UnknownOutputType)
+
+  }
+
+  "flattentype" should "be valid" in {
+    import org.apache.griffin.measure.configuration.enums.FlattenType._
+    var ruleOutputParam = new RuleOutputParam("metric", "", "map")
+    ruleOutputParam.getFlatten should be(MapFlattenType)
+    ruleOutputParam = new RuleOutputParam("metric", "", "metr")
+    ruleOutputParam.getFlatten should not be (MapFlattenType)
+    ruleOutputParam.getFlatten should be(DefaultFlattenType)
+
+    ruleOutputParam = new RuleOutputParam("metric", "", "array")
+    ruleOutputParam.getFlatten should be(ArrayFlattenType)
+    ruleOutputParam = new RuleOutputParam("metric", "", "list")
+    ruleOutputParam.getFlatten should be(ArrayFlattenType)
+    ruleOutputParam = new RuleOutputParam("metric", "", "arrays")
+    ruleOutputParam.getFlatten should not be (ArrayFlattenType)
+    ruleOutputParam.getFlatten should be(DefaultFlattenType)
+
+    ruleOutputParam = new RuleOutputParam("metric", "", "entries")
+    ruleOutputParam.getFlatten should be(EntriesFlattenType)
+    ruleOutputParam = new RuleOutputParam("metric", "", "entry")
+    ruleOutputParam.getFlatten should not be (EntriesFlattenType)
+    ruleOutputParam.getFlatten should be(DefaultFlattenType)
+  }
+
+  "sinktype" should "be valid" in {
+    import org.mockito.Mockito._
+    import org.apache.griffin.measure.configuration.enums.SinkType._
+    var dqConfig = new DQConfig(
+      "test",
+      1234,
+      "",
+      Nil,
+      mock(classOf[EvaluateRuleParam]),
+      List(
+        "Console",
+        "Log",
+        "CONSOLE",
+        "LOG",
+        "Es",
+        "ElasticSearch",
+        "Http",
+        "MongoDB",
+        "mongo",
+        "hdfs"
+      )
+    )
+    dqConfig.getValidSinkTypes should be(
+      Seq(
+        Console,
+        ElasticSearch,
+        MongoDB,
+        Hdfs
+      )
+    )
+    dqConfig = new DQConfig(
+      "test",
+      1234,
+      "",
+      Nil,
+      mock(classOf[EvaluateRuleParam]),
+      List("Consol", "Logg")
+    )
+    dqConfig.getValidSinkTypes should not be (Seq(Console))
+    dqConfig.getValidSinkTypes should be(Seq(ElasticSearch))
+
+    dqConfig = new DQConfig(
+      "test",
+      1234,
+      "",
+      Nil,
+      mock(classOf[EvaluateRuleParam]),
+      List("")
+    )
+    dqConfig.getValidSinkTypes should be(Seq(ElasticSearch))
+  }
+
+}
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
index 024e96b..38d8db3 100644
--- 
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReaderSpec.scala
@@ -17,21 +17,22 @@
 
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
-import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
 import org.scalatest._
+import scala.util.{Failure, Success}
 
+import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
+import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
 
-import scala.util.{Failure, Success}
 
 class ParamFileReaderSpec extends FlatSpec with Matchers{
 
 
   "params " should "be parsed from a valid file" in {
-    val reader :ParamReader = 
ParamFileReader(getClass.getResource("/_accuracy-batch-griffindsl.json").getFile)
+    val reader: ParamReader = 
ParamFileReader(getClass.getResource("/_accuracy-batch-griffindsl.json").getFile)
     val params = reader.readConfig[DQConfig]
     params match {
       case Success(v) =>
-        v.getEvaluateRule.getRules(0).getDslType.desc should === 
("griffin-dsl")
+        v.getEvaluateRule.getRules(0).getDslType should === (GriffinDsl)
         v.getEvaluateRule.getRules(0).getOutDfName() should === ("accu")
       case Failure(_) =>
         fail("it should not happen")
@@ -40,7 +41,8 @@ class ParamFileReaderSpec extends FlatSpec with Matchers{
   }
 
   it should "fail for an invalid file" in {
-    val reader :ParamReader = 
ParamFileReader(getClass.getResource("/invalidconfigs/missingrule_accuracy_batch_sparksql.json").getFile)
+    val reader: ParamReader = ParamFileReader(getClass.
+      
getResource("/invalidconfigs/missingrule_accuracy_batch_sparksql.json").getFile)
     val params = reader.readConfig[DQConfig]
     params match {
       case Success(_) =>
@@ -52,7 +54,8 @@ class ParamFileReaderSpec extends FlatSpec with Matchers{
   }
 
   it should "fail for an invalid completeness json file" in {
-    val reader: ParamFileReader = 
ParamFileReader(getClass.getResource("/invalidconfigs/invalidtype_completeness_batch_griffindal.json").getFile)
+    val reader: ParamFileReader = ParamFileReader(getClass.
+      
getResource("/invalidconfigs/invalidtype_completeness_batch_griffindal.json").getFile)
     val params = reader.readConfig[DQConfig]
     params match {
       case Success(_) =>
@@ -63,7 +66,8 @@ class ParamFileReaderSpec extends FlatSpec with Matchers{
   }
 
   it should "be parsed from a valid errorconf completeness json file" in {
-    val reader :ParamReader = 
ParamFileReader(getClass.getResource("/_completeness_errorconf-batch-griffindsl.json").getFile)
+    val reader: ParamReader = ParamFileReader(getClass.
+      getResource("/_completeness_errorconf-batch-griffindsl.json").getFile)
     val params = reader.readConfig[DQConfig]
     params match {
       case Success(v) =>
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
index 6227c20..54bd95b 100644
--- 
a/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReaderSpec.scala
@@ -17,12 +17,15 @@
 
 package org.apache.griffin.measure.configuration.dqdefinition.reader
 
-import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
-import org.scalatest.{FlatSpec, Matchers}
-
 import scala.io.Source
+
+import org.scalatest.{FlatSpec, Matchers}
 import scala.util.{Failure, Success}
 
+import org.apache.griffin.measure.configuration.dqdefinition.DQConfig
+import org.apache.griffin.measure.configuration.enums.DslType.GriffinDsl
+
+
 class ParamJsonReaderSpec extends FlatSpec with Matchers{
 
 
@@ -31,11 +34,11 @@ class ParamJsonReaderSpec extends FlatSpec with Matchers{
     val jsonString = bufferedSource.getLines().mkString
     bufferedSource.close
 
-    val reader :ParamReader = ParamJsonReader(jsonString)
+    val reader: ParamReader = ParamJsonReader(jsonString)
     val params = reader.readConfig[DQConfig]
     params match {
       case Success(v) =>
-        v.getEvaluateRule.getRules(0).getDslType.desc should === 
("griffin-dsl")
+        v.getEvaluateRule.getRules(0).getDslType should === (GriffinDsl)
         v.getEvaluateRule.getRules(0).getOutDfName() should === ("accu")
       case Failure(_) =>
         fail("it should not happen")
@@ -44,11 +47,12 @@ class ParamJsonReaderSpec extends FlatSpec with Matchers{
   }
 
   it should "fail for an invalid file" in {
-    val bufferedSource = 
Source.fromFile(getClass.getResource("/invalidconfigs/missingrule_accuracy_batch_sparksql.json").getFile)
+    val bufferedSource = Source.fromFile(getClass.
+      
getResource("/invalidconfigs/missingrule_accuracy_batch_sparksql.json").getFile)
     val jsonString = bufferedSource.getLines().mkString
     bufferedSource.close
 
-    val reader :ParamReader = ParamJsonReader(jsonString)
+    val reader: ParamReader = ParamJsonReader(jsonString)
     val params = reader.readConfig[DQConfig]
     params match {
       case Success(_) =>
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala 
b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
index bf163aa..952c591 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/job/DQAppTest.scala
@@ -19,16 +19,15 @@ package org.apache.griffin.measure.job
 
 import scala.util.Failure
 import scala.util.Success
-
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
-
 import org.apache.griffin.measure.Application._
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.SparkSuiteBase
 import org.apache.griffin.measure.configuration.dqdefinition._
-import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.enums.ProcessType
+import org.apache.griffin.measure.configuration.enums.ProcessType._
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.launch.batch.BatchDQApp
 import org.apache.griffin.measure.launch.streaming.StreamingDQApp
@@ -55,7 +54,7 @@ class DQAppTest extends FlatSpec with SparkSuiteBase with 
BeforeAndAfterAll with
     val allParam: GriffinConfig = GriffinConfig(envParam, dqParam)
 
     // choose process
-    val procType = ProcessType(allParam.getDqConfig.getProcType)
+    val procType = 
ProcessType.withNameWithDefault(allParam.getDqConfig.getProcType)
     dqApp = procType match {
       case BatchProcessType => BatchDQApp(allParam)
       case StreamingProcessType => StreamingDQApp(allParam)
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala 
b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
index c226299..33886f8 100644
--- 
a/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/sink/CustomSinkTest.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.sink
 import scala.collection.mutable
 
 import org.apache.griffin.measure.configuration.dqdefinition.{RuleOutputParam, 
SinkParam}
-import org.apache.griffin.measure.configuration.enums.FlattenType
+import 
org.apache.griffin.measure.configuration.enums.FlattenType.DefaultFlattenType
 import org.apache.griffin.measure.step.write.{MetricFlushStep, 
MetricWriteStep, RecordWriteStep}
 
 class CustomSinkTest extends SinkTestBase {
@@ -113,7 +113,7 @@ class CustomSinkTest extends SinkTestBase {
     val metricWriteStep = {
       val metricOpt = Some(metricsDefaultOutput)
       val mwName = 
metricOpt.flatMap(_.getNameOpt).getOrElse("default_metrics_name")
-      val flattenType = 
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+      val flattenType = 
metricOpt.map(_.getFlatten).getOrElse(DefaultFlattenType)
       MetricWriteStep(mwName, resultTable, flattenType)
     }
 
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala 
b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
index c474081..5380b4c 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/sink/SinkTestBase.scala
@@ -25,7 +25,7 @@ import org.scalatest.Matchers
 
 import org.apache.griffin.measure.Loggable
 import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
-import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import 
org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
 import org.apache.griffin.measure.context.{ContextId, DQContext}
 import org.apache.griffin.measure.SparkSuiteBase
 
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
index b7f1bc3..c3a9330 100644
--- 
a/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/step/TransformStepTest.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.step
 import org.scalatest._
 
 import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import 
org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
 import org.apache.griffin.measure.context.ContextId
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.SparkSuiteBase
diff --git 
a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
 
b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
index 11042ff..2bf5119 100644
--- 
a/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
+++ 
b/measure/src/test/scala/org/apache/griffin/measure/transformations/AccuracyTransformationsIntegrationTest.scala
@@ -19,9 +19,8 @@ package org.apache.griffin.measure.transformations
 
 import org.apache.spark.sql.DataFrame
 import org.scalatest._
-
 import org.apache.griffin.measure.configuration.dqdefinition._
-import org.apache.griffin.measure.configuration.enums.BatchProcessType
+import 
org.apache.griffin.measure.configuration.enums.ProcessType.BatchProcessType
 import org.apache.griffin.measure.context.{ContextId, DQContext}
 import org.apache.griffin.measure.datasource.DataSourceFactory
 import org.apache.griffin.measure.job.builder.DQJobBuilder

Reply via email to