http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/Application.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/Application.scala 
b/measure/src/main/scala/org/apache/griffin/measure/batch/Application.scala
deleted file mode 100644
index b792e13..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/batch/Application.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch
-
-import org.apache.griffin.measure.batch.algo._
-import org.apache.griffin.measure.batch.config.params._
-import org.apache.griffin.measure.batch.config.params.env._
-import org.apache.griffin.measure.batch.config.params.user._
-import org.apache.griffin.measure.batch.config.reader._
-import org.apache.griffin.measure.batch.config.validator.AllParamValidator
-import org.apache.griffin.measure.batch.log.Loggable
-
-import scala.util.{Failure, Success, Try}
-
-object Application extends Loggable {
-
-  def main(args: Array[String]): Unit = {
-    info(args.toString)
-    if (args.length < 2) {
-      error("Usage: class <env-param> <user-param> [List of String split by 
comma: raw | local | hdfs(default)]")
-      sys.exit(-1)
-    }
-
-    val envParamFile = args(0)
-    val userParamFile = args(1)
-    val (envFsType, userFsType) = if (args.length > 2) {
-      val fsTypes = args(2).trim.split(",")
-      if (fsTypes.length == 1) (fsTypes(0).trim, fsTypes(0).trim)
-      else if (fsTypes.length >= 2) (fsTypes(0).trim, fsTypes(1).trim)
-      else ("hdfs", "hdfs")
-    } else ("hdfs", "hdfs")
-
-    info(envParamFile)
-    info(userParamFile)
-
-    // read param files
-    val envParam = readParamFile[EnvParam](envParamFile, envFsType) match {
-      case Success(p) => p
-      case Failure(ex) => {
-        error(ex.getMessage)
-        sys.exit(-2)
-      }
-    }
-    val userParam = readParamFile[UserParam](userParamFile, userFsType) match {
-      case Success(p) => p
-      case Failure(ex) => {
-        error(ex.getMessage)
-        sys.exit(-2)
-      }
-    }
-    val allParam: AllParam = AllParam(envParam, userParam)
-
-    // validate param files
-    validateParams(allParam) match {
-      case Failure(ex) => {
-        error(ex.getMessage)
-        sys.exit(-3)
-      }
-      case _ => {
-        info("params validation pass")
-      }
-    }
-
-    // choose algorithm
-    val dqType = allParam.userParam.dqType
-    val algo: Algo = dqType match {
-      case MeasureType.accuracy() => BatchAccuracyAlgo(allParam)
-      case MeasureType.profile() => BatchProfileAlgo(allParam)
-      case _ => {
-        error(s"${dqType} is unsupported dq type!")
-        sys.exit(-4)
-      }
-    }
-
-    // algorithm run
-    algo.run match {
-      case Failure(ex) => {
-        error(ex.getMessage)
-        sys.exit(-5)
-      }
-      case _ => {
-        info("calculation finished")
-      }
-    }
-  }
-
-  private def readParamFile[T <: Param](file: String, fsType: String)(implicit 
m : Manifest[T]): Try[T] = {
-    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-    paramReader.readConfig[T]
-  }
-
-  private def validateParams(allParam: AllParam): Try[Boolean] = {
-    val allParamValidator = AllParamValidator()
-    allParamValidator.validate(allParam)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
deleted file mode 100644
index e4ccc7e..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.algo
-
-
-trait AccuracyAlgo extends Algo {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala 
b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala
deleted file mode 100644
index 15142ba..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.algo
-
-import org.apache.griffin.measure.batch.config.params.env._
-import org.apache.griffin.measure.batch.config.params.user._
-import org.apache.griffin.measure.batch.log.Loggable
-
-import scala.util.Try
-
-trait Algo extends Loggable with Serializable {
-
-  val envParam: EnvParam
-  val userParam: UserParam
-
-  def run(): Try[_]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
deleted file mode 100644
index 196fb3b..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.algo
-
-import java.util.Date
-
-import org.apache.griffin.measure.batch.algo.core.AccuracyCore
-import org.apache.griffin.measure.batch.config.params.AllParam
-import org.apache.griffin.measure.batch.connector._
-import org.apache.griffin.measure.batch.rule._
-import org.apache.griffin.measure.batch.rule.expr._
-import org.apache.griffin.measure.batch.persist._
-import org.apache.griffin.measure.batch.result._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.{SparkConf, SparkContext}
-
-import scala.util.{Failure, Success, Try}
-
-// accuracy algorithm for batch mode
-case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
-  val envParam = allParam.envParam
-  val userParam = allParam.userParam
-
-  def run(): Try[_] = {
-    Try {
-      val metricName = userParam.name
-
-      val conf = new SparkConf().setAppName(metricName)
-      val sc = new SparkContext(conf)
-      val sqlContext = new HiveContext(sc)
-
-      // start time
-      val startTime = new Date().getTime()
-
-      // get persists to persist measure result
-      val persist: Persist = PersistFactory(envParam.persistParams, 
metricName).getPersists(startTime)
-
-      // get spark application id
-      val applicationId = sc.applicationId
-
-      // persist start id
-      persist.start(applicationId)
-
-      // generate rule from rule param, generate rule analyzer
-      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-      val rule: StatementExpr = ruleFactory.generateRule()
-      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-
-      // const expr value map
-      val constExprValueMap = ExprValueUtil.genExprValueMap(None, 
ruleAnalyzer.constCacheExprs, Map[String, Any]())
-      val finalConstExprValueMap = 
ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, 
constExprValueMap)
-
-      // data connector
-      val sourceDataConnector: DataConnector =
-        DataConnectorFactory.getDataConnector(sqlContext, 
userParam.sourceParam,
-          ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
-        ) match {
-          case Success(cntr) => {
-            if (cntr.available) cntr
-            else throw new Exception("source data connection error!")
-          }
-          case Failure(ex) => throw ex
-        }
-      val targetDataConnector: DataConnector =
-        DataConnectorFactory.getDataConnector(sqlContext, 
userParam.targetParam,
-          ruleAnalyzer.targetRuleExprs, finalConstExprValueMap
-        ) match {
-          case Success(cntr) => {
-            if (cntr.available) cntr
-            else throw new Exception("target data connection error!")
-          }
-          case Failure(ex) => throw ex
-        }
-
-      // get metadata
-//      val sourceMetaData: Iterable[(String, String)] = 
sourceDataConnector.metaData() match {
-//        case Success(md) => md
-//        case _ => throw new Exception("source metadata error!")
-//      }
-//      val targetMetaData: Iterable[(String, String)] = 
targetDataConnector.metaData() match {
-//        case Success(md) => md
-//        case _ => throw new Exception("target metadata error!")
-//      }
-
-      // get data
-      val sourceData: RDD[(Product, Map[String, Any])] = 
sourceDataConnector.data() match {
-        case Success(dt) => dt
-        case Failure(ex) => throw ex
-      }
-      val targetData: RDD[(Product, Map[String, Any])] = 
targetDataConnector.data() match {
-        case Success(dt) => dt
-        case Failure(ex) => throw ex
-      }
-
-      // accuracy algorithm
-      val (accuResult, missingRdd, matchedRdd) = accuracy(sourceData, 
targetData, ruleAnalyzer)
-
-      // end time
-      val endTime = new Date().getTime
-      persist.log(endTime, s"calculation using time: ${endTime - startTime} 
ms")
-
-      // persist result
-      persist.result(endTime, accuResult)
-      val missingRecords = missingRdd.map(record2String(_, 
ruleAnalyzer.sourceRuleExprs.persistExprs, 
ruleAnalyzer.targetRuleExprs.persistExprs))
-      persist.missRecords(missingRecords)
-
-      // persist end time
-      val persistEndTime = new Date().getTime
-      persist.log(persistEndTime, s"persist using time: ${persistEndTime - 
endTime} ms")
-
-      // finish
-      persist.finish()
-
-      // context stop
-      sc.stop
-
-    }
-  }
-
-  def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, 
Any]) = {
-    (data, Map[String, Any]())
-  }
-
-  // calculate accuracy between source data and target data
-  def accuracy(sourceData: RDD[(Product, Map[String, Any])], targetData: 
RDD[(Product, Map[String, Any])], ruleAnalyzer: RuleAnalyzer
-              ): (AccuracyResult, RDD[(Product, (Map[String, Any], Map[String, 
Any]))], RDD[(Product, (Map[String, Any], Map[String, Any]))]) = {
-
-    // 1. wrap data
-    val sourceWrappedData: RDD[(Product, (Map[String, Any], Map[String, 
Any]))] = sourceData.map(r => (r._1, wrapInitData(r._2)))
-    val targetWrappedData: RDD[(Product, (Map[String, Any], Map[String, 
Any]))] = targetData.map(r => (r._1, wrapInitData(r._2)))
-
-    // 2. cogroup
-    val allKvs = sourceWrappedData.cogroup(targetWrappedData)
-
-    // 3. accuracy calculation
-    val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, 
ruleAnalyzer)
-
-    (accuResult, missingRdd, matchedRdd)
-  }
-
-  // convert data into a string
-  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), 
sourcePersist: Iterable[Expr], targetPersist: Iterable[Expr]): String = {
-    val (key, (data, info)) = rec
-    val persistData = getPersistMap(data, sourcePersist)
-    val persistInfo = info.mapValues { value =>
-      value match {
-        case vd: Map[String, Any] => getPersistMap(vd, targetPersist)
-        case v => v
-      }
-    }
-    s"${persistData} [${persistInfo}]"
-  }
-
-  // get the expr value map of the persist expressions
-  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): 
Map[String, Any] = {
-    val persistMap = persist.map(e => (e._id, e.desc)).toMap
-    data.flatMap { pair =>
-      val (k, v) = pair
-      persistMap.get(k) match {
-        case Some(d) => Some((d -> v))
-        case _ => None
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
deleted file mode 100644
index 711ed78..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/BatchProfileAlgo.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.algo
-
-import java.util.Date
-
-import org.apache.griffin.measure.batch.algo.core.ProfileCore
-import org.apache.griffin.measure.batch.config.params._
-import org.apache.griffin.measure.batch.connector.{DataConnector, 
DataConnectorFactory}
-import org.apache.griffin.measure.batch.persist.{Persist, PersistFactory}
-import org.apache.griffin.measure.batch.result._
-import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleAnalyzer, 
RuleFactory}
-import org.apache.griffin.measure.batch.rule.expr.{Expr, StatementExpr}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.{SparkConf, SparkContext}
-
-import scala.util.{Failure, Success, Try}
-
-// profile algorithm for batch mode
-case class BatchProfileAlgo(allParam: AllParam) extends ProfileAlgo {
-  val envParam = allParam.envParam
-  val userParam = allParam.userParam
-
-  def run(): Try[_] = {
-    Try {
-      val metricName = userParam.name
-
-      val conf = new SparkConf().setAppName(metricName)
-      val sc = new SparkContext(conf)
-      val sqlContext = new HiveContext(sc)
-
-      // start time
-      val startTime = new Date().getTime()
-
-      // get persists to persist measure result
-      val persist: Persist = PersistFactory(envParam.persistParams, 
metricName).getPersists(startTime)
-
-      // get spark application id
-      val applicationId = sc.applicationId
-
-      // persist start id
-      persist.start(applicationId)
-
-      // generate rule from rule param, generate rule analyzer
-      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-      val rule: StatementExpr = ruleFactory.generateRule()
-      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-
-      // const expr value map
-      val constExprValueMap = ExprValueUtil.genExprValueMap(None, 
ruleAnalyzer.constCacheExprs, Map[String, Any]())
-      val finalConstExprValueMap = 
ExprValueUtil.updateExprValueMap(ruleAnalyzer.constFinalCacheExprs, 
constExprValueMap)
-
-      // data connector
-      val sourceDataConnector: DataConnector =
-      DataConnectorFactory.getDataConnector(sqlContext, userParam.sourceParam,
-        ruleAnalyzer.sourceRuleExprs, finalConstExprValueMap
-      ) match {
-        case Success(cntr) => {
-          if (cntr.available) cntr
-          else throw new Exception("source data connection error!")
-        }
-        case Failure(ex) => throw ex
-      }
-
-      // get metadata
-      //      val sourceMetaData: Iterable[(String, String)] = 
sourceDataConnector.metaData() match {
-      //        case Success(md) => md
-      //        case _ => throw new Exception("source metadata error!")
-      //      }
-
-      // get data
-      val sourceData: RDD[(Product, Map[String, Any])] = 
sourceDataConnector.data() match {
-        case Success(dt) => dt
-        case Failure(ex) => throw ex
-      }
-
-      // profile algorithm
-      val (profileResult, missingRdd, matchedRdd) = profile(sourceData, 
ruleAnalyzer)
-
-      // end time
-      val endTime = new Date().getTime
-      persist.log(endTime, s"calculation using time: ${endTime - startTime} 
ms")
-
-      // persist result
-      persist.result(endTime, profileResult)
-      val matchedRecords = matchedRdd.map(record2String(_, 
ruleAnalyzer.sourceRuleExprs.persistExprs))
-      persist.matchRecords(matchedRecords)
-
-      // persist end time
-      val persistEndTime = new Date().getTime
-      persist.log(persistEndTime, s"persist using time: ${persistEndTime - 
endTime} ms")
-
-      // finish
-      persist.finish()
-
-      // context stop
-      sc.stop
-    }
-  }
-
-  def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, 
Any]) = {
-    (data, Map[String, Any]())
-  }
-
-  // calculate profile from source data
-  def profile(sourceData: RDD[(Product, Map[String, Any])], ruleAnalyzer: 
RuleAnalyzer
-              ): (ProfileResult, RDD[(Product, (Map[String, Any], Map[String, 
Any]))], RDD[(Product, (Map[String, Any], Map[String, Any]))]) = {
-
-    // 1. wrap data
-    val sourceWrappedData: RDD[(Product, (Map[String, Any], Map[String, 
Any]))] = sourceData.map(r => (r._1, wrapInitData(r._2)))
-
-    // 2. profile calculation
-    val (profileResult, missingRdd, matchedRdd) = 
ProfileCore.profile(sourceWrappedData, ruleAnalyzer)
-
-    (profileResult, missingRdd, matchedRdd)
-  }
-
-  // convert data into a string
-  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), 
sourcePersist: Iterable[Expr]): String = {
-    val (key, (data, info)) = rec
-    val persistData = getPersistMap(data, sourcePersist)
-    val persistInfo = info
-    if (persistInfo.size > 0) s"${persistData} [${persistInfo}]" else 
s"${persistData}"
-  }
-
-  // get the expr value map of the persist expressions
-  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): 
Map[String, Any] = {
-    val persistMap = persist.map(e => (e._id, e.desc)).toMap
-    data.flatMap { pair =>
-      val (k, v) = pair
-      persistMap.get(k) match {
-        case Some(d) => Some((d -> v))
-        case _ => None
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
deleted file mode 100644
index 9c3c6ca..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/MeasureType.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.algo
-
-object MeasureType {
-
-  val accuracy = """^(?i)accuracy$""".r
-  val profile = """^(?i)profile""".r
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
deleted file mode 100644
index 31daa2d..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/ProfileAlgo.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.algo
-
-trait ProfileAlgo extends Algo {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
deleted file mode 100644
index 87c5f7d..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.algo.core
-
-import org.apache.griffin.measure.batch.rule.RuleAnalyzer
-import org.apache.griffin.measure.batch.result._
-import org.apache.spark.rdd.RDD
-
-
-object AccuracyCore {
-
-  type V = Map[String, Any]
-  type T = Map[String, Any]
-
-  // allKvs: rdd of (key, (List[(sourceData, sourceInfo)], List[(targetData, 
targetInfo)]))
-  // output: accuracy result, missing source data rdd, matched source data rdd
-  def accuracy(allKvs: RDD[(Product, (Iterable[(V, T)], Iterable[(V, T)]))], 
ruleAnalyzer: RuleAnalyzer
-              ): (AccuracyResult, RDD[(Product, (V, T))], RDD[(Product, (V, 
T))]) = {
-    val result: RDD[(Long, Long, List[(Product, (V, T))], List[(Product, (V, 
T))])] = allKvs.map { kv =>
-      val (key, (sourceDatas, targetDatas)) = kv
-
-      // result: (missCount, matchCount, missDataList, matchDataList)
-      val rslt = sourceDatas.foldLeft((0L, 0L, List[(Product, (V, T))](), 
List[(Product, (V, T))]())) { (sr, sourcePair) =>
-        val matchResult = if (targetDatas.isEmpty) {
-          (false, Map[String, Any](MismatchInfo.wrap("no target")))
-        } else {
-          targetDatas.foldLeft((false, Map[String, Any]())) { (tr, targetPair) 
=>
-            if (tr._1) tr
-            else matchData(sourcePair, targetPair, ruleAnalyzer)
-          }
-        }
-
-        if (matchResult._1) {
-          val matchItem = (key, sourcePair)
-          (sr._1, sr._2 + 1, sr._3, sr._4 :+ matchItem)
-        } else {
-          val missItem = (key, (sourcePair._1, sourcePair._2 ++ 
matchResult._2))
-          (sr._1 + 1, sr._2, sr._3 :+ missItem, sr._4)
-        }
-      }
-
-      rslt
-    }
-
-    val missRdd = result.flatMap(_._3)
-    val matchRdd = result.flatMap(_._4)
-
-    def seq(cnt: (Long, Long), rcd: (Long, Long, Any, Any)): (Long, Long) = {
-      (cnt._1 + rcd._1, cnt._2 + rcd._2)
-    }
-    def comb(c1: (Long, Long), c2: (Long, Long)): (Long, Long) = {
-      (c1._1 + c2._1, c1._2 + c2._2)
-    }
-    val countPair = result.aggregate((0L, 0L))(seq, comb)
-
-    (AccuracyResult(countPair._1, (countPair._1 + countPair._2)), missRdd, 
matchRdd)
-  }
-
-  // try to match source and target data, return true if matched, false if 
unmatched, also with some matching info
-  private def matchData(source: (V, T), target: (V, T), ruleAnalyzer: 
RuleAnalyzer): (Boolean, T) = {
-
-    // 1. merge source and target cached data
-    val mergedExprValueMap: Map[String, Any] = mergeExprValueMap(source, 
target)
-
-    // 2. check valid
-    if (ruleAnalyzer.rule.valid(mergedExprValueMap)) {
-      // 3. substitute the cached data into statement, get the statement value
-      val matched = ruleAnalyzer.rule.calculate(mergedExprValueMap) match {
-        case Some(b: Boolean) => b
-        case _ => false
-      }
-      // currently we can not get the mismatch reason, we need to add such 
information to figure out how it mismatches
-      if (matched) (matched, Map[String, Any]())
-      else (matched, Map[String, Any](MismatchInfo.wrap("not matched"), 
TargetInfo.wrap(target._1)))
-    } else {
-      (false, Map[String, Any](MismatchInfo.wrap("invalid to compare"), 
TargetInfo.wrap(target._1)))
-    }
-
-  }
-
-  private def mergeExprValueMap(source: (V, T), target: (V, T)): Map[String, 
Any] = {
-    source._1 ++ target._1
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
deleted file mode 100644
index 59b3f2f..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/algo/core/ProfileCore.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.algo.core
-
-import org.apache.griffin.measure.batch.rule.RuleAnalyzer
-import org.apache.griffin.measure.batch.result._
-import org.apache.spark.rdd.RDD
-
-
-object ProfileCore {
-
-  type V = Map[String, Any]
-  type T = Map[String, Any]
-
-  // dataRdd: rdd of (key, (sourceData, sourceInfo))
-  // output: accuracy result, missing source data rdd, matched source data rdd
-  def profile(dataRdd: RDD[(Product, (V, T))], ruleAnalyzer: RuleAnalyzer
-              ): (ProfileResult, RDD[(Product, (V, T))], RDD[(Product, (V, 
T))]) = {
-
-    val resultRdd: RDD[((Product, (V, T)), Boolean)] = dataRdd.map { kv =>
-      val (key, (data, info)) = kv
-      val (matched, missInfo) = matchData((data, info), ruleAnalyzer)
-      ((key, (data, info ++ missInfo)), matched)
-    }
-
-    val totalCount = resultRdd.count
-    val matchRdd = resultRdd.filter(_._2).map(_._1)
-    val matchCount = matchRdd.count
-    val missRdd = resultRdd.filter(!_._2).map(_._1)
-    val missCount = missRdd.count
-
-    (ProfileResult(matchCount, totalCount), missRdd, matchRdd)
-
-  }
-
-  // try to match data as rule, return true if matched, false if unmatched
-  private def matchData(dataPair: (V, T), ruleAnalyzer: RuleAnalyzer): 
(Boolean, T) = {
-
-    val data: Map[String, Any] = dataPair._1
-
-    // 1. check valid
-    if (ruleAnalyzer.rule.valid(data)) {
-      // 2. substitute the cached data into statement, get the statement value
-      val matched = ruleAnalyzer.rule.calculate(data) match {
-        case Some(b: Boolean) => b
-        case _ => false
-      }
-      // currently we can not get the mismatch reason, we need to add such 
information to figure out how it mismatches
-      if (matched) (matched, Map[String, Any]())
-      else (matched, Map[String, Any](MismatchInfo.wrap("not matched")))
-    } else {
-      (false, Map[String, Any](MismatchInfo.wrap("invalid to compare")))
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
deleted file mode 100644
index 7ddcaff..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.params
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.env._
-import org.apache.griffin.measure.batch.config.params.user._
-
-// simply composite of env and user params, for convenient usage
-@JsonInclude(Include.NON_NULL)
-case class AllParam( @JsonProperty("env") envParam: EnvParam,
-                     @JsonProperty("user") userParam: UserParam
-                   ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
deleted file mode 100644
index 5d2b39f..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.params
-
-trait Param extends Serializable {
-
-  def validate(): Boolean = true
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
deleted file mode 100644
index ae8deae..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.params.env
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class CleanerParam() extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
deleted file mode 100644
index 3c2429e..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.params.env
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class EnvParam( @JsonProperty("spark") sparkParam: SparkParam,
-                     @JsonProperty("persist") persistParams: 
List[PersistParam],
-                     @JsonProperty("cleaner") cleanerParam: CleanerParam
-                   ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
deleted file mode 100644
index dc8f9ef..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.params.env
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class PersistParam( @JsonProperty("type") persistType: String,
-                         @JsonProperty("config") config: Map[String, Any]
-                       ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
deleted file mode 100644
index 0e8d859..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.params.env
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class SparkParam( @JsonProperty("log.level") logLevel: String,
-                       @JsonProperty("checkpoint.dir") cpDir: String,
-                       @JsonProperty("config") config: Map[String, Any]
-                     ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
deleted file mode 100644
index f560201..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.params.user
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class DataConnectorParam( @JsonProperty("type") conType: String,
-                               @JsonProperty("version") version: String,
-                               @JsonProperty("config") config: Map[String, Any]
-                             ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
deleted file mode 100644
index db3f48f..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.params.user
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class EvaluateRuleParam( @JsonProperty("sampleRatio") sampleRatio: Double,
-                              @JsonProperty("rules") rules: String
-                            ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
deleted file mode 100644
index de8372b..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.params.user
-
-import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
-import com.fasterxml.jackson.annotation.JsonInclude.Include
-import org.apache.griffin.measure.batch.config.params.Param
-
-@JsonInclude(Include.NON_NULL)
-case class UserParam(@JsonProperty("name") name: String,
-                     @JsonProperty("type") dqType: String,
-                     @JsonProperty("source") sourceParam: DataConnectorParam,
-                     @JsonProperty("target") targetParam: DataConnectorParam,
-                     @JsonProperty("evaluateRule") evaluateRuleParam: 
EvaluateRuleParam
-                    ) extends Param {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
deleted file mode 100644
index 7ed537e..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.reader
-
-import org.apache.griffin.measure.batch.config.params.Param
-import org.apache.griffin.measure.batch.utils.JsonUtil
-
-import scala.util.Try
-
-case class ParamFileReader(file: String) extends ParamReader {
-
-  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
-    Try {
-      val source = scala.io.Source.fromFile(file)
-      val lines = source.mkString
-      val param = JsonUtil.fromJson[T](lines)
-      source.close
-      param
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
deleted file mode 100644
index d4af8da..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.reader
-
-import org.apache.griffin.measure.batch.config.params.Param
-import org.apache.griffin.measure.batch.utils.JsonUtil
-import org.apache.griffin.measure.batch.utils.HdfsUtil
-
-import scala.util.Try
-
-case class ParamHdfsFileReader(filePath: String) extends ParamReader {
-
-  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
-    Try {
-      val source = HdfsUtil.openFile(filePath)
-      val param = JsonUtil.fromJson[T](source)
-      source.close
-      param
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
deleted file mode 100644
index e02675e..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.reader
-
-import org.apache.griffin.measure.batch.config.params.Param
-import org.apache.griffin.measure.batch.utils.JsonUtil
-
-import scala.util.Try
-
-case class ParamRawStringReader(rawString: String) extends ParamReader {
-
-  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
-    Try {
-      val param = JsonUtil.fromJson[T](rawString)
-      param
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
deleted file mode 100644
index 46731be..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.reader
-
-import org.apache.griffin.measure.batch.log.Loggable
-import org.apache.griffin.measure.batch.config.params.Param
-
-import scala.util.Try
-
-trait ParamReader extends Loggable with Serializable {
-
-  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
deleted file mode 100644
index ea787da..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.reader
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-
-
-object ParamReaderFactory {
-
-  val RawStringRegex = """^(?i)raw$""".r
-  val LocalFsRegex = """^(?i)local$""".r
-  val HdfsFsRegex = """^(?i)hdfs$""".r
-
-  def getParamReader(filePath: String, fsType: String): ParamReader = {
-    fsType match {
-      case RawStringRegex() => ParamRawStringReader(filePath)
-      case LocalFsRegex() => ParamFileReader(filePath)
-      case HdfsFsRegex() => ParamHdfsFileReader(filePath)
-      case _ => ParamHdfsFileReader(filePath)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
deleted file mode 100644
index 0314bd7..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.validator
-
-import org.apache.griffin.measure.batch.config.params.Param
-
-import scala.util.Try
-
-// need to validate params
-case class AllParamValidator() extends ParamValidator {
-
-  def validate[T <: Param](param: Param): Try[Boolean] = {
-    Try {
-      param.validate
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
deleted file mode 100644
index a7e38ea..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.config.validator
-
-import org.apache.griffin.measure.batch.log.Loggable
-import org.apache.griffin.measure.batch.config.params.Param
-
-import scala.util.Try
-
-trait ParamValidator extends Loggable with Serializable {
-
-  def validate[T <: Param](param: Param): Try[Boolean]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
deleted file mode 100644
index 1f5d462..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.connector
-
-import org.apache.griffin.measure.batch.rule.expr._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-import com.databricks.spark.avro._
-
-import scala.util.{Success, Try}
-import java.nio.file.{Files, Paths}
-
-import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleExprs}
-import org.apache.griffin.measure.batch.utils.HdfsUtil
-
-// data connector for avro file
-case class AvroDataConnector(sqlContext: SQLContext, config: Map[String, Any],
-                             ruleExprs: RuleExprs, constFinalExprValueMap: 
Map[String, Any]
-                            ) extends DataConnector {
-
-  val FilePath = "file.path"
-  val FileName = "file.name"
-
-  val filePath = config.getOrElse(FilePath, "").toString
-  val fileName = config.getOrElse(FileName, "").toString
-
-  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else 
fileName
-
-  private def pathPrefix(): Boolean = {
-    filePath.nonEmpty
-  }
-
-  private def fileExist(): Boolean = {
-    HdfsUtil.existPath(concreteFileFullPath)
-  }
-
-  def available(): Boolean = {
-    (!concreteFileFullPath.isEmpty) && fileExist
-  }
-
-  def metaData(): Try[Iterable[(String, String)]] = {
-    Try {
-      val st = 
sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
-      st.fields.map(f => (f.name, f.dataType.typeName))
-    }
-  }
-
-  def data(): Try[RDD[(Product, Map[String, Any])]] = {
-    Try {
-      loadDataFile.flatMap { row =>
-        // generate cache data
-        val cacheExprValueMap: Map[String, Any] = 
ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
-          ExprValueUtil.genExprValueMap(Some(row), expr, cachedMap)
-        }
-        val finalExprValueMap = 
ExprValueUtil.updateExprValueMap(ruleExprs.finalCacheExprs, cacheExprValueMap)
-
-        // when clause filter data source
-        val whenResult = ruleExprs.whenClauseExprOpt match {
-          case Some(whenClause) => whenClause.calculate(finalExprValueMap)
-          case _ => None
-        }
-
-        // get groupby data
-        whenResult match {
-          case Some(false) => None
-          case _ => {
-            val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { 
expr =>
-              expr.calculate(finalExprValueMap) match {
-                case Some(v) => Some(v.asInstanceOf[AnyRef])
-                case _ => None
-              }
-            }
-            val key = toTuple(groupbyData)
-
-            Some((key, finalExprValueMap))
-          }
-        }
-      }
-    }
-  }
-
-  private def loadDataFile() = {
-    
sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
-  }
-
-  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
-    if (as.size > 0) {
-      val tupleClass = Class.forName("scala.Tuple" + as.size)
-      tupleClass.getConstructors.apply(0).newInstance(as: 
_*).asInstanceOf[Product]
-    } else None
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
deleted file mode 100644
index 4c7f976..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.connector
-
-import org.apache.spark.rdd.RDD
-
-import scala.util.Try
-
-
-trait DataConnector extends Serializable {
-
-  def available(): Boolean
-
-  def metaData(): Try[Iterable[(String, String)]]
-
-  def data(): Try[RDD[(Product, Map[String, Any])]]
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
deleted file mode 100644
index 1806d87..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.connector
-
-import org.apache.griffin.measure.batch.config.params.user._
-import org.apache.griffin.measure.batch.rule.RuleExprs
-import org.apache.griffin.measure.batch.rule.expr._
-import org.apache.spark.sql.SQLContext
-
-import scala.util.Try
-
-object DataConnectorFactory {
-
-  val HiveRegex = """^(?i)hive$""".r
-  val AvroRegex = """^(?i)avro$""".r
-
-  def getDataConnector(sqlContext: SQLContext,
-                       dataConnectorParam: DataConnectorParam,
-                       ruleExprs: RuleExprs,
-                       globalFinalCacheMap: Map[String, Any]
-                      ): Try[DataConnector] = {
-    val conType = dataConnectorParam.conType
-    val version = dataConnectorParam.version
-    Try {
-      conType match {
-        case HiveRegex() => HiveDataConnector(sqlContext, 
dataConnectorParam.config, ruleExprs, globalFinalCacheMap)
-        case AvroRegex() => AvroDataConnector(sqlContext, 
dataConnectorParam.config, ruleExprs, globalFinalCacheMap)
-        case _ => throw new Exception("connector creation error!")
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
deleted file mode 100644
index f0395f5..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.connector
-
-import org.apache.griffin.measure.batch.rule.{ExprValueUtil, RuleExprs}
-import org.apache.griffin.measure.batch.rule.expr._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-
-import scala.util.{Success, Try}
-
-// data connector for hive
-case class HiveDataConnector(sqlContext: SQLContext, config: Map[String, Any],
-                             ruleExprs: RuleExprs, constFinalExprValueMap: 
Map[String, Any]
-                            ) extends DataConnector {
-
-  val Database = "database"
-  val TableName = "table.name"
-  val Partitions = "partitions"
-
-  val database = config.getOrElse(Database, "").toString
-  val tableName = config.getOrElse(TableName, "").toString
-  val partitionsString = config.getOrElse(Partitions, "").toString
-
-  val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else 
tableName
-  val partitions = partitionsString.split(";").map(s => 
s.split(",").map(_.trim))
-
-  private def dbPrefix(): Boolean = {
-    database.nonEmpty && !database.equals("default")
-  }
-
-  def available(): Boolean = {
-    (!tableName.isEmpty) && {
-      Try {
-        if (dbPrefix) {
-          sqlContext.tables(database).filter(tableExistsSql).collect.size
-        } else {
-          sqlContext.tables().filter(tableExistsSql).collect.size
-        }
-      } match {
-        case Success(s) => s > 0
-        case _ => false
-      }
-    }
-  }
-
-  def metaData(): Try[Iterable[(String, String)]] = {
-    Try {
-      val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), 
r.getString(1))).collect
-      val partitionPos: Int = originRows.indexWhere(pair => 
pair._1.startsWith("# "))
-      if (partitionPos < 0) originRows
-      else originRows.take(partitionPos)
-    }
-  }
-
-  def data(): Try[RDD[(Product, Map[String, Any])]] = {
-    Try {
-      sqlContext.sql(dataSql).flatMap { row =>
-        // generate cache data
-        val cacheExprValueMap: Map[String, Any] = 
ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
-          ExprValueUtil.genExprValueMap(Some(row), expr, cachedMap)
-        }
-        val finalExprValueMap = 
ExprValueUtil.updateExprValueMap(ruleExprs.finalCacheExprs, cacheExprValueMap)
-
-        // when clause filter data source
-        val whenResult = ruleExprs.whenClauseExprOpt match {
-          case Some(whenClause) => whenClause.calculate(finalExprValueMap)
-          case _ => None
-        }
-
-        // get groupby data
-        whenResult match {
-          case Some(false) => None
-          case _ => {
-            val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { 
expr =>
-              expr.calculate(finalExprValueMap) match {
-                case Some(v) => Some(v.asInstanceOf[AnyRef])
-                case _ => None
-              }
-            }
-            val key = toTuple(groupbyData)
-
-            Some((key, finalExprValueMap))
-          }
-        }
-      }
-    }
-  }
-
-  private def tableExistsSql(): String = {
-//    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but 
not work for spark sql
-    s"tableName LIKE '${tableName}'"
-  }
-
-  private def metaDataSql(): String = {
-    s"DESCRIBE ${concreteTableName}"
-  }
-
-  private def dataSql(): String = {
-    val clauses = partitions.map { prtn =>
-      val cls = prtn.mkString(" AND ")
-      if (cls.isEmpty) s"SELECT * FROM ${concreteTableName}"
-      else s"SELECT * FROM ${concreteTableName} WHERE ${cls}"
-    }
-    clauses.mkString(" UNION ALL ")
-  }
-
-  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
-    if (as.size > 0) {
-      val tupleClass = Class.forName("scala.Tuple" + as.size)
-      tupleClass.getConstructors.apply(0).newInstance(as: 
_*).asInstanceOf[Product]
-    } else None
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala 
b/measure/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
deleted file mode 100644
index af04760..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.log
-
-import org.slf4j.LoggerFactory
-
-trait Loggable {
-
-  @transient private lazy val logger = LoggerFactory.getLogger(getClass)
-
-  protected def info(msg: String): Unit = {
-    logger.info(msg)
-  }
-
-  protected def debug(msg: String): Unit = {
-    logger.debug(msg)
-  }
-
-  protected def warn(msg: String): Unit = {
-    logger.warn(msg)
-  }
-
-  protected def error(msg: String): Unit = {
-    logger.error(msg)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
deleted file mode 100644
index ed06ad0..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.persist
-
-import java.util.Date
-
-import org.apache.griffin.measure.batch.result._
-import org.apache.griffin.measure.batch.utils.HdfsUtil
-import org.apache.spark.rdd.RDD
-
-import scala.util.Try
-
-// persist result and data to hdfs
-case class HdfsPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
-
-  val Path = "path"
-  val MaxPersistLines = "max.persist.lines"
-  val MaxLinesPerFile = "max.lines.per.file"
-
-  val path = config.getOrElse(Path, "").toString
-  val maxPersistLines = try { config.getOrElse(MaxPersistLines, 
-1).toString.toInt } catch { case _ => -1 }
-  val maxLinesPerFile = try { config.getOrElse(MaxLinesPerFile, 
10000).toString.toLong } catch { case _ => 10000 }
-
-  val separator = "/"
-
-  val StartFile = filePath("_START")
-  val FinishFile = filePath("_FINISH")
-  val ResultFile = filePath("_RESULT")
-
-  val MissRecFile = filePath("_MISSREC")      // optional
-  val MatchRecFile = filePath("_MATCHREC")    // optional
-
-  val LogFile = filePath("_LOG")
-
-  var _init = true
-  private def isInit = {
-    val i = _init
-    _init = false
-    i
-  }
-
-  def available(): Boolean = {
-    (path.nonEmpty) && (maxPersistLines < Int.MaxValue)
-  }
-
-  private def persistHead: String = {
-    val dt = new Date(timeStamp)
-    s"================ log of ${dt} ================\n"
-  }
-
-  private def timeHead(rt: Long): String = {
-    val dt = new Date(rt)
-    s"--- ${dt} ---\n"
-  }
-
-  protected def getFilePath(parentPath: String, fileName: String): String = {
-    if (parentPath.endsWith(separator)) parentPath + fileName else parentPath 
+ separator + fileName
-  }
-
-  protected def filePath(file: String): String = {
-    getFilePath(path, s"${metricName}/${timeStamp}/${file}")
-  }
-
-  protected def withSuffix(path: String, suffix: String): String = {
-    s"${path}.${suffix}"
-  }
-
-  def start(msg: String): Unit = {
-    try {
-      HdfsUtil.writeContent(StartFile, msg)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-  def finish(): Unit = {
-    try {
-      HdfsUtil.createEmptyFile(FinishFile)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-  def result(rt: Long, result: Result): Unit = {
-    try {
-      val resStr = result match {
-        case ar: AccuracyResult => {
-          s"match percentage: ${ar.matchPercentage}\ntotal count: 
${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
-        }
-        case pr: ProfileResult => {
-          s"match percentage: ${pr.matchPercentage}\ntotal count: 
${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}"
-        }
-        case _ => {
-          s"result: ${result}"
-        }
-      }
-      HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr)
-      log(rt, resStr)
-
-      info(resStr)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-  // need to avoid string too long
-  private def rddRecords(records: RDD[String], path: String): Unit = {
-    try {
-      val recordCount = records.count
-      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
-      if (count > 0) {
-        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
-        if (groupCount <= 1) {
-          val recs = records.take(count.toInt)
-          persistRecords(path, recs)
-        } else {
-          val groupedRecords: RDD[(Long, Iterable[String])] =
-            records.zipWithIndex.flatMap { r =>
-              val gid = r._2 / maxLinesPerFile
-              if (gid < groupCount) Some((gid, r._1)) else None
-            }.groupByKey()
-          groupedRecords.foreach { group =>
-            val (gid, recs) = group
-            val hdfsPath = if (gid == 0) path else withSuffix(path, 
gid.toString)
-            persistRecords(hdfsPath, recs)
-          }
-        }
-      }
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-  def missRecords(records: RDD[String]): Unit = {
-    rddRecords(records, MissRecFile)
-  }
-
-  def matchRecords(records: RDD[String]): Unit = {
-    rddRecords(records, MatchRecFile)
-  }
-
-  private def persistRecords(hdfsPath: String, records: Iterable[String]): 
Unit = {
-    val recStr = records.mkString("\n")
-    HdfsUtil.appendContent(hdfsPath, recStr)
-  }
-
-  def log(rt: Long, msg: String): Unit = {
-    try {
-      val logStr = (if (isInit) persistHead else "") + timeHead(rt) + 
s"${msg}\n\n"
-      HdfsUtil.appendContent(LogFile, logStr)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
deleted file mode 100644
index 47cb785..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.batch.persist
-
-import org.apache.griffin.measure.batch.result._
-import org.apache.griffin.measure.batch.utils.{HttpUtil, JsonUtil}
-import org.apache.spark.rdd.RDD
-
-import scala.util.Try
-
-// persist result by http way
-case class HttpPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
-
-  val Api = "api"
-  val Method = "method"
-
-  val api = config.getOrElse(Api, "").toString
-  val method = config.getOrElse(Method, "post").toString
-
-  def available(): Boolean = {
-    api.nonEmpty
-  }
-
-  def start(msg: String): Unit = {}
-  def finish(): Unit = {}
-
-  def result(rt: Long, result: Result): Unit = {
-    result match {
-      case ar: AccuracyResult => {
-        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> 
timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch))
-        httpResult(dataMap)
-      }
-      case pr: ProfileResult => {
-        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> 
timeStamp), ("total" -> pr.getTotal), ("matched" -> pr.getMatch))
-        httpResult(dataMap)
-      }
-      case _ => {
-        info(s"result: ${result}")
-      }
-    }
-  }
-
-  private def httpResult(dataMap: Map[String, Any]) = {
-    try {
-      val data = JsonUtil.toJson(dataMap)
-      // post
-      val params = Map[String, Object]()
-      val header = Map[String, Object]()
-      val status = HttpUtil.httpRequest(api, method, params, header, data)
-      info(s"${method} to ${api} response status: ${status}")
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-
-  }
-
-  def missRecords(records: RDD[String]): Unit = {}
-  def matchRecords(records: RDD[String]): Unit = {}
-
-  def log(rt: Long, msg: String): Unit = {}
-
-}

Reply via email to