fix bug of non-block persist in batch mode

Author: Lionel Liu <[email protected]>
Author: dodobel <[email protected]>

Closes #288 from bhlx3lyx7/spark2.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/299aa476
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/299aa476
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/299aa476

Branch: refs/heads/master
Commit: 299aa476df8739e5fd0bdfd4a3482cb6e38f1a40
Parents: 1d7acd5
Author: Lionel Liu <[email protected]>
Authored: Tue May 29 16:18:27 2018 +0800
Committer: Lionel Liu <[email protected]>
Committed: Tue May 29 16:18:27 2018 +0800

----------------------------------------------------------------------
 measure/src/main/resources/config-batch.json    |  56 ++++++++
 .../src/main/resources/config-streaming.json    |  75 ++++++++++
 measure/src/main/resources/env-batch.json       |  38 +++++
 measure/src/main/resources/env-streaming.json   |  62 ++++++++
 .../apache/griffin/measure/Application.scala    |   4 +-
 .../griffin/measure/context/DQContext.scala     |  10 +-
 .../griffin/measure/context/MetricWrapper.scala |  10 +-
 .../measure/context/writer/HdfsPersist.scala    |  10 +-
 .../measure/context/writer/HttpPersist.scala    |  20 ++-
 .../measure/context/writer/LoggerPersist.scala  |   2 +
 .../measure/context/writer/MongoPersist.scala   |  14 +-
 .../measure/context/writer/MultiPersists.scala  |   2 +
 .../measure/context/writer/Persist.scala        |   2 +
 .../measure/context/writer/PersistFactory.scala |  21 +--
 .../context/writer/PersistTaskRunner.scala      | 101 +++++++++++++
 .../context/writer/PersistThreadPool.scala      |  81 -----------
 .../measure/step/write/MetricFlushStep.scala    |   5 +-
 .../measure/step/write/MetricWriteStep.scala    |   1 -
 .../resources/_accuracy-batch-griffindsl.json   |  56 ++++++++
 .../resources/_accuracy-batch-sparksql.json     |  63 ++++++++
 .../_accuracy-streaming-griffindsl.json         | 121 ++++++++++++++++
 .../resources/_accuracy-streaming-sparksql.json | 142 +++++++++++++++++++
 .../_completeness-batch-griffindsl.json         |  36 +++++
 .../_completeness-streaming-griffindsl.json     |  65 +++++++++
 .../_distinctness-batch-griffindsl.json         |  57 ++++++++
 .../_distinctness-batch-griffindsl1.json        |  73 ++++++++++
 .../_distinctness-batch-griffindsl2.json        |  74 ++++++++++
 .../_distinctness-streaming-griffindsl.json     |  85 +++++++++++
 .../_profiling-batch-griffindsl-hive.json       |  48 +++++++
 .../resources/_profiling-batch-griffindsl.json  |  54 +++++++
 .../resources/_profiling-batch-sparksql.json    |  44 ++++++
 .../_profiling-streaming-griffindsl.json        |  75 ++++++++++
 .../_profiling-streaming-sparksql.json          |  80 +++++++++++
 .../resources/_timeliness-batch-griffindsl.json |  49 +++++++
 .../resources/_timeliness-batch-sparksql.json   |  52 +++++++
 .../_timeliness-streaming-griffindsl.json       |  79 +++++++++++
 .../_timeliness-streaming-sparksql.json         |  82 +++++++++++
 .../resources/_uniqueness-batch-griffindsl.json |  58 ++++++++
 .../_uniqueness-streaming-griffindsl.json       | 119 ++++++++++++++++
 .../_uniqueness-streaming-sparksql.json         | 130 +++++++++++++++++
 .../src/test/resources/config-griffindsl.json   |  56 --------
 .../resources/config-streaming-accuracy.json    | 121 ----------------
 .../src/test/resources/config-streaming.json    |  75 ----------
 measure/src/test/resources/config.json          |  71 ----------
 measure/src/test/resources/env-batch.json       |  38 +++++
 .../src/test/resources/env-streaming-mongo.json |  54 +++++++
 measure/src/test/resources/env.json             |  39 -----
 measure/src/test/resources/log4j.properties     |  25 ++++
 .../griffin/measure/ApplicationTest.scala       |   8 +-
 49 files changed, 2160 insertions(+), 483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/resources/config-batch.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-batch.json 
b/measure/src/main/resources/config-batch.json
new file mode 100644
index 0000000..10167cd
--- /dev/null
+++ b/measure/src/main/resources/config-batch.json
@@ -0,0 +1,56 @@
+{
+  "name": "accu_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_target.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "name": "accu",
+        "rule": "source.user_id = target.user_id AND upper(source.first_name) 
= upper(target.first_name) AND source.last_name = target.last_name AND 
source.address = target.address AND source.email = target.email AND 
source.phone = target.phone AND source.post_code = target.post_code",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "miss": "miss_count",
+          "total": "total_count",
+          "matched": "matched_count"
+        },
+        "metric": {
+          "name": "accu"
+        },
+        "record": {
+          "name": "missRecords"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/resources/config-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-streaming.json 
b/measure/src/main/resources/config-streaming.json
new file mode 100644
index 0000000..243a691
--- /dev/null
+++ b/measure/src/main/resources/config-streaming.json
@@ -0,0 +1,75 @@
+{
+  "name": "prof_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.147.177.107:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"],
+        "init.clear": true
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "prof",
+        "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as 
`min` from source",
+        "metric": {
+          "name": "prof"
+        }
+      },
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "grp",
+        "rule": "select name, count(*) as `cnt` from source group by name",
+        "metric": {
+          "name": "name_group",
+          "collect.type": "array"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/resources/env-batch.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/env-batch.json 
b/measure/src/main/resources/env-batch.json
new file mode 100644
index 0000000..3e8aa80
--- /dev/null
+++ b/measure/src/main/resources/env-batch.json
@@ -0,0 +1,38 @@
+{
+  "spark": {
+    "log.level": "WARN",
+    "config": {
+      "spark.master": "local[*]"
+    }
+  },
+
+  "persist": [
+    {
+      "type": "log",
+      "config": {
+        "max.log.lines": 10
+      }
+    },
+    {
+      "type": "hdfs",
+      "config": {
+        "path": "hdfs://localhost/griffin/batch/persist",
+        "max.persist.lines": 10000,
+        "max.lines.per.file": 10000
+      }
+    },
+    {
+      "type": "http",
+      "config": {
+        "method": "post",
+        "api": "http://10.148.181.248:39200/griffin/accuracy";,
+        "over.time": "1m",
+        "retry": 10
+      }
+    }
+  ],
+
+  "info.cache": [],
+
+  "cleaner": {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/resources/env-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/env-streaming.json 
b/measure/src/main/resources/env-streaming.json
new file mode 100644
index 0000000..6871bb9
--- /dev/null
+++ b/measure/src/main/resources/env-streaming.json
@@ -0,0 +1,62 @@
+{
+  "spark": {
+    "log.level": "WARN",
+    "checkpoint.dir": "hdfs://localhost/test/griffin/cp",
+    "batch.interval": "2s",
+    "process.interval": "10s",
+    "init.clear": true,
+    "config": {
+      "spark.master": "local[*]",
+      "spark.task.maxFailures": 5,
+      "spark.streaming.kafkaMaxRatePerPartition": 1000,
+      "spark.streaming.concurrentJobs": 4,
+      "spark.yarn.maxAppAttempts": 5,
+      "spark.yarn.am.attemptFailuresValidityInterval": "1h",
+      "spark.yarn.max.executor.failures": 120,
+      "spark.yarn.executor.failuresValidityInterval": "1h",
+      "spark.hadoop.fs.hdfs.impl.disable.cache": true
+    }
+  },
+
+  "persist": [
+    {
+      "type": "log",
+      "config": {
+        "max.log.lines": 100
+      }
+    },
+    {
+      "type": "hdfs",
+      "config": {
+        "path": "hdfs://localhost/griffin/streaming/persist",
+        "max.persist.lines": 10000,
+        "max.lines.per.file": 10000
+      }
+    },
+    {
+      "type": "http",
+      "config": {
+        "method": "post",
+        "api": "http://localhost:9200/griffin/accuracy";
+      }
+    }
+  ],
+
+  "info.cache": [
+    {
+      "type": "zk",
+      "config": {
+        "hosts": "localhost:2181",
+        "namespace": "griffin/infocache",
+        "lock.path": "lock",
+        "mode": "persist",
+        "init.clear": true,
+        "close.clear": false
+      }
+    }
+  ],
+
+  "cleaner": {
+    "clean.interval": "2m"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/Application.scala 
b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index 25dc34e..893ba2c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -22,7 +22,7 @@ import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.json.ParamReaderFactory
 import org.apache.griffin.measure.configuration.params.{AllParam, DQParam, 
EnvParam, Param}
 import org.apache.griffin.measure.configuration.validator.ParamValidator
-import org.apache.griffin.measure.context.writer.PersistThreadPool
+import org.apache.griffin.measure.context.writer.PersistTaskRunner
 import org.apache.griffin.measure.launch.DQApp
 import org.apache.griffin.measure.launch.batch.BatchDQApp
 import org.apache.griffin.measure.launch.streaming.StreamingDQApp
@@ -138,11 +138,9 @@ object Application extends Loggable {
   }
 
   private def startup(): Unit = {
-    PersistThreadPool.start
   }
 
   private def shutdown(): Unit = {
-    PersistThreadPool.shutdown
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala 
b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index e4b5046..43b61aa 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -62,12 +62,18 @@ case class DQContext(contextId: ContextId,
   }
 
   private val persistFactory = PersistFactory(persistParams, name)
-  private val defaultPersist: Persist = 
persistFactory.getPersists(contextId.timestamp)
+  private val defaultPersist: Persist = createPersist(contextId.timestamp)
   def getPersist(timestamp: Long): Persist = {
     if (timestamp == contextId.timestamp) getPersist()
-    else persistFactory.getPersists(timestamp)
+    else createPersist(timestamp)
   }
   def getPersist(): Persist = defaultPersist
+  private def createPersist(t: Long): Persist = {
+    procType match {
+      case BatchProcessType => persistFactory.getPersists(t, true)
+      case StreamingProcessType => persistFactory.getPersists(t, false)
+    }
+  }
 
   def cloneDQContext(newContextId: ContextId): DQContext = {
     DQContext(newContextId, name, dataSources, persistParams, 
procType)(sparkSession)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala 
b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
index d98952a..cec737f 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala
@@ -25,6 +25,10 @@ import scala.collection.mutable.{Map => MutableMap}
   */
 case class MetricWrapper(name: String) extends Serializable {
 
+  val _Name = "name"
+  val _Timestamp = "tmst"
+  val _Value = "value"
+
   val metrics: MutableMap[Long, Map[String, Any]] = MutableMap()
 
   def insertMetric(timestamp: Long, value: Map[String, Any]): Unit = {
@@ -39,9 +43,9 @@ case class MetricWrapper(name: String) extends Serializable {
     metrics.toMap.map { pair =>
       val (timestamp, value) = pair
       (timestamp, Map[String, Any](
-        ("name" -> name),
-        ("timestamp" -> timestamp),
-        ("value" -> value)
+        (_Name -> name),
+        (_Timestamp -> timestamp),
+        (_Value -> value)
       ))
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala
index 8f424e6..b6cabb3 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala
@@ -29,6 +29,8 @@ import org.apache.spark.rdd.RDD
   */
 case class HdfsPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
 
+  val block: Boolean = true
+
   val Path = "path"
   val MaxPersistLines = "max.persist.lines"
   val MaxLinesPerFile = "max.lines.per.file"
@@ -43,10 +45,6 @@ case class HdfsPersist(config: Map[String, Any], metricName: 
String, timeStamp:
 
   val LogFile = filePath("_LOG")
 
-  val _MetricName = "metricName"
-  val _Timestamp = "timestamp"
-  val _Value = "value"
-
   var _init = true
 
   def available(): Boolean = {
@@ -168,10 +166,8 @@ case class HdfsPersist(config: Map[String, Any], 
metricName: String, timeStamp:
   }
 
   def persistMetrics(metrics: Map[String, Any]): Unit = {
-    val head = Map[String, Any]((_MetricName -> metricName), (_Timestamp -> 
timeStamp))
-    val result = head + (_Value -> metrics)
     try {
-      val json = JsonUtil.toJson(result)
+      val json = JsonUtil.toJson(metrics)
       persistRecords2Hdfs(MetricsFile, json :: Nil)
     } catch {
       case e: Throwable => error(e.getMessage)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala
index 4c12652..a072fa5 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala
@@ -18,22 +18,29 @@ under the License.
 */
 package org.apache.griffin.measure.context.writer
 
-import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
 import org.apache.griffin.measure.utils.ParamUtil._
 import org.apache.spark.rdd.RDD
 
-import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
 
 /**
   * persist metric and record through http request
   */
-case class HttpPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
+case class HttpPersist(config: Map[String, Any], metricName: String,
+                       timeStamp: Long, block: Boolean
+                      ) extends Persist {
 
   val Api = "api"
   val Method = "method"
+  val OverTime = "over.time"
+  val Retry = "retry"
 
   val api = config.getString(Api, "")
   val method = config.getString(Method, "post")
+  val overTime = TimeUtil.milliseconds(config.getString(OverTime, 
"")).getOrElse(-1L)
+  val retry = config.getInt(Retry, 10)
 
   val _Value = "value"
 
@@ -55,7 +62,8 @@ case class HttpPersist(config: Map[String, Any], metricName: 
String, timeStamp:
         import scala.concurrent.ExecutionContext.Implicits.global
         (timeStamp, Future(HttpUtil.httpRequest(api, method, params, header, 
data)))
       }
-      PersistThreadPool.addTask(func _, 10)
+      if (block) PersistTaskRunner.addBlockTask(func _, retry, overTime)
+      else PersistTaskRunner.addNonBlockTask(func _, retry)
     } catch {
       case e: Throwable => error(e.getMessage)
     }
@@ -68,9 +76,7 @@ case class HttpPersist(config: Map[String, Any], metricName: 
String, timeStamp:
   def persistRecords(records: Iterable[String], name: String): Unit = {}
 
   def persistMetrics(metrics: Map[String, Any]): Unit = {
-    val head = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp))
-    val result = head + (_Value -> metrics)
-    httpResult(result)
+    httpResult(metrics)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala
index 3063faf..eff4c62 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala
@@ -27,6 +27,8 @@ import org.apache.spark.rdd.RDD
   */
 case class LoggerPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
 
+  val block: Boolean = true
+
   val MaxLogLines = "max.log.lines"
 
   val maxLogLines = config.getInt(MaxLogLines, 100)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala
index 3cfcf04..14b86e4 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala
@@ -19,6 +19,7 @@ under the License.
 package org.apache.griffin.measure.context.writer
 
 import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.TimeUtil
 import org.apache.spark.rdd.RDD
 import org.mongodb.scala._
 import org.mongodb.scala.model.{Filters, UpdateOptions, Updates}
@@ -29,10 +30,18 @@ import scala.concurrent.Future
 /**
   * persist metric and record to mongo
   */
-case class MongoPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
+case class MongoPersist(config: Map[String, Any], metricName: String,
+                        timeStamp: Long, block: Boolean
+                       ) extends Persist {
 
   MongoConnection.init(config)
 
+  val OverTime = "over.time"
+  val Retry = "retry"
+
+  val overTime = TimeUtil.milliseconds(config.getString(OverTime, 
"")).getOrElse(-1L)
+  val retry = config.getInt(Retry, 10)
+
   val _MetricName = "metricName"
   val _Timestamp = "timestamp"
   val _Value = "value"
@@ -63,7 +72,8 @@ case class MongoPersist(config: Map[String, Any], metricName: 
String, timeStamp:
         (timeStamp, MongoConnection.getDataCollection.updateOne(
           filter, update, UpdateOptions().upsert(true)).toFuture)
       }
-      PersistThreadPool.addTask(func _, 10)
+      if (block) PersistTaskRunner.addBlockTask(func _, retry, overTime)
+      else PersistTaskRunner.addNonBlockTask(func _, retry)
     } catch {
       case e: Throwable => error(e.getMessage)
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala
index ea9133a..4c7d0f6 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala
@@ -25,6 +25,8 @@ import org.apache.spark.rdd.RDD
   */
 case class MultiPersists(persists: Iterable[Persist]) extends Persist {
 
+  val block: Boolean = false
+
   val metricName: String = persists match {
     case Nil => ""
     case _ => persists.head.metricName

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala
index 28eeb64..8adbcc3 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala
@@ -30,6 +30,8 @@ trait Persist extends Loggable with Serializable {
 
   val config: Map[String, Any]
 
+  val block: Boolean
+
   def available(): Boolean
 
   def start(msg: String): Unit

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
index 12b5f0b..9314876 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
@@ -30,20 +30,23 @@ case class PersistFactory(persistParams: 
Iterable[PersistParam], metricName: Str
   val LOG_REGEX = """^(?i)log$""".r
   val MONGO_REGEX = """^(?i)mongo$""".r
 
-  def getPersists(timeStamp: Long): MultiPersists = {
-    MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param)))
-  }
-
   /**
-    * create persist by param
+    * create persist
+    * @param timeStamp    the timestamp of persist
+    * @param block        persist write metric in block or non-block way
+    * @return   persist
     */
-  private def getPersist(timeStamp: Long, persistParam: PersistParam): 
Option[Persist] = {
+  def getPersists(timeStamp: Long, block: Boolean): MultiPersists = {
+    MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param, 
block)))
+  }
+
+  private def getPersist(timeStamp: Long, persistParam: PersistParam, block: 
Boolean): Option[Persist] = {
     val config = persistParam.config
     val persistTry = persistParam.persistType match {
-      case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp))
-      case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp))
       case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp))
-      case MONGO_REGEX() => Try(MongoPersist(config, metricName, timeStamp))
+      case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp))
+      case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp, 
block))
+      case MONGO_REGEX() => Try(MongoPersist(config, metricName, timeStamp, 
block))
       case _ => throw new Exception("not supported persist type")
     }
     persistTry match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistTaskRunner.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistTaskRunner.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistTaskRunner.scala
new file mode 100644
index 0000000..af4f36b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistTaskRunner.scala
@@ -0,0 +1,101 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.writer
+
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.Loggable
+
+import scala.concurrent.duration._
+import scala.concurrent._
+import scala.util.{Failure, Success}
+
+/**
+  * persist task runner, to persist metrics in block or non-block mode
+  */
+object PersistTaskRunner extends Loggable {
+
+  import scala.concurrent.ExecutionContext.Implicits.global
+
+  val MAX_RETRY = 100
+
+  def addNonBlockTask(func: () => (Long, Future[_]), retry: Int): Unit = {
+    val r = validRetryNum(retry)
+    nonBlockExecute(func, r)
+  }
+
+  def addBlockTask(func: () => (Long, Future[_]), retry: Int, wait: Long): 
Unit = {
+    val r = validRetryNum(retry)
+    val duration = if (wait >= 0) Duration(wait, TimeUnit.MILLISECONDS) else 
Duration.Inf
+    blockExecute(func, r, duration)
+  }
+
+  private def nonBlockExecute(func: () => (Long, Future[_]), retry: Int): Unit 
= {
+    val nextRetry = nextRetryCount(retry)
+    val st = new Date().getTime
+    val (t, res) = func()
+    res.onComplete {
+      case Success(value) => {
+        val et = new Date().getTime
+        info(s"task ${t} success with (${value}) [ using time ${et - st} ms ]")
+      }
+      case Failure(e) => {
+        val et = new Date().getTime
+        warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}")
+        if (nextRetry >= 0) {
+          info(s"task ${t} retry [ rest retry count: ${nextRetry} ]")
+          nonBlockExecute(func, nextRetry)
+        } else {
+          error(s"task fails: task ${t} retry ends but fails")
+        }
+      }
+    }
+  }
+
+  private def blockExecute(func: () => (Long, Future[_]), retry: Int, 
waitDuration: Duration): Unit = {
+    val nextRetry = nextRetryCount(retry)
+    val st = new Date().getTime
+    val (t, res) = func()
+    try {
+      val value = Await.result(res, waitDuration)
+      val et = new Date().getTime
+      info(s"task ${t} success with (${value}) [ using time ${et - st} ms ]")
+    } catch {
+      case e: Throwable => {
+        val et = new Date().getTime
+        warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}")
+        if (nextRetry >= 0) {
+          info(s"task ${t} retry [ rest retry count: ${nextRetry} ]")
+          blockExecute(func, nextRetry, waitDuration)
+        } else {
+          error(s"task fails: task ${t} retry ends but fails")
+        }
+      }
+    }
+  }
+
+  private def validRetryNum(retry: Int): Int = {
+    if (retry > MAX_RETRY) MAX_RETRY else retry
+  }
+  private def nextRetryCount(retry: Int): Int = {
+    if (retry >= 0) retry - 1 else -1
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala
deleted file mode 100644
index 221fcad..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.writer
-
-import java.util.Date
-import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-
-import org.apache.griffin.measure.Loggable
-
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-/**
-  * persist thread pool, to persist metrics in parallel mode
-  */
-object PersistThreadPool extends Loggable {
-
-  import scala.concurrent.ExecutionContext.Implicits.global
-
-  private val pool: ThreadPoolExecutor = 
Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
-  val MAX_RETRY = 100
-
-  def start(): Unit = {
-  }
-
-  def shutdown(): Unit = {
-    pool.shutdown()
-    pool.awaitTermination(10, TimeUnit.SECONDS)
-  }
-
-  def addTask(func: () => (Long, Future[_]), retry: Int): Unit = {
-    val r = if (retry < 0) MAX_RETRY else retry
-    info(s"add task, current task num: ${pool.getQueue.size}")
-    pool.submit(Task(func, r))
-  }
-
-  case class Task(func: () => (Long, Future[_]), retry: Int) extends Runnable 
with Loggable {
-
-    override def run(): Unit = {
-      val st = new Date().getTime
-      val (t, res) = func()
-      res.onComplete {
-        case Success(value) => {
-          val et = new Date().getTime
-          info(s"task ${t} success [ using time ${et - st} ms ]")
-        }
-        case Failure(e) => {
-          val et = new Date().getTime
-          warn(s"task ${t} fails [ using time ${et - st} ms ] : 
${e.getMessage}")
-          if (retry > 0) {
-            info(s"task ${t} retry [ rest retry count: ${retry - 1} ]")
-            pool.submit(Task(func, retry - 1))
-          } else {
-            fail(s"task ${t} retry ends but fails")
-          }
-        }
-      }
-    }
-
-    def fail(msg: String): Unit = {
-      error(s"task fails: ${msg}")
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 40c9b05..6b7944d 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -36,7 +36,10 @@ case class MetricFlushStep() extends WriteStep {
         context.getPersist(t).persistMetrics(metric)
         true
       } catch {
-        case e: Throwable => false
+        case e: Throwable => {
+          error(s"flush metrics error: ${e.getMessage}")
+          false
+        }
       }
       ret && pr
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index c29c072..2f34d63 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -46,7 +46,6 @@ case class MetricWriteStep(name: String,
     val writeMode = writeTimestampOpt.map(_ => 
SimpleMode).getOrElse(context.writeMode)
     val timestampMetricMap: Map[Long, Map[String, Any]] = writeMode match {
       case SimpleMode => {
-        println(metricMaps)
         val metrics: Map[String, Any] = normalizeMetric(metricMaps, name, 
collectType)
         emptyMetricMap + (timestamp -> metrics)
       }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_accuracy-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json 
b/measure/src/test/resources/_accuracy-batch-griffindsl.json
new file mode 100644
index 0000000..10167cd
--- /dev/null
+++ b/measure/src/test/resources/_accuracy-batch-griffindsl.json
@@ -0,0 +1,56 @@
+{
+  "name": "accu_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_target.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "name": "accu",
+        "rule": "source.user_id = target.user_id AND upper(source.first_name) 
= upper(target.first_name) AND source.last_name = target.last_name AND 
source.address = target.address AND source.email = target.email AND 
source.phone = target.phone AND source.post_code = target.post_code",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "miss": "miss_count",
+          "total": "total_count",
+          "matched": "matched_count"
+        },
+        "metric": {
+          "name": "accu"
+        },
+        "record": {
+          "name": "missRecords"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_accuracy-batch-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-batch-sparksql.json 
b/measure/src/test/resources/_accuracy-batch-sparksql.json
new file mode 100644
index 0000000..2eef9f1
--- /dev/null
+++ b/measure/src/test/resources/_accuracy-batch-sparksql.json
@@ -0,0 +1,63 @@
+{
+  "name": "accu_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_target.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "spark-sql",
+        "name": "missRecords",
+        "rule": "SELECT source.* FROM source LEFT JOIN target ON 
coalesce(source.user_id, '') = coalesce(target.user_id, '') AND 
coalesce(source.first_name, '') = coalesce(target.first_name, '') AND 
coalesce(source.post_code, '') = coalesce(target.post_code, '') WHERE (NOT 
(source.user_id IS NULL AND source.first_name IS NULL AND source.post_code IS 
NULL)) AND (target.user_id IS NULL AND target.first_name IS NULL AND 
target.post_code IS NULL)",
+        "record": {
+          "name": "miss"
+        }
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "miss_count",
+        "rule": "SELECT count(*) as miss FROM `missRecords`"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "total_count",
+        "rule": "SELECT count(*) as total FROM source"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "accu",
+        "rule": "SELECT `total_count`.`total` AS `total`, 
coalesce(`miss_count`.`miss`, 0) AS `miss`, (`total` - `miss`) AS `matched` 
FROM `total_count` FULL JOIN `miss_count`",
+        "metric": {
+          "name": "accu"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_accuracy-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json 
b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
new file mode 100644
index 0000000..240d768
--- /dev/null
+++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
@@ -0,0 +1,121 @@
+{
+  "name": "accu_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.147.177.107:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "type": "parquet",
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-2m", "0"],
+        "init.clear": true,
+        "updatable": true
+      }
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.147.177.107:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "ttt",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${t1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${t1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "type": "parquet",
+        "file.path": "hdfs://localhost/griffin/streaming/dump/target",
+        "info.path": "target",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-2m", "0"],
+        "init.clear": true
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "accuracy",
+        "name": "accu",
+        "rule": "source.name = target.name and source.age = target.age",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "miss": "miss_count",
+          "total": "total_count",
+          "matched": "matched_count"
+        },
+        "metric": {
+          "name": "accu"
+        },
+        "record": {
+          "name": "missRecords"
+        }
+      }
+    ]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_accuracy-streaming-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-streaming-sparksql.json 
b/measure/src/test/resources/_accuracy-streaming-sparksql.json
new file mode 100644
index 0000000..0824cb8
--- /dev/null
+++ b/measure/src/test/resources/_accuracy-streaming-sparksql.json
@@ -0,0 +1,142 @@
+{
+  "name": "accu_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-2m", "0"]
+      }
+    }, {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "ttt",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${t1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${t1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/target",
+        "info.path": "target",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-2m", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "spark-sql",
+        "name": "missRecords",
+        "cache": true,
+        "rule": "SELECT source.* FROM source LEFT JOIN target ON 
coalesce(source.name, '') = coalesce(target.name, '') AND coalesce(source.age, 
'') = coalesce(target.age, '') WHERE (NOT (source.name IS NULL AND source.age 
IS NULL)) AND (target.name IS NULL AND target.age IS NULL)"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "miss_count",
+        "rule": "SELECT `__tmst`, count(*) as miss FROM `missRecords` GROUP BY 
`__tmst`"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "total_count",
+        "rule": "SELECT `__tmst`, count(*) as total FROM source GROUP BY 
`__tmst`"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "accu",
+        "rule": "SELECT `total_count`.`__tmst` AS `__tmst`, 
`total_count`.`total` AS `total`, coalesce(`miss_count`.`miss`, 0) AS `miss` 
FROM `total_count` FULL JOIN `miss_count` ON `total_count`.`__tmst` = 
`miss_count`.`__tmst`"
+      },
+      {
+        "dsl.type": "df-opr",
+        "name": "metric_accu",
+        "rule": "accuracy",
+        "details": {
+          "df.name": "accu",
+          "miss": "miss",
+          "total": "total",
+          "matched": "matched"
+        },
+        "metric": {
+          "name": "accuracy"
+        }
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "accu_miss_records",
+        "rule": "SELECT `__tmst`, `__empty` FROM `metric_accu` WHERE 
`__record`",
+        "record": {
+          "name": "missRecords",
+          "data.source.cache": "source",
+          "origin.DF": "missRecords"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_completeness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json 
b/measure/src/test/resources/_completeness-batch-griffindsl.json
new file mode 100644
index 0000000..9c00444
--- /dev/null
+++ b/measure/src/test/resources/_completeness-batch-griffindsl.json
@@ -0,0 +1,36 @@
+{
+  "name": "comp_batch",
+
+  "process.type": "batch",
+
+  "timestamp": 123456,
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "completeness",
+        "name": "comp",
+        "rule": "email, post_code, first_name",
+        "metric": {
+          "name": "comp"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_completeness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json 
b/measure/src/test/resources/_completeness-streaming-griffindsl.json
new file mode 100644
index 0000000..ba8bdce
--- /dev/null
+++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json
@@ -0,0 +1,65 @@
+{
+  "name": "comp_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.147.177.107:9092",
+              "group.id": "source",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "test",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"],
+        "init.clear": true
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "completeness",
+        "name": "comp",
+        "rule": "name, age",
+        "metric": {
+          "name": "comp"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_distinctness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json 
b/measure/src/test/resources/_distinctness-batch-griffindsl.json
new file mode 100644
index 0000000..af0c91e
--- /dev/null
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl.json
@@ -0,0 +1,57 @@
+{
+  "name": "dist_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    },
+    {
+      "name": "target",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "distinct",
+        "name": "dist",
+        "rule": "user_id",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "total": "total",
+          "distinct": "distinct",
+          "dup": "dup",
+          "num": "num",
+          "duplication.array": "dup"
+        },
+        "metric": {
+          "name": "distinct"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_distinctness-batch-griffindsl1.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl1.json 
b/measure/src/test/resources/_distinctness-batch-griffindsl1.json
new file mode 100644
index 0000000..4d94d8e
--- /dev/null
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl1.json
@@ -0,0 +1,73 @@
+{
+  "name": "dist_batch",
+
+  "process.type": "batch",
+
+  "timestamp": 123456,
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/dupdata.avro"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${this}"
+            }
+          ]
+        }
+      ]
+    },
+    {
+      "name": "target",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/dupdata.avro"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select DISTINCT name, age from ${this}"
+            }
+          ]
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "distinct",
+        "name": "dist",
+        "rule": "name",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "total": "total",
+          "distinct": "distinct",
+          "dup": "dup",
+          "num": "num",
+          "duplication.array": "dup"
+        },
+        "metric": {
+          "name": "distinct"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_distinctness-batch-griffindsl2.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl2.json 
b/measure/src/test/resources/_distinctness-batch-griffindsl2.json
new file mode 100644
index 0000000..6a12719
--- /dev/null
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl2.json
@@ -0,0 +1,74 @@
+{
+  "name": "dist_batch",
+
+  "process.type": "batch",
+
+  "timestamp": 123456,
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/dupdata.avro"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${this}"
+            }
+          ]
+        }
+      ]
+    },
+    {
+      "name": "target",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/dupdata.avro"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select DISTINCT name, age from ${this}"
+            }
+          ]
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "distinct",
+        "name": "dist",
+        "rule": "name, [age]",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "total": "total",
+          "distinct": "distinct",
+          "dup": "dup",
+          "num": "num",
+          "duplication.array": "dup",
+          "record.enable": true
+        },
+        "metric": {
+          "name": "distinct"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_distinctness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-streaming-griffindsl.json 
b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
new file mode 100644
index 0000000..c36e7ba
--- /dev/null
+++ b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
@@ -0,0 +1,85 @@
+{
+  "name": "dist_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "new",
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+        "info.path": "new",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"],
+        "read.only": true
+      }
+    },
+    {
+      "name": "old",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "old",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "ttt",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+        "info.path": "old",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-24h", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "distinct",
+        "name": "dist",
+        "rule": "name, age",
+        "details": {
+          "source": "new",
+          "target": "old",
+          "total": "total",
+          "distinct": "distinct",
+          "dup": "dup",
+          "accu_dup": "accu_dup",
+          "num": "num",
+          "duplication.array": "dup"
+        },
+        "metric": {
+          "name": "distinct"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json 
b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
new file mode 100644
index 0000000..03b0405
--- /dev/null
+++ b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
@@ -0,0 +1,48 @@
+{
+  "name": "prof_batch",
+
+  "process.type": "batch",
+
+  "timestamp": 123456,
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "hive",
+          "version": "1.2",
+          "config": {
+            "database": "default",
+            "table.name": "s1"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "prof",
+        "rule": "name, count(*) as cnt from source group by name",
+        "metric": {
+          "name": "name_group",
+          "collect.type": "array"
+        }
+      },
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "grp",
+        "rule": "age, count(*) as cnt from source group by age order by cnt",
+        "metric": {
+          "name": "age_group",
+          "collect.type": "array"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_profiling-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json 
b/measure/src/test/resources/_profiling-batch-griffindsl.json
new file mode 100644
index 0000000..ec082c4
--- /dev/null
+++ b/measure/src/test/resources/_profiling-batch-griffindsl.json
@@ -0,0 +1,54 @@
+{
+  "name": "prof_batch",
+
+  "process.type": "batch",
+
+  "timestamp": 123456,
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select reg_replace(email, 
'^([^@0-9]+)([0-9]+)@(dc)(?:\\\\.[^@]+)$', '$1@$3') as email, post_code from 
${this}"
+            }
+          ]
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "prof",
+        "rule": "email, count(*) as cnt from source group by email",
+        "metric": {
+          "name": "prof",
+          "collect.type": "array"
+        }
+      },
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "grp",
+        "rule": "source.post_code, count(*) as cnt from source group by 
source.post_code order by cnt desc",
+        "metric": {
+          "name": "post_group",
+          "collect.type": "array"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_profiling-batch-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-batch-sparksql.json 
b/measure/src/test/resources/_profiling-batch-sparksql.json
new file mode 100644
index 0000000..fdfd812
--- /dev/null
+++ b/measure/src/test/resources/_profiling-batch-sparksql.json
@@ -0,0 +1,44 @@
+{
+  "name": "prof_batch",
+
+  "process.type": "batch",
+
+  "timestamp": 123456,
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "spark-sql",
+        "name": "prof",
+        "rule": "select count(*) as `cnt`, count(distinct `post_code`) as 
`dis-cnt`, max(user_id) as `max` from source",
+        "metric": {
+          "name": "prof"
+        }
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "grp",
+        "rule": "select post_code as `pc`, count(*) as `cnt` from source group 
by post_code",
+        "metric": {
+          "name": "post_group",
+          "collect.type": "array"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_profiling-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json 
b/measure/src/test/resources/_profiling-streaming-griffindsl.json
new file mode 100644
index 0000000..b6feb5a
--- /dev/null
+++ b/measure/src/test/resources/_profiling-streaming-griffindsl.json
@@ -0,0 +1,75 @@
+{
+  "name": "prof_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.147.177.107:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "test",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"],
+        "init.clear": true
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "prof",
+        "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as 
`min` from source",
+        "metric": {
+          "name": "prof"
+        }
+      },
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "profiling",
+        "name": "grp",
+        "rule": "select name, count(*) as `cnt` from source group by name",
+        "metric": {
+          "name": "name_group",
+          "collect.type": "array"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_profiling-streaming-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-streaming-sparksql.json 
b/measure/src/test/resources/_profiling-streaming-sparksql.json
new file mode 100644
index 0000000..4f0b0ee
--- /dev/null
+++ b/measure/src/test/resources/_profiling-streaming-sparksql.json
@@ -0,0 +1,80 @@
+{
+  "name": "prof_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "spark-sql",
+        "name": "prof",
+        "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as 
`min` from source",
+        "metric": {
+          "name": "prof"
+        }
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "grp",
+        "rule": "select name, count(*) as `cnt` from source group by name",
+        "metric": {
+          "name": "name_group",
+          "collect.type": "array"
+        }
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "tmst_grp",
+        "rule": "select `__tmst`, count(*) as `cnt` from source group by 
`__tmst`",
+        "metric": {
+          "name": "tmst_group"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_timeliness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json 
b/measure/src/test/resources/_timeliness-batch-griffindsl.json
new file mode 100644
index 0000000..90439df
--- /dev/null
+++ b/measure/src/test/resources/_timeliness-batch-griffindsl.json
@@ -0,0 +1,49 @@
+{
+  "name": "timeliness_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/timeliness_data.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "timeliness",
+        "name": "timeliness",
+        "rule": "ts, end_ts",
+        "details": {
+          "source": "source",
+          "latency": "latency",
+          "total": "total",
+          "avg": "avg",
+          "threshold": "3m",
+          "step": "step",
+          "count": "cnt",
+          "step.size": "2m",
+          "percentile": "percentile",
+          "percentile.values": [0.95]
+        },
+        "metric": {
+          "name": "timeliness"
+        },
+        "record": {
+          "name": "lateRecords"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_timeliness-batch-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-batch-sparksql.json 
b/measure/src/test/resources/_timeliness-batch-sparksql.json
new file mode 100644
index 0000000..f9cb368
--- /dev/null
+++ b/measure/src/test/resources/_timeliness-batch-sparksql.json
@@ -0,0 +1,52 @@
+{
+  "name": "timeliness_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/timeliness_data.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "spark-sql",
+        "name": "in_time",
+        "rule": "select *, (ts) as `_in_ts`, (end_ts) as `_out_ts` from source 
where (ts) IS NOT NULL"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "lat",
+        "cache": true,
+        "rule": "select *, (`_out_ts` - `_in_ts`) as `latency` from `in_time`"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "metric",
+        "rule": "select cast(avg(`latency`) as bigint) as `avg`, 
max(`latency`) as `max`, min(`latency`) as `min` from `lat`",
+        "metric": {
+          "name": "timeliness"
+        }
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "slows",
+        "rule": "select * from `lat` where `latency` > 60000",
+        "record": {
+          "name": "lateRecords"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_timeliness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json 
b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
new file mode 100644
index 0000000..5916e5c
--- /dev/null
+++ b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
@@ -0,0 +1,79 @@
+{
+  "name": "timeliness_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "fff",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select ts, end_ts, name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "timeliness",
+        "name": "timeliness",
+        "rule": "ts, end_ts",
+        "details": {
+          "source": "source",
+          "latency": "latency",
+          "total": "total",
+          "avg": "avg",
+          "threshold": "1h",
+          "step": "step",
+          "count": "cnt",
+          "step.size": "5m",
+          "percentile": "percentile",
+          "percentile.values": [0.2, 0.5, 0.8]
+        },
+        "metric": {
+          "name": "timeliness"
+        },
+        "record": {
+          "name": "lateRecords"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_timeliness-streaming-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-streaming-sparksql.json 
b/measure/src/test/resources/_timeliness-streaming-sparksql.json
new file mode 100644
index 0000000..dc736ab
--- /dev/null
+++ b/measure/src/test/resources/_timeliness-streaming-sparksql.json
@@ -0,0 +1,82 @@
+{
+  "name": "timeliness_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "group1",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "fff",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select ts, name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "spark-sql",
+        "name": "in_time",
+        "rule": "select *, (ts) as `_in_ts` from source where (ts) IS NOT NULL"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "lat",
+        "cache": true,
+        "rule": "select *, (`__tmst` - `_in_ts`) as `latency` from `in_time`"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "metric",
+        "rule": "select `__tmst`, cast(avg(`latency`) as bigint) as `avg`, 
max(`latency`) as `max`, min(`latency`) as `min` from `lat`",
+        "metric": {
+          "name": "timeliness"
+        }
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "slows",
+        "rule": "select * from `lat` where `latency` > 60000",
+        "record": {
+          "name": "lateRecords"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_uniqueness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_uniqueness-batch-griffindsl.json 
b/measure/src/test/resources/_uniqueness-batch-griffindsl.json
new file mode 100644
index 0000000..28009e8
--- /dev/null
+++ b/measure/src/test/resources/_uniqueness-batch-griffindsl.json
@@ -0,0 +1,58 @@
+{
+  "name": "unique_batch",
+
+  "process.type": "batch",
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    },
+    {
+      "name": "target",
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/users_info_src.avro"
+          }
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "uniqueness",
+        "name": "dup",
+        "rule": "user_id",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "total": "total",
+          "unique": "unique",
+          "dup": "dup",
+          "num": "num"
+        },
+        "metric": {
+          "name": "unique"
+        },
+        "record": {
+          "name": "dupRecords"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json 
b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
new file mode 100644
index 0000000..bc5cbd2
--- /dev/null
+++ b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json
@@ -0,0 +1,119 @@
+{
+  "name": "unique_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "new",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "new",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "ttt",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/new",
+        "info.path": "new",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"]
+      }
+    },
+    {
+      "name": "old",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "old",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "ttt",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+        "info.path": "old",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-24h", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "uniqueness",
+        "name": "dup",
+        "rule": "name, age",
+        "details": {
+          "source": "new",
+          "target": "old",
+          "total": "total",
+          "unique": "unique",
+          "dup": "dup",
+          "num": "num",
+          "duplication.array": "dup"
+        },
+        "metric": {
+          "name": "unique"
+        },
+        "record": {
+          "name": "dupRecords"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_uniqueness-streaming-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_uniqueness-streaming-sparksql.json 
b/measure/src/test/resources/_uniqueness-streaming-sparksql.json
new file mode 100644
index 0000000..7d13215
--- /dev/null
+++ b/measure/src/test/resources/_uniqueness-streaming-sparksql.json
@@ -0,0 +1,130 @@
+{
+  "name": "unique_streaming",
+
+  "process.type": "streaming",
+
+  "data.sources": [
+    {
+      "name": "new",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "new",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/new",
+        "info.path": "new",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["0", "0"]
+      }
+    },
+    {
+      "name": "old",
+      "connectors": [
+        {
+          "type": "kafka",
+          "version": "0.8",
+          "config": {
+            "kafka.config": {
+              "bootstrap.servers": "10.149.247.156:9092",
+              "group.id": "old",
+              "auto.offset.reset": "smallest",
+              "auto.commit.enable": "false"
+            },
+            "topics": "sss",
+            "key.type": "java.lang.String",
+            "value.type": "java.lang.String"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "df-opr",
+              "name": "${s1}",
+              "rule": "from_json",
+              "details": {
+                "df.name": "${this}"
+              }
+            },
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${s1}"
+            }
+          ]
+        }
+      ],
+      "cache": {
+        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
+        "info.path": "old",
+        "ready.time.interval": "10s",
+        "ready.time.delay": "0",
+        "time.range": ["-24h", "0"]
+      }
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "spark-sql",
+        "name": "dist",
+        "rule": "SELECT DISTINCT * FROM new"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "joined",
+        "rule": "SELECT dist.* FROM old RIGHT JOIN dist ON coalesce(old.name, 
'') = coalesce(dist.name, '') AND coalesce(old.age, '') = coalesce(dist.age, 
'')"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "grouped",
+        "rule": "SELECT `__tmst`, `name`, `age`, count(*) as `dup_cnt` FROM 
joined GROUP BY `__tmst`, `name`, `age`"
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "dupRecs",
+        "cache": true,
+        "rule": "SELECT * FROM grouped WHERE `dup_cnt` > 1",
+        "record": {
+          "name": "dupRecords"
+        }
+      },
+      {
+        "dsl.type": "spark-sql",
+        "name": "dupMetric",
+        "rule": "SELECT `__tmst`, `dup_cnt`, count(*) as `item_cnt` FROM 
dupRecs GROUP BY `__tmst`, `dup_cnt`",
+        "metric": {
+          "name": "dup"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file


Reply via email to