docker version

Author: Liu <[email protected]>
Author: Liu <[email protected]>
Author: Liu <[email protected]>
Author: bhlx3lyx7 <[email protected]>

Closes #34 from bhlx3lyx7/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/a0117811
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/a0117811
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/a0117811

Branch: refs/heads/master
Commit: a0117811c2e1ee101632458bb7a01d22171300d7
Parents: 38978a8
Author: Liu <[email protected]>
Authored: Tue May 23 11:10:57 2017 +0800
Committer: Liu <[email protected]>
Committed: Tue May 23 11:10:57 2017 +0800

----------------------------------------------------------------------
 griffin-measure/griffin-measure-batch/pom.xml   |  16 +
 .../src/main/resources/config-old.json          |  45 +++
 .../src/main/resources/config.json              |  29 ++
 .../src/main/resources/env.json                 |  29 ++
 .../src/main/resources/log4j.properties         |   5 +
 .../griffin/measure/batch/Application.scala     |  87 ++++++
 .../measure/batch/algo/AccuracyAlgo.scala       |   6 +
 .../griffin/measure/batch/algo/Algo.scala       |  16 +
 .../measure/batch/algo/BatchAccuracyAlgo.scala  | 144 +++++++++
 .../measure/batch/algo/core/AccuracyCore.scala  |  78 +++++
 .../measure/batch/config/params/AllParam.scala  |  14 +
 .../measure/batch/config/params/Param.scala     |   7 +
 .../batch/config/params/env/CleanerParam.scala  |  10 +
 .../batch/config/params/env/EnvParam.scala      |  13 +
 .../batch/config/params/env/PersistParam.scala  |  12 +
 .../batch/config/params/env/SparkParam.scala    |  13 +
 .../config/params/user/DataConnectorParam.scala |  13 +
 .../config/params/user/EvaluateRuleParam.scala  |  12 +
 .../batch/config/params/user/UserParam.scala    |  15 +
 .../batch/config/reader/ParamFileReader.scala   |  20 ++
 .../config/reader/ParamHdfsFileReader.scala     |  20 ++
 .../config/reader/ParamRawStringReader.scala    |  17 ++
 .../batch/config/reader/ParamReader.scala       |  12 +
 .../config/reader/ParamReaderFactory.scala      |  22 ++
 .../config/validator/AllParamValidator.scala    |  16 +
 .../batch/config/validator/ParamValidator.scala |  12 +
 .../batch/connector/AvroDataConnector.scala     |  79 +++++
 .../measure/batch/connector/CacheDataUtil.scala |  63 ++++
 .../measure/batch/connector/DataConnector.scala |  16 +
 .../batch/connector/DataConnectorFactory.scala  |  32 ++
 .../batch/connector/HiveDataConnector.scala     | 101 +++++++
 .../griffin/measure/batch/log/Loggable.scala    |  25 ++
 .../measure/batch/persist/HdfsPersist.scala     | 119 ++++++++
 .../measure/batch/persist/HttpPersist.scala     |  44 +++
 .../measure/batch/persist/MultiPersists.scala   |  27 ++
 .../griffin/measure/batch/persist/Persist.scala |  23 ++
 .../measure/batch/persist/PersistFactory.scala  |  30 ++
 .../measure/batch/result/AccuracyResult.scala   |  26 ++
 .../griffin/measure/batch/result/Result.scala   |  14 +
 .../measure/batch/result/ResultInfo.scala       |  33 ++
 .../measure/batch/rule/RuleAnalyzer.scala       |  28 ++
 .../measure/batch/rule/RuleAnalyzerOld.scala    |  26 ++
 .../measure/batch/rule/RuleFactory.scala        |  34 +++
 .../griffin/measure/batch/rule/RuleParser.scala | 298 +++++++++++++++++++
 .../batch/rule/RuleParserDescription.scala      |  84 ++++++
 .../batch/rule/expr/AnalyzableExpr.scala        |   7 +
 .../measure/batch/rule/expr/Cacheable.scala     |  15 +
 .../measure/batch/rule/expr/Calculatable.scala  |   7 +
 .../batch/rule/expr/DataSourceable.scala        |  10 +
 .../measure/batch/rule/expr/Describable.scala   |  15 +
 .../griffin/measure/batch/rule/expr/Expr.scala  |  33 ++
 .../measure/batch/rule/expr/ExprDescOnly.scala  |  22 ++
 .../measure/batch/rule/expr/ExprIdCounter.scala |  42 +++
 .../measure/batch/rule/expr/FieldDescOnly.scala |  40 +++
 .../measure/batch/rule/expr/LiteralExpr.scala   |  68 +++++
 .../measure/batch/rule/expr/LogicalExpr.scala   | 149 ++++++++++
 .../measure/batch/rule/expr/MathExpr.scala      |  79 +++++
 .../measure/batch/rule/expr/SelectExpr.scala    |  53 ++++
 .../measure/batch/rule/expr/StatementExpr.scala |  50 ++++
 .../batch/rule/expr_old/AnnotationExpr.scala    |  16 +
 .../batch/rule/expr_old/Calculatable.scala      |   7 +
 .../measure/batch/rule/expr_old/ConstExpr.scala |  59 ++++
 .../measure/batch/rule/expr_old/DataExpr.scala  |  27 ++
 .../batch/rule/expr_old/ElementExpr.scala       |  52 ++++
 .../measure/batch/rule/expr_old/Expr.scala      |  11 +
 .../batch/rule/expr_old/ExprAnalyzable.scala    |   8 +
 .../batch/rule/expr_old/ExprIdCounter.scala     |  39 +++
 .../batch/rule/expr_old/Recordable.scala        |  15 +
 .../batch/rule/expr_old/SelectExpr.scala        |  48 +++
 .../rule/expr_old/StatementAnalyzable.scala     |  14 +
 .../batch/rule/expr_old/StatementExpr.scala     |  85 ++++++
 .../batch/rule/expr_old/VariableExpr.scala      |  26 ++
 .../measure/batch/utils/CalculationUtil.scala   | 259 ++++++++++++++++
 .../griffin/measure/batch/utils/HdfsUtil.scala  |  62 ++++
 .../griffin/measure/batch/utils/HttpUtil.scala  |  30 ++
 .../griffin/measure/batch/utils/JsonUtil.scala  |  32 ++
 .../measure/batch/utils/StringParseUtil.scala   |  10 +
 .../src/test/resources/config.json              |  25 ++
 .../src/test/resources/config1.json             |  27 ++
 .../src/test/resources/env.json                 |  27 ++
 .../src/test/resources/env1.json                |  21 ++
 .../src/test/resources/log4j.properties         |   5 +
 .../src/test/resources/users_info_src.avro      | Bin 0 -> 3850 bytes
 .../src/test/resources/users_info_src.dat       |  50 ++++
 .../src/test/resources/users_info_target.avro   | Bin 0 -> 3852 bytes
 .../src/test/resources/users_info_target.dat    |  50 ++++
 .../batch/algo/BatchAccuracyAlgoTest.scala      | 219 ++++++++++++++
 .../config/reader/ParamFileReaderTest.scala     |  35 +++
 .../measure/batch/rule/RuleParserTest.scala     |  77 +++++
 griffin-measure/pom.xml                         | 197 ++++++++++++
 .../data/test/dataFile/users_info_src.avro      | Bin 0 -> 3799 bytes
 .../data/test/dataFile/users_info_target.avro   | Bin 0 -> 3799 bytes
 .../data/test/recordFile/_RESULT_ACCU           |  12 +
 .../data/test/reqJson/accuUsersInfo.json        |  55 ++++
 .../src/test/scala/modelTest/AccuTest.scala     |  86 ++++++
 .../scala/org/apache/griffin/Application.scala  |   3 +-
 ui/js/controllers/metrics-ctrl.js               |   5 +-
 ui/js/services/services.js                      |   6 +-
 98 files changed, 4067 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/pom.xml
----------------------------------------------------------------------
diff --git a/griffin-measure/griffin-measure-batch/pom.xml 
b/griffin-measure/griffin-measure-batch/pom.xml
new file mode 100644
index 0000000..6fd72ed
--- /dev/null
+++ b/griffin-measure/griffin-measure-batch/pom.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>griffin-measure</artifactId>
+        <groupId>org.apache.griffin</groupId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>griffin-measure-batch</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/resources/config-old.json
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/resources/config-old.json 
b/griffin-measure/griffin-measure-batch/src/main/resources/config-old.json
new file mode 100644
index 0000000..63dee69
--- /dev/null
+++ b/griffin-measure/griffin-measure-batch/src/main/resources/config-old.json
@@ -0,0 +1,45 @@
+{
+  "name": "accu1",
+  "type": "accuracy",
+
+  "source": {
+    "connector": {
+      "type": "hive",
+      "version": "1.2",
+      "config": {
+        "table.name": "users_info_src",
+        "partitions": "dt=20170410, hour=14"
+      }
+    }
+  },
+
+  "target": {
+    "connector": {
+      "type": "hive",
+      "version": "1.2",
+      "config": {
+        "database": "default",
+        "table.name": "users_info_target",
+        "partitions": "dt=20170410, hour=14; dt=20170410, hour=15"
+      }
+    }
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 1,
+    "assertion": {
+      "type": "DSL-griffin",
+      "rules": [
+        {
+          "rule": "@Key ${source}['user_id'] === ${target}['user_id']"
+        },
+        {
+          "rule": "${source}['first_name'] === ${target}['first_name']; 
${source}['last_name'] === ${target}['last_name']; ${source}['address'] === 
${target}['address']"
+        },
+        {
+          "rule": "${source}['email'] === ${target}['email']; 
${source}['phone'] === ${target}['phone']; ${source}['post_code'] === 
${target}['post_code']"
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/resources/config.json
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/resources/config.json 
b/griffin-measure/griffin-measure-batch/src/main/resources/config.json
new file mode 100644
index 0000000..edd2e6a
--- /dev/null
+++ b/griffin-measure/griffin-measure-batch/src/main/resources/config.json
@@ -0,0 +1,29 @@
+{
+  "name": "accu1",
+  "type": "accuracy",
+
+  "source": {
+    "type": "hive",
+    "version": "1.2",
+    "config": {
+      "database": "default",
+      "table.name": "users_info_src",
+      "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+    }
+  },
+
+  "target": {
+    "type": "hive",
+    "version": "1.2",
+    "config": {
+      "database": "default",
+      "table.name": "users_info_target",
+      "partitions": "dt=23123, hour=432; dt=35464, hour=4657"
+    }
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 0.2,
+    "rules": "$source.user_id = $target.user_id AND $source.first_name = 
$target.first_name AND $source.last_name = $target.last_name AND 
$source.address = $target.address AND $source.email = $target.email AND 
$source.phone = $target.phone AND $source.post_code = $target.post_code"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/resources/env.json
----------------------------------------------------------------------
diff --git a/griffin-measure/griffin-measure-batch/src/main/resources/env.json 
b/griffin-measure/griffin-measure-batch/src/main/resources/env.json
new file mode 100644
index 0000000..2e72601
--- /dev/null
+++ b/griffin-measure/griffin-measure-batch/src/main/resources/env.json
@@ -0,0 +1,29 @@
+{
+  "spark": {
+    "log.level": "INFO",
+    "checkpoint.dir": "hdfs:///griffin/batch/cp",
+    "config": {}
+  },
+
+  "persist": [
+    {
+      "type": "hdfs",
+      "config": {
+        "path": "hdfs:///griffin/streaming/persist",
+        "max.persist.lines": 10000,
+        "max.lines.per.file": 10000
+      }
+    },
+    {
+      "type": "http",
+      "config": {
+        "method": "post",
+        "api": "http://phxbark4dq-360935.stratus.phx.ebay.com:8080/";
+      }
+    }
+  ],
+
+  "cleaner": {
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/resources/log4j.properties 
b/griffin-measure/griffin-measure-batch/src/main/resources/log4j.properties
new file mode 100644
index 0000000..bd31e15
--- /dev/null
+++ b/griffin-measure/griffin-measure-batch/src/main/resources/log4j.properties
@@ -0,0 +1,5 @@
+log4j.rootLogger=INFO, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} 
%-5p [%c] - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
new file mode 100644
index 0000000..6fe6bcb
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/Application.scala
@@ -0,0 +1,87 @@
+package org.apache.griffin.measure.batch
+
+import org.apache.griffin.measure.batch.algo.{Algo, BatchAccuracyAlgo}
+import org.apache.griffin.measure.batch.config.params._
+import org.apache.griffin.measure.batch.config.params.env._
+import org.apache.griffin.measure.batch.config.params.user._
+import org.apache.griffin.measure.batch.config.reader._
+import org.apache.griffin.measure.batch.config.validator.AllParamValidator
+import org.apache.griffin.measure.batch.log.Loggable
+
+import scala.util.{Failure, Success, Try}
+
+object Application extends Loggable {
+
+  def main(args: Array[String]): Unit = {
+    info(args.toString)
+    if (args.length < 2) {
+      error("Usage: class <env-param> <user-param> [List of String split by 
comma: raw | local | hdfs(default)]")
+      sys.exit(-1)
+    }
+
+    val envParamFile = args(0)
+    val userParamFile = args(1)
+    val (envFsType, userFsType) = if (args.length > 2) {
+      val fsTypes = args(2).trim.split(",")
+      if (fsTypes.length == 1) (fsTypes(0).trim, fsTypes(0).trim)
+      else if (fsTypes.length >= 2) (fsTypes(0).trim, fsTypes(1).trim)
+      else ("hdfs", "hdfs")
+    } else ("hdfs", "hdfs")
+
+    info(envParamFile)
+    info(userParamFile)
+
+    // read param files
+    val envParam = readParamFile[EnvParam](envParamFile, envFsType) match {
+      case Success(p) => p
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-2)
+      }
+    }
+    val userParam = readParamFile[UserParam](userParamFile, userFsType) match {
+      case Success(p) => p
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-2)
+      }
+    }
+    val allParam: AllParam = AllParam(envParam, userParam)
+
+    // validate param files
+    validateParams(allParam) match {
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-3)
+      }
+      case _ => {
+        info("params validation pass")
+      }
+    }
+
+    // choose algorithm   // fixme: not done, need to choose algorithm by param
+    val dqType = allParam.userParam.dqType
+    val algo: Algo = BatchAccuracyAlgo(allParam)
+
+    algo.run match {
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-4)
+      }
+      case _ => {
+        info("calculation finished")
+      }
+    }
+  }
+
+  private def readParamFile[T <: Param](file: String, fsType: String)(implicit 
m : Manifest[T]): Try[T] = {
+    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+    paramReader.readConfig[T]
+  }
+
+  private def validateParams(allParam: AllParam): Try[Boolean] = {
+    val allParamValidator = AllParamValidator()
+    allParamValidator.validate(allParam)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
new file mode 100644
index 0000000..d22add2
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/AccuracyAlgo.scala
@@ -0,0 +1,6 @@
+package org.apache.griffin.measure.batch.algo
+
+
+trait AccuracyAlgo extends Algo {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala
new file mode 100644
index 0000000..d7047ee
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/Algo.scala
@@ -0,0 +1,16 @@
+package org.apache.griffin.measure.batch.algo
+
+import org.apache.griffin.measure.batch.config.params.env._
+import org.apache.griffin.measure.batch.config.params.user._
+import org.apache.griffin.measure.batch.log.Loggable
+
+import scala.util.Try
+
+trait Algo extends Loggable with Serializable {
+
+  val envParam: EnvParam
+  val userParam: UserParam
+
+  def run(): Try[_]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
new file mode 100644
index 0000000..24564a4
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/BatchAccuracyAlgo.scala
@@ -0,0 +1,144 @@
+package org.apache.griffin.measure.batch.algo
+
+import java.util.Date
+
+import org.apache.griffin.measure.batch.algo.core.AccuracyCore
+import org.apache.griffin.measure.batch.config.params.AllParam
+import org.apache.griffin.measure.batch.connector._
+import org.apache.griffin.measure.batch.rule._
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.griffin.measure.batch.persist._
+import org.apache.griffin.measure.batch.result._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.util.{Failure, Success, Try}
+
+
+case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
+  val envParam = allParam.envParam
+  val userParam = allParam.userParam
+
+  def run(): Try[_] = {
+    Try {
+      val metricName = userParam.name
+
+      val conf = new SparkConf().setAppName(metricName)
+      val sc = new SparkContext(conf)
+      val sqlContext = new HiveContext(sc)
+
+      // start time
+      val startTime = new Date().getTime()
+
+      // get persists
+      val persist: Persist = PersistFactory(envParam.persistParams, 
metricName).getPersists(startTime)
+
+      // get spark application id
+      val applicationId = sc.applicationId
+
+      // start
+      persist.start(applicationId)
+
+      // rules
+      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+      val rule: StatementExpr = ruleFactory.generateRule()
+      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+
+      // global cache data
+      val globalCachedData = CacheDataUtil.genCachedMap(None, 
ruleAnalyzer.globalCacheExprs, Map[String, Any]())
+      val globalFinalCachedData = 
CacheDataUtil.filterCachedMap(ruleAnalyzer.globalFinalCacheExprs, 
globalCachedData)
+
+      // data connector
+      val sourceDataConnector: DataConnector =
+        DataConnectorFactory.getDataConnector(sqlContext, 
userParam.sourceParam,
+          ruleAnalyzer.sourceGroupbyExprs, ruleAnalyzer.sourceCacheExprs,
+          ruleAnalyzer.sourceFinalCacheExprs, globalFinalCachedData) match {
+          case Success(cntr) => {
+            if (cntr.available) cntr
+            else throw new Exception("source data connection error!")
+          }
+          case Failure(ex) => throw ex
+        }
+      val targetDataConnector: DataConnector =
+        DataConnectorFactory.getDataConnector(sqlContext, 
userParam.targetParam,
+          ruleAnalyzer.targetGroupbyExprs, ruleAnalyzer.targetCacheExprs,
+          ruleAnalyzer.targetFinalCacheExprs, globalFinalCachedData) match {
+          case Success(cntr) => {
+            if (cntr.available) cntr
+            else throw new Exception("target data connection error!")
+          }
+          case Failure(ex) => throw ex
+        }
+
+      // get metadata
+//      val sourceMetaData: Iterable[(String, String)] = 
sourceDataConnector.metaData() match {
+//        case Success(md) => md
+//        case _ => throw new Exception("source metadata error!")
+//      }
+//      val targetMetaData: Iterable[(String, String)] = 
targetDataConnector.metaData() match {
+//        case Success(md) => md
+//        case _ => throw new Exception("target metadata error!")
+//      }
+
+      // get data
+      val sourceData: RDD[(Product, Map[String, Any])] = 
sourceDataConnector.data() match {
+        case Success(dt) => dt
+        case Failure(ex) => throw ex
+      }
+      val targetData: RDD[(Product, Map[String, Any])] = 
targetDataConnector.data() match {
+        case Success(dt) => dt
+        case Failure(ex) => throw ex
+      }
+
+      // accuracy algorithm
+      val (accuResult, missingRdd, matchingRdd) = accuracy(sourceData, 
targetData, ruleAnalyzer)
+
+      // end time
+      val endTime = new Date().getTime
+      persist.log(endTime, s"calculation using time: ${endTime - startTime} 
ms")
+
+      // persist result
+      persist.result(endTime, accuResult)
+      val missingRecords = missingRdd.map(record2String(_))
+      persist.missRecords(missingRecords)
+
+      // persist end time
+      val persistEndTime = new Date().getTime
+      persist.log(persistEndTime, s"persist using time: ${persistEndTime - 
endTime} ms")
+
+      // finish
+      persist.finish()
+
+      // context stop
+      sc.stop
+
+    }
+  }
+
+  def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, 
Any]) = {
+    (data, Map[String, Any]())
+  }
+
+  def accuracy(sourceData: RDD[(Product, Map[String, Any])], targetData: 
RDD[(Product, Map[String, Any])], ruleAnalyzer: RuleAnalyzer
+              ): (AccuracyResult, RDD[(Product, (Map[String, Any], Map[String, 
Any]))], RDD[(Product, (Map[String, Any], Map[String, Any]))]) = {
+
+    // 1. wrap data
+    val sourceWrappedData: RDD[(Product, (Map[String, Any], Map[String, 
Any]))] = sourceData.map(r => (r._1, wrapInitData(r._2)))
+    val targetWrappedData: RDD[(Product, (Map[String, Any], Map[String, 
Any]))] = targetData.map(r => (r._1, wrapInitData(r._2)))
+
+    // 2. cogroup
+    val allKvs = sourceWrappedData.cogroup(targetWrappedData)
+
+    // 3. accuracy calculation
+    val (accuResult, missingRdd, matchingRdd) = AccuracyCore.accuracy(allKvs, 
ruleAnalyzer)
+
+    (accuResult, missingRdd, matchingRdd)
+  }
+
+  def record2String(rec: (Product, (Map[String, Any], Map[String, Any]))): 
String = {
+    val (key, (data, info)) = rec
+    s"${data} [${info}]"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
new file mode 100644
index 0000000..2989538
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/algo/core/AccuracyCore.scala
@@ -0,0 +1,78 @@
+package org.apache.griffin.measure.batch.algo.core
+
+import org.apache.griffin.measure.batch.rule.RuleAnalyzer
+import org.apache.griffin.measure.batch.result._
+import org.apache.spark.rdd.RDD
+
+
+object AccuracyCore {
+
+  type V = Map[String, Any]
+  type T = Map[String, Any]
+
+  def accuracy(allKvs: RDD[(Product, (Iterable[(V, T)], Iterable[(V, T)]))], 
ruleAnalyzer: RuleAnalyzer
+              ): (AccuracyResult, RDD[(Product, (V, T))], RDD[(Product, (V, 
T))]) = {
+    val result: RDD[(Long, Long, List[(Product, (V, T))], List[(Product, (V, 
T))])] = allKvs.map { kv =>
+      val (key, (sourceDatas, targetDatas)) = kv
+
+      // result: (missCount, matchCount, missDataList, matchDataList)
+      val rslt = sourceDatas.foldLeft((0L, 0L, List[(Product, (V, T))](), 
List[(Product, (V, T))]())) { (sr, sourcePair) =>
+        val matchResult = if (targetDatas.isEmpty) {
+          (false, Map[String, Any]((MismatchInfo.key -> "no target")))
+        } else {
+          targetDatas.foldLeft((false, Map[String, Any]())) { (tr, targetPair) 
=>
+            if (tr._1) tr
+            else matchData(sourcePair, targetPair, ruleAnalyzer)
+          }
+        }
+
+        if (matchResult._1) {
+          val matchItem = (key, sourcePair)
+          (sr._1, sr._2 + 1, sr._3, sr._4 :+ matchItem)
+        } else {
+          val missItem = (key, (sourcePair._1, sourcePair._2 ++ 
matchResult._2))
+          (sr._1 + 1, sr._2, sr._3 :+ missItem, sr._4)
+        }
+      }
+
+      rslt
+    }
+
+    val missRdd = result.flatMap(_._3)
+    val matchRdd = result.flatMap(_._4)
+
+    def seq(cnt: (Long, Long), rcd: (Long, Long, Any, Any)): (Long, Long) = {
+      (cnt._1 + rcd._1, cnt._2 + rcd._2)
+    }
+    def comb(c1: (Long, Long), c2: (Long, Long)): (Long, Long) = {
+      (c1._1 + c2._1, c1._2 + c2._2)
+    }
+    val countPair = result.aggregate((0L, 0L))(seq, comb)
+
+    (AccuracyResult(countPair._1, (countPair._1 + countPair._2)), missRdd, 
matchRdd)
+  }
+
+  private def matchData(source: (V, T), target: (V, T), ruleAnalyzer: 
RuleAnalyzer): (Boolean, T) = {
+
+    // 1. merge source and target cached data
+    val mergedData: Map[String, Any] = mergeData(source, target)
+
+    // 2. check valid
+    if (ruleAnalyzer.rule.valid(mergedData)) {
+      // 3. substitute the cached data into statement, get the statement value
+      // currently we can not get the mismatch reason, we need to add such 
information to figure out how it mismatches
+      ((ruleAnalyzer.rule.calculate(mergedData) match {
+        case Some(b: Boolean) => b
+        case _ => false
+      }), Map[String, Any]((MismatchInfo.key -> s"not matched with 
${target._1}")))
+    } else {
+      (false, Map[String, Any]((MismatchInfo.key -> s"not valid to compare 
with ${target._1}")))
+    }
+
+  }
+
+  private def mergeData(source: (V, T), target: (V, T)): Map[String, Any] = {
+    source._1 ++ target._1
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
new file mode 100644
index 0000000..b5618a8
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/AllParam.scala
@@ -0,0 +1,14 @@
+package org.apache.griffin.measure.batch.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.env._
+import org.apache.griffin.measure.batch.config.params.user._
+
+// simply composite of env and user params, for convenient usage
+@JsonInclude(Include.NON_NULL)
+case class AllParam( @JsonProperty("env") envParam: EnvParam,
+                     @JsonProperty("user") userParam: UserParam
+                   ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
new file mode 100644
index 0000000..ce94c5c
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/Param.scala
@@ -0,0 +1,7 @@
+package org.apache.griffin.measure.batch.config.params
+
+trait Param extends Serializable {
+
+  def validate(): Boolean = true
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
new file mode 100644
index 0000000..a29004c
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/CleanerParam.scala
@@ -0,0 +1,10 @@
+package org.apache.griffin.measure.batch.config.params.env
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class CleanerParam() extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
new file mode 100644
index 0000000..79e2575
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/EnvParam.scala
@@ -0,0 +1,13 @@
+package org.apache.griffin.measure.batch.config.params.env
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class EnvParam( @JsonProperty("spark") sparkParam: SparkParam,
+                     @JsonProperty("persist") persistParams: 
List[PersistParam],
+                     @JsonProperty("cleaner") cleanerParam: CleanerParam
+                   ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
new file mode 100644
index 0000000..655f19b
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/PersistParam.scala
@@ -0,0 +1,12 @@
+package org.apache.griffin.measure.batch.config.params.env
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class PersistParam( @JsonProperty("type") persistType: String,
+                         @JsonProperty("config") config: Map[String, Any]
+                       ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
new file mode 100644
index 0000000..ba6f9a6
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/env/SparkParam.scala
@@ -0,0 +1,13 @@
+package org.apache.griffin.measure.batch.config.params.env
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class SparkParam( @JsonProperty("log.level") logLevel: String,
+                       @JsonProperty("checkpoint.dir") cpDir: String,
+                       @JsonProperty("config") config: Map[String, Any]
+                     ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
new file mode 100644
index 0000000..7a8b2b2
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/DataConnectorParam.scala
@@ -0,0 +1,13 @@
+package org.apache.griffin.measure.batch.config.params.user
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class DataConnectorParam( @JsonProperty("type") conType: String,
+                               @JsonProperty("version") version: String,
+                               @JsonProperty("config") config: Map[String, Any]
+                             ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
new file mode 100644
index 0000000..edf0a38
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/EvaluateRuleParam.scala
@@ -0,0 +1,12 @@
+package org.apache.griffin.measure.batch.config.params.user
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class EvaluateRuleParam( @JsonProperty("sampleRatio") sampleRatio: Double,
+                              @JsonProperty("rules") rules: String
+                            ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
new file mode 100644
index 0000000..be0d3c8
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/params/user/UserParam.scala
@@ -0,0 +1,15 @@
+package org.apache.griffin.measure.batch.config.params.user
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import org.apache.griffin.measure.batch.config.params.Param
+
+@JsonInclude(Include.NON_NULL)
+case class UserParam(@JsonProperty("name") name: String,
+                     @JsonProperty("type") dqType: String,
+                     @JsonProperty("source") sourceParam: DataConnectorParam,
+                     @JsonProperty("target") targetParam: DataConnectorParam,
+                     @JsonProperty("evaluateRule") evaluateRuleParam: 
EvaluateRuleParam
+                    ) extends Param {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
new file mode 100644
index 0000000..e30513f
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamFileReader.scala
@@ -0,0 +1,20 @@
+package org.apache.griffin.measure.batch.config.reader
+
+import org.apache.griffin.measure.batch.config.params.Param
+import org.apache.griffin.measure.batch.utils.JsonUtil
+
+import scala.util.Try
+
+case class ParamFileReader(file: String) extends ParamReader {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
+    Try {
+      val source = scala.io.Source.fromFile(file)
+      val lines = source.mkString
+      val param = JsonUtil.fromJson[T](lines)
+      source.close
+      param
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
new file mode 100644
index 0000000..473b428
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamHdfsFileReader.scala
@@ -0,0 +1,20 @@
+package org.apache.griffin.measure.batch.config.reader
+
+import org.apache.griffin.measure.batch.config.params.Param
+import org.apache.griffin.measure.batch.utils.JsonUtil
+import org.apache.griffin.measure.batch.utils.HdfsUtil
+
+import scala.util.Try
+
+case class ParamHdfsFileReader(filePath: String) extends ParamReader {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
+    Try {
+      val source = HdfsUtil.openFile(filePath)
+      val param = JsonUtil.fromJson[T](source)
+      source.close
+      param
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
new file mode 100644
index 0000000..1e931de
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamRawStringReader.scala
@@ -0,0 +1,17 @@
+package org.apache.griffin.measure.batch.config.reader
+
+import org.apache.griffin.measure.batch.config.params.Param
+import org.apache.griffin.measure.batch.utils.JsonUtil
+
+import scala.util.Try
+
+case class ParamRawStringReader(rawString: String) extends ParamReader {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = {
+    Try {
+      val param = JsonUtil.fromJson[T](rawString)
+      param
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
new file mode 100644
index 0000000..2f42b09
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReader.scala
@@ -0,0 +1,12 @@
+package org.apache.griffin.measure.batch.config.reader
+
+import org.apache.griffin.measure.batch.log.Loggable
+import org.apache.griffin.measure.batch.config.params.Param
+
+import scala.util.Try
+
+trait ParamReader extends Loggable with Serializable {
+
+  def readConfig[T <: Param](implicit m : Manifest[T]): Try[T]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
new file mode 100644
index 0000000..dd77e8c
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/reader/ParamReaderFactory.scala
@@ -0,0 +1,22 @@
+package org.apache.griffin.measure.batch.config.reader
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+
+object ParamReaderFactory {
+
+  val RawStringRegex = """^(?i)raw$""".r
+  val LocalFsRegex = """^(?i)local$""".r
+  val HdfsFsRegex = """^(?i)hdfs$""".r
+
+  def getParamReader(filePath: String, fsType: String): ParamReader = {
+    fsType match {
+      case RawStringRegex() => ParamRawStringReader(filePath)
+      case LocalFsRegex() => ParamFileReader(filePath)
+      case HdfsFsRegex() => ParamHdfsFileReader(filePath)
+      case _ => ParamHdfsFileReader(filePath)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
new file mode 100644
index 0000000..b0ab8b5
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala
@@ -0,0 +1,16 @@
+package org.apache.griffin.measure.batch.config.validator
+
+import org.apache.griffin.measure.batch.config.params.Param
+
+import scala.util.Try
+
+case class AllParamValidator() extends ParamValidator {
+
+  def validate[T <: Param](param: Param): Try[Boolean] = {
+    Try {
+      // fixme: not done, need to validate param
+      true
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
new file mode 100644
index 0000000..9dc9e60
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala
@@ -0,0 +1,12 @@
+package org.apache.griffin.measure.batch.config.validator
+
+import org.apache.griffin.measure.batch.log.Loggable
+import org.apache.griffin.measure.batch.config.params.Param
+
+import scala.util.Try
+
+trait ParamValidator extends Loggable with Serializable {
+
+  def validate[T <: Param](param: Param): Try[Boolean]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
new file mode 100644
index 0000000..d271e17
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala
@@ -0,0 +1,79 @@
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import com.databricks.spark.avro._
+
+import scala.util.{Success, Try}
+import java.nio.file.{Files, Paths}
+
+import org.apache.griffin.measure.batch.utils.HdfsUtil
+
+case class AvroDataConnector(sqlContext: SQLContext, config: Map[String, Any],
+                             groupbyExprs: Seq[MathExpr], cacheExprs: 
Iterable[Expr],
+                             finalCacheExprs: Iterable[Expr], 
globalFinalCacheMap: Map[String, Any]
+                            ) extends DataConnector {
+
+  val FilePath = "file.path"
+  val FileName = "file.name"
+
+  val filePath = config.getOrElse(FilePath, "").toString
+  val fileName = config.getOrElse(FileName, "").toString
+
+  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else 
fileName
+
+  private def pathPrefix(): Boolean = {
+    filePath.nonEmpty
+  }
+
+  private def fileExist(): Boolean = {
+    HdfsUtil.existPath(concreteFileFullPath)
+  }
+
+  def available(): Boolean = {
+    (!concreteFileFullPath.isEmpty) && fileExist
+  }
+
+  def metaData(): Try[Iterable[(String, String)]] = {
+    Try {
+      val st = 
sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+      st.fields.map(f => (f.name, f.dataType.typeName))
+    }
+  }
+
+  def data(): Try[RDD[(Product, Map[String, Any])]] = {
+    Try {
+      loadDataFile.map { row =>
+        // generate cache data
+        val cacheData: Map[String, Any] = 
cacheExprs.foldLeft(globalFinalCacheMap) { (cachedMap, expr) =>
+          CacheDataUtil.genCachedMap(Some(row), expr, cachedMap)
+        }
+        val finalCacheData = CacheDataUtil.filterCachedMap(finalCacheExprs, 
cacheData)
+
+        // get groupby data
+        val groupbyData: Seq[AnyRef] = groupbyExprs.flatMap { expr =>
+          expr.calculate(finalCacheData) match {
+            case Some(v) => Some(v.asInstanceOf[AnyRef])
+            case _ => None
+          }
+        }
+        val key = toTuple(groupbyData)
+
+        (key, finalCacheData)
+      }
+    }
+  }
+
+  private def loadDataFile() = {
+    
sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
+  }
+
+  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
+    if (as.size > 0) {
+      val tupleClass = Class.forName("scala.Tuple" + as.size)
+      tupleClass.getConstructors.apply(0).newInstance(as: 
_*).asInstanceOf[Product]
+    } else None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala
new file mode 100644
index 0000000..884df5d
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala
@@ -0,0 +1,63 @@
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.spark.sql.Row
+
+import scala.util.{Success, Try}
+
+object CacheDataUtil {
+
+  // for now, one expr only get one value, not supporting one expr get 
multiple values
+  private def getCacheData(data: Option[Any], expr: Expr, cachedMap: 
Map[String, Any]): Option[Any] = {
+    Try {
+      expr match {
+        case selection: SelectionExpr => {
+          selection.selectors.foldLeft(data) { (dt, selector) =>
+            getCacheData(dt, selector, cachedMap)
+          }
+        }
+        case selector: IndexFieldRangeSelectExpr => {
+          data match {
+            case Some(row: Row) => {
+              if (selector.fields.size == 1) {
+                selector.fields.head match {
+                  case i: IndexDesc => Some(row.getAs[Any](i.index))
+                  case f: FieldDesc => Some(row.getAs[Any](f.field))
+                  case _ => None
+                }
+              } else None
+            }
+            case _ => None
+          }
+        }
+        case _ => expr.calculate(cachedMap)
+      }
+    } match {
+      case Success(v) => v
+      case _ => None
+    }
+  }
+
+  def genCachedMap(data: Option[Any], expr: Expr, initialCachedMap: 
Map[String, Any]): Map[String, Any] = {
+    val valueOpt = getCacheData(data, expr, initialCachedMap)
+    if (valueOpt.nonEmpty) {
+      initialCachedMap + (expr._id -> valueOpt.get)
+    } else initialCachedMap
+  }
+
+  def genCachedMap(data: Option[Any], exprs: Iterable[Expr], initialCachedMap: 
Map[String, Any]): Map[String, Any] = {
+    exprs.foldLeft(initialCachedMap) { (cachedMap, expr) =>
+      CacheDataUtil.genCachedMap(None, expr, cachedMap)
+    }
+  }
+
+  def filterCachedMap(exprs: Iterable[Expr], cachedMap: Map[String, Any]): 
Map[String, Any] = {
+    exprs.foldLeft(Map[String, Any]()) { (newMap, expr) =>
+      val valueOpt = expr.calculate(cachedMap)
+      if (valueOpt.nonEmpty) {
+        newMap + (expr._id -> valueOpt.get)
+      } else newMap
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
new file mode 100644
index 0000000..9cc9be6
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala
@@ -0,0 +1,16 @@
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.spark.rdd.RDD
+
+import scala.util.Try
+
+
+trait DataConnector extends Serializable {
+
+  def available(): Boolean
+
+  def metaData(): Try[Iterable[(String, String)]]
+
+  def data(): Try[RDD[(Product, Map[String, Any])]]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
new file mode 100644
index 0000000..ffa8d9c
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala
@@ -0,0 +1,32 @@
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.griffin.measure.batch.config.params.user._
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.spark.sql.SQLContext
+
+import scala.util.Try
+
+object DataConnectorFactory {
+
+  val HiveRegex = """^(?i)hive$""".r
+  val AvroRegex = """^(?i)avro$""".r
+
+  def getDataConnector(sqlContext: SQLContext,
+                       dataConnectorParam: DataConnectorParam,
+                       groupbyExprs: Seq[MathExpr],
+                       cacheExprs: Iterable[Expr],
+                       finalCacheExprs: Iterable[Expr],
+                       globalFinalCacheMap: Map[String, Any]
+                      ): Try[DataConnector] = {
+    val conType = dataConnectorParam.conType
+    val version = dataConnectorParam.version
+    Try {
+      conType match {
+        case HiveRegex() => HiveDataConnector(sqlContext, 
dataConnectorParam.config, groupbyExprs, cacheExprs, finalCacheExprs, 
globalFinalCacheMap)
+        case AvroRegex() => AvroDataConnector(sqlContext, 
dataConnectorParam.config, groupbyExprs, cacheExprs, finalCacheExprs, 
globalFinalCacheMap)
+        case _ => throw new Exception("connector creation error!")
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
new file mode 100644
index 0000000..5c80e30
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala
@@ -0,0 +1,101 @@
+package org.apache.griffin.measure.batch.connector
+
+import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+
+import scala.util.{Success, Try}
+
+case class HiveDataConnector(sqlContext: SQLContext, config: Map[String, Any],
+                             groupbyExprs: Seq[MathExpr], cacheExprs: 
Iterable[Expr],
+                             finalCacheExprs: Iterable[Expr], 
globalFinalCacheMap: Map[String, Any]
+                            ) extends DataConnector {
+
+  val Database = "database"
+  val TableName = "table.name"
+  val Partitions = "partitions"
+
+  val database = config.getOrElse(Database, "").toString
+  val tableName = config.getOrElse(TableName, "").toString
+  val partitionsString = config.getOrElse(Partitions, "").toString
+
+  val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else 
tableName
+  val partitions = partitionsString.split(";").map(s => 
s.split(",").map(_.trim))
+
+  private def dbPrefix(): Boolean = {
+    database.nonEmpty && !database.equals("default")
+  }
+
+  def available(): Boolean = {
+    (!tableName.isEmpty) && {
+      Try {
+        if (dbPrefix) {
+          sqlContext.tables(database).filter(tableExistsSql).collect.size
+        } else {
+          sqlContext.tables().filter(tableExistsSql).collect.size
+        }
+      } match {
+        case Success(s) => s > 0
+        case _ => false
+      }
+    }
+  }
+
+  def metaData(): Try[Iterable[(String, String)]] = {
+    Try {
+      val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), 
r.getString(1))).collect
+      val partitionPos: Int = originRows.indexWhere(pair => 
pair._1.startsWith("# "))
+      if (partitionPos < 0) originRows
+      else originRows.take(partitionPos)
+    }
+  }
+
+  def data(): Try[RDD[(Product, Map[String, Any])]] = {
+    Try {
+      sqlContext.sql(dataSql).map { row =>
+        // generate cache data
+        val cacheData: Map[String, Any] = 
cacheExprs.foldLeft(globalFinalCacheMap) { (cachedMap, expr) =>
+          CacheDataUtil.genCachedMap(Some(row), expr, cachedMap)
+        }
+        val finalCacheData = CacheDataUtil.filterCachedMap(finalCacheExprs, 
cacheData)
+
+        // get groupby data
+        val groupbyData: Seq[AnyRef] = groupbyExprs.flatMap { expr =>
+          expr.calculate(finalCacheData) match {
+            case Some(v) => Some(v.asInstanceOf[AnyRef])
+            case _ => None
+          }
+        }
+        val key = toTuple(groupbyData)
+
+        (key, finalCacheData)
+      }
+    }
+  }
+
+  private def tableExistsSql(): String = {
+//    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but 
not work for spark sql
+    s"tableName LIKE '${tableName}'"
+  }
+
+  private def metaDataSql(): String = {
+    s"DESCRIBE ${concreteTableName}"
+  }
+
+  private def dataSql(): String = {
+    val clauses = partitions.map { prtn =>
+      val cls = prtn.mkString(" AND ")
+      if (cls.isEmpty) s"SELECT * FROM ${concreteTableName}"
+      else s"SELECT * FROM ${concreteTableName} WHERE ${cls}"
+    }
+    clauses.mkString(" UNION ALL ")
+  }
+
+  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
+    if (as.size > 0) {
+      val tupleClass = Class.forName("scala.Tuple" + as.size)
+      tupleClass.getConstructors.apply(0).newInstance(as: 
_*).asInstanceOf[Product]
+    } else None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
new file mode 100644
index 0000000..f73e86c
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala
@@ -0,0 +1,25 @@
+package org.apache.griffin.measure.batch.log
+
+import org.slf4j.LoggerFactory
+
+trait Loggable {
+
+  @transient private lazy val logger = LoggerFactory.getLogger(getClass)
+
+  protected def info(msg: String): Unit = {
+    logger.info(msg)
+  }
+
+  protected def debug(msg: String): Unit = {
+    logger.debug(msg)
+  }
+
+  protected def warn(msg: String): Unit = {
+    logger.warn(msg)
+  }
+
+  protected def error(msg: String): Unit = {
+    logger.error(msg)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
new file mode 100644
index 0000000..7bff3b6
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala
@@ -0,0 +1,119 @@
+package org.apache.griffin.measure.batch.persist
+
+import java.util.Date
+
+import org.apache.griffin.measure.batch.result._
+import org.apache.griffin.measure.batch.utils.HdfsUtil
+import org.apache.spark.rdd.RDD
+
+
+case class HdfsPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
+
+  val Path = "path"
+  val MaxPersistLines = "max.persist.lines"
+  val MaxLinesPerFile = "max.lines.per.file"
+
+  val path = config.getOrElse(Path, "").toString
+  val maxPersistLines = config.getOrElse(MaxPersistLines, -1).toString.toLong
+  val maxLinesPerFile = config.getOrElse(MaxLinesPerFile, 
10000).toString.toLong
+
+  val separator = "/"
+
+  val StartFile = filePath("_START")
+  val FinishFile = filePath("_FINISH")
+  val ResultFile = filePath("_RESULT")
+
+  val MissRecFile = filePath("_MISSREC")      // optional
+
+  val LogFile = filePath("_LOG")
+
+  var _init = true
+  private def isInit = {
+    val i = _init
+    _init = false
+    i
+  }
+
+  def available(): Boolean = {
+    (path.nonEmpty) && (maxPersistLines < Int.MaxValue)
+  }
+
+  private def persistHead: String = {
+    val dt = new Date(timeStamp)
+    s"================ log of ${dt} ================\n"
+  }
+
+  private def timeHead(rt: Long): String = {
+    val dt = new Date(rt)
+    s"--- ${dt} ---\n"
+  }
+
+  protected def getFilePath(parentPath: String, fileName: String): String = {
+    if (parentPath.endsWith(separator)) parentPath + fileName else parentPath 
+ separator + fileName
+  }
+
+  protected def filePath(file: String): String = {
+    getFilePath(path, s"${metricName}/${timeStamp}/${file}")
+  }
+
+  protected def withSuffix(path: String, suffix: String): String = {
+    s"${path}.${suffix}"
+  }
+
+  def start(msg: String): Unit = {
+    HdfsUtil.writeContent(StartFile, msg)
+  }
+  def finish(): Unit = {
+    HdfsUtil.createEmptyFile(FinishFile)
+  }
+
+  def result(rt: Long, result: Result): Unit = {
+    val resStr = result match {
+      case ar: AccuracyResult => {
+        s"match percentage: ${ar.matchPercentage}\ntotal count: 
${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
+      }
+      case _ => {
+        s"result: ${result}"
+      }
+    }
+    HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr)
+    log(rt, resStr)
+
+    info(resStr)
+  }
+
+  // need to avoid string too long
+  def missRecords(records: RDD[String]): Unit = {
+    val recordCount = records.count
+    val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
+    if (count > 0) {
+      val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
+      if (groupCount <= 1) {
+        val recs = records.take(count.toInt)
+        persistRecords(MissRecFile, recs)
+      } else {
+        val groupedRecords: RDD[(Long, Iterable[String])] =
+          records.zipWithIndex.flatMap { r =>
+            val gid = r._2 / maxLinesPerFile
+            if (gid < groupCount) Some((gid, r._1)) else None
+          }.groupByKey()
+        groupedRecords.foreach { group =>
+          val (gid, recs) = group
+          val hdfsPath = if (gid == 0) MissRecFile else 
withSuffix(MissRecFile, gid.toString)
+          persistRecords(hdfsPath, recs)
+        }
+      }
+    }
+  }
+
+  private def persistRecords(hdfsPath: String, records: Iterable[String]): 
Unit = {
+    val recStr = records.mkString("\n")
+    HdfsUtil.appendContent(hdfsPath, recStr)
+  }
+
+  def log(rt: Long, msg: String): Unit = {
+    val logStr = (if (isInit) persistHead else "") + timeHead(rt) + 
s"${msg}\n\n"
+    HdfsUtil.appendContent(LogFile, logStr)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
new file mode 100644
index 0000000..5765927
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala
@@ -0,0 +1,44 @@
+package org.apache.griffin.measure.batch.persist
+
+import org.apache.griffin.measure.batch.result._
+import org.apache.griffin.measure.batch.utils.{HttpUtil, JsonUtil}
+import org.apache.spark.rdd.RDD
+
+case class HttpPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
+
+  val Api = "api"
+  val Method = "method"
+
+  val api = config.getOrElse(Api, "").toString
+  val method = config.getOrElse(Method, "post").toString
+
+  def available(): Boolean = {
+    api.nonEmpty
+  }
+
+  def start(msg: String): Unit = {}
+  def finish(): Unit = {}
+
+  def result(rt: Long, result: Result): Unit = {
+    result match {
+      case ar: AccuracyResult => {
+        val dataMap = Map[String, Any](("metricName" -> metricName), 
("timestamp" -> timeStamp), ("value" -> ar.matchPercentage))
+        val data = JsonUtil.toJson(dataMap)
+
+        // post
+        val params = Map[String, Object]()
+        val header = Map[String, Object](("content-type" -> 
"application/json"))
+        val status = HttpUtil.httpRequest(api, method, params, header, data)
+        info(s"${method} to ${api} response status: ${status}")
+      }
+      case _ => {
+        info(s"result: ${result}")
+      }
+    }
+  }
+
+  def missRecords(records: RDD[String]): Unit = {}
+
+  def log(rt: Long, msg: String): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
new file mode 100644
index 0000000..2fa6942
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala
@@ -0,0 +1,27 @@
+package org.apache.griffin.measure.batch.persist
+
+import org.apache.griffin.measure.batch.result._
+import org.apache.griffin.measure.batch.utils.{HttpUtil, JsonUtil}
+import org.apache.spark.rdd.RDD
+
+case class MultiPersists(persists: Iterable[Persist]) extends Persist {
+
+  val timeStamp: Long = persists match {
+    case Nil => 0
+    case _ => persists.head.timeStamp
+  }
+
+  val config: Map[String, Any] = Map[String, Any]()
+
+  def available(): Boolean = { persists.exists(_.available()) }
+
+  def start(msg: String): Unit = { persists.foreach(_.start(msg)) }
+  def finish(): Unit = {persists.foreach(_.finish())}
+
+  def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, 
result)) }
+
+  def missRecords(records: RDD[String]): Unit = { 
persists.foreach(_.missRecords(records)) }
+
+  def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
new file mode 100644
index 0000000..7398c24
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala
@@ -0,0 +1,23 @@
+package org.apache.griffin.measure.batch.persist
+
+import org.apache.griffin.measure.batch.log.Loggable
+import org.apache.griffin.measure.batch.result._
+import org.apache.spark.rdd.RDD
+
+
+trait Persist extends Loggable with Serializable {
+  val timeStamp: Long
+
+  val config: Map[String, Any]
+
+  def available(): Boolean
+
+  def start(msg: String): Unit
+  def finish(): Unit
+
+  def result(rt: Long, result: Result): Unit
+
+  def missRecords(records: RDD[String]): Unit
+
+  def log(rt: Long, msg: String): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
new file mode 100644
index 0000000..84dc4ce
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala
@@ -0,0 +1,30 @@
+package org.apache.griffin.measure.batch.persist
+
+import org.apache.griffin.measure.batch.config.params.env._
+
+import scala.util.{Success, Try}
+
+
+case class PersistFactory(persistParams: Iterable[PersistParam], metricName: 
String) extends Serializable {
+
+  val HDFS_REGEX = """^(?i)hdfs$""".r
+  val HTTP_REGEX = """^(?i)http$""".r
+
+  def getPersists(timeStamp: Long): MultiPersists = {
+    MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param)))
+  }
+
+  private def getPersist(timeStamp: Long, persistParam: PersistParam): 
Option[Persist] = {
+    val persistConfig = persistParam.config
+    val persistTry = persistParam.persistType match {
+      case HDFS_REGEX() => Try(HdfsPersist(persistConfig, metricName, 
timeStamp))
+      case HTTP_REGEX() => Try(HttpPersist(persistConfig, metricName, 
timeStamp))
+      case _ => throw new Exception("not supported persist type")
+    }
+    persistTry match {
+      case Success(persist) if (persist.available) => Some(persist)
+      case _ => None
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
new file mode 100644
index 0000000..55505ac
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala
@@ -0,0 +1,26 @@
+package org.apache.griffin.measure.batch.result
+
+
+case class AccuracyResult(miss: Long, total: Long) extends Result {
+
+  type T = AccuracyResult
+
+  def update(delta: T): T = {
+    AccuracyResult(delta.miss, total)
+  }
+
+  def eventual(): Boolean = {
+    this.miss <= 0
+  }
+
+  def differsFrom(other: T): Boolean = {
+    (this.miss != other.miss) || (this.total != other.total)
+  }
+
+  def getMiss = miss
+  def getTotal = total
+  def getMatch = total - miss
+
+  def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / 
getTotal * 100
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala
new file mode 100644
index 0000000..8d529db
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala
@@ -0,0 +1,14 @@
+package org.apache.griffin.measure.batch.result
+
+
+trait Result extends Serializable {
+
+  type T <: Result
+
+  def update(delta: T): T
+
+  def eventual(): Boolean
+
+  def differsFrom(other: T): Boolean
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala
new file mode 100644
index 0000000..a430318
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala
@@ -0,0 +1,33 @@
+package org.apache.griffin.measure.batch.result
+
+
+sealed trait ResultInfo {
+  type T
+  val key: String
+  val tp: String
+  def wrap(value: T) = (key -> value)
+}
+
+final case object TimeGroupInfo extends ResultInfo {
+  type T = Long
+  val key = "__time__"
+  val tp = "bigint"
+}
+
+final case object NextFireTimeInfo extends ResultInfo {
+  type T = Long
+  val key = "__next_fire_time__"
+  val tp = "bigint"
+}
+
+final case object MismatchInfo extends ResultInfo {
+  type T = String
+  val key = "__mismatch__"
+  val tp = "string"
+}
+
+final case object ErrorInfo extends ResultInfo {
+  type T = String
+  val key = "__error__"
+  val tp = "string"
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
new file mode 100644
index 0000000..78665f9
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala
@@ -0,0 +1,28 @@
+package org.apache.griffin.measure.batch.rule
+
+import org.apache.griffin.measure.batch.rule.expr._
+
+case class RuleAnalyzer(rule: StatementExpr) extends Serializable {
+
+  val GlobalData = ""
+  val SourceData = "source"
+  val TargetData = "target"
+
+  val globalCacheExprs: Iterable[Expr] = rule.getCacheExprs(GlobalData)
+  val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData)
+  val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData)
+
+  val sourcePersistExprs: Iterable[Expr] = rule.getPersistExprs(SourceData)
+  val targetPersistExprs: Iterable[Expr] = rule.getPersistExprs(TargetData)
+
+  val globalFinalCacheExprs: Iterable[Expr] = 
rule.getFinalCacheExprs(GlobalData).toSet
+  val sourceFinalCacheExprs: Iterable[Expr] = 
rule.getFinalCacheExprs(SourceData).toSet ++ sourcePersistExprs.toSet
+  val targetFinalCacheExprs: Iterable[Expr] = 
rule.getFinalCacheExprs(TargetData).toSet ++ targetPersistExprs.toSet
+
+  val groupbyExprPairs: Seq[(MathExpr, MathExpr)] = 
rule.getGroupbyExprPairs((SourceData, TargetData))
+  val sourceGroupbyExprs: Seq[MathExpr] = groupbyExprPairs.map(_._1)
+  val targetGroupbyExprs: Seq[MathExpr] = groupbyExprPairs.map(_._2)
+
+  val whenClauseExpr: Option[LogicalExpr] = rule.getWhenClauseExpr
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerOld.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerOld.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerOld.scala
new file mode 100644
index 0000000..dd90c81
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzerOld.scala
@@ -0,0 +1,26 @@
+package org.apache.griffin.measure.batch.rule
+
+//import org.apache.griffin.measure.batch.rule.expr._
+import org.apache.griffin.measure.batch.rule.expr_old._
+
+case class RuleAnalyzerOld(rule: StatementExpr) extends Serializable {
+
+  val SourceData = "source"
+  val TargetData = "target"
+
+//  val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData)
+//  val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData)
+
+  val sourceDataExprs: Iterable[DataExpr] = 
rule.getDataRelatedExprs(SourceData)
+  val targetDataExprs: Iterable[DataExpr] = 
rule.getDataRelatedExprs(TargetData)
+
+  val keyMappings: Iterable[MappingExpr] = rule.getKeyMappings()
+
+  val sourceDataKeyExprs: Seq[DataExpr] = 
keyMappings.flatMap(_.getDataRelatedExprs(SourceData)).toSeq
+  val targetDatakeyExprs: Seq[DataExpr] = 
keyMappings.flatMap(_.getDataRelatedExprs(TargetData)).toSeq
+
+  val assigns: Iterable[AssignExpr] = rule.getAssigns()
+  val conditions: Iterable[ConditionExpr] = rule.getConditions()
+  val mappings: Iterable[MappingExpr] = rule.getMappings()
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala
----------------------------------------------------------------------
diff --git 
a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala
 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala
new file mode 100644
index 0000000..cc3e8b3
--- /dev/null
+++ 
b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala
@@ -0,0 +1,34 @@
+package org.apache.griffin.measure.batch.rule
+
+import org.apache.griffin.measure.batch.config.params.user._
+
+import scala.util.Failure
+//import org.apache.griffin.measure.batch.rule.expr_old._
+import org.apache.griffin.measure.batch.rule.expr._
+
+import scala.util.{Success, Try}
+
+
+case class RuleFactory(evaluateRuleParam: EvaluateRuleParam) {
+
+  val ruleParser: RuleParser = RuleParser()
+
+  def generateRule(): StatementExpr = {
+    val rules = evaluateRuleParam.rules
+    val statement = parseExpr(rules) match {
+      case Success(se) => se
+      case Failure(ex) => throw ex
+    }
+    statement
+  }
+
+  private def parseExpr(rules: String): Try[StatementExpr] = {
+    Try {
+      val result = ruleParser.parseAll(ruleParser.rule, rules)
+      if (result.successful) result.get
+      else throw new Exception("parse rule error!")
+//      throw new Exception("parse rule error!")
+    }
+  }
+
+}

Reply via email to