http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/griffin-doc/measure/dsl-guide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/dsl-guide.md b/griffin-doc/measure/dsl-guide.md
new file mode 100644
index 0000000..e7f8569
--- /dev/null
+++ b/griffin-doc/measure/dsl-guide.md
@@ -0,0 +1,181 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Apache Griffin DSL Guide
+Griffin DSL is designed for DQ measurement, as a SQL-like language, trying to 
describe the DQ domain request.
+
+## Griffin DSL Syntax Description
+Griffin DSL is SQL-like, case insensitive, and easy to learn.
+
+### Supporting process
+- logical operation: not, and, or, in, between, like, is null, is nan, =, !=, 
<=, >=, <, >
+- mathematical operation: +, -, *, /, %
+- sql statement: as, where, group by, having, order by, limit
+
+
+### Keywords
+- `null, nan, true, false`
+- `not, and, or`
+- `in, between, like, is`
+- `select, from, as, where, group, by, having, order, desc, asc, limit`
+
+### Operators
+- `!, &&, ||, =, !=, <, >, <=, >=, <>`
+- `+, -, *, /, %`
+- `(, )`
+- `., [, ]`
+
+### Literals
+- **string**: any string surrounded with a pair of " or ', with escape 
charactor \ if any request.  
+       e.g. `"test"`, `'string 1'`, `"hello \" world \" "`
+- **number**: double or integer number.  
+       e.g. `123`, `33.5`
+- **time**: a integer with unit in a string, will be translated to a integer 
number in millisecond.  
+       e.g. `3d`, `5h`, `4ms`
+- **boolean**: boolean value directly.  
+       e.g. `true`, `false`
+
+### Selections
+- **selection head**: data source name.  
+       e.g. `source`, `target`, `` `my table name` ``
+- **all field selection**: * or with data source name ahead.  
+       e.g. `*`, `source.*`, `target.*`
+- **field selection**: field name or with data source name ahead.  
+       e.g. `source.age`, `target.name`, `user_id`
+- **index selection**: interget between square brackets "[]" with field name 
ahead.  
+       e.g. `source.attributes[3]`
+- **function selection**: function name with brackets "()", with field name 
ahead or not.  
+       e.g. `count(*)`, `*.count()`, `source.user_id.count()`, 
`max(source.age)`
+- **alias**: declare an alias after a selection.  
+       e.g. `source.user_id as id`, `target.user_name as name`
+
+### Math expressions
+- **math factor**: literal or function or selection or math exression with 
brackets.  
+       e.g. `123`, `max(1, 2, 3, 4)`, `source.age`, `(source.age + 13)`
+- **unary math expression**: unary math operator with factor.  
+       e.g. `-(100 - source.score)`
+- **binary math expression**: math factors with binary math operators.  
+       e.g. `source.age + 13`, `score * 2 + ratio`
+
+### Logical expression
+- **in**: in clause like sql.  
+       e.g. `source.country in ("USA", "CHN", "RSA")`
+- **between**: between clause like sql.  
+       e.g. `source.age between 3 and 30`, `source.age between (3, 30)`
+- **like**: like clause like sql.  
+       e.g. `source.name like "%abc%"`
+- **is null**: is null operator like sql.  
+       e.g. `source.desc is not null`
+- **is nan**: check if the value is not a number, the syntax like `is null`  
+       e.g. `source.age is not nan`
+- **logical factor**: math expression or logical expressions above or other 
logical expressions with brackets.  
+       e.g. `(source.user_id = target.user_id AND source.age > target.age)`
+- **unary logical expression**: unary logical operator with factor.  
+       e.g. `NOT source.has_data`, `!(source.age = target.age)`
+- **binary logical expression**: logical factors with binary logical 
operators, including `and`, `or` and comparison operators.  
+       e.g. `source.age = target.age OR source.ticket = target.tck`
+
+
+### Expression
+- **expression**: logical expression and math expression.
+
+### Function
+- **argument**: expression.
+- **function**: function name with arguments between brackets.  
+       e.g. `max(source.age, target.age)`, `count(*)`
+
+### Clause
+- **select clause**: the result columns like sql select clause, we can ignore 
the word "select" in Griffin DSL.  
+       e.g. `select user_id.count(), age.max() as max`, 
`source.user_id.count() as cnt, source.age.min()`
+- **from clause**: the table name like sql from clause, in which the data 
source name must be one of data source names or the output table name of the 
former rule steps, we can ignore this clause by configoring the data source 
name.  
+       e.g. `from source`, ``from `target` ``
+- **where clause**: the filter condition like sql where clause, optional.  
+       e.g. `where source.id = target.id and source.age = target.age`
+- **group-by clause**: like the group-by clause in sql, optional. Optional 
having clause could be following.  
+       e.g. `group by cntry`, `group by gender having count(*) > 50`
+- **order-by clause**: like the order-by clause, optional.  
+       e.g. `order by name`, `order by first_name desc, age asc`
+- **limit clause**: like the limit clause in sql, optional.  
+       e.g. `limit 5`
+
+### Accuracy Rule
+Accuracy rule expression in Griffin DSL is a logical expression, telling the 
mapping relation between data sources.  
+       e.g. `source.id = target.id and source.name = target.name and 
source.age between (target.age, target.age + 5)`
+
+### Profiling Rule
+Profiling rule expression in Griffin DSL is a sql-like expression, with select 
clause ahead, following optional from clause, where clause, group-by clause, 
order-by clause, limit clause in order.  
+       e.g. `source.gender, source.id.count() where source.age > 20 group by 
source.gender`, `select country, max(age), min(age), count(*) as cnt from 
source group by country order by cnt desc limit 5`
+
+## Griffin DSL translation to SQL
+Griffin DSL is defined for DQ measurement, to describe DQ domain problem.  
+Actually, in Griffin, we get Griffin DSL rules, translate them into spark-sql 
rules for calculation in spark-sql engine.  
+In DQ domain, there're multiple dimensions, we need to translate them in 
different ways.
+
+### Accuracy
+For accuracy, we need to get the match count between source and target, the 
rule describes the mapping relation between data sources. Griffin needs to 
translate the dsl rule into multiple sql rules.  
+For example, the dsl rule is `source.id = target.id and source.name = 
target.name`, which represents the match condition of accuracy. After the 
translation, the sql rules are as below:  
+- **get miss items from source**: `SELECT source.* FROM source LEFT JOIN 
target ON coalesce(source.id, '') = coalesce(target.id, '') and 
coalesce(source.name, '') = coalesce(target.name, '') WHERE (NOT (source.id IS 
NULL AND source.name IS NULL)) AND (target.id IS NULL AND target.name IS 
NULL)`, save as table `miss_items`.
+- **get miss count**: `SELECT COUNT(*) AS miss FROM miss_items`, save as table 
`miss_count`.
+- **get total count from source**: `SELECT COUNT(*) AS total FROM source`, 
save as table `total_count`.
+- **get accuracy metric**: `SELECT miss_count.miss AS miss, total_count.total 
AS total, (total_count.total - miss_count.miss) AS matched FROM miss_count FULL 
JOIN total_count`, save as table `accuracy`.  
+
+After the translation, the metrics will be persisted in table `accuracy`.
+
+### Profiling
+For profiling, the request is always the aggregation function of data, the 
rule is mainly the same as sql, but only supporting `select`, `from`, `where`, 
`group-by`, `having`, `order-by`, `limit` clauses, which can describe most of 
the profiling requests. If any complicate request, you can use sql rule 
directly to describe it.  
+For example, the dsl rule is `source.cntry, source.id.count(), 
source.age.max() group by source.cntry`, which represents the profiling 
requests. After the translation, the sql rule is as below:  
+- **profiling sql rule**: `SELECT source.cntry, count(source.id), 
max(source.age) FROM source GROUP BY source.cntry`, save as table `profiling`.  
+
+After the translation, the metrics will be persisted in table `profiling`.  
+
+## Alternative Rules
+You can simply use Griffin DSL rule to describe your problem in DQ domain, for 
some complicate requirement, you can also use some alternative rules supported 
by Griffin.  
+
+### Spark sql
+Griffin supports spark-sql directly, you can write rule in sql like this:  
+```
+{
+       "dsl.type": "spark-sql",
+       "name": "source",
+       "rule": "SELECT count(id) AS cnt, max(timestamp) AS fresh_time FROM 
source"
+}
+```
+Griffin will calculate it in spark-sql engine directly.  
+
+### Data frame operation
+Griffin supports some other operations on data frame in spark, like converting 
json string data frame into extracted data frame with extracted object schema. 
For example:  
+```
+{
+       "dsl.type": "df-opr",
+       "name": "ext_source",
+       "rule": "from_json",
+       "details": {
+               "df.name": "json_source"
+       }
+}
+```
+Griffin will do the operation to extract json strings.  
+Actually, you can also extend the df-opr engine and df-opr adaptor in Griffin 
to support more types of data frame operations.  
+
+## Tips
+Griffin engine runs on spark, it might works in two phases, pre-proc phase and 
run phase.  
+- **Pre-proc phase**: Griffin calculates data source directly, to get 
appropriate data format, as a preparation for DQ calculation. In this phase, 
you can use df-opr and spark-sql rules.  
+After preparation, to support streaming DQ calculation, a timestamp column 
will be added in each row of data, so the data frame in run phase contains an 
extra column named "__tmst".  
+- **Run phase**: Griffin calculates with prepared data, to get the DQ metrics. 
In this phase, you can use griffin-dsl, spark-sql rules, and a part of df-opr 
rules.  
+For griffin-dsl rule, griffin translates it into spark-sql rule with a 
group-by condition for column "__tmst", it's useful for especially streaming DQ 
calculation. But for spark-sql rule, griffin use it directly, you need to add 
the "__tmst" column in your spark-sql rule explicitly, or you can't get correct 
metrics result after calculation.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/griffin-doc/measure/measure-batch-sample.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-batch-sample.md 
b/griffin-doc/measure/measure-batch-sample.md
new file mode 100644
index 0000000..3783f94
--- /dev/null
+++ b/griffin-doc/measure/measure-batch-sample.md
@@ -0,0 +1,140 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Measure Batch Sample
+Measures consists of batch measure and streaming measure. This document is for 
the batch measure sample.
+
+## Batch Accuracy Sample
+```
+{
+  "name": "accu_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "src",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "users_info_src.avro"
+          }
+        }
+      ]
+    }, {
+      "name": "tgt",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "users_info_target.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluateRule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "rule": "src.user_id = tgt.user_id AND upper(src.first_name) = 
upper(tgt.first_name) AND src.last_name = tgt.last_name",
+        "details": {
+          "source": "src",
+          "target": "tgt",
+          "miss.records": {
+            "name": "miss.records",
+            "persist.type": "record"
+          },
+          "accuracy": {
+            "name": "accu",
+            "persist.type": "metric"
+          },
+          "miss": "miss_count",
+          "total": "total_count",
+          "matched": "matched_count"
+        }
+      }
+    ]
+  }
+}
+```
+Above is the configure file of batch accuracy job.  
+
+### Data source
+In this sample, we use avro file as source and target.  
+
+### Evaluate rule
+In this accuracy sample, the rule describes the match condition: `src.user_id 
= tgt.user_id AND upper(src.first_name) = upper(tgt.first_name) AND 
src.last_name = tgt.last_name`.  
+The accuracy metrics will be persisted as metric, with miss column named 
"miss_count", total column named "total_count", matched column named 
"matched_count".  
+The miss records of source will be persisted as record.  
+
+## Batch Profiling Sample
+```
+{
+  "name": "prof_batch_test",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "hive",
+          "version": "1.2",
+          "config": {
+               "database": "griffin",
+               "table.name": "demo_src"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluateRule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "rule": "country, country.count() as cnt group by country order by cnt 
desc limit 3",
+        "details": {
+          "source": "source",
+          "profiling": {
+            "name": "cntry-group",
+            "persist.type": "metric"
+          }
+        }
+      }
+    ]
+  }
+}
+```
+Above is the configure file of batch profiling job.  
+
+### Data source
+In this sample, we use hive table as source.  
+
+### Evaluate rule
+In this profiling sample, the rule describes the profiling request: `country, 
country.count() as cnt group by country order by cnt desc limit 3`.  
+The profiling metrics will be persisted as metric, listing the most 3 groups 
of items in same country.  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/griffin-doc/measure/measure-configuration-guide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-configuration-guide.md 
b/griffin-doc/measure/measure-configuration-guide.md
new file mode 100644
index 0000000..0632927
--- /dev/null
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -0,0 +1,211 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Griffin Measure Configuration Guide
+Griffin measure module needs two configuration files to define the parameters 
of execution, one is for environment, the other is for dq job.
+
+## Environment Parameters
+```
+{
+  "spark": {
+    "log.level": "WARN",
+    "checkpoint.dir": "hdfs:///griffin/streaming/cp",
+    "batch.interval": "5s",
+    "process.interval": "30s",
+    "config": {
+      "spark.task.maxFailures": 5,
+      "spark.streaming.kafkaMaxRatePerPartition": 1000,
+      "spark.streaming.concurrentJobs": 4
+    }
+  },
+
+  "persist": [
+    {
+      "type": "log",
+      "config": {
+        "max.log.lines": 100
+      }
+    }, {
+      "type": "hdfs",
+      "config": {
+        "path": "hdfs:///griffin/streaming/persist",
+        "max.persist.lines": 10000,
+        "max.lines.per.file": 10000
+      }
+    }
+  ],
+
+  "info.cache": [
+    {
+      "type": "zk",
+      "config": {
+        "hosts": "<zookeeper host ip>:2181",
+        "namespace": "griffin/infocache",
+        "lock.path": "lock",
+        "mode": "persist",
+        "init.clear": true,
+        "close.clear": false
+      }
+    }
+  ]
+}
+```
+Above lists environment parameters.  
+
+- **spark**: This field configures spark and spark streaming parameters.  
+       + log.level: Level of spark log.
+       + checkpoint.dir: Check point directory of spark streaming, for 
streaming mode.
+       + batch.interval: Interval of dumping streaming data, for streaming 
mode.
+       + process.interval: Interval of processing dumped streaming data, for 
streaming mode.
+       + config: Configuration of spark parameters.
+- **persist**: This field configures list of metrics persist parameters, 
multiple persist ways are supported. Details of persist configuration 
[here](#persist).
+- **info.cache**: This field configures list of information cache parameters, 
multiple cache ways are supported. It is only for streaming dq case. Details of 
info cache configuration [here](#info-cache).
+
+### <a name="persist"></a>Persist
+- **type**: Metrics persist type, "log", "hdfs" and "http". 
+- **config**: Configure parameters of each persist type.
+       + log persist
+               * max.log.lines: the max lines of log.
+       + hdfs persist
+               * path: hdfs path to persist metrics
+               * max.persist.lines: the max lines of total persist data.
+               * max.lines.per.file: the max lines of each persist file.
+       + http persist
+               * api: api to submit persist metrics.
+               * method: http method, "post" default.
+
+### <a name="info-cache"></a>Info Cache
+- **type**: Information cache type, "zk" for zookeeper cache.
+- **config**: Configure parameters of info cache type.
+       + zookeeper cache
+               * hosts: zookeeper hosts list as a string, separated by comma.
+               * namespace: namespace of cache info, "" as default.
+               * lock.path: path of lock info, "lock" as default.
+               * mode: create mode of zookeeper node, "persist" as default.
+               * init.clear: clear cache info when initialize, true default.
+               * close.clear: clear cache info when close connection, false 
default.
+
+## DQ Job Parameters
+```
+{
+  "name": "accu_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "src",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+               "file.path": "<path>/<to>",
+            "file.name": "<source-file>.avro"
+          }
+        }
+      ]
+    }, {
+      "name": "tgt",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+               "file.path": "<path>/<to>",
+            "file.name": "<target-file>.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluateRule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "rule": "src.user_id = tgt.user_id AND upper(src.first_name) = 
upper(tgt.first_name) AND src.last_name = tgt.last_name",
+        "details": {
+          "source": "src",
+          "target": "tgt",
+          "miss.records": {
+            "name": "miss.records",
+            "persist.type": "record"
+          },
+          "accuracy": {
+            "name": "accu",
+            "persist.type": "metric"
+          },
+          "miss": "miss_count",
+          "total": "total_count",
+          "matched": "matched_count"
+        }
+      }
+    ]
+  }
+}
+```
+Above lists DQ job configure parameters.  
+
+- **name**: Name of DQ job.
+- **process.type**: Process type of DQ job, "batch" or "streaming".
+- **data.sources**: List of data sources in this DQ job.
+       + name: Name of this data source, it should be different from other 
data sources.
+       + connectors: List of data connectors combined as the same data source. 
Details of data connector configuration [here](#data-connector).
+- **evaluateRule**: Evaluate rule parameters of this DQ job.
+       + dsl.type: Default dsl type of all the rules.
+       + rules: List of rules, to define every rule step. Details of rule 
configuration [here](#rule).
+
+### <a name="data-connector"></a>Data Connector
+- **type**: Data connector type, "avro", "hive", "text-dir" for batch mode, 
"kafka" for streaming mode.
+- **version**: Version string of data connector type.
+- **config**: Configure parameters of each data connector type.
+       + avro data connector
+               * file.path: avro file path, optional, "" as default.
+               * file.name: avro file name.
+       + hive data connector
+               * database: data base name, optional, "default" as default.
+               * table.name: table name.
+               * partitions: partition conditions string, split by ";" and 
",", optional. 
+                       e.g. `dt=20170410, hour=15; dt=20170411, hour=15; 
dt=20170412, hour=15`
+       + text dir data connector
+               * dir.path: parent directory path.
+               * data.dir.depth: integer, depth of data directories, 0 as 
default.
+               * success.file: success file name, 
+               * done.file: 
+
+### <a name="rule"></a>Rule
+- **dsl.type**: Rule dsl type, "spark-sql", "df-opr" and "griffin-dsl".
+- **name** (step information): Result table name of this rule, optional for 
"griffin-dsl" type.
+- **persist.type** (step information): Persist type of result table, optional 
for "griffin-dsl" type. Supporting "metric", "record" and "none" type, "metric" 
type indicates the result will be persisted as metrics, "record" type indicates 
the result will be persisted as record only, "none" type indicates the result 
will not be persisted. Default is "none" type.
+- **update.data.source** (step information): If the result table needs to 
update the data source, this parameter is the data source name, for streaming 
accuracy case, optional.
+- **dq.type**: DQ type of this rule, only for "griffin-dsl" type, supporting 
"accuracy" and "profiling".
+- **details**: Details of this rule, optional.
+       + accuracy dq type detail configuration
+               * source: the data source name which as source in accuracy, 
default is the name of first data source in "data.sources" if not configured.
+               * target: the data source name which as target in accuracy, 
default is the name of second data source in "data.sources" if not configured.
+               * miss.records: step information of miss records result table 
step in accuracy.
+               * accuracy: step information of accuracy result table step in 
accuracy.
+               * miss: alias of miss column in result table.
+               * total: alias of total column in result table.
+               * matched: alias of matched column in result table.
+       + profiling dq type detail configuration
+               * source: the data source name which as source in profiling, 
default is the name of first data source in "data.sources" if not configured. 
If the griffin-dsl rule contains from clause, this parameter is ignored.
+               * profiling: step information of profiling result table step in 
profiling.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/griffin-doc/measure/measure-streaming-sample-old.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measure-streaming-sample-old.md 
b/griffin-doc/measure/measure-streaming-sample-old.md
new file mode 100644
index 0000000..004ed3b
--- /dev/null
+++ b/griffin-doc/measure/measure-streaming-sample-old.md
@@ -0,0 +1,204 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+# Measure streaming sample
+Measures consists of batch measure and streaming measure. This document is for 
the streaming measure sample.
+
+### Data source
+At current, we support kafka as streaming data source.  
+In this sample, we also need a kafka as data source.
+
+### Measure type
+At current, we support accuracy measure in streaming mode.
+
+### Kafka decoder
+In kafka, data always needs encode and decode, we support String type kafka 
data currently, you can also implement and use your decoder for kafka case.
+
+### Environment
+For current griffin streaming case, we need some necessary environment 
dependencies, zookeeper and hdfs.  
+We use zookeeper to cache some checkpoint information, it's optional, but we 
recommend it.  
+We use hdfs to save the temporary data, it's also a recommend selection.
+
+### Streaming accuracy result
+The streaming data will be separated into mini-batches of data, for each 
mini-batch data, there should be an accuracy result. Therefore, the streaming 
accuracy result should be a bunch of batch accuracy results with timestamp.  
+Considering the latency of streaming data, which means the source data and the 
matching target data will not exactly reach exactly at the same time, we have 
to accept some delay of data in streaming mode, by holding unmatched data in 
memory or disk, and try to match them later until the data is out-time.
+
+## How to run streaming sample
+### Environment Preparation
+At first, we need some environment preparation.  
+- Zookeeper: Zookeeper 3.4.10
+- Hadoop: Hadoop 2.6
+- Spark: Spark 1.6
+- Kafka: Kafka 0.8
+
+### Data Preparation
+Create two topics in kafka, for source and target data. For example, topic 
"source" for source data, and topic "target" for target data.  
+Streaming data should also be prepared, the format could be json string, for 
example:  
+Source data could be:
+```
+{"name": "kevin", "age": 24}
+{"name": "jason", "age": 25}
+{"name": "jhon", "age": 28}
+{"name": "steve", "age": 31}
+```
+Target data could be:
+```
+{"name": "kevin", "age": 24}
+{"name": "jason", "age": 25}
+{"name": "steve", "age": 20}
+```
+You need to input the source data and target data into these two topics, 
through console producer might be a good choice for experimental purpose.
+
+### Configuration Preparation
+Two configuration files are required.
+Environment configuration file: env.json
+```
+{
+  "spark": {
+    "log.level": "WARN",
+    "checkpoint.dir": "hdfs:///griffin/streaming/cp",
+    "batch.interval": "5s",
+    "process.interval": "30s",
+    "config": {
+      "spark.task.maxFailures": 5,
+      "spark.streaming.kafkaMaxRatePerPartition": 1000,
+      "spark.streaming.concurrentJobs": 4
+    }
+  },
+
+  "persist": [
+    {
+      "type": "log",
+      "config": {
+        "max.log.lines": 100
+      }
+    }, {
+      "type": "hdfs",
+      "config": {
+        "path": "hdfs:///griffin/streaming/persist",
+        "max.persist.lines": 10000,
+        "max.lines.per.file": 10000
+      }
+    }
+  ],
+
+  "info.cache": [
+    {
+      "type": "zk",
+      "config": {
+        "hosts": "<zookeeper host ip>:2181",
+        "namespace": "griffin/infocache",
+        "lock.path": "lock",
+        "mode": "persist",
+        "init.clear": true,
+        "close.clear": false
+      }
+    }
+  ]
+}
+```
+In env.json, "spark" field configures the spark and spark streaming 
parameters, "persist" field configures the persist ways, we support "log", 
"hdfs" and "http" ways at current, "info.cache" field configures the 
information cache parameters, we support zookeeper only at current.  
+
+Process configuration file: config.json
+```
+{
+  "name": "streaming-accu-sample",
+  "type": "accuracy",
+  "process.type": "streaming",
+
+  "source": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "<kafka host ip>:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "source",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "text",
+      "config": {
+        "file.path": "hdfs:///griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-5m", "0"]
+    },
+    "match.once": true
+  },
+
+  "target": {
+    "type": "kafka",
+    "version": "0.8",
+    "config": {
+      "kafka.config": {
+        "bootstrap.servers": "<kafka host ip>:9092",
+        "group.id": "group1",
+        "auto.offset.reset": "smallest",
+        "auto.commit.enable": "false"
+      },
+      "topics": "target",
+      "key.type": "java.lang.String",
+      "value.type": "java.lang.String"
+    },
+    "cache": {
+      "type": "text",
+      "config": {
+        "file.path": "hdfs:///griffin/streaming/dump/target",
+        "info.path": "target",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0"
+      },
+      "time.range": ["-5m", "0"]
+    },
+    "match.once": false
+  },
+
+  "evaluateRule": {
+    "rules": "$source.json().name = $target.json().name AND $source.json().age 
= $target.json().age"
+  }
+}
+```
+In config.json, "source" and "target" fields configure the data source 
parameters.  
+The "cache" field in data source configuration represents the temporary data 
cache way, at current we support "text" and "hive" ways. We recommend "text" 
way, it only depends on hdfs. "time.range" means that the data older than the 
lower bound should be considered as out-time, and the out-time data will not be 
calculated any more.   
+"match.once" represents the data from this data source could be matched only 
once or more times.  
+"evaluateRule.rule" configures the match rule between each source and target 
data.
+
+### Run
+Build the measure package.
+```
+mvn clean install
+```
+Get the measure package ```measure-<version>-incubating-SNAPSHOT.jar```, 
rename it to ```griffin-measure.jar```.  
+Put measure package together with env.json and config.json.
+Run the following command:
+```
+spark-submit --class org.apache.griffin.measure.Application \
+--master yarn-client --queue default \
+griffin-measure.jar \
+env.json config.json local,local
+```
+The first two parameters are the paths of env.json and config.json, the third 
parameter represents the file system type of the two configuration files, 
"local" or "hdfs" are both supported.  
+
+The spark streaming application will be long-time running, you can get the 
results of each mini-batch of data, during the run-time, you can also input 
more data into source and target topics, to check the results of the later 
mini-batches.

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/griffin-doc/measure/measures.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measure/measures.md b/griffin-doc/measure/measures.md
new file mode 100644
index 0000000..2f6680e
--- /dev/null
+++ b/griffin-doc/measure/measures.md
@@ -0,0 +1,173 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+# Measures
+measures to calculate data quality metrics.
+
+### Accuracy measure
+accuracy measure is to compare source and target content, given corresponding 
mapping relationship.
+
+#### Introduction
+How to measure accuracy dimension of one target dataset T, given source of 
truth as golden dataset S.
+To measure accuracy quality of target dataset T,
+basic approach is to calculate discrepancy between target and source datasets 
by going through their contents,
+examining whether all fields are exactly matched as below,
+```
+                Count(source.field1 == target.field1 && source.field2 == 
target.field2 && ...source.fieldN == target.fieldN)
+Accuracy  =     
---------------------------------------------------------------------------------------------------------------
+                Count(source)
+
+```
+
+Since two datasets are too big to fit in one box, so our approach is to 
leverage map reduce programming model by distributed computing.
+
+The real challenge is how to make this comparing algorithm generic enough to 
release data analysts and data scientists from coding burdens, and at the same 
time, it keeps flexibility to cover most of accuracy requirements.
+
+Traditional way is to use SQL based join to calculate this, like scripts in 
hive.
+
+But this SQL based solution can be improved since it has not considered unique 
natures of source dataset and target dataset in this context.
+
+Our approach is to provide a generic accuracy measure, after taking into 
consideration of special natures of source dataset and target dataset.
+
+Our implementation is in scala, leveraging scala's declarative capability to 
cater for various requirements, and running in spark cluster.
+
+To make it concrete, schema for Source is as below
+
+```
+|-- uid: string (nullable = true)
+|-- site_id: string (nullable = true)
+|-- page_id: string (nullable = true)
+|-- curprice: string (nullable = true)
+|-- itm: string (nullable = true)
+|-- itmcond: string (nullable = true)
+|-- itmtitle: string (nullable = true)
+|-- l1: string (nullable = true)
+|-- l2: string (nullable = true)
+|-- leaf: string (nullable = true)
+|-- meta: string (nullable = true)
+|-- st: string (nullable = true)
+|-- dc: string (nullable = true)
+|-- tr: string (nullable = true)
+|-- eventtimestamp: string (nullable = true)
+|-- cln: string (nullable = true)
+|-- siid: string (nullable = true)
+|-- ciid: string (nullable = true)
+|-- sellerid: string (nullable = true)
+|-- pri: string (nullable = true)
+|-- pt: string (nullable = true)
+|-- dt: string (nullable = true)
+|-- hour: string (nullable = true)
+```
+
+and schema for target is below as
+
+```
+|-- uid: string (nullable = true)
+|-- page_id: string (nullable = true)
+|-- site_id: string (nullable = true)
+|-- js_ev_mak: string (nullable = true)
+|-- js_ev_orgn: string (nullable = true)
+|-- curprice: string (nullable = true)
+|-- itm: string (nullable = true)
+|-- itmcond: string (nullable = true)
+|-- itmtitle: string (nullable = true)
+|-- l1: string (nullable = true)
+|-- l2: string (nullable = true)
+|-- leaf: string (nullable = true)
+|-- meta: string (nullable = true)
+|-- st: string (nullable = true)
+|-- dc: string (nullable = true)
+|-- tr: string (nullable = true)
+|-- eventtimestamp: string (nullable = true)
+|-- cln: string (nullable = true)
+|-- siid: string (nullable = true)
+|-- ciid: string (nullable = true)
+|-- sellerid: string (nullable = true)
+|-- product_ref_id: string (nullable = true)
+|-- product_type: string (nullable = true)
+|-- is_bu: string (nullable = true)
+|-- is_udid: string (nullable = true)
+|-- is_userid: string (nullable = true)
+|-- is_cguid: string (nullable = true)
+|-- dt: string (nullable = true)
+|-- hour: string (nullable = true)
+```
+
+
+#### Accuracy Measure In Deep
+
+##### Pre-Process phase (transform raw data)
+For efficient, we will convert our raw record to some key-value pair , after 
that, we just need to compare values which have the same key.
+Since two dataset might have different names for the same field, and fields 
might come in different order, we will keep original information in associative 
map for later process.
+
+The records will look like,
+```
+((uid,eventtimestamp)->(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)
+```
+and to track where are the data from, we add one labeling tag here.
+for source dataset, we add label tag "\_\_source\_\_" and for target dataset, 
we add label tag "\_\_target\_\_".
+```
+((uid,eventtimestamp)->("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)))
+((uid,eventtimestamp)->("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)))
+```
+Ideally, in dataset, applying those composite keys, we should be able to get 
unique records for every composite key.
+but the reality is , for various unknown reasons, dataset might have duplicate 
records given one unique composite key.
+To cover this problem, and to track all records from source node, we will 
append all duplicate records in a list during this step.
+The record will look like after pre process ,
+```
+((uid,eventtimestamp)->List(("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)),...,("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...))))
+```
+To save all records from target node, we will insert all records in a set 
during this step.
+The record will look like after pre process ,
+```
+((uid,eventtimestamp)->Set(("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)),...,("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...))))
+```
+##### Aggregate and Comparing phase
+Union source and target together, execute one aggregate for all, we can apply 
rules defined by users to check whether records in source and target are 
matched or not.
+
+```
+aggregate { (List(sources),Set(targets)) =>
+ if(foreach element from List(sources) in Set(targets)) emit true
+ else emit false
+}
+```
+We can also execute one aggregate to count the mismatch records in source
+```
+aggregate (missedCount = 0) { (List(sources), Set(targets)) =>
+ foreach (element in List(sources)) {
+  if (element in Set(targets)) continue
+  else missedCount += 1
+ }
+}
+```
+#### Benefits
+ + It is two times faster than traditional SQL JOIN based solution, since it 
is using algorithm customized for this special accuracy problem.
+ + It is easily to iterate new accuracy metric as it is packaged as a common 
library as a basic service, previously it took us one week to develop and 
deploy one new metrics from scratch, but after applying this approach , it only 
need several hours to get all done.
+
+
+
+
+#### Further discussion
+ + How to select keys?
+       How many keys we should use, if we use too many keys, it will reduce 
our calculation performance, otherwise, it might have too many duplicate 
records, which will make our comparison logic complex.
+ + How to define content equation?
+       For some data, it is straightforward, but for some data, it might 
require transform by some UDFS, how can we make our system extensible to 
support different raw data.
+ + How to fix data latency issue?
+       To compare, we have to have data available, but how to handle data 
latency issue which happens often in real enterprise environment.
+ + How to restore lost data?
+       Detect data lost is good, but the further action is how can we restore 
those lost data?

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/e7e4c3a7/griffin-doc/measures.md
----------------------------------------------------------------------
diff --git a/griffin-doc/measures.md b/griffin-doc/measures.md
deleted file mode 100644
index 2f6680e..0000000
--- a/griffin-doc/measures.md
+++ /dev/null
@@ -1,173 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-# Measures
-measures to calculate data quality metrics.
-
-### Accuracy measure
-accuracy measure is to compare source and target content, given corresponding 
mapping relationship.
-
-#### Introduction
-How to measure accuracy dimension of one target dataset T, given source of 
truth as golden dataset S.
-To measure accuracy quality of target dataset T,
-basic approach is to calculate discrepancy between target and source datasets 
by going through their contents,
-examining whether all fields are exactly matched as below,
-```
-                Count(source.field1 == target.field1 && source.field2 == 
target.field2 && ...source.fieldN == target.fieldN)
-Accuracy  =     
---------------------------------------------------------------------------------------------------------------
-                Count(source)
-
-```
-
-Since two datasets are too big to fit in one box, so our approach is to 
leverage map reduce programming model by distributed computing.
-
-The real challenge is how to make this comparing algorithm generic enough to 
release data analysts and data scientists from coding burdens, and at the same 
time, it keeps flexibility to cover most of accuracy requirements.
-
-Traditional way is to use SQL based join to calculate this, like scripts in 
hive.
-
-But this SQL based solution can be improved since it has not considered unique 
natures of source dataset and target dataset in this context.
-
-Our approach is to provide a generic accuracy measure, after taking into 
consideration of special natures of source dataset and target dataset.
-
-Our implementation is in scala, leveraging scala's declarative capability to 
cater for various requirements, and running in spark cluster.
-
-To make it concrete, schema for Source is as below
-
-```
-|-- uid: string (nullable = true)
-|-- site_id: string (nullable = true)
-|-- page_id: string (nullable = true)
-|-- curprice: string (nullable = true)
-|-- itm: string (nullable = true)
-|-- itmcond: string (nullable = true)
-|-- itmtitle: string (nullable = true)
-|-- l1: string (nullable = true)
-|-- l2: string (nullable = true)
-|-- leaf: string (nullable = true)
-|-- meta: string (nullable = true)
-|-- st: string (nullable = true)
-|-- dc: string (nullable = true)
-|-- tr: string (nullable = true)
-|-- eventtimestamp: string (nullable = true)
-|-- cln: string (nullable = true)
-|-- siid: string (nullable = true)
-|-- ciid: string (nullable = true)
-|-- sellerid: string (nullable = true)
-|-- pri: string (nullable = true)
-|-- pt: string (nullable = true)
-|-- dt: string (nullable = true)
-|-- hour: string (nullable = true)
-```
-
-and schema for target is below as
-
-```
-|-- uid: string (nullable = true)
-|-- page_id: string (nullable = true)
-|-- site_id: string (nullable = true)
-|-- js_ev_mak: string (nullable = true)
-|-- js_ev_orgn: string (nullable = true)
-|-- curprice: string (nullable = true)
-|-- itm: string (nullable = true)
-|-- itmcond: string (nullable = true)
-|-- itmtitle: string (nullable = true)
-|-- l1: string (nullable = true)
-|-- l2: string (nullable = true)
-|-- leaf: string (nullable = true)
-|-- meta: string (nullable = true)
-|-- st: string (nullable = true)
-|-- dc: string (nullable = true)
-|-- tr: string (nullable = true)
-|-- eventtimestamp: string (nullable = true)
-|-- cln: string (nullable = true)
-|-- siid: string (nullable = true)
-|-- ciid: string (nullable = true)
-|-- sellerid: string (nullable = true)
-|-- product_ref_id: string (nullable = true)
-|-- product_type: string (nullable = true)
-|-- is_bu: string (nullable = true)
-|-- is_udid: string (nullable = true)
-|-- is_userid: string (nullable = true)
-|-- is_cguid: string (nullable = true)
-|-- dt: string (nullable = true)
-|-- hour: string (nullable = true)
-```
-
-
-#### Accuracy Measure In Deep
-
-##### Pre-Process phase (transform raw data)
-For efficient, we will convert our raw record to some key-value pair , after 
that, we just need to compare values which have the same key.
-Since two dataset might have different names for the same field, and fields 
might come in different order, we will keep original information in associative 
map for later process.
-
-The records will look like,
-```
-((uid,eventtimestamp)->(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)
-```
-and to track where are the data from, we add one labeling tag here.
-for source dataset, we add label tag "\_\_source\_\_" and for target dataset, 
we add label tag "\_\_target\_\_".
-```
-((uid,eventtimestamp)->("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)))
-((uid,eventtimestamp)->("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)))
-```
-Ideally, in dataset, applying those composite keys, we should be able to get 
unique records for every composite key.
-but the reality is , for various unknown reasons, dataset might have duplicate 
records given one unique composite key.
-To cover this problem, and to track all records from source node, we will 
append all duplicate records in a list during this step.
-The record will look like after pre process ,
-```
-((uid,eventtimestamp)->List(("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)),...,("__source__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...))))
-```
-To save all records from target node, we will insert all records in a set 
during this step.
-The record will look like after pre process ,
-```
-((uid,eventtimestamp)->Set(("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...)),...,("__target__",(curprice->value(curprice),itm->value(itm),itmcond->value(itmcond),itmtitle->value(itmtitle),...))))
-```
-##### Aggregate and Comparing phase
-Union source and target together, execute one aggregate for all, we can apply 
rules defined by users to check whether records in source and target are 
matched or not.
-
-```
-aggregate { (List(sources),Set(targets)) =>
- if(foreach element from List(sources) in Set(targets)) emit true
- else emit false
-}
-```
-We can also execute one aggregate to count the mismatch records in source
-```
-aggregate (missedCount = 0) { (List(sources), Set(targets)) =>
- foreach (element in List(sources)) {
-  if (element in Set(targets)) continue
-  else missedCount += 1
- }
-}
-```
-#### Benefits
- + It is two times faster than traditional SQL JOIN based solution, since it 
is using algorithm customized for this special accuracy problem.
- + It is easily to iterate new accuracy metric as it is packaged as a common 
library as a basic service, previously it took us one week to develop and 
deploy one new metrics from scratch, but after applying this approach , it only 
need several hours to get all done.
-
-
-
-
-#### Further discussion
- + How to select keys?
-       How many keys we should use, if we use too many keys, it will reduce 
our calculation performance, otherwise, it might have too many duplicate 
records, which will make our comparison logic complex.
- + How to define content equation?
-       For some data, it is straightforward, but for some data, it might 
require transform by some UDFS, how can we make our system extensible to 
support different raw data.
- + How to fix data latency issue?
-       To compare, we have to have data available, but how to handle data 
latency issue which happens often in real enterprise environment.
- + How to restore lost data?
-       Detect data lost is good, but the further action is how can we restore 
those lost data?

Reply via email to