Modify measure module to support updated env and config json files format

Some important modification in env.json
- "info.cache" -> "griffin.checkpoint"
- "persist" -> "sinks"
- "log" -> "console"
- "http" -> "elasticsearch"
- remove "cleaner"

Some important modiffication in dq.json
- add "baseline" in data source
- add "dataframe.name" in data connector
- in data source,"cache" -> "checkpoint"
- in rule and pre-proc rule, add "in.dataframe.name" and "out.dataframe.name", 
remove "name"
- in rule, add "out" param array, move "metric", "record" param inside "out" 
array
- add "sinks" string array as filter of sinks in env.json

Author: Lionel Liu <bhlx3l...@163.com>

Closes #382 from bhlx3lyx7/json-update.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/23ff999c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/23ff999c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/23ff999c

Branch: refs/heads/master
Commit: 23ff999cd7d5f97063cc3079c77d9413b5d4b7ec
Parents: fda8222
Author: Lionel Liu <bhlx3l...@163.com>
Authored: Fri Aug 10 18:16:30 2018 +0800
Committer: William Guo <gu...@apache.org>
Committed: Fri Aug 10 18:16:30 2018 +0800

----------------------------------------------------------------------
 .../main/resources/config-batch-advanced.json   |   8 +-
 measure/src/main/resources/config-batch.json    |   2 +-
 .../src/main/resources/config-streaming.json    |  12 +-
 measure/src/main/resources/env-batch.json       |   4 +-
 measure/src/main/resources/env-streaming.json   |   6 +-
 .../configuration/dqdefinition/DQConfig.scala   | 128 ++++++-----
 .../configuration/dqdefinition/EnvConfig.scala  |  50 ++---
 .../measure/configuration/enums/DslType.scala   |   4 +-
 .../configuration/enums/FlattenType.scala       |  88 ++++++++
 .../configuration/enums/NormalizeType.scala     |  87 --------
 .../configuration/enums/OutputType.scala        |  77 +++++++
 .../measure/configuration/enums/SinkType.scala  |  83 +++++++
 .../griffin/measure/context/DQContext.scala     |  42 ++--
 .../checkpoint/lock/CheckpointLock.scala        |  34 +++
 .../checkpoint/lock/CheckpointLockInZK.scala    |  53 +++++
 .../checkpoint/lock/CheckpointLockSeq.scala     |  33 +++
 .../checkpoint/offset/OffsetCheckpoint.scala    |  39 ++++
 .../offset/OffsetCheckpointClient.scala         |  54 +++++
 .../offset/OffsetCheckpointFactory.scala        |  39 ++++
 .../offset/OffsetCheckpointInZK.scala           | 214 +++++++++++++++++++
 .../streaming/checkpoint/offset/OffsetOps.scala | 125 +++++++++++
 .../context/streaming/lock/CacheLock.scala      |  34 ---
 .../context/streaming/lock/CacheLockInZK.scala  |  53 -----
 .../context/streaming/lock/CacheLockSeq.scala   |  33 ---
 .../context/streaming/offset/OffsetCache.scala  |  39 ----
 .../streaming/offset/OffsetCacheClient.scala    |  54 -----
 .../streaming/offset/OffsetCacheFactory.scala   |  39 ----
 .../streaming/offset/OffsetCacheInZK.scala      | 214 -------------------
 .../context/streaming/offset/OffsetOps.scala    | 125 -----------
 .../griffin/measure/datasource/DataSource.scala |   2 +
 .../measure/datasource/DataSourceFactory.scala  |   2 +-
 .../datasource/cache/StreamingCacheClient.scala |  12 +-
 .../cache/StreamingCacheClientFactory.scala     |  16 +-
 .../cache/StreamingOffsetCacheable.scala        |  26 +--
 .../datasource/connector/DataConnector.scala    |   9 +-
 .../apache/griffin/measure/launch/DQApp.scala   |   9 +-
 .../measure/launch/batch/BatchDQApp.scala       |  10 +-
 .../launch/streaming/StreamingDQApp.scala       |  28 +--
 .../griffin/measure/sink/ConsoleSink.scala      |   8 +-
 .../measure/sink/ElasticSearchSink.scala        |  81 +++++++
 .../apache/griffin/measure/sink/HdfsSink.scala  |  20 +-
 .../apache/griffin/measure/sink/HttpSink.scala  |  81 -------
 .../apache/griffin/measure/sink/MongoSink.scala |   8 +-
 .../griffin/measure/sink/MultiSinks.scala       |  40 ++--
 .../org/apache/griffin/measure/sink/Sink.scala  |   8 +-
 .../griffin/measure/sink/SinkFactory.scala      |  43 ++--
 .../griffin/measure/sink/SinkTaskRunner.scala   |   2 +-
 .../builder/DataFrameOpsDQStepBuilder.scala     |   5 +-
 .../step/builder/GriffinDslDQStepBuilder.scala  |   4 +-
 .../step/builder/RuleParamStepBuilder.scala     |  14 +-
 .../step/builder/SparkSqlDQStepBuilder.scala    |   2 +-
 .../dsl/transform/AccuracyExpr2DQSteps.scala    |  29 ++-
 .../transform/CompletenessExpr2DQSteps.scala    |  10 +-
 .../transform/DistinctnessExpr2DQSteps.scala    |  14 +-
 .../dsl/transform/ProfilingExpr2DQSteps.scala   |  12 +-
 .../dsl/transform/TimelinessExpr2DQSteps.scala  |  16 +-
 .../dsl/transform/UniquenessExpr2DQSteps.scala  |   8 +-
 .../builder/preproc/PreProcParamMaker.scala     |  67 ++++++
 .../preproc/PreProcRuleParamGenerator.scala     |  72 -------
 .../measure/step/transform/DataFrameOps.scala   |  23 +-
 .../transform/DataFrameOpsTransformStep.scala   |   7 +-
 .../step/write/DataSourceUpdateWriteStep.scala  |   4 +-
 .../measure/step/write/MetricFlushStep.scala    |   2 +-
 .../measure/step/write/MetricWriteStep.scala    |  16 +-
 .../measure/step/write/RecordWriteStep.scala    |  16 +-
 .../resources/_accuracy-batch-griffindsl.json   |  18 +-
 .../resources/_accuracy-batch-sparksql.json     |  63 ------
 .../_accuracy-streaming-griffindsl.json         |  56 ++---
 .../resources/_accuracy-streaming-sparksql.json | 142 ------------
 .../_completeness-batch-griffindsl.json         |  15 +-
 .../_completeness-streaming-griffindsl.json     |  32 +--
 .../_distinctness-batch-griffindsl.json         |  15 +-
 .../_distinctness-batch-griffindsl1.json        |  73 -------
 .../_distinctness-batch-griffindsl2.json        |  74 -------
 .../_distinctness-streaming-griffindsl.json     |  34 +--
 .../_profiling-batch-griffindsl-hive.json       |  30 ++-
 .../resources/_profiling-batch-griffindsl.json  |  36 ++--
 .../resources/_profiling-batch-sparksql.json    |  28 ++-
 .../_profiling-streaming-griffindsl.json        |  45 ++--
 .../_profiling-streaming-sparksql.json          |  80 -------
 .../resources/_timeliness-batch-griffindsl.json |  22 +-
 .../resources/_timeliness-batch-sparksql.json   |  52 -----
 .../_timeliness-streaming-griffindsl.json       |  39 ++--
 .../_timeliness-streaming-sparksql.json         |  82 -------
 .../resources/_uniqueness-batch-griffindsl.json |  22 +-
 .../_uniqueness-streaming-griffindsl.json       |  56 ++---
 .../_uniqueness-streaming-sparksql.json         | 130 -----------
 measure/src/test/resources/env-batch.json       |  16 +-
 .../src/test/resources/env-streaming-mongo.json |  12 +-
 measure/src/test/resources/env-streaming.json   |  12 +-
 .../reader/ParamFileReaderSpec.scala            |   6 +-
 .../reader/ParamJsonReaderSpec.scala            |   6 +-
 92 files changed, 1600 insertions(+), 2095 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/resources/config-batch-advanced.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-batch-advanced.json 
b/measure/src/main/resources/config-batch-advanced.json
index 96ae245..9e98efa 100644
--- a/measure/src/main/resources/config-batch-advanced.json
+++ b/measure/src/main/resources/config-batch-advanced.json
@@ -6,7 +6,7 @@
   "data.sources": [
     {
       "name": "source",
-      "as.baseline": true,
+      "baseline": true,
       "connectors": [
         {
           "type": "avro",
@@ -37,7 +37,7 @@
         "dq.type": "accuracy",
         "out.dataframe.name": "accu",
         "rule": "source.user_id = target.user_id AND upper(source.first_name) 
= upper(target.first_name) AND source.last_name = target.last_name AND 
source.address = target.address AND source.email = target.email AND 
source.phone = target.phone AND source.post_code = target.post_code",
-        "alias": {
+        "details": {
           "source": "source",
           "target": "target",
           "miss": "miss_count",
@@ -46,11 +46,11 @@
         },
         "out":[
           {
-            "type":"metric",
+            "type": "metric",
             "name": "accu"
           },
           {
-            "type":"record",
+            "type": "record",
             "name": "missRecords"
           }
         ]

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/resources/config-batch.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-batch.json 
b/measure/src/main/resources/config-batch.json
index f1b03fb..d2257a0 100644
--- a/measure/src/main/resources/config-batch.json
+++ b/measure/src/main/resources/config-batch.json
@@ -6,7 +6,7 @@
   "data.sources": [
     {
       "name": "source",
-      "as.baseline": true,
+      "baseline": true,
       "connectors": [
         {
           "type": "avro",

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/resources/config-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/config-streaming.json 
b/measure/src/main/resources/config-streaming.json
index 15a91ac..a7c47e8 100644
--- a/measure/src/main/resources/config-streaming.json
+++ b/measure/src/main/resources/config-streaming.json
@@ -6,6 +6,7 @@
   "data.sources": [
     {
       "name": "source",
+      "baseline": true,
       "connectors": [
         {
           "type": "kafka",
@@ -24,11 +25,10 @@
           },
           "pre.proc": [
             {
-              "dsl.type": "df-opr",
+              "dsl.type": "df-ops",
               "in.dataframe.name": "kafka",
               "out.dataframe.name": "out1",
               "rule": "from_json"
-
             },
             {
               "dsl.type": "spark-sql",
@@ -71,13 +71,9 @@
         "rule": "select name, count(*) as `cnt` from source group by name",
         "out":[
           {
-            "type": "array",
+            "type": "metric",
             "name": "name_group",
-            "flatten":"array"
-          },
-          {
-            "type": "record",
-            "name": "missRecords"
+            "flatten": "array"
           }
         ]
       }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/resources/env-batch.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/env-batch.json 
b/measure/src/main/resources/env-batch.json
index 024ad80..bed6ed8 100644
--- a/measure/src/main/resources/env-batch.json
+++ b/measure/src/main/resources/env-batch.json
@@ -32,7 +32,5 @@
     }
   ],
 
-  "info.cache": [],
-
-  "cleaner": {}
+  "griffin.checkpoint": []
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/resources/env-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/main/resources/env-streaming.json 
b/measure/src/main/resources/env-streaming.json
index 83ff6ab..f5e303c 100644
--- a/measure/src/main/resources/env-streaming.json
+++ b/measure/src/main/resources/env-streaming.json
@@ -54,9 +54,5 @@
         "close.clear": false
       }
     }
-  ],
-
-  "cleaner": {
-    "clean.interval": "2m"
-  }
+  ]
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
index 8d69377..79a43c1 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala
@@ -25,18 +25,20 @@ import org.apache.griffin.measure.configuration.enums._
 
 /**
   * dq param
-  * @param name         name of dq measurement (must)
-  * @param timestamp    default timestamp of measure in batch mode (optional)
-  * @param procType     batch mode or streaming mode (must)
-  * @param dataSources   data sources (must)
-  * @param evaluateRule  dq measurement (must)
+  * @param name           name of dq measurement (must)
+  * @param timestamp      default timestamp of measure in batch mode (optional)
+  * @param procType       batch mode or streaming mode (must)
+  * @param dataSources    data sources (must)
+  * @param evaluateRule   dq measurement (must)
+  * @param sinks          sink types (optional, by default will be 
elasticsearch)
   */
 @JsonInclude(Include.NON_NULL)
 case class DQConfig(@JsonProperty("name") name: String,
                     @JsonProperty("timestamp") timestamp: Long,
                     @JsonProperty("process.type") procType: String,
                     @JsonProperty("data.sources") dataSources: 
List[DataSourceParam],
-                    @JsonProperty("evaluate.rule") evaluateRule: 
EvaluateRuleParam
+                    @JsonProperty("evaluate.rule") evaluateRule: 
EvaluateRuleParam,
+                    @JsonProperty("sinks") sinks: List[String]
                   ) extends Param {
   def getName: String = name
   def getTimestampOpt: Option[Long] = if (timestamp != 0) Some(timestamp) else 
None
@@ -50,6 +52,7 @@ case class DQConfig(@JsonProperty("name") name: String,
     }._1
   }
   def getEvaluateRule: EvaluateRuleParam = evaluateRule
+  def getValidSinkTypes: Seq[SinkType] = SinkType.validSinkTypes(if (sinks != 
null) sinks else Nil)
 
   def validate(): Unit = {
     assert(StringUtils.isNotBlank(name), "dq config name should not be blank")
@@ -64,17 +67,20 @@ case class DQConfig(@JsonProperty("name") name: String,
 /**
   * data source param
   * @param name         data source name (must)
+  * @param baseline     data source is baseline or not, false by default 
(optional)
   * @param connectors   data connectors (optional)
-  * @param cache        data source cache configuration (must in streaming 
mode with streaming connectors)
+  * @param checkpoint   data source checkpoint configuration (must in 
streaming mode with streaming connectors)
   */
 @JsonInclude(Include.NON_NULL)
 case class DataSourceParam( @JsonProperty("name") name: String,
+                            @JsonProperty("baseline") baseline: Boolean,
                             @JsonProperty("connectors") connectors: 
List[DataConnectorParam],
-                            @JsonProperty("cache") cache: Map[String, Any]
+                            @JsonProperty("checkpoint") checkpoint: 
Map[String, Any]
                           ) extends Param {
   def getName: String = name
+  def isBaseline: Boolean = if (baseline != null) baseline else false
   def getConnectors: Seq[DataConnectorParam] = if (connectors != null) 
connectors else Nil
-  def getCacheOpt: Option[Map[String, Any]] = if (cache != null) Some(cache) 
else None
+  def getCheckpointOpt: Option[Map[String, Any]] = if (checkpoint != null) 
Some(checkpoint) else None
 
   def validate(): Unit = {
     assert(StringUtils.isNotBlank(name), "data source name should not be 
empty")
@@ -86,17 +92,20 @@ case class DataSourceParam( @JsonProperty("name") name: 
String,
   * data connector param
   * @param conType    data connector type, e.g.: hive, avro, kafka (must)
   * @param version    data connector type version (optional)
+  * @param dataFrameName    data connector dataframe name, for pre-process 
input usage (optional)
   * @param config     detail configuration of data connector (must)
   * @param preProc    pre-process rules after load data (optional)
   */
 @JsonInclude(Include.NON_NULL)
 case class DataConnectorParam( @JsonProperty("type") conType: String,
                                @JsonProperty("version") version: String,
+                               @JsonProperty("dataframe.name") dataFrameName: 
String,
                                @JsonProperty("config") config: Map[String, 
Any],
                                @JsonProperty("pre.proc") preProc: 
List[RuleParam]
                              ) extends Param {
   def getType: String = conType
-  def getVersion: String = version
+  def getVersion: String = if (version != null) version else ""
+  def getDataFrameName(defName: String): String = if (dataFrameName != null) 
dataFrameName else defName
   def getConfig: Map[String, Any] = if (config != null) config else 
Map[String, Any]()
   def getPreProcRules: Seq[RuleParam] = if (preProc != null) preProc else Nil
 
@@ -124,94 +133,77 @@ case class EvaluateRuleParam( @JsonProperty("rules") 
rules: List[RuleParam]
   * rule param
   * @param dslType    dsl type of this rule (must)
   * @param dqType     dq type of this rule (must if dsl type is "griffin-dsl")
-  * @param name       name of result calculated by this rule (must if for 
later usage)
+  * @param inDfName       name of input dataframe of this rule, by default 
will be the previous rule output dataframe name
+  * @param outDfName      name of output dataframe of this rule, by default 
will be generated as data connector dataframe name with index suffix
   * @param rule       rule to define dq step calculation (must)
   * @param details    detail config of rule (optional)
-  * @param cache      cache the result for multiple usage (optional, valid for 
"spark-sql" and "df-opr" mode)
-  * @param metric     config for metric output (optional)
-  * @param record     config for record output (optional)
-  * @param dsCacheUpdate    config for data source cache update output 
(optional, valid in streaming mode)
+  * @param cache      cache the result for multiple usage (optional, valid for 
"spark-sql" and "df-ops" mode)
+  * @param outputs    output ways configuration (optional)
+//  * @param metric     config for metric output (optional)
+//  * @param record     config for record output (optional)
+//  * @param dsCacheUpdate    config for data source cache update output 
(optional, valid in streaming mode)
   */
 @JsonInclude(Include.NON_NULL)
-case class RuleParam( @JsonProperty("dsl.type") dslType: String,
-                      @JsonProperty("dq.type") dqType: String,
-                      @JsonProperty("name") name: String,
-                      @JsonProperty("rule") rule: String,
-                      @JsonProperty("details") details: Map[String, Any],
-                      @JsonProperty("cache") cache: Boolean,
-                      @JsonProperty("metric") metric: RuleMetricParam,
-                      @JsonProperty("record") record: RuleRecordParam,
-                      @JsonProperty("ds.cache.update") dsCacheUpdate: 
RuleDsCacheUpdateParam
+case class RuleParam(@JsonProperty("dsl.type") dslType: String,
+                     @JsonProperty("dq.type") dqType: String,
+                     @JsonProperty("in.dataframe.name") inDfName: String,
+                     @JsonProperty("out.dataframe.name") outDfName: String,
+                     @JsonProperty("rule") rule: String,
+                     @JsonProperty("details") details: Map[String, Any],
+                     @JsonProperty("cache") cache: Boolean,
+                     @JsonProperty("out") outputs: List[RuleOutputParam]
                     ) extends Param {
   def getDslType: DslType = if (dslType != null) DslType(dslType) else 
DslType("")
   def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("")
   def getCache: Boolean = if (cache) cache else false
 
-  def getName: String = if (name != null) name else ""
+  def getInDfName(defName: String = ""): String = if (inDfName != null) 
inDfName else defName
+  def getOutDfName(defName: String = ""): String = if (outDfName != null) 
outDfName else defName
   def getRule: String = if (rule != null) rule else ""
   def getDetails: Map[String, Any] = if (details != null) details else 
Map[String, Any]()
 
-  def getMetricOpt: Option[RuleMetricParam] = if (metric != null) Some(metric) 
else None
-  def getRecordOpt: Option[RuleRecordParam] = if (record != null) Some(record) 
else None
-  def getDsCacheUpdateOpt: Option[RuleDsCacheUpdateParam] = if (dsCacheUpdate 
!= null) Some(dsCacheUpdate) else None
+  def getOutputs: Seq[RuleOutputParam] = if (outputs != null) outputs else Nil
+  def getOutputOpt(tp: OutputType): Option[RuleOutputParam] = 
getOutputs.filter(_.getOutputType == tp).headOption
 
-  def replaceName(newName: String): RuleParam = {
-    if (StringUtils.equals(newName, name)) this
-    else RuleParam(dslType, dqType, newName, rule, details, cache, metric, 
record, dsCacheUpdate)
+  def replaceInDfName(newName: String): RuleParam = {
+    if (StringUtils.equals(newName, inDfName)) this
+    else RuleParam(dslType, dqType, newName, outDfName, rule, details, cache, 
outputs)
+  }
+  def replaceOutDfName(newName: String): RuleParam = {
+    if (StringUtils.equals(newName, outDfName)) this
+    else RuleParam(dslType, dqType, inDfName, newName, rule, details, cache, 
outputs)
+  }
+  def replaceInOutDfName(in: String, out: String): RuleParam = {
+    if (StringUtils.equals(inDfName, in) && StringUtils.equals(outDfName, 
out)) this
+    else RuleParam(dslType, dqType, in, out, rule, details, cache, outputs)
   }
   def replaceRule(newRule: String): RuleParam = {
     if (StringUtils.equals(newRule, rule)) this
-    else RuleParam(dslType, dqType, name, newRule, details, cache, metric, 
record, dsCacheUpdate)
-  }
-  def replaceDetails(newDetails: Map[String, Any]): RuleParam = {
-    RuleParam(dslType, dqType, name, rule, newDetails, cache, metric, record, 
dsCacheUpdate)
+    else RuleParam(dslType, dqType, inDfName, outDfName, newRule, details, 
cache, outputs)
   }
 
   def validate(): Unit = {
     assert(!(getDslType.equals(GriffinDslType) && 
getDqType.equals(UnknownType)),
       "unknown dq type for griffin dsl")
 
-    getMetricOpt.foreach(_.validate)
-    getRecordOpt.foreach(_.validate)
-    getDsCacheUpdateOpt.foreach(_.validate)
+    getOutputs.foreach(_.validate)
   }
 }
 
 /**
-  * metric param of rule
-  * @param name         name of metric to output (optional)
-  * @param collectType  the normalize strategy to collect metric  (optional)
+  * out param of rule
+  * @param outputType     output type (must)
+  * @param name           output name (optional)
+  * @param flatten        flatten type of output metric (optional, available 
in output metric type)
   */
 @JsonInclude(Include.NON_NULL)
-case class RuleMetricParam( @JsonProperty("name") name: String,
-                            @JsonProperty("collect.type") collectType: String
+case class RuleOutputParam( @JsonProperty("type") outputType: String,
+                            @JsonProperty("name") name: String,
+                            @JsonProperty("flatten") flatten: String
                           ) extends Param {
+  def getOutputType: OutputType = if (outputType != null) 
OutputType(outputType) else OutputType("")
   def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name)) 
Some(name) else None
-  def getCollectType: NormalizeType = if (StringUtils.isNotBlank(collectType)) 
NormalizeType(collectType) else NormalizeType("")
-
-  def validate(): Unit = {}
-}
-
-/**
-  * record param of rule
-  * @param name   name of record to output (optional)
-  */
-@JsonInclude(Include.NON_NULL)
-case class RuleRecordParam( @JsonProperty("name") name: String
-                          ) extends Param {
-  def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name)) 
Some(name) else None
-
-  def validate(): Unit = {}
-}
-
-/**
-  * data source cache update param of rule
-  * @param dsName   name of data source to be updated by thie rule result 
(must)
-  */
-@JsonInclude(Include.NON_NULL)
-case class RuleDsCacheUpdateParam( @JsonProperty("ds.name") dsName: String
-                                 ) extends Param {
-  def getDsNameOpt: Option[String] = if (StringUtils.isNotBlank(dsName)) 
Some(dsName) else None
+  def getFlatten: FlattenType = if (StringUtils.isNotBlank(flatten)) 
FlattenType(flatten) else FlattenType("")
 
   def validate(): Unit = {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
index 9793c01..2ad2837 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala
@@ -21,28 +21,28 @@ package 
org.apache.griffin.measure.configuration.dqdefinition
 import com.fasterxml.jackson.annotation.JsonInclude.Include
 import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
 import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.utils.TimeUtil
+import org.apache.griffin.measure.configuration.enums._
 
 /**
   * environment param
-  * @param sparkParam       config of spark environment (must)
-  * @param persistParams    config of persist ways (optional)
-  * @param offsetCacheParams  config of information cache ways (required in 
streaming mode)
+  * @param sparkParam         config of spark environment (must)
+  * @param sinkParams         config of sink ways (optional)
+  * @param checkpointParams   config of checkpoint locations (required in 
streaming mode)
   */
 @JsonInclude(Include.NON_NULL)
 case class EnvConfig(@JsonProperty("spark") sparkParam: SparkParam,
-                     @JsonProperty("persist") persistParams: 
List[PersistParam],
-                     @JsonProperty("info.cache") offsetCacheParams: 
List[OffsetCacheParam]
+                     @JsonProperty("sinks") sinkParams: List[SinkParam],
+                     @JsonProperty("griffin.checkpoint") checkpointParams: 
List[CheckpointParam]
                    ) extends Param {
   def getSparkParam: SparkParam = sparkParam
-  def getPersistParams: Seq[PersistParam] = if (persistParams != null) 
persistParams else Nil
-  def getOffsetCacheParams: Seq[OffsetCacheParam] = if (offsetCacheParams != 
null) offsetCacheParams else Nil
+  def getSinkParams: Seq[SinkParam] = if (sinkParams != null) sinkParams else 
Nil
+  def getCheckpointParams: Seq[CheckpointParam] = if (checkpointParams != 
null) checkpointParams else Nil
 
   def validate(): Unit = {
     assert((sparkParam != null), "spark param should not be null")
     sparkParam.validate
-    getPersistParams.foreach(_.validate)
-    getOffsetCacheParams.foreach(_.validate)
+    getSinkParams.foreach(_.validate)
+    getCheckpointParams.foreach(_.validate)
   }
 }
 
@@ -78,35 +78,35 @@ case class SparkParam( @JsonProperty("log.level") logLevel: 
String,
 }
 
 /**
-  * persist param
-  * @param persistType    persist type, e.g.: log, hdfs, http, mongo (must)
-  * @param config         config of persist way (must)
+  * sink param
+  * @param sinkType       sink type, e.g.: log, hdfs, http, mongo (must)
+  * @param config         config of sink way (must)
   */
 @JsonInclude(Include.NON_NULL)
-case class PersistParam( @JsonProperty("type") persistType: String,
-                         @JsonProperty("config") config: Map[String, Any]
-                       ) extends Param {
-  def getType: String = persistType
+case class SinkParam(@JsonProperty("type") sinkType: String,
+                     @JsonProperty("config") config: Map[String, Any]
+                    ) extends Param {
+  def getType: SinkType = SinkType(sinkType)
   def getConfig: Map[String, Any] = if (config != null) config else 
Map[String, Any]()
 
   def validate(): Unit = {
-    assert(StringUtils.isNotBlank(persistType), "persist type should not be 
empty")
+    assert(StringUtils.isNotBlank(sinkType), "sink type should not be empty")
   }
 }
 
 /**
-  * offset cache param
-  * @param cacheType    offset cache type, e.g.: zookeeper (must)
-  * @param config       config of cache way
+  * checkpoint param
+  * @param cpType       checkpoint location type, e.g.: zookeeper (must)
+  * @param config       config of checkpoint location
   */
 @JsonInclude(Include.NON_NULL)
-case class OffsetCacheParam(@JsonProperty("type") cacheType: String,
-                            @JsonProperty("config") config: Map[String, Any]
+case class CheckpointParam(@JsonProperty("type") cpType: String,
+                           @JsonProperty("config") config: Map[String, Any]
                           ) extends Param {
-  def getType: String = cacheType
+  def getType: String = cpType
   def getConfig: Map[String, Any] = if (config != null) config else 
Map[String, Any]()
 
   def validate(): Unit = {
-    assert(StringUtils.isNotBlank(cacheType), "info cache type should not be 
empty")
+    assert(StringUtils.isNotBlank(cpType), "griffin checkpoint type should not 
be empty")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
index c9a1172..c72965c 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DslType.scala
@@ -51,8 +51,8 @@ object DslType {
   * df-ops: data frame operations rule, support some pre-defined data frame ops
   */
  case object DataFrameOpsType extends DslType {
-  val idPattern = "^(?i)df-?(?:op|opr|ops)$".r
-  val desc = "df-opr"
+  val idPattern = "^(?i)df-?(?:ops|opr|operations)$".r
+  val desc = "df-ops"
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
new file mode 100644
index 0000000..160ecaf
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.enums
+
+import scala.util.matching.Regex
+
+/**
+  * the strategy to flatten metric
+  */
+sealed trait FlattenType {
+  val idPattern: Regex
+  val desc: String
+}
+
+object FlattenType {
+  private val flattenTypes: List[FlattenType] = List(DefaultFlattenType, 
EntriesFlattenType, ArrayFlattenType, MapFlattenType)
+  val default = DefaultFlattenType
+  def apply(ptn: String): FlattenType = {
+    flattenTypes.find(tp => ptn match {
+      case tp.idPattern() => true
+      case _ => false
+    }).getOrElse(default)
+  }
+  def unapply(pt: FlattenType): Option[String] = Some(pt.desc)
+}
+
+/**
+  * default flatten strategy
+  * metrics contains 1 row -> flatten metric json map
+  * metrics contains n > 1 rows -> flatten metric json array
+  * n = 0: { }
+  * n = 1: { "col1": "value1", "col2": "value2", ... }
+  * n > 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] }
+  * all rows
+  */
+ case object DefaultFlattenType extends FlattenType {
+  val idPattern: Regex = "".r
+  val desc: String = "default"
+}
+
+/**
+  * metrics contains n rows -> flatten metric json map
+  * n = 0: { }
+  * n >= 1: { "col1": "value1", "col2": "value2", ... }
+  * the first row only
+  */
+ case object EntriesFlattenType extends FlattenType {
+  val idPattern: Regex = "^(?i)entries$".r
+  val desc: String = "entries"
+}
+
+/**
+  * metrics contains n rows -> flatten metric json array
+  * n = 0: { "arr-name": [ ] }
+  * n >= 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] 
}
+  * all rows
+  */
+ case object ArrayFlattenType extends FlattenType {
+  val idPattern: Regex = "^(?i)array|list$".r
+  val desc: String = "array"
+}
+
+/**
+  * metrics contains n rows -> flatten metric json wrapped map
+  * n = 0: { "map-name": { } }
+  * n >= 1: { "map-name": { "col1": "value1", "col2": "value2", ... } }
+  * the first row only
+  */
+ case object MapFlattenType extends FlattenType {
+  val idPattern: Regex = "^(?i)map$".r
+  val desc: String = "map"
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
deleted file mode 100644
index 61bf27c..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.configuration.enums
-
-import scala.util.matching.Regex
-
-/**
-  * the normalize strategy to collect metric
-  */
-sealed trait NormalizeType {
-  val idPattern: Regex
-  val desc: String
-}
-
-object NormalizeType {
-  private val normalizeTypes: List[NormalizeType] = List(DefaultNormalizeType, 
EntriesNormalizeType, ArrayNormalizeType, MapNormalizeType)
-  val default = DefaultNormalizeType
-  def apply(ptn: String): NormalizeType = {
-    normalizeTypes.find(tp => ptn match {
-      case tp.idPattern() => true
-      case _ => false
-    }).getOrElse(default)
-  }
-  def unapply(pt: NormalizeType): Option[String] = Some(pt.desc)
-}
-
-/**
-  * default normalize strategy
-  * metrics contains n rows -> normalized metric json map
-  * n = 0: { }
-  * n = 1: { "col1": "value1", "col2": "value2", ... }
-  * n > 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] }
-  * all rows
-  */
- case object DefaultNormalizeType extends NormalizeType {
-  val idPattern: Regex = "".r
-  val desc: String = "default"
-}
-
-/**
-  * metrics contains n rows -> normalized metric json map
-  * n = 0: { }
-  * n >= 1: { "col1": "value1", "col2": "value2", ... }
-  * the first row only
-  */
- case object EntriesNormalizeType extends NormalizeType {
-  val idPattern: Regex = "^(?i)entries$".r
-  val desc: String = "entries"
-}
-
-/**
-  * metrics contains n rows -> normalized metric json map
-  * n = 0: { "arr-name": [ ] }
-  * n >= 1: { "arr-name": [ { "col1": "value1", "col2": "value2", ... }, ... ] 
}
-  * all rows
-  */
- case object ArrayNormalizeType extends NormalizeType {
-  val idPattern: Regex = "^(?i)array|list$".r
-  val desc: String = "array"
-}
-
-/**
-  * metrics contains n rows -> normalized metric json map
-  * n = 0: { "map-name": { } }
-  * n >= 1: { "map-name": { "col1": "value1", "col2": "value2", ... } }
-  * the first row only
-  */
- case object MapNormalizeType extends NormalizeType {
-  val idPattern: Regex = "^(?i)map$".r
-  val desc: String = "map"
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
new file mode 100644
index 0000000..5b1d261
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala
@@ -0,0 +1,77 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.enums
+
+import scala.util.matching.Regex
+
+/**
+  * the strategy to flatten metric
+  */
+sealed trait OutputType {
+  val idPattern: Regex
+  val desc: String
+}
+
+object OutputType {
+  private val outputTypes: List[OutputType] = List(MetricOutputType, 
RecordOutputType, DscUpdateOutputType, UnknownOutputType)
+  val default = UnknownOutputType
+  def apply(ptn: String): OutputType = {
+    outputTypes.find(tp => ptn match {
+      case tp.idPattern() => true
+      case _ => false
+    }).getOrElse(default)
+  }
+  def unapply(pt: OutputType): Option[String] = Some(pt.desc)
+}
+
+/**
+  * metric output type
+  * output the rule step result as metric
+  */
+ case object MetricOutputType extends OutputType {
+  val idPattern: Regex = "^(?i)metric$".r
+  val desc: String = "metric"
+}
+
+/**
+  * record output type
+  * output the rule step result as records
+  */
+ case object RecordOutputType extends OutputType {
+  val idPattern: Regex = "^(?i)record|records$".r
+  val desc: String = "record"
+}
+
+/**
+  * data source cache update output type
+  * output the rule step result to update data source cache
+  */
+ case object DscUpdateOutputType extends OutputType {
+  val idPattern: Regex = "^(?i)dsc-update$".r
+  val desc: String = "dsc-update"
+}
+
+/**
+  * unknown output type
+  * will not output the result
+  */
+ case object UnknownOutputType extends OutputType {
+  val idPattern: Regex = "".r
+  val desc: String = "unknown"
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
new file mode 100644
index 0000000..5471e83
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.configuration.enums
+
+import scala.util.matching.Regex
+
+/**
+  * sink type
+  */
+sealed trait SinkType {
+  val idPattern: Regex
+  val desc: String
+}
+
+object SinkType {
+  private val sinkTypes: List[SinkType] = List(
+    ConsoleSinkType, HdfsSinkType, ElasticsearchSinkType, MongoSinkType, 
UnknownSinkType
+  )
+  def apply(ptn: String): SinkType = {
+    sinkTypes.find(tp => ptn match {
+      case tp.idPattern() => true
+      case _ => false
+    }).getOrElse(UnknownSinkType)
+  }
+  def unapply(pt: SinkType): Option[String] = Some(pt.desc)
+  def validSinkTypes(strs: Seq[String]): Seq[SinkType] = {
+    val seq = strs.map(s => SinkType(s)).filter(_ != UnknownSinkType).distinct
+    if (seq.size > 0) seq else Seq(ElasticsearchSinkType)
+  }
+}
+
+/**
+  * console sink, will sink metric in console
+  */
+ case object ConsoleSinkType extends SinkType {
+  val idPattern = "^(?i)console|log$".r
+  val desc = "console"
+}
+
+/**
+  * hdfs sink, will sink metric and record in hdfs
+  */
+ case object HdfsSinkType extends SinkType {
+  val idPattern = "^(?i)hdfs$".r
+  val desc = "hdfs"
+}
+
+/**
+  * elasticsearch sink, will sink metric in elasticsearch
+  */
+ case object ElasticsearchSinkType extends SinkType {
+  val idPattern = "^(?i)es|elasticsearch|http$".r
+  val desc = "elasticsearch"
+}
+
+/**
+  * mongo sink, will sink metric in mongo db
+  */
+ case object MongoSinkType extends SinkType {
+  val idPattern = "^(?i)mongo|mongodb$".r
+  val desc = "distinct"
+}
+
+ case object UnknownSinkType extends SinkType {
+  val idPattern = "".r
+  val desc = "unknown"
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala 
b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index eee8917..a9f3da0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.{Encoders, SQLContext, 
SparkSession}
 case class DQContext(contextId: ContextId,
                      name: String,
                      dataSources: Seq[DataSource],
-                     persistParams: Seq[PersistParam],
+                     sinkParams: Seq[SinkParam],
                      procType: ProcessType
                     )(@transient implicit val sparkSession: SparkSession) {
 
@@ -46,8 +46,22 @@ case class DQContext(contextId: ContextId,
   val metricWrapper: MetricWrapper = MetricWrapper(name)
   val writeMode = WriteMode.defaultMode(procType)
 
-  val dataSourceNames: Seq[String] = dataSources.map(_.name)
+  val dataSourceNames: Seq[String] = {
+    // sort data source names, put baseline data source name to the head
+    val (blOpt, others) = dataSources.foldLeft((None: Option[String], Nil: 
Seq[String])) { (ret, ds) =>
+      val (opt, seq) = ret
+      if (opt.isEmpty && ds.isBaseline) (Some(ds.name), seq) else (opt, seq :+ 
ds.name)
+    }
+    blOpt match {
+      case Some(bl) => bl +: others
+      case _ => others
+    }
+  }
   dataSourceNames.foreach(name => compileTableRegister.registerTable(name))
+  def getDataSourceName(index: Int): String = {
+    if (dataSourceNames.size > index) dataSourceNames(index) else ""
+  }
+
   implicit val encoder = Encoders.STRING
   val functionNames: Seq[String] = 
sparkSession.catalog.listFunctions.map(_.name).collect.toSeq
 
@@ -59,26 +73,22 @@ case class DQContext(contextId: ContextId,
   }
   printTimeRanges
 
-  def getDataSourceName(index: Int): String = {
-    if (dataSourceNames.size > index) dataSourceNames(index) else ""
-  }
-
-  private val persistFactory = SinkFactory(persistParams, name)
-  private val defaultPersist: Sink = createPersist(contextId.timestamp)
-  def getPersist(timestamp: Long): Sink = {
-    if (timestamp == contextId.timestamp) getPersist()
-    else createPersist(timestamp)
+  private val sinkFactory = SinkFactory(sinkParams, name)
+  private val defaultSink: Sink = createSink(contextId.timestamp)
+  def getSink(timestamp: Long): Sink = {
+    if (timestamp == contextId.timestamp) getSink()
+    else createSink(timestamp)
   }
-  def getPersist(): Sink = defaultPersist
-  private def createPersist(t: Long): Sink = {
+  def getSink(): Sink = defaultSink
+  private def createSink(t: Long): Sink = {
     procType match {
-      case BatchProcessType => persistFactory.getPersists(t, true)
-      case StreamingProcessType => persistFactory.getPersists(t, false)
+      case BatchProcessType => sinkFactory.getSinks(t, true)
+      case StreamingProcessType => sinkFactory.getSinks(t, false)
     }
   }
 
   def cloneDQContext(newContextId: ContextId): DQContext = {
-    DQContext(newContextId, name, dataSources, persistParams, 
procType)(sparkSession)
+    DQContext(newContextId, name, dataSources, sinkParams, 
procType)(sparkSession)
   }
 
   def clean(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLock.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLock.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLock.scala
new file mode 100644
index 0000000..bf0fde0
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLock.scala
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.lock
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.Loggable
+
+/**
+  * lock for checkpoint
+  */
+trait CheckpointLock extends Loggable with Serializable {
+
+  def lock(outtime: Long, unit: TimeUnit): Boolean
+
+  def unlock(): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
new file mode 100644
index 0000000..b1cbe0f
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala
@@ -0,0 +1,53 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.lock
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex
+
+case class CheckpointLockInZK(@transient mutex: InterProcessMutex) extends 
CheckpointLock {
+
+  def lock(outtime: Long, unit: TimeUnit): Boolean = {
+    try {
+      if (outtime >= 0) {
+        mutex.acquire(outtime, unit)
+      } else {
+        mutex.acquire(-1, null)
+      }
+    } catch {
+      case e: Throwable => {
+        error(s"lock error: ${e.getMessage}")
+        false
+      }
+    }
+
+  }
+
+  def unlock(): Unit = {
+    try {
+      if (mutex.isAcquiredInThisProcess) mutex.release
+    } catch {
+      case e: Throwable => {
+        error(s"unlock error: ${e.getMessage}")
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockSeq.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockSeq.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockSeq.scala
new file mode 100644
index 0000000..092d86a
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockSeq.scala
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.lock
+
+import java.util.concurrent.TimeUnit
+
+case class CheckpointLockSeq(locks: Seq[CheckpointLock]) extends 
CheckpointLock {
+
+  def lock(outtime: Long, unit: TimeUnit): Boolean = {
+    locks.headOption.map(_.lock(outtime, unit)).getOrElse(true)
+  }
+
+  def unlock(): Unit = {
+    locks.headOption.foreach(_.unlock)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpoint.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpoint.scala
new file mode 100644
index 0000000..8d7b045
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpoint.scala
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.offset
+
+import org.apache.griffin.measure.Loggable
+import 
org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLock
+
+trait OffsetCheckpoint extends Loggable with Serializable {
+
+  def init(): Unit
+  def available(): Boolean
+  def close(): Unit
+
+  def cache(kvs: Map[String, String]): Unit
+  def read(keys: Iterable[String]): Map[String, String]
+  def delete(keys: Iterable[String]): Unit
+  def clear(): Unit
+
+  def listKeys(path: String): List[String]
+
+  def genLock(s: String): CheckpointLock
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
new file mode 100644
index 0000000..8acfbeb
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.offset
+
+import org.apache.griffin.measure.configuration.dqdefinition.CheckpointParam
+import 
org.apache.griffin.measure.context.streaming.checkpoint.lock.{CheckpointLock, 
CheckpointLockSeq}
+
+object OffsetCheckpointClient extends OffsetCheckpoint with OffsetOps {
+  var offsetCheckpoints: Seq[OffsetCheckpoint] = Nil
+
+  def initClient(checkpointParams: Iterable[CheckpointParam], metricName: 
String) = {
+    val fac = OffsetCheckpointFactory(checkpointParams, metricName)
+    offsetCheckpoints = checkpointParams.flatMap(param => 
fac.getOffsetCheckpoint(param)).toList
+  }
+
+  def init(): Unit = offsetCheckpoints.foreach(_.init)
+  def available(): Boolean = offsetCheckpoints.foldLeft(false)(_ || 
_.available)
+  def close(): Unit = offsetCheckpoints.foreach(_.close)
+
+  def cache(kvs: Map[String, String]): Unit = {
+    offsetCheckpoints.foreach(_.cache(kvs))
+  }
+  def read(keys: Iterable[String]): Map[String, String] = {
+    val maps = offsetCheckpoints.map(_.read(keys)).reverse
+    maps.fold(Map[String, String]())(_ ++ _)
+  }
+  def delete(keys: Iterable[String]): Unit = 
offsetCheckpoints.foreach(_.delete(keys))
+  def clear(): Unit = offsetCheckpoints.foreach(_.clear)
+
+  def listKeys(path: String): List[String] = {
+    offsetCheckpoints.foldLeft(Nil: List[String]) { (res, offsetCheckpoint) =>
+      if (res.size > 0) res else offsetCheckpoint.listKeys(path)
+    }
+  }
+
+  def genLock(s: String): CheckpointLock = 
CheckpointLockSeq(offsetCheckpoints.map(_.genLock(s)))
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
new file mode 100644
index 0000000..5fe8e15
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.offset
+
+import org.apache.griffin.measure.configuration.dqdefinition.CheckpointParam
+
+import scala.util.Try
+
+case class OffsetCheckpointFactory(checkpointParams: 
Iterable[CheckpointParam], metricName: String
+                                  ) extends Serializable {
+
+  val ZK_REGEX = """^(?i)zk|zookeeper$""".r
+
+  def getOffsetCheckpoint(checkpointParam: CheckpointParam): 
Option[OffsetCheckpoint] = {
+    val config = checkpointParam.getConfig
+    val offsetCheckpointTry = checkpointParam.getType match {
+      case ZK_REGEX() => Try(OffsetCheckpointInZK(config, metricName))
+      case _ => throw new Exception("not supported info cache type")
+    }
+    offsetCheckpointTry.toOption
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
new file mode 100644
index 0000000..e051779
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala
@@ -0,0 +1,214 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.offset
+
+import org.apache.curator.framework.imps.CuratorFrameworkState
+import org.apache.curator.framework.recipes.locks.InterProcessMutex
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.utils.ZKPaths
+import 
org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK
+import org.apache.zookeeper.CreateMode
+
+import scala.collection.JavaConverters._
+
+/**
+  * leverage zookeeper for info cache
+  * @param config
+  * @param metricName
+  */
+case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) 
extends OffsetCheckpoint with OffsetOps {
+
+  val Hosts = "hosts"
+  val Namespace = "namespace"
+  val Mode = "mode"
+  val InitClear = "init.clear"
+  val CloseClear = "close.clear"
+  val LockPath = "lock.path"
+
+  val PersistentRegex = """^(?i)persist(ent)?$""".r
+  val EphemeralRegex = """^(?i)ephemeral$""".r
+
+  final val separator = ZKPaths.PATH_SEPARATOR
+
+  val hosts = config.getOrElse(Hosts, "").toString
+  val namespace = config.getOrElse(Namespace, "").toString
+  val mode: CreateMode = config.get(Mode) match {
+    case Some(s: String) => s match {
+      case PersistentRegex() => CreateMode.PERSISTENT
+      case EphemeralRegex() => CreateMode.EPHEMERAL
+      case _ => CreateMode.PERSISTENT
+    }
+    case _ => CreateMode.PERSISTENT
+  }
+  val initClear = config.get(InitClear) match {
+    case Some(b: Boolean) => b
+    case _ => true
+  }
+  val closeClear = config.get(CloseClear) match {
+    case Some(b: Boolean) => b
+    case _ => false
+  }
+  val lockPath = config.getOrElse(LockPath, "lock").toString
+
+  private val cacheNamespace: String = if (namespace.isEmpty) metricName else 
namespace + separator + metricName
+  private val builder = CuratorFrameworkFactory.builder()
+    .connectString(hosts)
+    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+    .namespace(cacheNamespace)
+  private val client: CuratorFramework = builder.build
+
+  def init(): Unit = {
+    client.start()
+    info("start zk info cache")
+    client.usingNamespace(cacheNamespace)
+    info(s"init with namespace: ${cacheNamespace}")
+    delete(lockPath :: Nil)
+    if (initClear) {
+      clear
+    }
+  }
+
+  def available(): Boolean = {
+    client.getState match {
+      case CuratorFrameworkState.STARTED => true
+      case _ => false
+    }
+  }
+
+  def close(): Unit = {
+    if (closeClear) {
+      clear
+    }
+    info("close zk info cache")
+    client.close()
+  }
+
+  def cache(kvs: Map[String, String]): Unit = {
+    kvs.foreach(kv => createOrUpdate(path(kv._1), kv._2))
+  }
+
+  def read(keys: Iterable[String]): Map[String, String] = {
+    keys.flatMap { key =>
+      read(path(key)) match {
+        case Some(v) => Some((key, v))
+        case _ => None
+      }
+    }.toMap
+  }
+
+  def delete(keys: Iterable[String]): Unit = {
+    keys.foreach { key => delete(path(key)) }
+  }
+
+  def clear(): Unit = {
+//    delete("/")
+    delete(finalCacheInfoPath :: Nil)
+    delete(infoPath :: Nil)
+    info("clear info")
+  }
+
+  def listKeys(p: String): List[String] = {
+    children(path(p))
+  }
+
+  def genLock(s: String): CheckpointLockInZK = {
+    val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s
+    CheckpointLockInZK(new InterProcessMutex(client, lpt))
+  }
+
+  private def path(k: String): String = {
+    if (k.startsWith(separator)) k else separator + k
+  }
+
+  private def children(path: String): List[String] = {
+    try {
+      client.getChildren().forPath(path).asScala.toList
+    } catch {
+      case e: Throwable => {
+        warn(s"list ${path} warn: ${e.getMessage}")
+        Nil
+      }
+    }
+  }
+
+  private def createOrUpdate(path: String, content: String): Boolean = {
+    if (checkExists(path)) {
+      update(path, content)
+    } else {
+      create(path, content)
+    }
+  }
+
+  private def create(path: String, content: String): Boolean = {
+    try {
+      client.create().creatingParentsIfNeeded().withMode(mode)
+        .forPath(path, content.getBytes("utf-8"))
+      true
+    } catch {
+      case e: Throwable => {
+        error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+  private def update(path: String, content: String): Boolean = {
+    try {
+      client.setData().forPath(path, content.getBytes("utf-8"))
+      true
+    } catch {
+      case e: Throwable => {
+        error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+  private def read(path: String): Option[String] = {
+    try {
+      Some(new String(client.getData().forPath(path), "utf-8"))
+    } catch {
+      case e: Throwable => {
+        warn(s"read ${path} warn: ${e.getMessage}")
+        None
+      }
+    }
+  }
+
+  private def delete(path: String): Unit = {
+    try {
+      client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path)
+    } catch {
+      case e: Throwable => error(s"delete ${path} error: ${e.getMessage}")
+    }
+  }
+
+  private def checkExists(path: String): Boolean = {
+    try {
+      client.checkExists().forPath(path) != null
+    } catch {
+      case e: Throwable => {
+        warn(s"check exists ${path} warn: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetOps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetOps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetOps.scala
new file mode 100644
index 0000000..6be97ef
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetOps.scala
@@ -0,0 +1,125 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.checkpoint.offset
+
+trait OffsetOps extends Serializable { this: OffsetCheckpoint =>
+
+  val CacheTime = "cache.time"
+  val LastProcTime = "last.proc.time"
+  val ReadyTime = "ready.time"
+  val CleanTime = "clean.time"
+  val OldCacheIndex = "old.cache.index"
+
+  def cacheTime(path: String): String = s"${path}/${CacheTime}"
+  def lastProcTime(path: String): String = s"${path}/${LastProcTime}"
+  def readyTime(path: String): String = s"${path}/${ReadyTime}"
+  def cleanTime(path: String): String = s"${path}/${CleanTime}"
+  def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}"
+
+  val infoPath = "info"
+
+  val finalCacheInfoPath = "info.final"
+  val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}"
+  val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}"
+  val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}"
+
+  def startOffsetCheckpoint(): Unit = {
+    genFinalReadyTime
+  }
+
+  def getTimeRange(): (Long, Long) = {
+    readTimeRange
+  }
+
+  def getCleanTime(): Long = {
+    readCleanTime
+  }
+
+  def endOffsetCheckpoint: Unit = {
+    genFinalLastProcTime
+    genFinalCleanTime
+  }
+
+  private def genFinalReadyTime(): Unit = {
+    val subPath = listKeys(infoPath)
+    val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
+    val result = read(keys)
+    val times = keys.flatMap { k =>
+      getLongOpt(result, k)
+    }
+    if (times.nonEmpty) {
+      val time = times.min
+      val map = Map[String, String]((finalReadyTime -> time.toString))
+      cache(map)
+    }
+  }
+
+  private def genFinalLastProcTime(): Unit = {
+    val subPath = listKeys(infoPath)
+    val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
+    val result = read(keys)
+    val times = keys.flatMap { k =>
+      getLongOpt(result, k)
+    }
+    if (times.nonEmpty) {
+      val time = times.min
+      val map = Map[String, String]((finalLastProcTime -> time.toString))
+      cache(map)
+    }
+  }
+
+  private def genFinalCleanTime(): Unit = {
+    val subPath = listKeys(infoPath)
+    val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
+    val result = read(keys)
+    val times = keys.flatMap { k =>
+      getLongOpt(result, k)
+    }
+    if (times.nonEmpty) {
+      val time = times.min
+      val map = Map[String, String]((finalCleanTime -> time.toString))
+      cache(map)
+    }
+  }
+
+  private def readTimeRange(): (Long, Long) = {
+    val map = read(List(finalLastProcTime, finalReadyTime))
+    val lastProcTime = getLong(map, finalLastProcTime)
+    val curReadyTime = getLong(map, finalReadyTime)
+    (lastProcTime, curReadyTime)
+  }
+
+  private def readCleanTime(): Long = {
+    val map = read(List(finalCleanTime))
+    val cleanTime = getLong(map, finalCleanTime)
+    cleanTime
+  }
+
+  private def getLongOpt(map: Map[String, String], key: String): Option[Long] 
= {
+    try {
+      map.get(key).map(_.toLong)
+    } catch {
+      case e: Throwable => None
+    }
+  }
+  private def getLong(map: Map[String, String], key: String) = {
+    getLongOpt(map, key).getOrElse(-1L)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala
deleted file mode 100644
index a2cfad9..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.lock
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.Loggable
-
-/**
-  * lock for info cache
-  */
-trait CacheLock extends Loggable with Serializable {
-
-  def lock(outtime: Long, unit: TimeUnit): Boolean
-
-  def unlock(): Unit
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala
deleted file mode 100644
index 5ae4c75..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.lock
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.curator.framework.recipes.locks.InterProcessMutex
-
-case class CacheLockInZK(@transient mutex: InterProcessMutex) extends 
CacheLock {
-
-  def lock(outtime: Long, unit: TimeUnit): Boolean = {
-    try {
-      if (outtime >= 0) {
-        mutex.acquire(outtime, unit)
-      } else {
-        mutex.acquire(-1, null)
-      }
-    } catch {
-      case e: Throwable => {
-        error(s"lock error: ${e.getMessage}")
-        false
-      }
-    }
-
-  }
-
-  def unlock(): Unit = {
-    try {
-      if (mutex.isAcquiredInThisProcess) mutex.release
-    } catch {
-      case e: Throwable => {
-        error(s"unlock error: ${e.getMessage}")
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala
deleted file mode 100644
index 6f033bd..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.lock
-
-import java.util.concurrent.TimeUnit
-
-case class CacheLockSeq(cacheLocks: Seq[CacheLock]) extends CacheLock {
-
-  def lock(outtime: Long, unit: TimeUnit): Boolean = {
-    cacheLocks.headOption.map(_.lock(outtime, unit)).getOrElse(true)
-  }
-
-  def unlock(): Unit = {
-    cacheLocks.headOption.foreach(_.unlock)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala
deleted file mode 100644
index fc78eda..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.offset
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.streaming.lock.CacheLock
-
-trait OffsetCache extends Loggable with Serializable {
-
-  def init(): Unit
-  def available(): Boolean
-  def close(): Unit
-
-  def cache(kvs: Map[String, String]): Unit
-  def read(keys: Iterable[String]): Map[String, String]
-  def delete(keys: Iterable[String]): Unit
-  def clear(): Unit
-
-  def listKeys(path: String): List[String]
-
-  def genLock(s: String): CacheLock
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala
deleted file mode 100644
index 8d4abea..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.offset
-
-import org.apache.griffin.measure.configuration.dqdefinition.OffsetCacheParam
-import org.apache.griffin.measure.context.streaming.lock.{CacheLock, 
CacheLockSeq}
-
-object OffsetCacheClient extends OffsetCache with OffsetOps {
-  var offsetCaches: Seq[OffsetCache] = Nil
-
-  def initClient(offsetCacheParams: Iterable[OffsetCacheParam], metricName: 
String) = {
-    val fac = OffsetCacheFactory(offsetCacheParams, metricName)
-    offsetCaches = offsetCacheParams.flatMap(param => 
fac.getOffsetCache(param)).toList
-  }
-
-  def init(): Unit = offsetCaches.foreach(_.init)
-  def available(): Boolean = offsetCaches.foldLeft(false)(_ || _.available)
-  def close(): Unit = offsetCaches.foreach(_.close)
-
-  def cache(kvs: Map[String, String]): Unit = {
-    offsetCaches.foreach(_.cache(kvs))
-  }
-  def read(keys: Iterable[String]): Map[String, String] = {
-    val maps = offsetCaches.map(_.read(keys)).reverse
-    maps.fold(Map[String, String]())(_ ++ _)
-  }
-  def delete(keys: Iterable[String]): Unit = 
offsetCaches.foreach(_.delete(keys))
-  def clear(): Unit = offsetCaches.foreach(_.clear)
-
-  def listKeys(path: String): List[String] = {
-    offsetCaches.foldLeft(Nil: List[String]) { (res, offsetCache) =>
-      if (res.size > 0) res else offsetCache.listKeys(path)
-    }
-  }
-
-  def genLock(s: String): CacheLock = 
CacheLockSeq(offsetCaches.map(_.genLock(s)))
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala
deleted file mode 100644
index 0fbf335..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.offset
-
-import org.apache.griffin.measure.configuration.dqdefinition.OffsetCacheParam
-
-import scala.util.{Success, Try}
-
-case class OffsetCacheFactory(offsetCacheParams: Iterable[OffsetCacheParam], 
metricName: String
-                             ) extends Serializable {
-
-  val ZK_REGEX = """^(?i)zk|zookeeper$""".r
-
-  def getOffsetCache(offsetCacheParam: OffsetCacheParam): Option[OffsetCache] 
= {
-    val config = offsetCacheParam.getConfig
-    val offsetCacheTry = offsetCacheParam.getType match {
-      case ZK_REGEX() => Try(OffsetCacheInZK(config, metricName))
-      case _ => throw new Exception("not supported info cache type")
-    }
-    offsetCacheTry.toOption
-  }
-
-}
\ No newline at end of file

Reply via email to