Streaming

Streaming accuracy measure, supporting kafka as data source, depends zookeeper 
and hdfs as information and data cache.
Add some parameters to configure the streaming configuration.
With streaming accuracy sample document.

Author: Lionel Liu <bhlx3l...@163.com>

Closes #117 from bhlx3lyx7/streaming.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/6fd22ae7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/6fd22ae7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/6fd22ae7

Branch: refs/heads/master
Commit: 6fd22ae71c438ed25d6177deb79707de1edae76f
Parents: df44514
Author: Lionel Liu <bhlx3l...@163.com>
Authored: Fri Aug 11 09:39:47 2017 +0800
Committer: William Guo <guo...@icloud.com>
Committed: Fri Aug 11 09:39:47 2017 +0800

----------------------------------------------------------------------
 griffin-doc/measure-streaming-sample.md         | 204 +++++++++++
 measure/pom.xml                                 |   8 +
 measure/src/main/resources/config-old.json      |  45 ---
 .../src/main/resources/config-streaming.json    |  61 ++++
 measure/src/main/resources/config.json          |   2 +
 measure/src/main/resources/env.json             |  16 +
 .../apache/griffin/measure/Application.scala    | 133 +++++++
 .../griffin/measure/algo/AccuracyAlgo.scala     |  24 ++
 .../org/apache/griffin/measure/algo/Algo.scala  |  34 ++
 .../griffin/measure/algo/MeasureType.scala      |  26 ++
 .../griffin/measure/algo/ProcessType.scala      |  26 ++
 .../griffin/measure/algo/ProfileAlgo.scala      |  23 ++
 .../measure/algo/batch/BatchAccuracyAlgo.scala  | 190 ++++++++++
 .../measure/algo/batch/BatchProfileAlgo.scala   | 162 +++++++++
 .../measure/algo/core/AccuracyCore.scala        | 103 ++++++
 .../griffin/measure/algo/core/ProfileCore.scala |  73 ++++
 .../algo/streaming/StreamingAccuracyAlgo.scala  | 358 +++++++++++++++++++
 .../streaming/StreamingAccuracyProcess.scala    | 234 ++++++++++++
 .../measure/algo/streaming/TimingProcess.scala  |  46 +++
 .../griffin/measure/batch/Application.scala     | 113 ------
 .../measure/batch/algo/AccuracyAlgo.scala       |  24 --
 .../griffin/measure/batch/algo/Algo.scala       |  34 --
 .../measure/batch/algo/BatchAccuracyAlgo.scala  | 183 ----------
 .../measure/batch/algo/BatchProfileAlgo.scala   | 155 --------
 .../measure/batch/algo/MeasureType.scala        |  26 --
 .../measure/batch/algo/ProfileAlgo.scala        |  23 --
 .../measure/batch/algo/core/AccuracyCore.scala  | 101 ------
 .../measure/batch/algo/core/ProfileCore.scala   |  73 ----
 .../measure/batch/config/params/AllParam.scala  |  32 --
 .../measure/batch/config/params/Param.scala     |  25 --
 .../batch/config/params/env/CleanerParam.scala  |  28 --
 .../batch/config/params/env/EnvParam.scala      |  31 --
 .../batch/config/params/env/PersistParam.scala  |  30 --
 .../batch/config/params/env/SparkParam.scala    |  31 --
 .../config/params/user/DataConnectorParam.scala |  31 --
 .../config/params/user/EvaluateRuleParam.scala  |  30 --
 .../batch/config/params/user/UserParam.scala    |  33 --
 .../batch/config/reader/ParamFileReader.scala   |  38 --
 .../config/reader/ParamHdfsFileReader.scala     |  38 --
 .../config/reader/ParamRawStringReader.scala    |  35 --
 .../batch/config/reader/ParamReader.scala       |  30 --
 .../config/reader/ParamReaderFactory.scala      |  40 ---
 .../config/validator/AllParamValidator.scala    |  34 --
 .../batch/config/validator/ParamValidator.scala |  30 --
 .../batch/connector/AvroDataConnector.scala     | 109 ------
 .../measure/batch/connector/DataConnector.scala |  34 --
 .../batch/connector/DataConnectorFactory.scala  |  49 ---
 .../batch/connector/HiveDataConnector.scala     | 131 -------
 .../griffin/measure/batch/log/Loggable.scala    |  43 ---
 .../measure/batch/persist/HdfsPersist.scala     | 171 ---------
 .../measure/batch/persist/HttpPersist.scala     |  78 ----
 .../measure/batch/persist/LoggerPersist.scala   |  89 -----
 .../measure/batch/persist/MultiPersists.scala   |  49 ---
 .../griffin/measure/batch/persist/Persist.scala |  44 ---
 .../measure/batch/persist/PersistFactory.scala  |  51 ---
 .../measure/batch/result/AccuracyResult.scala   |  44 ---
 .../measure/batch/result/ProfileResult.scala    |  44 ---
 .../griffin/measure/batch/result/Result.scala   |  32 --
 .../measure/batch/result/ResultInfo.scala       |  57 ---
 .../measure/batch/rule/CalculationUtil.scala    | 315 ----------------
 .../measure/batch/rule/ExprValueUtil.scala      |  93 -----
 .../measure/batch/rule/RuleAnalyzer.scala       |  78 ----
 .../measure/batch/rule/RuleFactory.scala        |  52 ---
 .../griffin/measure/batch/rule/RuleParser.scala | 240 -------------
 .../batch/rule/expr/AnalyzableExpr.scala        |  25 --
 .../measure/batch/rule/expr/Cacheable.scala     |  33 --
 .../measure/batch/rule/expr/Calculatable.scala  |  25 --
 .../batch/rule/expr/DataSourceable.scala        |  28 --
 .../measure/batch/rule/expr/Describable.scala   |  33 --
 .../griffin/measure/batch/rule/expr/Expr.scala  |  51 ---
 .../measure/batch/rule/expr/ExprDescOnly.scala  |  40 ---
 .../measure/batch/rule/expr/ExprIdCounter.scala |  60 ----
 .../measure/batch/rule/expr/FieldDescOnly.scala |  58 ---
 .../measure/batch/rule/expr/LiteralExpr.scala   |  96 -----
 .../measure/batch/rule/expr/LogicalExpr.scala   | 177 ---------
 .../measure/batch/rule/expr/MathExpr.scala      |  97 -----
 .../measure/batch/rule/expr/SelectExpr.scala    |  71 ----
 .../measure/batch/rule/expr/StatementExpr.scala |  70 ----
 .../griffin/measure/batch/utils/HdfsUtil.scala  |  80 -----
 .../griffin/measure/batch/utils/HttpUtil.scala  |  48 ---
 .../griffin/measure/batch/utils/JsonUtil.scala  |  50 ---
 .../griffin/measure/cache/info/InfoCache.scala  |  39 ++
 .../measure/cache/info/InfoCacheFactory.scala   |  41 +++
 .../measure/cache/info/InfoCacheInstance.scala  |  53 +++
 .../measure/cache/info/TimeInfoCache.scala      | 116 ++++++
 .../measure/cache/info/ZKInfoCache.scala        | 210 +++++++++++
 .../griffin/measure/cache/lock/CacheLock.scala  |  31 ++
 .../measure/cache/lock/MultiCacheLock.scala     |  39 ++
 .../measure/cache/lock/ZKCacheLock.scala        |  53 +++
 .../measure/cache/result/CacheResult.scala      |  29 ++
 .../cache/result/CacheResultProcesser.scala     |  71 ++++
 .../measure/config/params/AllParam.scala        |  32 ++
 .../griffin/measure/config/params/Param.scala   |  25 ++
 .../config/params/env/CleanerParam.scala        |  29 ++
 .../measure/config/params/env/EnvParam.scala    |  32 ++
 .../config/params/env/InfoCacheParam.scala      |  30 ++
 .../config/params/env/PersistParam.scala        |  30 ++
 .../measure/config/params/env/SparkParam.scala  |  33 ++
 .../config/params/user/DataCacheParam.scala     |  31 ++
 .../config/params/user/DataConnectorParam.scala |  37 ++
 .../config/params/user/EvaluateRuleParam.scala  |  30 ++
 .../measure/config/params/user/UserParam.scala  |  34 ++
 .../measure/config/reader/ParamFileReader.scala |  38 ++
 .../config/reader/ParamHdfsFileReader.scala     |  38 ++
 .../config/reader/ParamRawStringReader.scala    |  35 ++
 .../measure/config/reader/ParamReader.scala     |  30 ++
 .../config/reader/ParamReaderFactory.scala      |  40 +++
 .../config/validator/AllParamValidator.scala    |  34 ++
 .../config/validator/ParamValidator.scala       |  30 ++
 .../measure/connector/DataConnector.scala       |  32 ++
 .../connector/DataConnectorFactory.scala        | 139 +++++++
 .../connector/cache/CacheDataConnector.scala    |  33 ++
 .../measure/connector/cache/DataCacheable.scala |  86 +++++
 .../measure/connector/cache/DataUpdatable.scala |  30 ++
 .../cache/HiveCacheDataConnector.scala          | 351 ++++++++++++++++++
 .../cache/TextCacheDataConnector.scala          | 311 ++++++++++++++++
 .../direct/AvroDirectDataConnector.scala        | 132 +++++++
 .../connector/direct/DirectDataConnector.scala  |  34 ++
 .../direct/HiveDirectDataConnector.scala        | 158 ++++++++
 .../direct/KafkaCacheDirectDataConnector.scala  | 125 +++++++
 .../StreamingCacheDirectDataConnector.scala     |  60 ++++
 .../streaming/KafkaStreamingDataConnector.scala |  58 +++
 .../streaming/StreamingDataConnector.scala      |  34 ++
 .../apache/griffin/measure/log/Loggable.scala   |  43 +++
 .../griffin/measure/persist/HdfsPersist.scala   | 206 +++++++++++
 .../griffin/measure/persist/HttpPersist.scala   |  88 +++++
 .../griffin/measure/persist/LoggerPersist.scala | 118 ++++++
 .../griffin/measure/persist/MultiPersists.scala |  52 +++
 .../measure/persist/OldHttpPersist.scala        |  87 +++++
 .../griffin/measure/persist/Persist.scala       |  52 +++
 .../measure/persist/PersistFactory.scala        |  53 +++
 .../measure/persist/PersistThreadPool.scala     |  62 ++++
 .../griffin/measure/result/AccuracyResult.scala |  44 +++
 .../griffin/measure/result/DataInfo.scala       |  50 +++
 .../griffin/measure/result/ProfileResult.scala  |  44 +++
 .../apache/griffin/measure/result/Result.scala  |  32 ++
 .../griffin/measure/rule/CalculationUtil.scala  | 315 ++++++++++++++++
 .../measure/rule/DataTypeCalculationUtil.scala  | 159 ++++++++
 .../griffin/measure/rule/ExprValueUtil.scala    | 263 ++++++++++++++
 .../griffin/measure/rule/RuleAnalyzer.scala     |  72 ++++
 .../griffin/measure/rule/RuleFactory.scala      |  52 +++
 .../griffin/measure/rule/RuleParser.scala       | 244 +++++++++++++
 .../measure/rule/SchemaValueCombineUtil.scala   | 187 ++++++++++
 .../measure/rule/expr/AnalyzableExpr.scala      |  24 ++
 .../griffin/measure/rule/expr/Cacheable.scala   |  33 ++
 .../measure/rule/expr/Calculatable.scala        |  25 ++
 .../griffin/measure/rule/expr/ClauseExpr.scala  | 109 ++++++
 .../measure/rule/expr/DataSourceable.scala      |  28 ++
 .../griffin/measure/rule/expr/Describable.scala |  33 ++
 .../apache/griffin/measure/rule/expr/Expr.scala |  53 +++
 .../measure/rule/expr/ExprDescOnly.scala        |  40 +++
 .../measure/rule/expr/ExprIdCounter.scala       |  60 ++++
 .../measure/rule/expr/FieldDescOnly.scala       |  58 +++
 .../griffin/measure/rule/expr/LiteralExpr.scala |  83 +++++
 .../griffin/measure/rule/expr/LogicalExpr.scala | 178 +++++++++
 .../griffin/measure/rule/expr/MathExpr.scala    |  99 +++++
 .../griffin/measure/rule/expr/SelectExpr.scala  |  88 +++++
 .../rule/func/DefaultFunctionDefine.scala       |  36 ++
 .../measure/rule/func/FunctionDefine.scala      |  25 ++
 .../measure/rule/func/FunctionUtil.scala        |  75 ++++
 .../measure/utils/HdfsFileDumpUtil.scala        |  83 +++++
 .../apache/griffin/measure/utils/HdfsUtil.scala | 120 +++++++
 .../apache/griffin/measure/utils/HttpUtil.scala |  54 +++
 .../apache/griffin/measure/utils/JsonUtil.scala |  54 +++
 .../apache/griffin/measure/utils/TimeUtil.scala |  79 ++++
 measure/src/test/resources/config-profile.json  |   2 +
 .../src/test/resources/config-streaming.json    |  69 ++++
 .../src/test/resources/config-streaming1.json   |  65 ++++
 .../src/test/resources/config-streaming2.json   |  65 ++++
 .../src/test/resources/config-streaming3.json   |  65 ++++
 measure/src/test/resources/config.json          |   4 +-
 measure/src/test/resources/config1.json         |   2 +
 measure/src/test/resources/env-streaming.json   |  45 +++
 measure/src/test/resources/env.json             |  24 +-
 .../algo/batch/BatchAccuracyAlgoTest.scala      | 198 ++++++++++
 .../algo/batch/BatchProfileAlgoTest.scala       | 173 +++++++++
 .../measure/algo/batch/DataFrameSaveTest.scala  | 172 +++++++++
 .../measure/algo/core/AccuracyCoreTest.scala    |  89 +++++
 .../measure/algo/core/ProfileCoreTest.scala     |  79 ++++
 .../streaming/StreamingAccuracyAlgoTest.scala   | 267 ++++++++++++++
 .../batch/algo/BatchAccuracyAlgoTest.scala      | 192 ----------
 .../batch/algo/BatchProfileAlgoTest.scala       | 167 ---------
 .../batch/algo/core/AccuracyCoreTest.scala      |  89 -----
 .../batch/algo/core/ProfileCoreTest.scala       |  79 ----
 .../reader/ParamRawStringReaderTest.scala       |  38 --
 .../validator/AllParamValidatorTest.scala       |  40 ---
 .../measure/batch/persist/HdfsPersistTest.scala |  48 ---
 .../measure/batch/persist/HttpPersistTest.scala |  42 ---
 .../batch/result/AccuracyResultTest.scala       |  57 ---
 .../batch/result/ProfileResultTest.scala        |  57 ---
 .../measure/batch/rule/RuleAnalyzerTest.scala   |  64 ----
 .../measure/batch/rule/RuleFactoryTest.scala    |  44 ---
 .../measure/batch/rule/RuleParserTest.scala     | 202 -----------
 .../measure/batch/utils/JsonUtilTest.scala      |  60 ----
 .../measure/cache/InfoCacheInstanceTest.scala   |  78 ++++
 .../griffin/measure/cache/ZKCacheLockTest.scala |  84 +++++
 .../griffin/measure/cache/ZKInfoCacheTest.scala |  90 +++++
 .../reader/ParamRawStringReaderTest.scala       |  38 ++
 .../validator/AllParamValidatorTest.scala       |  40 +++
 .../measure/connector/ConnectorTest.scala       |  70 ++++
 .../measure/persist/HdfsPersistTest.scala       |  48 +++
 .../measure/persist/HttpPersistTest.scala       |  42 +++
 .../measure/result/AccuracyResultTest.scala     |  57 +++
 .../measure/result/ProfileResultTest.scala      |  57 +++
 .../measure/rule/ExprValueUtilTest.scala        |  86 +++++
 .../griffin/measure/rule/RuleAnalyzerTest.scala |  60 ++++
 .../griffin/measure/rule/RuleFactoryTest.scala  |  44 +++
 .../griffin/measure/rule/RuleParserTest.scala   | 213 +++++++++++
 .../griffin/measure/utils/JsonUtilTest.scala    |  60 ++++
 service/src/main/resources/sparkJob.properties  |   2 +-
 210 files changed, 10619 insertions(+), 5322 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/griffin-doc/measure-streaming-sample.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure-streaming-sample.md 
b/griffin-doc/measure-streaming-sample.md
new file mode 100644
index 0000000..004ed3b
--- /dev/null
+++ b/griffin-doc/measure-streaming-sample.md
@@ -0,0 +1,204 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+# Measure streaming sample
+Measures consists of batch measure and streaming measure. This document is for 
the streaming measure sample.
+
+### Data source
+At current, we support kafka as streaming data source.  
+In this sample, we also need a kafka as data source.
+
+### Measure type
+At current, we support accuracy measure in streaming mode.
+
+### Kafka decoder
+In kafka, data always needs encode and decode, we support String type kafka 
data currently, you can also implement and use your decoder for kafka case.
+
+### Environment
+For current griffin streaming case, we need some necessary environment 
dependencies, zookeeper and hdfs.  
+We use zookeeper to cache some checkpoint information, it's optional, but we 
recommend it.  
+We use hdfs to save the temporary data, it's also a recommend selection.
+
+### Streaming accuracy result
+The streaming data will be separated into mini-batches of data, for each 
mini-batch data, there should be an accuracy result. Therefore, the streaming 
accuracy result should be a bunch of batch accuracy results with timestamp.  
+Considering the latency of streaming data, which means the source data and the 
matching target data will not exactly reach exactly at the same time, we have 
to accept some delay of data in streaming mode, by holding unmatched data in 
memory or disk, and try to match them later until the data is out-time.
+
+## How to run streaming sample
+### Environment Preparation
+At first, we need some environment preparation.  
+- Zookeeper: Zookeeper 3.4.10
+- Hadoop: Hadoop 2.6
+- Spark: Spark 1.6
+- Kafka: Kafka 0.8
+
+### Data Preparation
+Create two topics in kafka, for source and target data. For example, topic 
"source" for source data, and topic "target" for target data.  
+Streaming data should also be prepared, the format could be json string, for 
example:  
+Source data could be:
+```
+{"name": "kevin", "age": 24}
+{"name": "jason", "age": 25}
+{"name": "jhon", "age": 28}
+{"name": "steve", "age": 31}
+```
+Target data could be:
+```
+{"name": "kevin", "age": 24}
+{"name": "jason", "age": 25}
+{"name": "steve", "age": 20}
+```
+You need to input the source data and target data into these two topics, 
through console producer might be a good choice for experimental purpose.
+
+### Configuration Preparation
+Two configuration files are required.
+Environment configuration file: env.json
+```
+{
+  "spark": {
+    "log.level": "WARN",
+    "checkpoint.dir": "hdfs:///griffin/streaming/cp",
+    "batch.interval": "5s",
+    "process.interval": "30s",
+    "config": {
+      "spark.task.maxFailures": 5,
+      "spark.streaming.kafkaMaxRatePerPartition": 1000,
+      "spark.streaming.concurrentJobs": 4
+    }
+  },
+
+  "persist": [
+    {
+      "type": "log",
+      "config": {
+        "max.log.lines": 100
+      }
+    }, {
+      "type": "hdfs",
+      "config": {
+        "path": "hdfs:///griffin/streaming/persist",
+        "max.persist.lines": 10000,
+        "max.lines.per.file": 10000
+      }
+    }
+  ],
+
+  "info.cache": [
+    {
+      "type": "zk",
+      "config": {
+        "hosts": "<zookeeper host ip>:2181",
+        "namespace": "griffin/infocache",
+        "lock.path": "lock",
+        "mode": "persist",
+        "init.clear": true,
+        "close.clear": false
+      }
+    }
+  ]
+}
+```
+In env.json, "spark" field configures the spark and spark streaming 
parameters, "persist" field configures the persist ways, we support "log", 
"hdfs" and "http" ways at current, "info.cache" field configures the 
information cache parameters, we support zookeeper only at current.  
+
+Process configuration file: config.json
+```
+{
+  "name": "streaming-accu-sample",
+  "type": "accuracy",
+  "process.type": "streaming",
+
+  "source": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "<kafka host ip>:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "source",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "text",
+      "config": {
+        "file.path": "hdfs:///griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-5m", "0"]
+    },
+    "match.once": true
+  },
+
+  "target": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "<kafka host ip>:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "target",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "text",
+      "config": {
+        "file.path": "hdfs:///griffin/streaming/dump/target",
+        "info.path": "target",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-5m", "0"]
+    },
+    "match.once": false
+  },
+
+  "evaluateRule": {
+    "rules": "$source.json().name = $target.json().name AND $source.json().age 
= $target.json().age"
+  }
+}
+```
+In config.json, "source" and "target" fields configure the data source 
parameters.  
+The "cache" field in data source configuration represents the temporary data 
cache way, at current we support "text" and "hive" ways. We recommend "text" 
way, it only depends on hdfs. "time.range" means that the data older than the 
lower bound should be considered as out-time, and the out-time data will not be 
calculated any more.   
+"match.once" represents the data from this data source could be matched only 
once or more times.  
+"evaluateRule.rule" configures the match rule between each source and target 
data.
+
+### Run
+Build the measure package.
+```
+mvn clean install
+```
+Get the measure package ```measure-<version>-incubating-SNAPSHOT.jar```, 
rename it to ```griffin-measure.jar```.  
+Put measure package together with env.json and config.json.
+Run the following command:
+```
+spark-submit --class org.apache.griffin.measure.Application \
+--master yarn-client --queue default \
+griffin-measure.jar \
+env.json config.json local,local
+```
+The first two parameters are the paths of env.json and config.json, the third 
parameter represents the file system type of the two configuration files, 
"local" or "hdfs" are both supported.  
+
+The spark streaming application will be long-time running, you can get the 
results of each mini-batch of data, during the run-time, you can also input 
more data into source and target topics, to check the results of the later 
mini-batches.

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/pom.xml
----------------------------------------------------------------------
diff --git a/measure/pom.xml b/measure/pom.xml
index f80ff83..7be88f5 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -49,6 +49,7 @@ under the License.
     <scalatest.version>3.0.0</scalatest.version>
     <slf4j.version>1.7.21</slf4j.version>
     <log4j.version>1.2.16</log4j.version>
+    <curator.version>2.10.0</curator.version>
     <scalamock.version>3.6.0</scalamock.version>
   </properties>
 
@@ -139,6 +140,13 @@ under the License.
       <version>${log4j.version}</version>
     </dependency>
 
+    <!--curator-->
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+      <version>${curator.version}</version>
+    </dependency>
+
     <!--junit-->
     <dependency>
       <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/resources/config-old.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-old.json 
b/measure/src/main/resources/config-old.json
deleted file mode 100644
index 63dee69..0000000
--- a/measure/src/main/resources/config-old.json
+++ /dev/null
@@ -1,45 +0,0 @@
-{
-  "name": "accu1",
-  "type": "accuracy",
-
-  "source": {
-    "connector": {
-      "type": "hive",
-      "version": "1.2",
-      "config": {
-        "table.name": "users_info_src",
-        "partitions": "dt=20170410, hour=14"
-      }
-    }
-  },
-
-  "target": {
-    "connector": {
-      "type": "hive",
-      "version": "1.2",
-      "config": {
-        "database": "default",
-        "table.name": "users_info_target",
-        "partitions": "dt=20170410, hour=14; dt=20170410, hour=15"
-      }
-    }
-  },
-
-  "evaluateRule": {
-    "sampleRatio": 1,
-    "assertion": {
-      "type": "DSL-griffin",
-      "rules": [
-        {
-          "rule": "@Key ${source}['user_id'] === ${target}['user_id']"
-        },
-        {
-          "rule": "${source}['first_name'] === ${target}['first_name']; 
${source}['last_name'] === ${target}['last_name']; ${source}['address'] === 
${target}['address']"
-        },
-        {
-          "rule": "${source}['email'] === ${target}['email']; 
${source}['phone'] === ${target}['phone']; ${source}['post_code'] === 
${target}['post_code']"
-        }
-      ]
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/resources/config-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-streaming.json 
b/measure/src/main/resources/config-streaming.json
new file mode 100644
index 0000000..1705174
--- /dev/null
+++ b/measure/src/main/resources/config-streaming.json
@@ -0,0 +1,61 @@
+{
+  "name": "accu2",
+  "type": "accuracy",
+
+  "process.type": "steaming",
+
+  "source": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "a.b.c.d:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false",
+      },
+      "topics": "src",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String",
+      "cache": {
+        "type": "df",
+        "config": {
+          "table.name": "source",
+          "info.path": "src",
+          "ready.time.interval": "1m",
+          "ready.time.delay": "1m"
+        }
+      }
+    }
+  },
+
+  "target": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "a.b.c.d:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false",
+      },
+      "topics": "tgt",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String",
+      "cache": {
+        "type": "hive",
+        "version": 1.2,
+        "config": {
+          "database": "default",
+          "table.name": "target_table",
+          "info.path": "tgt"
+        }
+      }
+    }
+  },
+
+  "evaluateRule": {
+    "sampleRatio": 0.2,
+    "rules": "$source.json().seeds[*].json().url = 
$target.json().groups[0].attrsList['name' = 'URL'].values[0] AND 
$source.json().seeds[*].json().metadata.json().tracker.crawlRequestCreateTS = 
$target.json().groups[0].attrsList['name' = 
'CRAWLMETADATA'].values[0].json().tracker.crawlRequestCreateTS WHEN 
$source._timestamp_ + 24h < $target._timestamp_"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/resources/config.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config.json 
b/measure/src/main/resources/config.json
index edd2e6a..ab32b75 100644
--- a/measure/src/main/resources/config.json
+++ b/measure/src/main/resources/config.json
@@ -2,6 +2,8 @@
   "name": "accu1",
   "type": "accuracy",
 
+  "process.type": "batch",
+
   "source": {
     "type": "hive",
     "version": "1.2",

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/resources/env.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/env.json 
b/measure/src/main/resources/env.json
index 57da895..14e3b75 100644
--- a/measure/src/main/resources/env.json
+++ b/measure/src/main/resources/env.json
@@ -2,6 +2,8 @@
   "spark": {
     "log.level": "INFO",
     "checkpoint.dir": "hdfs:///griffin/batch/cp",
+    "batch.interval": "10s",
+    "process.interval": "10m",
     "config": {}
   },
 
@@ -23,6 +25,20 @@
     }
   ],
 
+  "info.cache": [
+    {
+      "type": "zk",
+      "config": {
+        "hosts": "localhost:2181",
+        "namespace": "griffin/infocache",
+        "lock.path": "lock",
+        "mode": "persist",
+        "init.clear": true,
+        "close.clear": false
+      }
+    }
+  ],
+
   "cleaner": {
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/Application.scala 
b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
new file mode 100644
index 0000000..af8c830
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -0,0 +1,133 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure
+
+import org.apache.griffin.measure.algo._
+import org.apache.griffin.measure.algo.batch._
+import org.apache.griffin.measure.algo.streaming._
+import org.apache.griffin.measure.config.params._
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.config.reader._
+import org.apache.griffin.measure.config.validator.AllParamValidator
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.persist.PersistThreadPool
+
+import scala.util.{Failure, Success, Try}
+
+object Application extends Loggable {
+
+  def main(args: Array[String]): Unit = {
+    info(args.toString)
+    if (args.length < 2) {
+      error("Usage: class <env-param> <user-param> [List of String split by 
comma: raw | local | hdfs(default)]")
+      sys.exit(-1)
+    }
+
+    val envParamFile = args(0)
+    val userParamFile = args(1)
+    val (envFsType, userFsType) = if (args.length > 2) {
+      val fsTypes = args(2).trim.split(",")
+      if (fsTypes.length == 1) (fsTypes(0).trim, fsTypes(0).trim)
+      else if (fsTypes.length >= 2) (fsTypes(0).trim, fsTypes(1).trim)
+      else ("hdfs", "hdfs")
+    } else ("hdfs", "hdfs")
+
+    info(envParamFile)
+    info(userParamFile)
+
+    // read param files
+    val envParam = readParamFile[EnvParam](envParamFile, envFsType) match {
+      case Success(p) => p
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-2)
+      }
+    }
+    val userParam = readParamFile[UserParam](userParamFile, userFsType) match {
+      case Success(p) => p
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-2)
+      }
+    }
+    val allParam: AllParam = AllParam(envParam, userParam)
+
+    // validate param files
+    validateParams(allParam) match {
+      case Failure(ex) => {
+        error(ex.getMessage)
+        sys.exit(-3)
+      }
+      case _ => {
+        info("params validation pass")
+      }
+    }
+
+    // choose algorithm
+    val dqType = allParam.userParam.dqType
+    val procType = allParam.userParam.procType
+    val algo: Algo = (dqType, procType) match {
+      case (MeasureType.accuracy(), ProcessType.batch()) => 
BatchAccuracyAlgo(allParam)
+      case (MeasureType.profile(), ProcessType.batch()) => 
BatchProfileAlgo(allParam)
+      case (MeasureType.accuracy(), ProcessType.streaming()) => 
StreamingAccuracyAlgo(allParam)
+//      case (MeasureType.profile(), ProcessType.streaming()) => 
StreamingProfileAlgo(allParam)
+      case _ => {
+        error(s"${dqType} with ${procType} is unsupported dq type!")
+        sys.exit(-4)
+      }
+    }
+
+    // algorithm run
+    algo.run match {
+      case Failure(ex) => {
+        error(s"app error: ${ex.getMessage}")
+
+        procType match {
+          case ProcessType.streaming() => {
+            // streaming need to attempt more times by spark streaming itself
+            throw ex
+          }
+          case _ => {
+            shutdown
+            sys.exit(-5)
+          }
+        }
+      }
+      case _ => {
+        info("app finished and success")
+      }
+    }
+  }
+
+  private def readParamFile[T <: Param](file: String, fsType: String)(implicit 
m : Manifest[T]): Try[T] = {
+    val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+    paramReader.readConfig[T]
+  }
+
+  private def validateParams(allParam: AllParam): Try[Boolean] = {
+    val allParamValidator = AllParamValidator()
+    allParamValidator.validate(allParam)
+  }
+
+  private def shutdown(): Unit = {
+    PersistThreadPool.shutdown
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/algo/AccuracyAlgo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/algo/AccuracyAlgo.scala 
b/measure/src/main/scala/org/apache/griffin/measure/algo/AccuracyAlgo.scala
new file mode 100644
index 0000000..7e0a563
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/algo/AccuracyAlgo.scala
@@ -0,0 +1,24 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo
+
+
+trait AccuracyAlgo extends Algo {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/algo/Algo.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/algo/Algo.scala 
b/measure/src/main/scala/org/apache/griffin/measure/algo/Algo.scala
new file mode 100644
index 0000000..82b71f1
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/algo/Algo.scala
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo
+
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.log.Loggable
+
+import scala.util.Try
+
+trait Algo extends Loggable with Serializable {
+
+  val envParam: EnvParam
+  val userParam: UserParam
+
+  def run(): Try[_]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/algo/MeasureType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/algo/MeasureType.scala 
b/measure/src/main/scala/org/apache/griffin/measure/algo/MeasureType.scala
new file mode 100644
index 0000000..23d4dac
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/algo/MeasureType.scala
@@ -0,0 +1,26 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo
+
+object MeasureType {
+
+  val accuracy = """^(?i)accuracy$""".r
+  val profile = """^(?i)profile$""".r
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/algo/ProcessType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/algo/ProcessType.scala 
b/measure/src/main/scala/org/apache/griffin/measure/algo/ProcessType.scala
new file mode 100644
index 0000000..5a85c7c
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/algo/ProcessType.scala
@@ -0,0 +1,26 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo
+
+object ProcessType {
+
+  val batch = """^(?i)batch$""".r
+  val streaming = """^(?i)streaming$""".r
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/algo/ProfileAlgo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/algo/ProfileAlgo.scala 
b/measure/src/main/scala/org/apache/griffin/measure/algo/ProfileAlgo.scala
new file mode 100644
index 0000000..6ffc87a
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/algo/ProfileAlgo.scala
@@ -0,0 +1,23 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo
+
+trait ProfileAlgo extends Algo {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgo.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgo.scala
new file mode 100644
index 0000000..241f456
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgo.scala
@@ -0,0 +1,190 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo.batch
+
+import java.util.Date
+
+import org.apache.griffin.measure.algo.AccuracyAlgo
+import org.apache.griffin.measure.algo.core.AccuracyCore
+import org.apache.griffin.measure.config.params.AllParam
+import org.apache.griffin.measure.connector._
+import org.apache.griffin.measure.connector.direct.DirectDataConnector
+import org.apache.griffin.measure.persist._
+import org.apache.griffin.measure.result._
+import org.apache.griffin.measure.rule._
+import org.apache.griffin.measure.rule.expr._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.util.{Failure, Success, Try}
+
+// accuracy algorithm for batch mode
+case class BatchAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
+  val envParam = allParam.envParam
+  val userParam = allParam.userParam
+
+  def run(): Try[_] = {
+    Try {
+      val metricName = userParam.name
+
+      val sparkParam = envParam.sparkParam
+
+      val conf = new SparkConf().setAppName(metricName)
+      conf.setAll(sparkParam.config)
+      val sc = new SparkContext(conf)
+      sc.setLogLevel(sparkParam.logLevel)
+      val sqlContext = new HiveContext(sc)
+
+      // start time
+      val startTime = new Date().getTime()
+
+      // get persists to persist measure result
+      val persist: Persist = PersistFactory(envParam.persistParams, 
metricName).getPersists(startTime)
+
+      // get spark application id
+      val applicationId = sc.applicationId
+
+      // persist start id
+      persist.start(applicationId)
+
+      // generate rule from rule param, generate rule analyzer
+      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+      val rule: StatementExpr = ruleFactory.generateRule()
+      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+
+      // const expr value map
+      val constExprValueMap = ExprValueUtil.genExprValueMaps(None, 
ruleAnalyzer.constCacheExprs, Map[String, Any]())
+      val finalConstExprValueMap = 
ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, 
constExprValueMap)
+      val finalConstMap = finalConstExprValueMap.headOption match {
+        case Some(m) => m
+        case _ => Map[String, Any]()
+      }
+
+      // data connector
+      val sourceDataConnector: DirectDataConnector =
+        DataConnectorFactory.getDirectDataConnector(sqlContext, null, 
userParam.sourceParam,
+          ruleAnalyzer.sourceRuleExprs, finalConstMap
+        ) match {
+          case Success(cntr) => {
+            if (cntr.available) cntr
+            else throw new Exception("source data connection error!")
+          }
+          case Failure(ex) => throw ex
+        }
+      val targetDataConnector: DirectDataConnector =
+        DataConnectorFactory.getDirectDataConnector(sqlContext, null, 
userParam.targetParam,
+          ruleAnalyzer.targetRuleExprs, finalConstMap
+        ) match {
+          case Success(cntr) => {
+            if (cntr.available) cntr
+            else throw new Exception("target data connection error!")
+          }
+          case Failure(ex) => throw ex
+        }
+
+      // get metadata
+//      val sourceMetaData: Iterable[(String, String)] = 
sourceDataConnector.metaData() match {
+//        case Success(md) => md
+//        case _ => throw new Exception("source metadata error!")
+//      }
+//      val targetMetaData: Iterable[(String, String)] = 
targetDataConnector.metaData() match {
+//        case Success(md) => md
+//        case _ => throw new Exception("target metadata error!")
+//      }
+
+      // get data
+      val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = 
sourceDataConnector.data() match {
+        case Success(dt) => dt
+        case Failure(ex) => throw ex
+      }
+      val targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = 
targetDataConnector.data() match {
+        case Success(dt) => dt
+        case Failure(ex) => throw ex
+      }
+
+      // accuracy algorithm
+      val (accuResult, missingRdd, matchedRdd) = accuracy(sourceData, 
targetData, ruleAnalyzer)
+
+      // end time
+      val endTime = new Date().getTime
+      persist.log(endTime, s"calculation using time: ${endTime - startTime} 
ms")
+
+      // persist result
+      persist.result(endTime, accuResult)
+      val missingRecords = missingRdd.map(record2String(_, 
ruleAnalyzer.sourceRuleExprs.persistExprs, 
ruleAnalyzer.targetRuleExprs.persistExprs))
+//      persist.missRecords(missingRecords)
+      persist.records(missingRecords, PersistType.MISS)
+
+      // persist end time
+      val persistEndTime = new Date().getTime
+      persist.log(persistEndTime, s"persist using time: ${persistEndTime - 
endTime} ms")
+
+      // finish
+      persist.finish()
+
+      // context stop
+      sc.stop
+
+    }
+  }
+
+  def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, 
Any]) = {
+    (data, Map[String, Any]())
+  }
+
+  // calculate accuracy between source data and target data
+  def accuracy(sourceData: RDD[(Product, (Map[String, Any], Map[String, 
Any]))],
+               targetData: RDD[(Product, (Map[String, Any], Map[String, 
Any]))],
+               ruleAnalyzer: RuleAnalyzer) = {
+    // 1. cogroup
+    val allKvs = sourceData.cogroup(targetData)
+
+    // 2. accuracy calculation
+    val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, 
ruleAnalyzer)
+
+    (accuResult, missingRdd, matchedRdd)
+  }
+
+  // convert data into a string
+  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), 
sourcePersist: Iterable[Expr], targetPersist: Iterable[Expr]): String = {
+    val (key, (data, info)) = rec
+    val persistData = getPersistMap(data, sourcePersist)
+    val persistInfo = info.mapValues { value =>
+      value match {
+        case vd: Map[String, Any] => getPersistMap(vd, targetPersist)
+        case v => v
+      }
+    }.map(identity)
+    s"${persistData} [${persistInfo}]"
+  }
+
+  // get the expr value map of the persist expressions
+  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): 
Map[String, Any] = {
+    val persistMap = persist.map(e => (e._id, e.desc)).toMap
+    data.flatMap { pair =>
+      val (k, v) = pair
+      persistMap.get(k) match {
+        case Some(d) => Some((d -> v))
+        case _ => None
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgo.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgo.scala
new file mode 100644
index 0000000..163a0b7
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgo.scala
@@ -0,0 +1,162 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo.batch
+
+import java.util.Date
+
+import org.apache.griffin.measure.algo.ProfileAlgo
+import org.apache.griffin.measure.algo.core.ProfileCore
+import org.apache.griffin.measure.config.params._
+import org.apache.griffin.measure.connector._
+import org.apache.griffin.measure.connector.direct.DirectDataConnector
+import org.apache.griffin.measure.persist.{Persist, PersistFactory, 
PersistType}
+import org.apache.griffin.measure.result._
+import org.apache.griffin.measure.rule.expr._
+import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, 
RuleFactory}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.util.{Failure, Success, Try}
+
+// profile algorithm for batch mode
+case class BatchProfileAlgo(allParam: AllParam) extends ProfileAlgo {
+  val envParam = allParam.envParam
+  val userParam = allParam.userParam
+
+  def run(): Try[_] = {
+    Try {
+      val metricName = userParam.name
+
+      val sparkParam = envParam.sparkParam
+
+      val conf = new SparkConf().setAppName(metricName)
+      conf.setAll(sparkParam.config)
+      val sc = new SparkContext(conf)
+      sc.setLogLevel(sparkParam.logLevel)
+      val sqlContext = new HiveContext(sc)
+
+      // start time
+      val startTime = new Date().getTime()
+
+      // get persists to persist measure result
+      val persist: Persist = PersistFactory(envParam.persistParams, 
metricName).getPersists(startTime)
+
+      // get spark application id
+      val applicationId = sc.applicationId
+
+      // persist start id
+      persist.start(applicationId)
+
+      // generate rule from rule param, generate rule analyzer
+      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+      val rule: StatementExpr = ruleFactory.generateRule()
+      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+
+      // const expr value map
+      val constExprValueMap = ExprValueUtil.genExprValueMaps(None, 
ruleAnalyzer.constCacheExprs, Map[String, Any]())
+      val finalConstExprValueMap = 
ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, 
constExprValueMap)
+      val finalConstMap = finalConstExprValueMap.headOption match {
+        case Some(m) => m
+        case _ => Map[String, Any]()
+      }
+
+      // data connector
+      val sourceDataConnector: DirectDataConnector =
+      DataConnectorFactory.getDirectDataConnector(sqlContext, null, 
userParam.sourceParam,
+        ruleAnalyzer.sourceRuleExprs, finalConstMap
+      ) match {
+        case Success(cntr) => {
+          if (cntr.available) cntr
+          else throw new Exception("source data connection error!")
+        }
+        case Failure(ex) => throw ex
+      }
+
+      // get metadata
+      //      val sourceMetaData: Iterable[(String, String)] = 
sourceDataConnector.metaData() match {
+      //        case Success(md) => md
+      //        case _ => throw new Exception("source metadata error!")
+      //      }
+
+      // get data
+      val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = 
sourceDataConnector.data() match {
+        case Success(dt) => dt
+        case Failure(ex) => throw ex
+      }
+
+      // profile algorithm
+      val (profileResult, missingRdd, matchedRdd) = profile(sourceData, 
ruleAnalyzer)
+
+      // end time
+      val endTime = new Date().getTime
+      persist.log(endTime, s"calculation using time: ${endTime - startTime} 
ms")
+
+      // persist result
+      persist.result(endTime, profileResult)
+      val matchedRecords = matchedRdd.map(record2String(_, 
ruleAnalyzer.sourceRuleExprs.persistExprs))
+//      persist.matchRecords(matchedRecords)
+      persist.records(matchedRecords, PersistType.MATCH)
+
+      // persist end time
+      val persistEndTime = new Date().getTime
+      persist.log(persistEndTime, s"persist using time: ${persistEndTime - 
endTime} ms")
+
+      // finish
+      persist.finish()
+
+      // context stop
+      sc.stop
+    }
+  }
+
+  def wrapInitData(data: Map[String, Any]): (Map[String, Any], Map[String, 
Any]) = {
+    (data, Map[String, Any]())
+  }
+
+  // calculate profile from source data
+  def profile(sourceData: RDD[(Product, (Map[String, Any], Map[String, 
Any]))], ruleAnalyzer: RuleAnalyzer
+              ) = {
+    // 1. profile calculation
+    val (profileResult, missingRdd, matchedRdd) = 
ProfileCore.profile(sourceData, ruleAnalyzer)
+
+    (profileResult, missingRdd, matchedRdd)
+  }
+
+  // convert data into a string
+  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), 
sourcePersist: Iterable[Expr]): String = {
+    val (key, (data, info)) = rec
+    val persistData = getPersistMap(data, sourcePersist)
+    val persistInfo = info
+    if (persistInfo.size > 0) s"${persistData} [${persistInfo}]" else 
s"${persistData}"
+  }
+
+  // get the expr value map of the persist expressions
+  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): 
Map[String, Any] = {
+    val persistMap = persist.map(e => (e._id, e.desc)).toMap
+    data.flatMap { pair =>
+      val (k, v) = pair
+      persistMap.get(k) match {
+        case Some(d) => Some((d -> v))
+        case _ => None
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/algo/core/AccuracyCore.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/algo/core/AccuracyCore.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/algo/core/AccuracyCore.scala
new file mode 100644
index 0000000..4ec6505
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/algo/core/AccuracyCore.scala
@@ -0,0 +1,103 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo.core
+
+import org.apache.griffin.measure.rule.RuleAnalyzer
+import org.apache.griffin.measure.result._
+import org.apache.spark.rdd.RDD
+
+
+object AccuracyCore {
+
+  type V = Map[String, Any]
+  type T = Map[String, Any]
+
+  // allKvs: rdd of (key, (List[(sourceData, sourceInfo)], List[(targetData, 
targetInfo)]))
+  // output: accuracy result, missing source data rdd, matched source data rdd
+  def accuracy(allKvs: RDD[(Product, (Iterable[(V, T)], Iterable[(V, T)]))], 
ruleAnalyzer: RuleAnalyzer
+              ): (AccuracyResult, RDD[(Product, (V, T))], RDD[(Product, (V, 
T))]) = {
+    val result: RDD[(Long, Long, List[(Product, (V, T))], List[(Product, (V, 
T))])] = allKvs.map { kv =>
+      val (key, (sourceDatas, targetDatas)) = kv
+
+      // result: (missCount, matchCount, missDataList, matchDataList)
+      val rslt = sourceDatas.foldLeft((0L, 0L, List[(Product, (V, T))](), 
List[(Product, (V, T))]())) { (sr, sourcePair) =>
+        val matchResult = if (targetDatas.isEmpty) {
+          (false, Map[String, Any](MismatchInfo.wrap("no target")))
+        } else {
+          targetDatas.foldLeft((false, Map[String, Any]())) { (tr, targetPair) 
=>
+            if (tr._1) tr
+            else matchData(sourcePair, targetPair, ruleAnalyzer)
+          }
+        }
+
+        if (matchResult._1) {
+          val matchItem = (key, sourcePair)
+          (sr._1, sr._2 + 1, sr._3, sr._4 :+ matchItem)
+        } else {
+          val missItem = (key, (sourcePair._1, sourcePair._2 ++ 
matchResult._2))
+          (sr._1 + 1, sr._2, sr._3 :+ missItem, sr._4)
+        }
+      }
+
+      rslt
+    }
+
+    val missRdd = result.flatMap(_._3)
+    val matchRdd = result.flatMap(_._4)
+
+    def seq(cnt: (Long, Long), rcd: (Long, Long, Any, Any)): (Long, Long) = {
+      (cnt._1 + rcd._1, cnt._2 + rcd._2)
+    }
+    def comb(c1: (Long, Long), c2: (Long, Long)): (Long, Long) = {
+      (c1._1 + c2._1, c1._2 + c2._2)
+    }
+    val countPair = result.aggregate((0L, 0L))(seq, comb)
+
+    (AccuracyResult(countPair._1, (countPair._1 + countPair._2)), missRdd, 
matchRdd)
+  }
+
+  // try to match source and target data, return true if matched, false if 
unmatched, also with some matching info
+  private def matchData(source: (V, T), target: (V, T), ruleAnalyzer: 
RuleAnalyzer): (Boolean, T) = {
+
+    // 1. merge source and target cached data
+    val mergedExprValueMap: Map[String, Any] = mergeExprValueMap(source, 
target)
+
+    // 2. check valid
+    if (ruleAnalyzer.rule.valid(mergedExprValueMap)) {
+      // 3. substitute the cached data into statement, get the statement value
+      val matched = ruleAnalyzer.rule.calculate(mergedExprValueMap) match {
+        case Some(b: Boolean) => b
+        case _ => false
+      }
+      // currently we can not get the mismatch reason, we need to add such 
information to figure out how it mismatches
+      if (matched) (matched, Map[String, Any]())
+      else (matched, Map[String, Any](MismatchInfo.wrap("not matched")))
+    } else {
+      (false, Map[String, Any](MismatchInfo.wrap("invalid to compare")))
+    }
+
+  }
+
+//  private def when
+
+  private def mergeExprValueMap(source: (V, T), target: (V, T)): Map[String, 
Any] = {
+    source._1 ++ target._1
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/algo/core/ProfileCore.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/algo/core/ProfileCore.scala 
b/measure/src/main/scala/org/apache/griffin/measure/algo/core/ProfileCore.scala
new file mode 100644
index 0000000..2987f2f
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/algo/core/ProfileCore.scala
@@ -0,0 +1,73 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo.core
+
+import org.apache.griffin.measure.rule.RuleAnalyzer
+import org.apache.griffin.measure.result._
+import org.apache.spark.rdd.RDD
+
+
+object ProfileCore {
+
+  type V = Map[String, Any]
+  type T = Map[String, Any]
+
+  // dataRdd: rdd of (key, (sourceData, sourceInfo))
+  // output: accuracy result, missing source data rdd, matched source data rdd
+  def profile(dataRdd: RDD[(Product, (V, T))], ruleAnalyzer: RuleAnalyzer
+              ): (ProfileResult, RDD[(Product, (V, T))], RDD[(Product, (V, 
T))]) = {
+
+    val resultRdd: RDD[((Product, (V, T)), Boolean)] = dataRdd.map { kv =>
+      val (key, (data, info)) = kv
+      val (matched, missInfo) = matchData((data, info), ruleAnalyzer)
+      ((key, (data, info ++ missInfo)), matched)
+    }
+
+    val totalCount = resultRdd.count
+    val matchRdd = resultRdd.filter(_._2).map(_._1)
+    val matchCount = matchRdd.count
+    val missRdd = resultRdd.filter(!_._2).map(_._1)
+    val missCount = missRdd.count
+
+    (ProfileResult(matchCount, totalCount), missRdd, matchRdd)
+
+  }
+
+  // try to match data as rule, return true if matched, false if unmatched
+  private def matchData(dataPair: (V, T), ruleAnalyzer: RuleAnalyzer): 
(Boolean, T) = {
+
+    val data: Map[String, Any] = dataPair._1
+
+    // 1. check valid
+    if (ruleAnalyzer.rule.valid(data)) {
+      // 2. substitute the cached data into statement, get the statement value
+      val matched = ruleAnalyzer.rule.calculate(data) match {
+        case Some(b: Boolean) => b
+        case _ => false
+      }
+      // currently we can not get the mismatch reason, we need to add such 
information to figure out how it mismatches
+      if (matched) (matched, Map[String, Any]())
+      else (matched, Map[String, Any](MismatchInfo.wrap("not matched")))
+    } else {
+      (false, Map[String, Any](MismatchInfo.wrap("invalid to compare")))
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgo.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgo.scala
new file mode 100644
index 0000000..bdac64e
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgo.scala
@@ -0,0 +1,358 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo.streaming
+
+import java.util.Date
+import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
+
+import org.apache.griffin.measure.algo.AccuracyAlgo
+import org.apache.griffin.measure.algo.core.AccuracyCore
+import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
+import org.apache.griffin.measure.cache.result.CacheResultProcesser
+import org.apache.griffin.measure.config.params.AllParam
+import org.apache.griffin.measure.connector._
+import org.apache.griffin.measure.connector.direct.DirectDataConnector
+import org.apache.griffin.measure.persist.{Persist, PersistFactory, 
PersistType}
+import org.apache.griffin.measure.result.{AccuracyResult, MismatchInfo, 
TimeStampInfo}
+import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, 
RuleFactory}
+import org.apache.griffin.measure.rule.expr._
+import org.apache.griffin.measure.utils.TimeUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.util.{Failure, Success, Try}
+
+
+case class StreamingAccuracyAlgo(allParam: AllParam) extends AccuracyAlgo {
+  val envParam = allParam.envParam
+  val userParam = allParam.userParam
+
+  def run(): Try[_] = {
+    Try {
+      val metricName = userParam.name
+
+      val sparkParam = envParam.sparkParam
+
+      val conf = new SparkConf().setAppName(metricName)
+      conf.setAll(sparkParam.config)
+      val sc = new SparkContext(conf)
+      sc.setLogLevel(sparkParam.logLevel)
+      val sqlContext = new HiveContext(sc)
+//      val sqlContext = new SQLContext(sc)
+
+      val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) 
match {
+        case Some(interval) => Milliseconds(interval)
+        case _ => throw new Exception("invalid batch interval")
+      }
+      val ssc = new StreamingContext(sc, batchInterval)
+      ssc.checkpoint(sparkParam.cpDir)
+
+      // init info cache instance
+      InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName)
+      InfoCacheInstance.init
+
+      // start time
+      val startTime = new Date().getTime()
+
+      val persistFactory = PersistFactory(envParam.persistParams, metricName)
+
+      // get persists to persist measure result
+      val appPersist: Persist = persistFactory.getPersists(startTime)
+
+      // get spark application id
+      val applicationId = sc.applicationId
+
+      // persist start id
+      appPersist.start(applicationId)
+
+      // generate rule from rule param, generate rule analyzer
+      val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
+      val rule: StatementExpr = ruleFactory.generateRule()
+      val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
+
+      // const expr value map
+      val constExprValueMap = ExprValueUtil.genExprValueMaps(None, 
ruleAnalyzer.constCacheExprs, Map[String, Any]())
+      val finalConstExprValueMap = 
ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, 
constExprValueMap)
+      val finalConstMap = finalConstExprValueMap.headOption match {
+        case Some(m) => m
+        case _ => Map[String, Any]()
+      }
+
+      // data connector
+      val sourceDataConnector: DirectDataConnector =
+      DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, 
userParam.sourceParam,
+        ruleAnalyzer.sourceRuleExprs, finalConstMap
+      ) match {
+        case Success(cntr) => {
+          if (cntr.available) cntr
+          else throw new Exception("source data connection error!")
+        }
+        case Failure(ex) => throw ex
+      }
+      val targetDataConnector: DirectDataConnector =
+        DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, 
userParam.targetParam,
+          ruleAnalyzer.targetRuleExprs, finalConstMap
+        ) match {
+          case Success(cntr) => {
+            if (cntr.available) cntr
+            else throw new Exception("target data connection error!")
+          }
+          case Failure(ex) => throw ex
+        }
+
+      val cacheResultProcesser = CacheResultProcesser()
+
+      // init data stream
+      sourceDataConnector.init()
+      targetDataConnector.init()
+
+      val streamingAccuracyProcess = StreamingAccuracyProcess(
+        sourceDataConnector, targetDataConnector,
+        ruleAnalyzer, cacheResultProcesser, persistFactory, appPersist)
+
+      // process thread
+//      case class Process() extends Runnable {
+//        val lock = InfoCacheInstance.genLock("process")
+//        def run(): Unit = {
+//          val updateTime = new Date().getTime
+//          val locked = lock.lock(5, TimeUnit.SECONDS)
+//          if (locked) {
+//            try {
+//              val st = new Date().getTime
+//
+//              TimeInfoCache.startTimeInfoCache
+//
+//              // get data
+//              val sourceData = sourceDataConnector.data match {
+//                case Success(dt) => dt
+//                case Failure(ex) => throw ex
+//              }
+//              val targetData = targetDataConnector.data match {
+//                case Success(dt) => dt
+//                case Failure(ex) => throw ex
+//              }
+//
+//              sourceData.cache
+//              targetData.cache
+//
+//              println(s"sourceData.count: ${sourceData.count}")
+//              println(s"targetData.count: ${targetData.count}")
+//
+//              // accuracy algorithm
+//              val (accuResult, missingRdd, matchedRdd) = 
accuracy(sourceData, targetData, ruleAnalyzer)
+//              println(s"accuResult: ${accuResult}")
+//
+//              val ct = new Date().getTime
+//              appPersist.log(ct, s"calculation using time: ${ct - st} ms")
+//
+//              sourceData.unpersist()
+//              targetData.unpersist()
+//
+//              // result of every group
+//              val matchedGroups = reorgByTimeGroup(matchedRdd)
+//              val matchedGroupCount = matchedGroups.count
+//              println(s"===== matchedGroupCount: ${matchedGroupCount} =====")
+//
+//              // get missing results
+//              val missingGroups = reorgByTimeGroup(missingRdd)
+//              val missingGroupCount = missingGroups.count
+//              println(s"===== missingGroupCount: ${missingGroupCount} =====")
+//
+//              val groups = matchedGroups.cogroup(missingGroups)
+//              val groupCount = groups.count
+//              println(s"===== groupCount: ${groupCount} =====")
+//
+//              val updateResults = groups.flatMap { group =>
+//                val (t, (matchData, missData)) = group
+//
+//                val matchSize = matchData.size
+//                val missSize = missData.size
+//                val res = AccuracyResult(missSize, matchSize + missSize)
+//
+//                val updatedCacheResultOpt = 
cacheResultProcesser.genUpdateCacheResult(t, updateTime, res)
+//
+//                updatedCacheResultOpt.flatMap { updatedCacheResult =>
+//                  Some((updatedCacheResult, (t, missData)))
+//                }
+//              }
+//
+//              updateResults.cache
+//
+//              val updateResultsPart =  updateResults.map(_._1)
+//              val updateDataPart =  updateResults.map(_._2)
+//
+//              val updateResultsArray = updateResultsPart.collect()
+//
+//              // update results cache (in driver)
+//              // collect action is traversable once action, it will make rdd 
updateResults empty
+//              updateResultsArray.foreach { updateResult =>
+//                println(s"update result: ${updateResult}")
+//                cacheResultProcesser.update(updateResult)
+//                // persist result
+//                val persist: Persist = 
persistFactory.getPersists(updateResult.timeGroup)
+//                persist.result(updateTime, updateResult.result)
+//              }
+//
+//              // record missing data and update old data (in executor)
+//              updateDataPart.foreach { grp =>
+//                val (t, datas) = grp
+//                val persist: Persist = persistFactory.getPersists(t)
+//                // persist missing data
+//                val missStrings = datas.map { row =>
+//                  val (_, (value, info)) = row
+//                  s"${value} [${info.getOrElse(MismatchInfo.key, 
"unknown")}]"
+//                }
+//                persist.records(missStrings, PersistType.MISS)
+//                // data connector update old data
+//                val dumpDatas = datas.map { r =>
+//                  val (_, (v, i)) = r
+//                  v ++ i
+//                }
+//
+//                println(t)
+//                dumpDatas.foreach(println)
+//
+//                sourceDataConnector.updateOldData(t, dumpDatas)
+//                targetDataConnector.updateOldData(t, dumpDatas)    // not 
correct
+//              }
+//
+//              updateResults.unpersist()
+//
+//              // dump missing rdd   (this part not need for future version, 
only for current df cache data version)
+//              val dumpRdd: RDD[Map[String, Any]] = missingRdd.map { r =>
+//                val (_, (v, i)) = r
+//                v ++ i
+//              }
+//              sourceDataConnector.updateAllOldData(dumpRdd)
+//              targetDataConnector.updateAllOldData(dumpRdd)    // not correct
+//
+//              TimeInfoCache.endTimeInfoCache
+//
+//              val et = new Date().getTime
+//              appPersist.log(et, s"persist using time: ${et - ct} ms")
+//
+//            } catch {
+//              case e: Throwable => error(s"process error: ${e.getMessage}")
+//            } finally {
+//              lock.unlock()
+//            }
+//          }
+//        }
+//      }
+
+      val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) 
match {
+        case Some(interval) => interval
+        case _ => throw new Exception("invalid batch interval")
+      }
+      val process = TimingProcess(processInterval, streamingAccuracyProcess)
+
+      // clean thread
+//    case class Clean() extends Runnable {
+//      val lock = InfoCacheInstance.genLock("clean")
+//      def run(): Unit = {
+//        val locked = lock.lock(5, TimeUnit.SECONDS)
+//        if (locked) {
+//          try {
+//            sourceDataConnector.cleanData
+//            targetDataConnector.cleanData
+//          } finally {
+//            lock.unlock()
+//          }
+//        }
+//      }
+//    }
+//    val cleanInterval = TimeUtil.milliseconds(cleanerParam.cleanInterval) 
match {
+//      case Some(interval) => interval
+//      case _ => throw new Exception("invalid batch interval")
+//    }
+//    val clean = TimingProcess(cleanInterval, Clean())
+
+      process.startup()
+//    clean.startup()
+
+      ssc.start()
+      ssc.awaitTermination()
+      ssc.stop(stopSparkContext=true, stopGracefully=true)
+
+      // context stop
+      sc.stop
+
+      InfoCacheInstance.close
+
+      appPersist.finish()
+
+      process.shutdown()
+//    clean.shutdown()
+    }
+  }
+
+  // calculate accuracy between source data and target data
+//  def accuracy(sourceData: RDD[(Product, (Map[String, Any], Map[String, 
Any]))],
+//               targetData: RDD[(Product, (Map[String, Any], Map[String, 
Any]))],
+//               ruleAnalyzer: RuleAnalyzer) = {
+//    // 1. cogroup
+//    val allKvs = sourceData.cogroup(targetData)
+//
+//    // 2. accuracy calculation
+//    val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, 
ruleAnalyzer)
+//
+//    (accuResult, missingRdd, matchedRdd)
+//  }
+
+//  // convert data into a string
+//  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), 
sourcePersist: Iterable[Expr], targetPersist: Iterable[Expr]): String = {
+//    val (key, (data, info)) = rec
+//    val persistData = getPersistMap(data, sourcePersist)
+//    val persistInfo = info.mapValues { value =>
+//      value match {
+//        case vd: Map[String, Any] => getPersistMap(vd, targetPersist)
+//        case v => v
+//      }
+//    }.map(identity)
+//    s"${persistData} [${persistInfo}]"
+//  }
+//
+//  // get the expr value map of the persist expressions
+//  private def getPersistMap(data: Map[String, Any], persist: 
Iterable[Expr]): Map[String, Any] = {
+//    val persistMap = persist.map(e => (e._id, e.desc)).toMap
+//    data.flatMap { pair =>
+//      val (k, v) = pair
+//      persistMap.get(k) match {
+//        case Some(d) => Some((d -> v))
+//        case _ => None
+//      }
+//    }
+//  }
+
+//  def reorgByTimeGroup(rdd: RDD[(Product, (Map[String, Any], Map[String, 
Any]))]
+//                      ): RDD[(Long, (Product, (Map[String, Any], Map[String, 
Any])))] = {
+//    rdd.flatMap { row =>
+//      val (key, (value, info)) = row
+//      val b: Option[(Long, (Product, (Map[String, Any], Map[String, Any])))] 
= info.get(TimeStampInfo.key) match {
+//        case Some(t: Long) => Some((t, row))
+//        case _ => None
+//      }
+//      b
+//    }
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyProcess.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyProcess.scala
new file mode 100644
index 0000000..be1f846
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyProcess.scala
@@ -0,0 +1,234 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo.streaming
+
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.algo.core.AccuracyCore
+import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
+import org.apache.griffin.measure.cache.result.CacheResultProcesser
+import org.apache.griffin.measure.connector.direct.DirectDataConnector
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.persist._
+import org.apache.griffin.measure.result.{AccuracyResult, MismatchInfo, 
TimeStampInfo}
+import org.apache.griffin.measure.rule._
+import org.apache.griffin.measure.rule.expr._
+import org.apache.spark.rdd.RDD
+
+import scala.util.{Failure, Success}
+
+case class StreamingAccuracyProcess(sourceDataConnector: DirectDataConnector,
+                                    targetDataConnector: DirectDataConnector,
+                                    ruleAnalyzer: RuleAnalyzer,
+                                    cacheResultProcesser: CacheResultProcesser,
+                                    persistFactory: PersistFactory,
+                                    appPersist: Persist
+                                   ) extends Runnable with Loggable {
+
+  val lock = InfoCacheInstance.genLock("process")
+
+  def run(): Unit = {
+//    println(s"cache count: ${cacheResultProcesser.cacheGroup.size}")
+    val updateTimeDate = new Date()
+    val updateTime = updateTimeDate.getTime
+    println(s"===== [${updateTimeDate}] process begins =====")
+    val locked = lock.lock(5, TimeUnit.SECONDS)
+    if (locked) {
+      try {
+        val st = new Date().getTime
+
+        TimeInfoCache.startTimeInfoCache
+
+        // get data
+        val sourceData = sourceDataConnector.data match {
+          case Success(dt) => dt
+          case Failure(ex) => throw ex
+        }
+        val targetData = targetDataConnector.data match {
+          case Success(dt) => dt
+          case Failure(ex) => throw ex
+        }
+
+        sourceData.cache
+        targetData.cache
+
+        println(s"sourceData.count: ${sourceData.count}")
+        println(s"targetData.count: ${targetData.count}")
+
+        // accuracy algorithm
+        val (accuResult, missingRdd, matchedRdd) = accuracy(sourceData, 
targetData, ruleAnalyzer)
+//        println(s"accuResult: ${accuResult}")
+
+        val ct = new Date().getTime
+        appPersist.log(ct, s"calculation using time: ${ct - st} ms")
+
+        sourceData.unpersist()
+        targetData.unpersist()
+
+        // result of every group
+        val matchedGroups = reorgByTimeGroup(matchedRdd)
+//        val matchedGroupCount = matchedGroups.count
+//        println(s"===== matchedGroupCount: ${matchedGroupCount} =====")
+
+        // get missing results
+        val missingGroups = reorgByTimeGroup(missingRdd)
+//        val missingGroupCount = missingGroups.count
+//        println(s"===== missingGroupCount: ${missingGroupCount} =====")
+
+        val groups = matchedGroups.cogroup(missingGroups)
+//        val groupCount = groups.count
+//        println(s"===== groupCount: ${groupCount} =====")
+
+        val updateResults = groups.flatMap { group =>
+          val (t, (matchData, missData)) = group
+
+          val matchSize = matchData.size
+          val missSize = missData.size
+          val res = AccuracyResult(missSize, matchSize + missSize)
+
+          val updatedCacheResultOpt = 
cacheResultProcesser.genUpdateCacheResult(t, updateTime, res)
+
+          updatedCacheResultOpt.flatMap { updatedCacheResult =>
+            Some((updatedCacheResult, (t, missData)))
+          }
+        }
+
+        updateResults.cache
+
+        val updateResultsPart =  updateResults.map(_._1)
+        val updateDataPart =  updateResults.map(_._2)
+
+        val updateResultsArray = updateResultsPart.collect()
+
+        // update results cache (in driver)
+        // collect action is traversable once action, it will make rdd 
updateResults empty
+        updateResultsArray.foreach { updateResult =>
+//          println(s"update result: ${updateResult}")
+          cacheResultProcesser.update(updateResult)
+          // persist result
+          val persist: Persist = 
persistFactory.getPersists(updateResult.timeGroup)
+          persist.result(updateTime, updateResult.result)
+        }
+
+        // record missing data and dump old data (in executor)
+        updateDataPart.foreach { grp =>
+          val (t, datas) = grp
+          val persist: Persist = persistFactory.getPersists(t)
+          // persist missing data
+          val missStrings = datas.map { row =>
+            record2String(row, ruleAnalyzer.sourceRuleExprs.persistExprs, 
ruleAnalyzer.targetRuleExprs.persistExprs)
+          }
+          persist.records(missStrings, PersistType.MISS)
+
+          // data connector update old data
+          val dumpDatas = datas.map { r =>
+            val (_, (v, i)) = r
+            v ++ i
+          }
+          sourceDataConnector.updateOldData(t, dumpDatas)
+//          targetDataConnector.updateOldData(t, dumpDatas)    // not correct
+        }
+
+        updateResults.unpersist()
+
+        TimeInfoCache.endTimeInfoCache
+
+        // clean old data
+        cleanData()
+
+        val et = new Date().getTime
+        appPersist.log(et, s"persist using time: ${et - ct} ms")
+
+      } catch {
+        case e: Throwable => error(s"process error: ${e.getMessage}")
+      } finally {
+        lock.unlock()
+      }
+    } else {
+      println(s"===== [${updateTimeDate}] process ignores =====")
+    }
+    val endTime = new Date().getTime
+    println(s"===== [${updateTimeDate}] process ends, using ${endTime - 
updateTime} ms =====")
+  }
+
+  // clean old data and old result cache
+  def cleanData(): Unit = {
+    try {
+      sourceDataConnector.cleanOldData
+      targetDataConnector.cleanOldData
+
+      val cleanTime = TimeInfoCache.getCleanTime
+      cacheResultProcesser.refresh(cleanTime)
+    } catch {
+      case e: Throwable => error(s"clean data error: ${e.getMessage}")
+    }
+  }
+
+  // calculate accuracy between source data and target data
+  private def accuracy(sourceData: RDD[(Product, (Map[String, Any], 
Map[String, Any]))],
+               targetData: RDD[(Product, (Map[String, Any], Map[String, 
Any]))],
+               ruleAnalyzer: RuleAnalyzer) = {
+    // 1. cogroup
+    val allKvs = sourceData.cogroup(targetData)
+
+    // 2. accuracy calculation
+    val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, 
ruleAnalyzer)
+
+    (accuResult, missingRdd, matchedRdd)
+  }
+
+  private def reorgByTimeGroup(rdd: RDD[(Product, (Map[String, Any], 
Map[String, Any]))]
+                      ): RDD[(Long, (Product, (Map[String, Any], Map[String, 
Any])))] = {
+    rdd.flatMap { row =>
+      val (key, (value, info)) = row
+      val b: Option[(Long, (Product, (Map[String, Any], Map[String, Any])))] = 
info.get(TimeStampInfo.key) match {
+        case Some(t: Long) => Some((t, row))
+        case _ => None
+      }
+      b
+    }
+  }
+
+  // convert data into a string
+  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), 
dataPersist: Iterable[Expr], infoPersist: Iterable[Expr]): String = {
+    val (key, (data, info)) = rec
+    val persistData = getPersistMap(data, dataPersist)
+    val persistInfo = info.mapValues { value =>
+      value match {
+        case vd: Map[String, Any] => getPersistMap(vd, infoPersist)
+        case v => v
+      }
+    }.map(identity)
+    s"${persistData} [${persistInfo}]"
+  }
+
+  // get the expr value map of the persist expressions
+  private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): 
Map[String, Any] = {
+    val persistMap = persist.map(e => (e._id, e.desc)).toMap
+    data.flatMap { pair =>
+      val (k, v) = pair
+      persistMap.get(k) match {
+        case Some(d) => Some((d -> v))
+        case _ => None
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/TimingProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/TimingProcess.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/TimingProcess.scala
new file mode 100644
index 0000000..e5bd7de
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/algo/streaming/TimingProcess.scala
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.algo.streaming
+
+import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
+import java.util.{Timer, TimerTask}
+
+case class TimingProcess(interval: Long, runnable: Runnable) {
+
+  val pool: ThreadPoolExecutor = 
Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
+
+  val timer = new Timer("process", true)
+
+  val timerTask = new TimerTask() {
+    override def run(): Unit = {
+      pool.submit(runnable)
+    }
+  }
+
+  def startup(): Unit = {
+    timer.schedule(timerTask, interval, interval)
+  }
+
+  def shutdown(): Unit = {
+    timer.cancel()
+    pool.shutdown()
+    pool.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+}


Reply via email to