http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala
deleted file mode 100644
index 00302a7..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/CleanerParam.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.params.env
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class CleanerParam( @JsonProperty("clean.interval") cleanInterval: String
-                       ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EmailParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EmailParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EmailParam.scala
deleted file mode 100644
index f5f8a4c..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EmailParam.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.measure.config.params.env
-
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import org.apache.griffin.measure.config.params.Param
-
-/**
-  * Created by xiaoqiu.duan on 2017/10/23.
-  */
-@JsonInclude(Include.NON_NULL)
-case class EmailParam(@JsonProperty("host") host: String,
-                       @JsonProperty("mail") mail: String,
-                       @JsonProperty("user") usr: String,
-                       @JsonProperty("password") pwd: String
-                     ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala
deleted file mode 100644
index 629d06a..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/EnvParam.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.params.env
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class EnvParam(@JsonProperty("spark") sparkParam: SparkParam,
-                    @JsonProperty("persist") persistParams: List[PersistParam],
-                    @JsonProperty("info.cache") infoCacheParams: 
List[InfoCacheParam],
-                    @JsonProperty("mail") emailParam: EmailParam,
-                    @JsonProperty("sms") smsParam: SMSParam,
-                    @JsonProperty("cleaner") cleanerParam: CleanerParam
-                   ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala
deleted file mode 100644
index be588f9..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/InfoCacheParam.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.params.env
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class InfoCacheParam( @JsonProperty("type") persistType: String,
-                           @JsonProperty("config") config: Map[String, Any]
-                         ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala
deleted file mode 100644
index 68b9bc8..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/PersistParam.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.params.env
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class PersistParam( @JsonProperty("type") persistType: String,
-                         @JsonProperty("config") config: Map[String, Any]
-                       ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SMSParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SMSParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SMSParam.scala
deleted file mode 100644
index 0cc87a7..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SMSParam.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.measure.config.params.env
-
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import org.apache.griffin.measure.config.params.Param
-
-/**
-  * Created by xiaoqiu.duan on 2017/10/23.
-  */
-@JsonInclude(Include.NON_NULL)
-case class SMSParam(@JsonProperty("host") host: String,
-                    @JsonProperty("id") id: String,
-                    @JsonProperty("key") key: String,
-                    @JsonProperty("UUID") uuid: String
-                     ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala
deleted file mode 100644
index a21a64f..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/env/SparkParam.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.params.env
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class SparkParam( @JsonProperty("log.level") logLevel: String,
-                       @JsonProperty("checkpoint.dir") cpDir: String,
-                       @JsonProperty("batch.interval") batchInterval: String,
-                       @JsonProperty("process.interval") processInterval: 
String,
-                       @JsonProperty("config") config: Map[String, String],
-                       @JsonProperty("init.clear") initClear: Boolean
-                     ) extends Param {
-
-  def needInitClear: Boolean = if (initClear != null) initClear else false
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
deleted file mode 100644
index a819997..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataConnectorParam.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.params.user
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class DataConnectorParam( @JsonProperty("type") conType: String,
-                               @JsonProperty("version") version: String,
-                               @JsonProperty("config") config: Map[String, 
Any],
-                               @JsonProperty("pre.proc") preProc: 
List[Map[String, Any]]
-                             ) extends Param {
-
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
deleted file mode 100644
index c43ea70..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/DataSourceParam.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.params.user
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class DataSourceParam( @JsonProperty("name") name: String,
-                            @JsonProperty("baseline") baseline: Boolean,
-                            @JsonProperty("connectors") connectors: 
List[DataConnectorParam],
-                            @JsonProperty("cache") cache: Map[String, Any]
-                          ) extends Param {
-  def hasName: Boolean = (name != null)
-  def isBaseLine: Boolean = if (baseline == null) false else baseline
-  def falseBaselineClone: DataSourceParam = DataSourceParam(name, false, 
connectors, cache)
-  def getConnectors: List[DataConnectorParam] = if (connectors != null) 
connectors else Nil
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
deleted file mode 100644
index 2abf3e5..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.params.user
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class EvaluateRuleParam( @JsonProperty("dsl.type") dslType: String,
-                              @JsonProperty("rules") rules: List[Map[String, 
Any]]
-                            ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
deleted file mode 100644
index b0f7b29..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.params.user
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class UserParam( @JsonProperty("name") name: String,
-                      @JsonProperty("timestamp") timestamp: Long,
-                      @JsonProperty("process.type") procType: String,
-                      @JsonProperty("data.sources") dataSourceParams: 
List[DataSourceParam],
-                      @JsonProperty("evaluate.rule") evaluateRuleParam: 
EvaluateRuleParam
-                    ) extends Param {
-
-  private val validDs = {
-    val (validDsParams, _) = dataSourceParams.foldLeft((Nil: 
Seq[DataSourceParam], Set[String]())) { (ret, dsParam) =>
-      val (seq, names) = ret
-      if (dsParam.hasName && !names.contains(dsParam.name)) {
-        (seq :+ dsParam, names + dsParam.name)
-      } else ret
-    }
-    validDsParams
-  }
-  private val baselineDsOpt = {
-    val baselines = validDs.filter(_.isBaseLine)
-    if (baselines.nonEmpty) baselines.headOption
-    else validDs.headOption
-  }
-
-  val baselineDsName = baselineDsOpt match {
-    case Some(ds) => ds.name
-    case _ => ""
-  }
-  val dataSources = {
-    validDs.map { ds =>
-      if (ds.name != baselineDsName && ds.isBaseLine) {
-        ds.falseBaselineClone
-      } else ds
-    }
-  }
-
-  override def validate(): Boolean = {
-    dataSources.nonEmpty
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala
deleted file mode 100644
index ede68f4..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamFileReader.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.reader
-
-import org.apache.griffin.measure.config.params.Param
-import org.apache.griffin.measure.utils.JsonUtil
-
-import scala.util.Try
-
-case class ParamFileReader(file: String) extends ParamReader {
-
-  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
-    Try {
-      val source = scala.io.Source.fromFile(file)
-      val lines = source.mkString
-      val param = JsonUtil.fromJson[T](lines)
-      source.close
-      param
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala
deleted file mode 100644
index 8b51b11..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamHdfsFileReader.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.reader
-
-import org.apache.griffin.measure.config.params.Param
-import org.apache.griffin.measure.utils.JsonUtil
-import org.apache.griffin.measure.utils.HdfsUtil
-
-import scala.util.Try
-
-case class ParamHdfsFileReader(filePath: String) extends ParamReader {
-
-  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
-    Try {
-      val source = HdfsUtil.openFile(filePath)
-      val param = JsonUtil.fromJson[T](source)
-      source.close
-      param
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala
deleted file mode 100644
index 87e1953..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamRawStringReader.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.reader
-
-import org.apache.griffin.measure.config.params.Param
-import org.apache.griffin.measure.utils.JsonUtil
-
-import scala.util.Try
-
-case class ParamRawStringReader(rawString: String) extends ParamReader {
-
-  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
-    Try {
-      val param = JsonUtil.fromJson[T](rawString)
-      param
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala
deleted file mode 100644
index 3508223..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReader.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.reader
-
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.config.params.Param
-
-import scala.util.Try
-
-trait ParamReader extends Loggable with Serializable {
-
-  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala
deleted file mode 100644
index 9299247..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/reader/ParamReaderFactory.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.reader
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-
-
-object ParamReaderFactory {
-
-  val RawStringRegex = """^(?i)raw$""".r
-  val LocalFsRegex = """^(?i)local$""".r
-  val HdfsFsRegex = """^(?i)hdfs$""".r
-
-  def getParamReader(filePath: String, fsType: String): ParamReader = {
-    fsType match {
-      case RawStringRegex() => ParamRawStringReader(filePath)
-      case LocalFsRegex() => ParamFileReader(filePath)
-      case HdfsFsRegex() => ParamHdfsFileReader(filePath)
-      case _ => ParamHdfsFileReader(filePath)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala
deleted file mode 100644
index fd486e9..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/config/validator/ParamValidator.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.config.validator
-
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.config.params._
-
-import scala.util.Try
-
-object ParamValidator extends Loggable with Serializable {
-
-  def validate[T <: Param](param: Param): Try[Boolean] = Try {
-    param.validate
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
new file mode 100644
index 0000000..154510f
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala
@@ -0,0 +1,112 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.enums
+
+import scala.util.matching.Regex
+
+/**
+  * effective when dsl type is "griffin-dsl",
+  * indicates the dq type of griffin pre-defined measurements
+  */
+sealed trait DqType {
+  val regex: Regex
+  val desc: String
+}
+
+object DqType {
+  private val dqTypes: List[DqType] = List(
+    AccuracyType, ProfilingType, UniquenessType, DistinctnessType, 
TimelinessType, CompletenessType, UnknownType
+  )
+  def apply(ptn: String): DqType = {
+    dqTypes.find(tp => ptn match {
+      case tp.regex() => true
+      case _ => false
+    }).getOrElse(UnknownType)
+  }
+  def unapply(pt: DqType): Option[String] = Some(pt.desc)
+}
+
+/**
+  * accuracy: the match percentage of items between source and target
+  * count(source items matched with the ones from target) / count(source)
+  * e.g.: source [1, 2, 3, 4, 5], target: [1, 2, 3, 4]
+  *       metric will be: { "total": 5, "miss": 1, "matched": 4 }
+  *       accuracy is 80%.
+  */
+final case object AccuracyType extends DqType {
+  val regex = "^(?i)accuracy$".r
+  val desc = "accuracy"
+}
+
+/**
+  * profiling: the statistic data of data source
+  * e.g.: max, min, average, group by count, ...
+  */
+final case object ProfilingType extends DqType {
+  val regex = "^(?i)profiling$".r
+  val desc = "profiling"
+}
+
+/**
+  * uniqueness: the uniqueness of data source comparing with itself
+  * count(unique items in source) / count(source)
+  * e.g.: [1, 2, 3, 3] -> { "unique": 2, "total": 4, "dup-arr": [ "dup": 1, 
"num": 1 ] }
+  * uniqueness indicates the items without any replica of data
+  */
+final case object UniquenessType extends DqType {
+  val regex = "^(?i)uniqueness|duplicate$".r
+  val desc = "uniqueness"
+}
+
+/**
+  * distinctness: the distinctness of data source comparing with itself
+  * count(distinct items in source) / count(source)
+  * e.g.: [1, 2, 3, 3] -> { "dist": 3, "total": 4, "dup-arr": [ "dup": 1, 
"num": 1 ] }
+  * distinctness indicates the valid information of data
+  * comparing with uniqueness, distinctness is more meaningful
+  */
+final case object DistinctnessType extends DqType {
+  val regex = "^(?i)distinct$".r
+  val desc = "distinct"
+}
+
+/**
+  * timeliness: the latency of data source with timestamp information
+  * e.g.: (receive_time - send_time)
+  * timeliness can get the statistic metric of latency, like average, max, 
min, percentile-value,
+  * even more, it can record the items with latency above threshold you 
configured
+  */
+final case object TimelinessType extends DqType {
+  val regex = "^(?i)timeliness$".r
+  val desc = "timeliness"
+}
+
+/**
+  * completeness: the completeness of data source
+  * the columns you measure is incomplete if it is null
+  */
+final case object CompletenessType extends DqType {
+  val regex = "^(?i)completeness$".r
+  val desc = "completeness"
+}
+
+final case object UnknownType extends DqType {
+  val regex = "".r
+  val desc = "unknown"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
new file mode 100644
index 0000000..c6b68d3
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
@@ -0,0 +1,64 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.enums
+
+import scala.util.matching.Regex
+
+/**
+  * dsl type indicates the language type of rule param
+  */
+sealed trait DslType {
+  val regex: Regex
+  val desc: String
+}
+
+object DslType {
+  private val dslTypes: List[DslType] = List(SparkSqlType, GriffinDslType, 
DataFrameOpsType)
+  def apply(ptn: String): DslType = {
+    dslTypes.find(tp => ptn match {
+      case tp.regex() => true
+      case _ => false
+    }).getOrElse(GriffinDslType)
+  }
+  def unapply(pt: DslType): Option[String] = Some(pt.desc)
+}
+
+/**
+  * spark-sql: rule defined in "SPARK-SQL" directly
+  */
+final case object SparkSqlType extends DslType {
+  val regex = "^(?i)spark-?sql$".r
+  val desc = "spark-sql"
+}
+
+/**
+  * df-ops: data frame operations rule, support some pre-defined data frame ops
+  */
+final case object DataFrameOpsType extends DslType {
+  val regex = "^(?i)df-?(?:op|opr|ops)$".r
+  val desc = "df-opr"
+}
+
+/**
+  * griffin-dsl: griffin dsl rule, to define dq measurements easier
+  */
+final case object GriffinDslType extends DslType {
+  val regex = "^(?i)griffin-?dsl$".r
+  val desc = "griffin-dsl"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
new file mode 100644
index 0000000..d8cfcf8
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
@@ -0,0 +1,86 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.enums
+
+import scala.util.matching.Regex
+
+/**
+  * the normalize strategy to collect metric
+  */
+sealed trait NormalizeType {
+  val regex: Regex
+  val desc: String
+}
+
+object NormalizeType {
+  private val normalizeTypes: List[NormalizeType] = List(DefaultNormalizeType, 
EntriesNormalizeType, ArrayNormalizeType, MapNormalizeType)
+  def apply(ptn: String): NormalizeType = {
+    normalizeTypes.find(tp => ptn match {
+      case tp.regex() => true
+      case _ => false
+    }).getOrElse(DefaultNormalizeType)
+  }
+  def unapply(pt: NormalizeType): Option[String] = Some(pt.desc)
+}
+
+/**
+  * default normalize strategy
+  * metrics contains n rows -> normalized metric json map
+  * n = 0: { }
+  * n = 1: { "col1": "value1", "col2": "value2", ... }
+  * n > 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] }
+  * all rows
+  */
+final case object DefaultNormalizeType extends NormalizeType {
+  val regex: Regex = "".r
+  val desc: String = "default"
+}
+
+/**
+  * metrics contains n rows -> normalized metric json map
+  * n = 0: { }
+  * n >= 1: { "col1": "value1", "col2": "value2", ... }
+  * the first row only
+  */
+final case object EntriesNormalizeType extends NormalizeType {
+  val regex: Regex = "^(?i)entries$".r
+  val desc: String = "entries"
+}
+
+/**
+  * metrics contains n rows -> normalized metric json map
+  * n = 0: { "arr-name": [ ] }
+  * n >= 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] 
}
+  * all rows
+  */
+final case object ArrayNormalizeType extends NormalizeType {
+  val regex: Regex = "^(?i)array|list$".r
+  val desc: String = "array"
+}
+
+/**
+  * metrics contains n rows -> normalized metric json map
+  * n = 0: { "map-name": { } }
+  * n >= 1: { "map-name": { "col1": "value1", "col2": "value2", ... } }
+  * the first row only
+  */
+final case object MapNormalizeType extends NormalizeType {
+  val regex: Regex = "^(?i)map$".r
+  val desc: String = "map"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
new file mode 100644
index 0000000..18a82bf
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala
@@ -0,0 +1,56 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.enums
+
+import scala.util.matching.Regex
+
+/**
+  * process type enum
+  */
+sealed trait ProcessType {
+  val regex: Regex
+  val desc: String
+}
+
+object ProcessType {
+  private val procTypes: List[ProcessType] = List(BatchProcessType, 
StreamingProcessType)
+  def apply(ptn: String): ProcessType = {
+    procTypes.find(tp => ptn match {
+      case tp.regex() => true
+      case _ => false
+    }).getOrElse(BatchProcessType)
+  }
+  def unapply(pt: ProcessType): Option[String] = Some(pt.desc)
+}
+
+/**
+  * process in batch mode
+  */
+final case object BatchProcessType extends ProcessType {
+  val regex = """^(?i)batch$""".r
+  val desc = "batch"
+}
+
+/**
+  * process in streaming mode
+  */
+final case object StreamingProcessType extends ProcessType {
+  val regex = """^(?i)streaming$""".r
+  val desc = "streaming"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
new file mode 100644
index 0000000..fd73da3
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/WriteMode.scala
@@ -0,0 +1,43 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.enums
+
+/**
+  * write mode when write metrics and records
+  */
+sealed trait WriteMode {}
+
+object WriteMode {
+  def defaultMode(procType: ProcessType): WriteMode = {
+    procType match {
+      case BatchProcessType => SimpleMode
+      case StreamingProcessType => TimestampMode
+    }
+  }
+}
+
+/**
+  * simple mode: write metrics and records directly
+  */
+final case object SimpleMode extends WriteMode {}
+
+/**
+  * timestamp mode: write metrics and records with timestamp information
+  */
+final case object TimestampMode extends WriteMode {}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamFileReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamFileReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamFileReader.scala
new file mode 100644
index 0000000..5091d49
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamFileReader.scala
@@ -0,0 +1,43 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.json
+
+import org.apache.griffin.measure.configuration.params.Param
+import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
+
+import scala.util.Try
+
+/**
+  * read params from config file path
+  * @param filePath:  hdfs path ("hdfs://cluster-name/path")
+  *                   local file path ("file:///path")
+  *                   relative file path ("relative/path")
+  */
+case class ParamFileReader(filePath: String) extends ParamReader {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
+    Try {
+      val source = HdfsUtil.openFile(filePath)
+      val param = JsonUtil.fromJson[T](source)
+      source.close
+      param
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamJsonReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamJsonReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamJsonReader.scala
new file mode 100644
index 0000000..2f4a5e7
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamJsonReader.scala
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.json
+
+import org.apache.griffin.measure.configuration.params.Param
+import org.apache.griffin.measure.utils.JsonUtil
+
+import scala.util.Try
+
+/**
+  * read params from json string directly
+  * @param jsonString
+  */
+case class ParamJsonReader(jsonString: String) extends ParamReader {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
+    Try {
+      val param = JsonUtil.fromJson[T](jsonString)
+      param
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReader.scala
new file mode 100644
index 0000000..d923cd5
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReader.scala
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.json
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.params.Param
+
+import scala.util.Try
+
+trait ParamReader extends Loggable with Serializable {
+
+  /**
+    * read config param
+    * @tparam T     param type expected
+    * @return       parsed param
+    */
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReaderFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReaderFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReaderFactory.scala
new file mode 100644
index 0000000..14fce02
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/json/ParamReaderFactory.scala
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.json
+
+import org.apache.griffin.measure.utils.JsonUtil
+
+object ParamReaderFactory {
+
+  val json = "json"
+  val file = "file"
+
+  /**
+    * parse string content to get param reader
+    * @param pathOrJson
+    * @return
+    */
+  def getParamReader(pathOrJson: String): ParamReader = {
+    val strType = paramStrType(pathOrJson)
+    if (json.equals(strType)) ParamJsonReader(pathOrJson)
+    else ParamFileReader(pathOrJson)
+  }
+
+  private def paramStrType(str: String): String = {
+    try {
+      JsonUtil.toAnyMap(str)
+      json
+    } catch {
+      case e: Throwable => file
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala
new file mode 100644
index 0000000..4ba1a15
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala
@@ -0,0 +1,36 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+/**
+  * full set of griffin configuration
+  * @param envParam   environment configuration (must)
+  * @param dqParam    dq measurement configuration (must)
+  */
+@JsonInclude(Include.NON_NULL)
+case class AllParam( @JsonProperty("env") envParam: EnvParam,
+                     @JsonProperty("dq") dqParam: DQParam
+                   ) extends Param {
+  override def validate(): Boolean = {
+    envParam != null && dqParam != null
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala
new file mode 100644
index 0000000..8d3c354
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala
@@ -0,0 +1,183 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.params
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import org.apache.commons.lang.StringUtils
+import org.apache.griffin.measure.configuration.enums.{DqType, DslType}
+
+/**
+  * dq param
+  * @param name         name of dq measurement (must)
+  * @param timestamp    default timestamp of measure in batch mode (optional)
+  * @param procType     batch mode or streaming mode (must)
+  * @param dataSourceParams   data sources (optional)
+  * @param evaluateRuleParam  dq measurement (optional)
+  */
+@JsonInclude(Include.NON_NULL)
+case class DQParam( @JsonProperty("name") name: String,
+                    @JsonProperty("timestamp") timestamp: Long,
+                    @JsonProperty("process.type") procType: String,
+                    @JsonProperty("data.sources") dataSourceParams: 
List[DataSourceParam],
+                    @JsonProperty("evaluate.rule") evaluateRuleParam: 
EvaluateRuleParam
+                  ) extends Param {
+  val dataSources = {
+    val (validDsParams, _) = dataSourceParams.foldLeft((Nil: 
Seq[DataSourceParam], Set[String]())) { (ret, dsParam) =>
+      val (seq, names) = ret
+      if (dsParam.hasName && !names.contains(dsParam.name)) {
+        (seq :+ dsParam, names + dsParam.name)
+      } else ret
+    }
+    validDsParams
+  }
+  val evaluateRule: EvaluateRuleParam = {
+    if (evaluateRuleParam != null) evaluateRuleParam
+    else EvaluateRuleParam("", Nil)
+  }
+
+  override def validate(): Boolean = {
+    dataSources.nonEmpty
+  }
+}
+
+/**
+  * data source param
+  * @param name         data source name (must)
+  * @param connectors   data connectors (optional)
+  * @param cache        data source cache configuration (must in streaming 
mode with streaming connectors)
+  */
+@JsonInclude(Include.NON_NULL)
+case class DataSourceParam( @JsonProperty("name") name: String,
+                            @JsonProperty("connectors") connectors: 
List[DataConnectorParam],
+                            @JsonProperty("cache") cache: Map[String, Any]
+                          ) extends Param {
+  def hasName: Boolean = StringUtils.isNotBlank(name)
+  def getConnectors: List[DataConnectorParam] = if (connectors != null) 
connectors else Nil
+  def hasCache: Boolean = (cache != null)
+
+  override def validate(): Boolean = hasName
+}
+
+/**
+  * data connector param
+  * @param conType    data connector type, e.g.: hive, avro, kafka (must)
+  * @param version    data connector type version (optional)
+  * @param config     detail configuration of data connector (must)
+  * @param preProc    pre-process rules after load data (optional)
+  */
+@JsonInclude(Include.NON_NULL)
+case class DataConnectorParam( @JsonProperty("type") conType: String,
+                               @JsonProperty("version") version: String,
+                               @JsonProperty("config") config: Map[String, 
Any],
+                               @JsonProperty("pre.proc") preProc: 
List[RuleParam]
+                             ) extends Param {
+  override def validate(): Boolean = {
+    StringUtils.isNotBlank(conType)
+  }
+}
+
+/**
+  * evaluate rule param
+  * @param dslType    default dsl type for all rules (optional)
+  * @param rules      rules to define dq measurement (optional)
+  */
+@JsonInclude(Include.NON_NULL)
+case class EvaluateRuleParam( @JsonProperty("dsl.type") dslType: String,
+                              @JsonProperty("rules") rules: List[RuleParam]
+                            ) extends Param {
+  def getDslType: DslType = if (dslType != null) DslType(dslType) else 
DslType("")
+  def getRules: List[RuleParam] = if (rules != null) rules else Nil
+}
+
+/**
+  * rule param
+  * @param dslType    dsl type of this rule (must if default dsl type not set)
+  * @param dqType     dq type of this rule (valid for "griffin-dsl")
+  * @param name       name of result calculated by this rule (must if for 
later usage)
+  * @param rule       rule to define dq step calculation (must)
+  * @param details    detail config of rule (optional)
+  * @param cache      cache the result for multiple usage (optional, valid for 
"spark-sql" and "df-opr" mode)
+  * @param metric     config for metric output (optional)
+  * @param record     config for record output (optional)
+  * @param dsCacheUpdate    config for data source cache update output 
(optional, valid in streaming mode)
+  */
+@JsonInclude(Include.NON_NULL)
+case class RuleParam( @JsonProperty("dsl.type") dslType: String,
+                      @JsonProperty("dq.type") dqType: String,
+                      @JsonProperty("name") name: String,
+                      @JsonProperty("rule") rule: String,
+                      @JsonProperty("details") details: Map[String, Any],
+                      @JsonProperty("cache") cache: Boolean,
+                      @JsonProperty("metric") metric: RuleMetricParam,
+                      @JsonProperty("record") record: RuleRecordParam,
+                      @JsonProperty("ds.cache.update") dsCacheUpdate: 
RuleDsCacheUpdateParam
+                    ) extends Param {
+  def getDslType(defaultDslType: DslType): DslType = if (dslType != null) 
DslType(dslType) else defaultDslType
+  def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
+  def getName: String = if (name != null) name else ""
+  def getRule: String = if (rule != null) rule else ""
+  def getDetails: Map[String, Any] = if (details != null) details else 
Map[String, Any]()
+  def getCache: Boolean = if (cache != null) cache else false
+
+  def metricOpt: Option[RuleMetricParam] = if (metric != null) Some(metric) 
else None
+  def recordOpt: Option[RuleRecordParam] = if (record != null) Some(record) 
else None
+  def dsCacheUpdateOpt: Option[RuleDsCacheUpdateParam] = if (dsCacheUpdate != 
null) Some(dsCacheUpdate) else None
+
+  def replaceName(newName: String): RuleParam = {
+    if (StringUtils.equals(newName, name)) this
+    else RuleParam(dslType, dqType, newName, rule, details, cache, metric, 
record, dsCacheUpdate)
+  }
+  def replaceRule(newRule: String): RuleParam = {
+    if (StringUtils.equals(newRule, rule)) this
+    else RuleParam(dslType, dqType, name, newRule, details, cache, metric, 
record, dsCacheUpdate)
+  }
+  def replaceDetails(newDetails: Map[String, Any]): RuleParam = {
+    RuleParam(dslType, dqType, name, rule, newDetails, cache, metric, record, 
dsCacheUpdate)
+  }
+}
+
+/**
+  * metric param of rule
+  * @param name         name of metric to output (optional)
+  * @param collectType  the normalize strategy to collect metric  (optional)
+  */
+@JsonInclude(Include.NON_NULL)
+case class RuleMetricParam( @JsonProperty("name") name: String,
+                            @JsonProperty("collect.type") collectType: String
+                          ) extends Param {
+}
+
+/**
+  * record param of rule
+  * @param name   name of record to output (optional)
+  */
+@JsonInclude(Include.NON_NULL)
+case class RuleRecordParam( @JsonProperty("name") name: String
+                          ) extends Param {
+}
+
+/**
+  * data source cache update param of rule
+  * @param dsName   name of data source to be updated by thie rule result 
(must)
+  */
+@JsonInclude(Include.NON_NULL)
+case class RuleDsCacheUpdateParam( @JsonProperty("ds.name") dsName: String
+                                 ) extends Param {
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala
new file mode 100644
index 0000000..5ee0610
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala
@@ -0,0 +1,97 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.params
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import org.apache.commons.lang.StringUtils
+
+/**
+  * environment param
+  * @param sparkParam       config of spark environment (must)
+  * @param persistParams    config of persist ways (optional)
+  * @param infoCacheParams  config of information cache ways (must in 
streaming mode)
+  * @param cleanerParam     config of cleaner (optional)
+  */
+@JsonInclude(Include.NON_NULL)
+case class EnvParam( @JsonProperty("spark") sparkParam: SparkParam,
+                     @JsonProperty("persist") persistParams: 
List[PersistParam],
+                     @JsonProperty("info.cache") infoCacheParams: 
List[InfoCacheParam],
+                     @JsonProperty("cleaner") cleanerParam: CleanerParam
+                   ) extends Param {
+}
+
+/**
+  * spark param
+  * @param logLevel         log level of spark application (optional)
+  * @param cpDir            checkpoint directory for spark streaming (must in 
streaming mode)
+  * @param batchInterval    batch interval for spark streaming (must in 
streaming mode)
+  * @param processInterval  process interval for streaming dq calculation 
(must in streaming mode)
+  * @param config           extra config for spark environment (optional)
+  * @param initClear        clear checkpoint directory or not when initial 
(optional)
+  */
+@JsonInclude(Include.NON_NULL)
+case class SparkParam( @JsonProperty("log.level") logLevel: String,
+                       @JsonProperty("checkpoint.dir") cpDir: String,
+                       @JsonProperty("batch.interval") batchInterval: String,
+                       @JsonProperty("process.interval") processInterval: 
String,
+                       @JsonProperty("config") config: Map[String, String],
+                       @JsonProperty("init.clear") initClear: Boolean
+                     ) extends Param {
+  def getLogLevel: String = if (logLevel != null) logLevel else "WARN"
+  def needInitClear: Boolean = if (initClear != null) initClear else false
+}
+
+/**
+  * persist param
+  * @param persistType    persist type, e.g.: log, hdfs, http, mongo (must)
+  * @param config         config of persist way (must)
+  */
+@JsonInclude(Include.NON_NULL)
+case class PersistParam( @JsonProperty("type") persistType: String,
+                         @JsonProperty("config") config: Map[String, Any]
+                       ) extends Param {
+  override def validate(): Boolean = {
+    StringUtils.isNotBlank(persistType)
+  }
+}
+
+/**
+  * info cache param
+  * @param cacheType    information cache type, e.g.: zookeeper (must)
+  * @param config       config of cache way
+  */
+@JsonInclude(Include.NON_NULL)
+case class InfoCacheParam( @JsonProperty("type") cacheType: String,
+                           @JsonProperty("config") config: Map[String, Any]
+                         ) extends Param {
+  override def validate(): Boolean = {
+    StringUtils.isNotBlank(cacheType)
+  }
+}
+
+/**
+  * cleaner param, invalid at current
+  * @param cleanInterval    clean interval (optional)
+  */
+@JsonInclude(Include.NON_NULL)
+case class CleanerParam( @JsonProperty("clean.interval") cleanInterval: String
+                       ) extends Param {
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala
new file mode 100644
index 0000000..87ad246
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala
@@ -0,0 +1,29 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.params
+
+trait Param extends Serializable {
+
+  /**
+    * validate param internally
+    * @return
+    */
+  def validate(): Boolean = true
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala
new file mode 100644
index 0000000..4c9a4d6
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.validator
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.params._
+
+import scala.util.Try
+
+object ParamValidator extends Loggable with Serializable {
+
+  /**
+    * validate param
+    * @param param    param to be validated
+    * @tparam T       type of param
+    * @return         param valid or not
+    */
+  def validate[T <: Param](param: T): Try[Boolean] = Try {
+    param.validate
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/ContextId.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/ContextId.scala 
b/measure/src/main/scala/org/apache/griffin/measure/context/ContextId.scala
new file mode 100644
index 0000000..8848a22
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/ContextId.scala
@@ -0,0 +1,28 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context
+
+/**
+  * context id, unique by different timestamp and tag
+  */
+case class ContextId(timestamp: Long, tag: String = "") extends Serializable {
+  def id: String = {
+    if (tag.nonEmpty) s"${tag}_${timestamp}" else s"${timestamp}"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala 
b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
new file mode 100644
index 0000000..e4b5046
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -0,0 +1,94 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context
+
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.params._
+import org.apache.griffin.measure.context.datasource._
+import org.apache.griffin.measure.context.writer._
+import org.apache.spark.sql.{Encoders, SQLContext, SparkSession}
+
+/**
+  * dq context: the context of each calculation
+  */
+case class DQContext(contextId: ContextId,
+                     name: String,
+                     dataSources: Seq[DataSource],
+                     persistParams: Seq[PersistParam],
+                     procType: ProcessType
+                    )(@transient implicit val sparkSession: SparkSession) {
+
+  val sqlContext: SQLContext = sparkSession.sqlContext
+
+  val compileTableRegister: CompileTableRegister = CompileTableRegister()
+  val runTimeTableRegister: RunTimeTableRegister = 
RunTimeTableRegister(sqlContext)
+
+  val dataFrameCache: DataFrameCache = DataFrameCache()
+
+  val metricWrapper: MetricWrapper = MetricWrapper(name)
+  val writeMode = WriteMode.defaultMode(procType)
+
+  val dataSourceNames: Seq[String] = dataSources.map(_.name)
+  dataSourceNames.foreach(name => compileTableRegister.registerTable(name))
+  implicit val encoder = Encoders.STRING
+  val functionNames: Seq[String] = 
sparkSession.catalog.listFunctions.map(_.name).collect.toSeq
+
+  val dataSourceTimeRanges = loadDataSources()
+  def loadDataSources(): Map[String, TimeRange] = {
+    dataSources.map { ds =>
+      (ds.name, ds.loadData(this))
+    }.toMap
+  }
+  printTimeRanges
+
+  def getDataSourceName(index: Int): String = {
+    if (dataSourceNames.size > index) dataSourceNames(index) else ""
+  }
+
+  private val persistFactory = PersistFactory(persistParams, name)
+  private val defaultPersist: Persist = 
persistFactory.getPersists(contextId.timestamp)
+  def getPersist(timestamp: Long): Persist = {
+    if (timestamp == contextId.timestamp) getPersist()
+    else persistFactory.getPersists(timestamp)
+  }
+  def getPersist(): Persist = defaultPersist
+
+  def cloneDQContext(newContextId: ContextId): DQContext = {
+    DQContext(newContextId, name, dataSources, persistParams, 
procType)(sparkSession)
+  }
+
+  def clean(): Unit = {
+    compileTableRegister.unregisterAllTables()
+    runTimeTableRegister.unregisterAllTables()
+
+    dataFrameCache.uncacheAllDataFrames()
+    dataFrameCache.clearAllTrashDataFrames()
+  }
+
+  private def printTimeRanges(): Unit = {
+    if (dataSourceTimeRanges.nonEmpty) {
+      val timeRangesStr = dataSourceTimeRanges.map { pair =>
+        val (name, timeRange) = pair
+        s"${name} -> (${timeRange.begin}, ${timeRange.end}]"
+      }.mkString(", ")
+      println(s"data source timeRanges: ${timeRangesStr}")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
new file mode 100644
index 0000000..765dc6f
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala
@@ -0,0 +1,72 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context
+
+import org.apache.griffin.measure.Loggable
+import org.apache.spark.sql.DataFrame
+
+import scala.collection.concurrent.{Map => ConcMap}
+import scala.collection.mutable.{MutableList, Map => MutableMap}
+
+/**
+  * cache and unpersist dataframes
+  */
+case class DataFrameCache() extends Loggable {
+
+  private val dataFrames: MutableMap[String, DataFrame] = MutableMap()
+  private val trashDataFrames: MutableList[DataFrame] = MutableList()
+
+  private def trashDataFrame(df: DataFrame): Unit = {
+    trashDataFrames += df
+  }
+  private def trashDataFrames(dfs: Seq[DataFrame]): Unit = {
+    trashDataFrames ++= dfs
+  }
+
+  def cacheDataFrame(name: String, df: DataFrame): Unit = {
+    info(s"try to cache data frame ${name}")
+    dataFrames.get(name) match {
+      case Some(odf) => {
+        trashDataFrame(odf)
+        dataFrames += (name -> df)
+        df.cache
+        info(s"cache after replace old df")
+      }
+      case _ => {
+        dataFrames += (name -> df)
+        df.cache
+        info(s"cache after replace no old df")
+      }
+    }
+  }
+
+  def uncacheDataFrame(name: String): Unit = {
+    dataFrames.get(name).foreach(df => trashDataFrame(df))
+    dataFrames -= name
+  }
+  def uncacheAllDataFrames(): Unit = {
+    trashDataFrames(dataFrames.values.toSeq)
+    dataFrames.clear
+  }
+
+  def clearAllTrashDataFrames(): Unit = {
+    trashDataFrames.foreach(_.unpersist)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala 
b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
new file mode 100644
index 0000000..d98952a
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
@@ -0,0 +1,49 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context
+
+import scala.collection.mutable.{Map => MutableMap}
+
+/**
+  * wrap metrics into one, each calculation produces one metric map
+  */
+case class MetricWrapper(name: String) extends Serializable {
+
+  val metrics: MutableMap[Long, Map[String, Any]] = MutableMap()
+
+  def insertMetric(timestamp: Long, value: Map[String, Any]): Unit = {
+    val newValue = metrics.get(timestamp) match {
+      case Some(v) => v ++ value
+      case _ => value
+    }
+    metrics += (timestamp -> newValue)
+  }
+
+  def flush: Map[Long, Map[String, Any]] = {
+    metrics.toMap.map { pair =>
+      val (timestamp, value) = pair
+      (timestamp, Map[String, Any](
+        ("name" -> name),
+        ("timestamp" -> timestamp),
+        ("value" -> value)
+      ))
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala 
b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
new file mode 100644
index 0000000..8d86170
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context
+
+import org.apache.griffin.measure.Loggable
+import org.apache.spark.sql._
+
+import scala.collection.mutable.{Set => MutableSet}
+
+/**
+  * register table name
+  */
+trait TableRegister extends Loggable with Serializable {
+
+  protected val tables: MutableSet[String] = MutableSet()
+
+  def registerTable(name: String): Unit = {
+    tables += name
+  }
+
+  def existsTable(name: String): Boolean = {
+    tables.exists(_.equals(name))
+  }
+
+  def unregisterTable(name: String): Unit = {
+    if (existsTable(name)) tables -= name
+  }
+  def unregisterAllTables(): Unit = {
+    tables.clear
+  }
+
+  def getTables(): Set[String] = {
+    tables.toSet
+  }
+
+}
+
+/**
+  * register table name when building dq job
+  */
+case class CompileTableRegister() extends TableRegister {}
+
+/**
+  * register table name and create temp view during calculation
+  */
+case class RunTimeTableRegister(@transient sqlContext: SQLContext) extends 
TableRegister {
+
+  def registerTable(name: String, df: DataFrame): Unit = {
+    registerTable(name)
+    df.createOrReplaceTempView(name)
+  }
+
+  override def unregisterTable(name: String): Unit = {
+    if (existsTable(name)) {
+      sqlContext.dropTempTable(name)
+      tables -= name
+    }
+  }
+  override def unregisterAllTables(): Unit = {
+    val uts = getTables
+    uts.foreach(t => sqlContext.dropTempTable(t))
+    tables.clear
+  }
+
+}
\ No newline at end of file


Reply via email to