[GRIFFIN-95] Enhance dump performance in streaming mode

1. in streaming mode, dump data as parquet file
2. repartition when dumping data as spark.default.parallelism
3. add percentile feature in timeliness measure

Author: Lionel Liu <[email protected]>

Closes #205 from bhlx3lyx7/tmst.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/b83c5870
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/b83c5870
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/b83c5870

Branch: refs/heads/master
Commit: b83c58706e1298cc6ad837137552da7dcd8f9e6e
Parents: 530c2da
Author: Lionel Liu <[email protected]>
Authored: Thu Feb 1 16:09:57 2018 +0800
Committer: Lionel Liu <[email protected]>
Committed: Thu Feb 1 16:09:57 2018 +0800

----------------------------------------------------------------------
 .../measure/cache/info/TimeInfoCache.scala      |   2 +
 .../streaming/KafkaStreamingDataConnector.scala |   9 +-
 .../streaming/StreamingDataConnector.scala      |   2 +-
 .../measure/data/source/DataCacheable.scala     |  76 --
 .../measure/data/source/DataSource.scala        |  16 +-
 .../measure/data/source/DataSourceCache.scala   | 403 ----------
 .../measure/data/source/DataSourceFactory.scala |  25 +-
 .../data/source/cache/DataCacheable.scala       |  84 ++
 .../data/source/cache/DataSourceCache.scala     | 333 ++++++++
 .../source/cache/DataSourceCacheFactory.scala   |  58 ++
 .../data/source/cache/JsonDataSourceCache.scala |  40 +
 .../data/source/cache/OrcDataSourceCache.scala  |  40 +
 .../source/cache/ParquetDataSourceCache.scala   |  40 +
 .../griffin/measure/persist/MultiPersists.scala |   8 -
 .../measure/process/BatchDqProcess.scala        |  47 +-
 .../measure/process/StreamingDqProcess.scala    |   6 +-
 .../measure/process/StreamingDqThread.scala     |  95 +--
 .../measure/process/engine/DqEngine.scala       |   2 +
 .../measure/process/engine/DqEngines.scala      | 307 ++------
 .../measure/process/engine/SparkDqEngine.scala  | 229 +-----
 .../measure/process/temp/TimeRange.scala        |   2 +-
 .../rule/adaptor/DataFrameOprAdaptor.scala      |   6 +-
 .../measure/rule/adaptor/GlobalKeys.scala       |  48 --
 .../rule/adaptor/GriffinDslAdaptor.scala        | 759 +------------------
 .../measure/rule/adaptor/RuleAdaptor.scala      | 128 +---
 .../measure/rule/adaptor/SparkSqlAdaptor.scala  |   6 +-
 .../griffin/measure/rule/plan/DsUpdate.scala    |  24 +
 .../measure/rule/plan/MetricExport.scala        |   3 -
 .../measure/rule/plan/RecordExport.scala        |   3 -
 .../griffin/measure/rule/plan/RuleExport.scala  |   2 -
 .../griffin/measure/rule/plan/RulePlan.scala    |   9 +-
 .../rule/trans/AccuracyRulePlanTrans.scala      | 198 +++++
 .../rule/trans/DistinctnessRulePlanTrans.scala  | 234 ++++++
 .../measure/rule/trans/DsUpdateFactory.scala    |  37 +
 .../rule/trans/ProfilingRulePlanTrans.scala     |  98 +++
 .../measure/rule/trans/RuleExportFactory.scala  |  65 ++
 .../measure/rule/trans/RulePlanTrans.scala      |  57 ++
 .../rule/trans/TimelinessRulePlanTrans.scala    | 279 +++++++
 .../rule/trans/UniquenessRulePlanTrans.scala    | 198 +++++
 .../griffin/measure/rule/udf/GriffinUdafs.scala |  29 +
 .../griffin/measure/rule/udf/MeanUdaf.scala     |  58 ++
 .../griffin/measure/utils/ParamUtil.scala       |  11 +
 .../_accuracy-streaming-griffindsl.json         |   8 +-
 .../resources/_timeliness-batch-griffindsl.json |   6 +-
 .../_timeliness-streaming-griffindsl.json       |   4 +-
 45 files changed, 2073 insertions(+), 2021 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
index aefd390..efd12b9 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala
@@ -26,11 +26,13 @@ object TimeInfoCache extends Loggable with Serializable {
   private val LastProcTime = "last.proc.time"
   private val ReadyTime = "ready.time"
   private val CleanTime = "clean.time"
+  private val OldCacheIndex = "old.cache.index"
 
   def cacheTime(path: String): String = s"${path}/${CacheTime}"
   def lastProcTime(path: String): String = s"${path}/${LastProcTime}"
   def readyTime(path: String): String = s"${path}/${ReadyTime}"
   def cleanTime(path: String): String = s"${path}/${CleanTime}"
+  def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}"
 
   val infoPath = "info"
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
index 41de217..f973f3f 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
@@ -49,7 +49,14 @@ trait KafkaStreamingDataConnector extends 
StreamingDataConnector {
     ds.foreachRDD((rdd, time) => {
       val ms = time.milliseconds
 
-      val dfOpt = transform(rdd)
+      // coalesce partition number
+      val prlCount = rdd.sparkContext.defaultParallelism
+      val ptnCount = rdd.getNumPartitions
+      val repartitionedRdd = if (prlCount < ptnCount) {
+        rdd.coalesce(prlCount)
+      } else rdd
+
+      val dfOpt = transform(repartitionedRdd)
 
       val preDfOpt = preProcess(dfOpt, ms)
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
index f65b0d2..39f4995 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
@@ -19,7 +19,7 @@ under the License.
 package org.apache.griffin.measure.data.connector.streaming
 
 import org.apache.griffin.measure.data.connector._
-import org.apache.griffin.measure.data.source.DataSourceCache
+import org.apache.griffin.measure.data.source.cache._
 import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.DataFrame

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala
deleted file mode 100644
index 3c9106a..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.source
-
-import java.util.concurrent.atomic.AtomicLong
-
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-
-trait DataCacheable {
-
-  val cacheInfoPath: String
-  val readyTimeInterval: Long
-  val readyTimeDelay: Long
-
-  def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}"
-
-  def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath)
-  def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath)
-  def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath)
-  def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath)
-
-  protected def submitCacheTime(ms: Long): Unit = {
-    val map = Map[String, String]((selfCacheTime -> ms.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  protected def submitReadyTime(ms: Long): Unit = {
-    val curReadyTime = ms - readyTimeDelay
-    if (curReadyTime % readyTimeInterval == 0) {
-      val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
-      InfoCacheInstance.cacheInfo(map)
-    }
-  }
-
-  protected def submitLastProcTime(ms: Long): Unit = {
-    val map = Map[String, String]((selfLastProcTime -> ms.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  protected def submitCleanTime(ms: Long): Unit = {
-    val cleanTime = genCleanTime(ms)
-    val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  protected def genCleanTime(ms: Long): Long = ms
-
-  protected def readCleanTime(): Option[Long] = {
-    val key = selfCleanTime
-    val keys = key :: Nil
-    InfoCacheInstance.readInfo(keys).get(key).flatMap { v =>
-      try {
-        Some(v.toLong)
-      } catch {
-        case _ => None
-      }
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
index fc8c646..9a4b640 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
@@ -22,6 +22,7 @@ import org.apache.griffin.measure.cache.tmst._
 import org.apache.griffin.measure.data.connector._
 import org.apache.griffin.measure.data.connector.batch._
 import org.apache.griffin.measure.data.connector.streaming._
+import org.apache.griffin.measure.data.source.cache._
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters, TimeRange}
 import org.apache.griffin.measure.rule.plan.TimeInfo
@@ -114,16 +115,25 @@ case class DataSource(sqlContext: SQLContext,
     }
   }
 
+  def updateData(df: DataFrame): Unit = {
+    dataSourceCacheOpt.foreach(_.updateData(Some(df)))
+  }
+
   def updateData(df: DataFrame, ms: Long): Unit = {
-    dataSourceCacheOpt.foreach(_.updateData(df, ms))
+//    dataSourceCacheOpt.foreach(_.updateData(df, ms))
   }
 
   def updateDataMap(dfMap: Map[Long, DataFrame]): Unit = {
-    dataSourceCacheOpt.foreach(_.updateDataMap(dfMap))
+//    dataSourceCacheOpt.foreach(_.updateDataMap(dfMap))
   }
 
   def cleanOldData(): Unit = {
-    dataSourceCacheOpt.foreach(_.cleanOldData)
+//    dataSourceCacheOpt.foreach(_.cleanOldData)
+    dataSourceCacheOpt.foreach(_.cleanOutTimeData)
+  }
+
+  def processFinish(): Unit = {
+    dataSourceCacheOpt.foreach(_.processFinish)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
deleted file mode 100644
index fff186f..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
+++ /dev/null
@@ -1,403 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.source
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.cache.tmst.TmstCache
-import 
org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector
-import org.apache.griffin.measure.data.connector._
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.process.temp.TimeRange
-import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, TimeUtil}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SQLContext}
-
-import scala.util.{Failure, Success}
-import org.apache.griffin.measure.utils.ParamUtil._
-
-case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
-                           dsName: String, index: Int
-                          ) extends DataCacheable with Loggable with 
Serializable {
-
-  var tmstCache: TmstCache = _
-  protected def rangeTmsts(from: Long, until: Long) = tmstCache.range(from, 
until)
-  protected def clearTmst(t: Long) = tmstCache.remove(t)
-  protected def clearTmstsUntil(until: Long) = {
-    val outDateTmsts = tmstCache.until(until)
-    tmstCache.remove(outDateTmsts)
-  }
-
-  val _FilePath = "file.path"
-  val _InfoPath = "info.path"
-  val _ReadyTimeInterval = "ready.time.interval"
-  val _ReadyTimeDelay = "ready.time.delay"
-  val _TimeRange = "time.range"
-
-  val defFilePath = s"hdfs:///griffin/cache/${dsName}/${index}"
-  val defInfoPath = s"${index}"
-
-  val filePath: String = param.getString(_FilePath, defFilePath)
-  val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
-  val readyTimeInterval: Long = 
TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, 
"1m")).getOrElse(60000L)
-  val readyTimeDelay: Long = 
TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
-  val deltaTimeRange: (Long, Long) = {
-    def negative(n: Long): Long = if (n <= 0) n else 0
-    param.get(_TimeRange) match {
-      case Some(seq: Seq[String]) => {
-        val nseq = seq.flatMap(TimeUtil.milliseconds(_))
-        val ns = negative(nseq.headOption.getOrElse(0))
-        val ne = negative(nseq.tail.headOption.getOrElse(0))
-        (ns, ne)
-      }
-      case _ => (0, 0)
-    }
-  }
-
-//  val _WriteInfoPath = "write.info.path"
-//  val _ReadInfoPath = "read.info.path"
-//  val writeCacheInfoPath = param.getString(_WriteInfoPath, defInfoPath)
-//  val readCacheInfoPath = param.getString(_ReadInfoPath, defInfoPath)
-
-  val _ReadOnly = "read.only"
-  val readOnly = param.getBoolean(_ReadOnly, false)
-
-  val rowSepLiteral = "\n"
-  val partitionUnits: List[String] = List("hour", "min", "sec")
-  val minUnitTime: Long = TimeUtil.timeFromUnit(1, partitionUnits.last)
-
-  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
-  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-
-  def init(): Unit = {
-    ;
-  }
-
-  def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
-    if (!readOnly) {
-      dfOpt match {
-        case Some(df) => {
-          val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-          if (newCacheLocked) {
-            try {
-              val ptns = getPartition(ms)
-              val ptnsPath = genPartitionHdfsPath(ptns)
-              val dirPath = s"${filePath}/${ptnsPath}"
-              val dataFileName = s"${ms}"
-              val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, 
dataFileName)
-
-              // transform data
-              val dataRdd: RDD[String] = df.toJSON
-
-              // save data
-              //            val dumped = if (!dataRdd.isEmpty) {
-              //              HdfsFileDumpUtil.dump(dataFilePath, dataRdd, 
rowSepLiteral)
-              //            } else false
-
-              if (!dataRdd.isEmpty) {
-                HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
-              }
-
-            } catch {
-              case e: Throwable => error(s"save data error: ${e.getMessage}")
-            } finally {
-              newCacheLock.unlock()
-            }
-          }
-        }
-        case _ => {
-          info(s"no data frame to save")
-        }
-      }
-
-      // submit cache time and ready time
-      submitCacheTime(ms)
-      submitReadyTime(ms)
-    }
-  }
-
-  // return: (data frame option, time range)
-  def readData(): (Option[DataFrame], TimeRange) = {
-    val tr = TimeInfoCache.getTimeRange
-    val timeRange = (tr._1 + minUnitTime, tr._2)
-    submitLastProcTime(timeRange._2)
-
-    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + 
deltaTimeRange._2)
-    submitCleanTime(reviseTimeRange._1)
-
-    // read directly through partition info
-    val partitionRanges = getPartitionRange(reviseTimeRange._1, 
reviseTimeRange._2)
-    println(s"read time ranges: ${reviseTimeRange}")
-    println(s"read partition ranges: ${partitionRanges}")
-
-    // list partition paths
-    val partitionPaths = listPathsBetweenRanges(filePath :: Nil, 
partitionRanges)
-//    println(partitionPaths)
-
-    val dfOpt = if (partitionPaths.isEmpty) {
-      None
-    } else {
-      try {
-        Some(sqlContext.read.json(partitionPaths: _*))
-      } catch {
-        case e: Throwable => {
-          warn(s"read data source cache warn: ${e.getMessage}")
-          None
-        }
-      }
-    }
-
-    // from until tmst range
-    val (from, until) = (reviseTimeRange._1, reviseTimeRange._2 + 1)
-    val tmstSet = rangeTmsts(from, until)
-
-    val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
-    (dfOpt, retTimeRange)
-  }
-
-  def updateData(df: DataFrame, ms: Long): Unit = {
-    if (!readOnly) {
-      val ptns = getPartition(ms)
-      val ptnsPath = genPartitionHdfsPath(ptns)
-      val dirPath = s"${filePath}/${ptnsPath}"
-      val dataFileName = s"${ms}"
-      val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
-
-      try {
-        val records = df.toJSON
-        val arr = records.collect
-        val needSave = !arr.isEmpty
-
-        // remove out time old data
-        HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
-        println(s"remove file path: ${dirPath}/${dataFileName}")
-
-        // save updated data
-        if (needSave) {
-          HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral)
-          println(s"update file path: ${dataFilePath}")
-        } else {
-          clearTmst(ms)
-          println(s"data source [${dsName}] timestamp [${ms}] cleared")
-        }
-      } catch {
-        case e: Throwable => error(s"update data error: ${e.getMessage}")
-      }
-    }
-  }
-
-  def updateData(rdd: RDD[String], ms: Long, cnt: Long): Unit = {
-    if (!readOnly) {
-      val ptns = getPartition(ms)
-      val ptnsPath = genPartitionHdfsPath(ptns)
-      val dirPath = s"${filePath}/${ptnsPath}"
-      val dataFileName = s"${ms}"
-      val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
-
-      try {
-        //      val needSave = !rdd.isEmpty
-
-        // remove out time old data
-        HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
-        println(s"remove file path: ${dirPath}/${dataFileName}")
-
-        // save updated data
-        if (cnt > 0) {
-          HdfsFileDumpUtil.dump(dataFilePath, rdd, rowSepLiteral)
-          println(s"update file path: ${dataFilePath}")
-        } else {
-          clearTmst(ms)
-          println(s"data source [${dsName}] timestamp [${ms}] cleared")
-        }
-      } catch {
-        case e: Throwable => error(s"update data error: ${e.getMessage}")
-      } finally {
-        rdd.unpersist()
-      }
-    }
-  }
-
-  def updateData(arr: Iterable[String], ms: Long): Unit = {
-    if (!readOnly) {
-      val ptns = getPartition(ms)
-      val ptnsPath = genPartitionHdfsPath(ptns)
-      val dirPath = s"${filePath}/${ptnsPath}"
-      val dataFileName = s"${ms}"
-      val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
-
-      try {
-        val needSave = !arr.isEmpty
-
-        // remove out time old data
-        HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
-        println(s"remove file path: ${dirPath}/${dataFileName}")
-
-        // save updated data
-        if (needSave) {
-          HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral)
-          println(s"update file path: ${dataFilePath}")
-        } else {
-          clearTmst(ms)
-          println(s"data source [${dsName}] timestamp [${ms}] cleared")
-        }
-      } catch {
-        case e: Throwable => error(s"update data error: ${e.getMessage}")
-      }
-    }
-  }
-
-  def updateDataMap(dfMap: Map[Long, DataFrame]): Unit = {
-    if (!readOnly) {
-      val dataMap = dfMap.map { pair =>
-        val (t, recs) = pair
-        val rdd = recs.toJSON
-        //      rdd.cache
-        (t, rdd, rdd.count)
-      }
-
-      dataMap.foreach { pair =>
-        val (t, arr, cnt) = pair
-        updateData(arr, t, cnt)
-      }
-    }
-  }
-
-  def cleanOldData(): Unit = {
-    if (!readOnly) {
-      val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-      if (oldCacheLocked) {
-        try {
-          val cleanTime = readCleanTime()
-          cleanTime match {
-            case Some(ct) => {
-              println(s"data source [${dsName}] old timestamps clear until 
[${ct}]")
-
-              // clear out date tmsts
-              clearTmstsUntil(ct)
-
-              // drop partitions
-              val bounds = getPartition(ct)
-
-              // list partition paths
-              val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, 
bounds)
-
-              // delete out time data path
-              earlierPaths.foreach { path =>
-                println(s"delete hdfs path: ${path}")
-                HdfsUtil.deleteHdfsPath(path)
-              }
-            }
-            case _ => {
-              // do nothing
-            }
-          }
-        } catch {
-          case e: Throwable => error(s"clean old data error: ${e.getMessage}")
-        } finally {
-          oldCacheLock.unlock()
-        }
-      }
-    }
-  }
-
-  override protected def genCleanTime(ms: Long): Long = {
-    val minPartitionUnit = partitionUnits.last
-    val t1 = TimeUtil.timeToUnit(ms, minPartitionUnit)
-    val t2 = TimeUtil.timeFromUnit(t1, minPartitionUnit)
-    t2
-  }
-
-  private def getPartition(ms: Long): List[Long] = {
-    partitionUnits.map { unit =>
-      TimeUtil.timeToUnit(ms, unit)
-    }
-  }
-  private def getPartitionRange(ms1: Long, ms2: Long): List[(Long, Long)] = {
-    partitionUnits.map { unit =>
-      val t1 = TimeUtil.timeToUnit(ms1, unit)
-      val t2 = TimeUtil.timeToUnit(ms2, unit)
-      (t1, t2)
-    }
-  }
-  private def genPartitionHdfsPath(partition: List[Long]): String = {
-    partition.map(prtn => s"${prtn}").mkString("/")
-  }
-  private def str2Long(str: String): Option[Long] = {
-    try {
-      Some(str.toLong)
-    } catch {
-      case e: Throwable => None
-    }
-  }
-
-
-  // here the range means [min, max]
-  private def listPathsBetweenRanges(paths: List[String],
-                                     partitionRanges: List[(Long, Long)]
-                                    ): List[String] = {
-    partitionRanges match {
-      case Nil => paths
-      case head :: tail => {
-        val (lb, ub) = head
-        val curPaths = paths.flatMap { path =>
-          val names = HdfsUtil.listSubPathsByType(path, "dir").toList
-          names.filter { name =>
-            str2Long(name) match {
-              case Some(t) => (t >= lb) && (t <= ub)
-              case _ => false
-            }
-          }.map(HdfsUtil.getHdfsFilePath(path, _))
-        }
-        listPathsBetweenRanges(curPaths, tail)
-      }
-    }
-  }
-  private def listPathsEarlierThanBounds(paths: List[String], bounds: 
List[Long]
-                                        ): List[String] = {
-    bounds match {
-      case Nil => paths
-      case head :: tail => {
-        val earlierPaths = paths.flatMap { path =>
-          val names = HdfsUtil.listSubPathsByType(path, "dir").toList
-          names.filter { name =>
-            str2Long(name) match {
-              case Some(t) => (t < head)
-              case _ => false
-            }
-          }.map(HdfsUtil.getHdfsFilePath(path, _))
-        }
-        val equalPaths = paths.flatMap { path =>
-          val names = HdfsUtil.listSubPathsByType(path, "dir").toList
-          names.filter { name =>
-            str2Long(name) match {
-              case Some(t) => (t == head)
-              case _ => false
-            }
-          }.map(HdfsUtil.getHdfsFilePath(path, _))
-        }
-
-        tail match {
-          case Nil => earlierPaths
-          case _ => earlierPaths ::: listPathsEarlierThanBounds(equalPaths, 
tail)
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
index b83e2fb..e18c852 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
@@ -19,11 +19,10 @@ under the License.
 package org.apache.griffin.measure.data.source
 
 import org.apache.griffin.measure.config.params.user._
-import org.apache.griffin.measure.data.connector.batch.BatchDataConnector
-import 
org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector
-import org.apache.griffin.measure.data.connector.{DataConnector, 
DataConnectorFactory}
+import org.apache.griffin.measure.data.connector.DataConnectorFactory
+import org.apache.griffin.measure.data.source.cache._
 import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.process.engine.{DqEngine, DqEngines}
+import org.apache.griffin.measure.process.engine._
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.streaming.StreamingContext
 
@@ -55,27 +54,11 @@ object DataSourceFactory extends Loggable {
         case _ => None
       }
     }
-    val dataSourceCacheOpt = genDataSourceCache(sqlContext, cacheParam, name, 
index)
+    val dataSourceCacheOpt = 
DataSourceCacheFactory.genDataSourceCache(sqlContext, cacheParam, name, index)
 
     Some(DataSource(sqlContext, name, baseline, dataConnectors, 
dataSourceCacheOpt))
   }
 
-  private def genDataSourceCache(sqlContext: SQLContext, param: Map[String, 
Any],
-                                 name: String, index: Int
-                                ) = {
-    if (param != null) {
-      try {
-        Some(DataSourceCache(sqlContext, param, name, index))
-      } catch {
-        case e: Throwable => {
-          error(s"generate data source cache fails")
-          None
-        }
-      }
-    } else None
-  }
-
-
   private def trimDataSourceParams(dataSourceParams: Seq[DataSourceParam]): 
Seq[DataSourceParam] = {
     val (validDsParams, _) =
       dataSourceParams.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { 
(ret, dsParam) =>

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala
new file mode 100644
index 0000000..36c556b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.source.cache
+
+import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
+
+trait DataCacheable {
+
+  val cacheInfoPath: String
+  val readyTimeInterval: Long
+  val readyTimeDelay: Long
+
+  def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}"
+
+  def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath)
+  def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath)
+  def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath)
+  def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath)
+  def selfOldCacheIndex = TimeInfoCache.oldCacheIndex(selfCacheInfoPath)
+
+  protected def submitCacheTime(ms: Long): Unit = {
+    val map = Map[String, String]((selfCacheTime -> ms.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  protected def submitReadyTime(ms: Long): Unit = {
+    val curReadyTime = ms - readyTimeDelay
+    if (curReadyTime % readyTimeInterval == 0) {
+      val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
+      InfoCacheInstance.cacheInfo(map)
+    }
+  }
+
+  protected def submitLastProcTime(ms: Long): Unit = {
+    val map = Map[String, String]((selfLastProcTime -> ms.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  protected def readLastProcTime(): Option[Long] = 
readSelfInfo(selfLastProcTime)
+
+  protected def submitCleanTime(ms: Long): Unit = {
+    val cleanTime = genCleanTime(ms)
+    val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  protected def genCleanTime(ms: Long): Long = ms
+
+  protected def readCleanTime(): Option[Long] = readSelfInfo(selfCleanTime)
+
+  protected def submitOldCacheIndex(index: Long): Unit = {
+    val map = Map[String, String]((selfOldCacheIndex -> index.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  protected def readOldCacheIndex(): Option[Long] = 
readSelfInfo(selfOldCacheIndex)
+
+  private def readSelfInfo(key: String): Option[Long] = {
+    InfoCacheInstance.readInfo(key :: Nil).get(key).flatMap { v =>
+      try {
+        Some(v.toLong)
+      } catch {
+        case _ => None
+      }
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
new file mode 100644
index 0000000..1a0366d
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
@@ -0,0 +1,333 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.source.cache
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
+import org.apache.griffin.measure.cache.tmst.TmstCache
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.process.temp.TimeRange
+import org.apache.griffin.measure.rule.adaptor.InternalColumns
+import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.spark.sql._
+
+// data source cache process steps
+// dump phase: save
+// process phase: read -> process -> update -> finish -> clean old data
+trait DataSourceCache extends DataCacheable with Loggable with Serializable {
+
+  val sqlContext: SQLContext
+  val param: Map[String, Any]
+  val dsName: String
+  val index: Int
+
+  var tmstCache: TmstCache = _
+  protected def rangeTmsts(from: Long, until: Long) = tmstCache.range(from, 
until)
+  protected def clearTmst(t: Long) = tmstCache.remove(t)
+  protected def clearTmstsUntil(until: Long) = {
+    val outDateTmsts = tmstCache.until(until)
+    tmstCache.remove(outDateTmsts)
+  }
+
+  val _FilePath = "file.path"
+  val _InfoPath = "info.path"
+  val _ReadyTimeInterval = "ready.time.interval"
+  val _ReadyTimeDelay = "ready.time.delay"
+  val _TimeRange = "time.range"
+
+  val defFilePath = s"hdfs:///griffin/cache/${dsName}/${index}"
+  val defInfoPath = s"${index}"
+
+  val filePath: String = param.getString(_FilePath, defFilePath)
+  val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
+  val readyTimeInterval: Long = 
TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, 
"1m")).getOrElse(60000L)
+  val readyTimeDelay: Long = 
TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+  val deltaTimeRange: (Long, Long) = {
+    def negative(n: Long): Long = if (n <= 0) n else 0
+    param.get(_TimeRange) match {
+      case Some(seq: Seq[String]) => {
+        val nseq = seq.flatMap(TimeUtil.milliseconds(_))
+        val ns = negative(nseq.headOption.getOrElse(0))
+        val ne = negative(nseq.tail.headOption.getOrElse(0))
+        (ns, ne)
+      }
+      case _ => (0, 0)
+    }
+  }
+
+  val _ReadOnly = "read.only"
+  val readOnly = param.getBoolean(_ReadOnly, false)
+
+  val _Updatable = "updatable"
+  val updatable = param.getBoolean(_Updatable, false)
+
+//  val rowSepLiteral = "\n"
+//  val partitionUnits: List[String] = List("hour", "min", "sec")
+//  val minUnitTime: Long = TimeUtil.timeFromUnit(1, partitionUnits.last)
+
+  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
+  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
+
+  val newFilePath = s"${filePath}/new"
+  val oldFilePath = s"${filePath}/old"
+
+  val defOldCacheIndex = 0L
+
+  protected def writeDataFrame(dfw: DataFrameWriter, path: String): Unit
+  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
+
+  def init(): Unit = {}
+
+  // save new cache data only
+  def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
+    if (!readOnly) {
+      dfOpt match {
+        case Some(df) => {
+          // lock makes it safer when writing new cache data
+          val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+          if (newCacheLocked) {
+            try {
+              val dfw = 
df.write.mode(SaveMode.Append).partitionBy(InternalColumns.tmst)
+              writeDataFrame(dfw, newFilePath)
+            } catch {
+              case e: Throwable => error(s"save data error: ${e.getMessage}")
+            } finally {
+              newCacheLock.unlock()
+            }
+          }
+        }
+        case _ => {
+          info(s"no data frame to save")
+        }
+      }
+
+      // submit cache time and ready time
+      submitCacheTime(ms)
+      submitReadyTime(ms)
+    }
+  }
+
+  // read new cache data and old cache data
+  def readData(): (Option[DataFrame], TimeRange) = {
+    // time range: [a, b)
+    val timeRange = TimeInfoCache.getTimeRange
+    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + 
deltaTimeRange._2)
+
+    // read partition info
+    val filterStr = s"`${InternalColumns.tmst}` >= ${reviseTimeRange._1} AND 
`${InternalColumns.tmst}` < ${reviseTimeRange._2}"
+    println(s"read time range: [${reviseTimeRange._1}, ${reviseTimeRange._2})")
+
+    // new cache data
+    val newDfOpt = try {
+      val dfr = sqlContext.read
+      Some(readDataFrame(dfr, newFilePath).filter(filterStr))
+    } catch {
+      case e: Throwable => {
+        warn(s"read data source cache warn: ${e.getMessage}")
+        None
+      }
+    }
+
+    // old cache data
+    val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None
+    val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
+      val oldDfPath = s"${oldFilePath}/${idx}"
+      try {
+        val dfr = sqlContext.read
+//        Some(readDataFrame(dfr, oldDfPath).filter(filterStr))
+        Some(readDataFrame(dfr, oldDfPath))   // not need to filter, has 
filtered in update phase
+      } catch {
+        case e: Throwable => {
+          warn(s"read old data source cache warn: ${e.getMessage}")
+          None
+        }
+      }
+    }
+
+    // whole cache data
+    val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt)
+
+    // from until tmst range
+    val (from, until) = (reviseTimeRange._1, reviseTimeRange._2)
+    val tmstSet = rangeTmsts(from, until)
+
+    val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
+    (cacheDfOpt, retTimeRange)
+  }
+
+  private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: Option[DataFrame]
+                         ): Option[DataFrame] = {
+    (dfOpt1, dfOpt2) match {
+      case (Some(df1), Some(df2)) => Some(df1 unionAll df2)
+      case (Some(df1), _) => dfOpt1
+      case (_, Some(df2)) => dfOpt2
+      case _ => None
+    }
+  }
+
+  private def cleanOutTimePartitions(path: String, outTime: Long, 
partitionOpt: Option[String]): Unit = {
+    val earlierPaths = listEarlierPartitions(path: String, outTime, 
partitionOpt)
+    // delete out time data path
+    earlierPaths.foreach { path =>
+      println(s"delete hdfs path: ${path}")
+      HdfsUtil.deleteHdfsPath(path)
+    }
+  }
+  private def listEarlierPartitions(path: String, bound: Long, partitionOpt: 
Option[String]): Iterable[String] = {
+    val names = HdfsUtil.listSubPathsByType(path, "dir")
+    val regex = partitionOpt match {
+      case Some(partition) => s"^${partition}=(\\d+)$$".r
+      case _ => "^(\\d+)$".r
+    }
+    names.filter { name =>
+      name match {
+        case regex(value) => {
+          str2Long(value) match {
+            case Some(t) => (t < bound)
+            case _ => false
+          }
+        }
+        case _ => false
+      }
+    }.map(name => s"${path}/${name}")
+  }
+  private def str2Long(str: String): Option[Long] = {
+    try {
+      Some(str.toLong)
+    } catch {
+      case e: Throwable => None
+    }
+  }
+
+  // clean out time from new cache data and old cache data
+  def cleanOutTimeData(): Unit = {
+    if (!readOnly) {
+      // new cache data
+      val newCacheCleanTime = if (updatable) readLastProcTime else 
readCleanTime
+      newCacheCleanTime match {
+        case Some(nct) => {
+          // clean calculated new cache data
+          val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+          if (newCacheLocked) {
+            try {
+              cleanOutTimePartitions(newFilePath, nct, 
Some(InternalColumns.tmst))
+            } catch {
+              case e: Throwable => error(s"clean new cache data error: 
${e.getMessage}")
+            } finally {
+              newCacheLock.unlock()
+            }
+          }
+        }
+        case _ => {
+          // do nothing
+        }
+      }
+
+      // old cache data
+      val oldCacheCleanTime = if (updatable) readCleanTime else None
+      oldCacheCleanTime match {
+        case Some(oct) => {
+          val oldCacheIndexOpt = readOldCacheIndex
+          oldCacheIndexOpt.foreach { idx =>
+            val oldDfPath = s"${oldFilePath}/${idx}"
+            val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+            if (oldCacheLocked) {
+              try {
+                // clean calculated old cache data
+                cleanOutTimePartitions(oldFilePath, idx, None)
+                // clean out time old cache data not calculated
+//                cleanOutTimePartitions(oldDfPath, oct, 
Some(InternalColumns.tmst))
+              } catch {
+                case e: Throwable => error(s"clean old cache data error: 
${e.getMessage}")
+              } finally {
+                oldCacheLock.unlock()
+              }
+            }
+          }
+        }
+        case _ => {
+          // do nothing
+        }
+      }
+    }
+  }
+
+  // update old cache data
+  def updateData(dfOpt: Option[DataFrame]): Unit = {
+    if (!readOnly && updatable) {
+      dfOpt match {
+        case Some(df) => {
+          // old cache lock
+          val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+          if (oldCacheLocked) {
+            try {
+              val oldCacheIndexOpt = readOldCacheIndex
+              val nextOldCacheIndex = 
oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1
+
+              val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}"
+//              val dfw = 
df.write.mode(SaveMode.Overwrite).partitionBy(InternalColumns.tmst)
+              val cleanTime = readCleanTime
+              val updateDf = cleanTime match {
+                case Some(ct) => {
+                  val filterStr = s"`${InternalColumns.tmst}` >= ${ct}"
+                  df.filter(filterStr)
+                }
+                case _ => df
+              }
+
+              val prlCount = sqlContext.sparkContext.defaultParallelism
+              // coalesce
+//              val ptnCount = updateDf.rdd.getNumPartitions
+//              val repartitionedDf = if (prlCount < ptnCount) {
+//                updateDf.coalesce(prlCount)
+//              } else updateDf
+              // repartition
+              val repartitionedDf = updateDf.repartition(prlCount)
+              val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
+              writeDataFrame(dfw, oldDfPath)
+
+              submitOldCacheIndex(nextOldCacheIndex)
+            } catch {
+              case e: Throwable => error(s"update data error: ${e.getMessage}")
+            } finally {
+              oldCacheLock.unlock()
+            }
+          }
+        }
+        case _ => {
+          info(s"no data frame to update")
+        }
+      }
+    }
+  }
+
+  // process finish
+  def processFinish(): Unit = {
+    // next last proc time
+    val timeRange = TimeInfoCache.getTimeRange
+    submitLastProcTime(timeRange._2)
+
+    // next clean time
+    val nextCleanTime = timeRange._2 + deltaTimeRange._1
+    submitCleanTime(nextCleanTime)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala
new file mode 100644
index 0000000..d03c181
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.source.cache
+
+import org.apache.griffin.measure.log.Loggable
+import org.apache.spark.sql.SQLContext
+import org.apache.griffin.measure.utils.ParamUtil._
+
+object DataSourceCacheFactory extends Loggable {
+
+  private object DataSourceCacheType {
+    val parquet = "^(?i)parq(uet)?$".r
+    val json = "^(?i)json$".r
+    val orc = "^(?i)orc$".r
+  }
+  import DataSourceCacheType._
+
+  val _type = "type"
+
+  def genDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
+                                 name: String, index: Int
+                                ) = {
+    if (param != null) {
+      try {
+        val tp = param.getString(_type, "")
+        val dsCache = tp match {
+          case parquet() => ParquetDataSourceCache(sqlContext, param, name, 
index)
+          case json() => JsonDataSourceCache(sqlContext, param, name, index)
+          case orc() => OrcDataSourceCache(sqlContext, param, name, index)
+          case _ => ParquetDataSourceCache(sqlContext, param, name, index)
+        }
+        Some(dsCache)
+      } catch {
+        case e: Throwable => {
+          error(s"generate data source cache fails")
+          None
+        }
+      }
+    } else None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
new file mode 100644
index 0000000..e284d47
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.source.cache
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, 
SQLContext}
+
+case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
+                               dsName: String, index: Int
+                              ) extends DataSourceCache {
+
+  override def init(): Unit = {
+//    
sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
 "false");
+  }
+
+  def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = {
+    println(s"write path: ${path}")
+    dfw.json(path)
+  }
+
+  def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+    dfr.json(path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
new file mode 100644
index 0000000..7b92bef
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.source.cache
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, 
SQLContext}
+
+case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
+                              dsName: String, index: Int
+                             ) extends DataSourceCache {
+
+  override def init(): Unit = {
+//    
sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
 "false");
+  }
+
+  def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = {
+    println(s"write path: ${path}")
+    dfw.orc(path)
+  }
+
+  def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+    dfr.orc(path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
new file mode 100644
index 0000000..1761f56
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.source.cache
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, 
SQLContext}
+
+case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, 
Any],
+                                  dsName: String, index: Int
+                                 ) extends DataSourceCache {
+
+  override def init(): Unit = {
+    
sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
 "false");
+  }
+
+  def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = {
+    println(s"write path: ${path}")
+    dfw.parquet(path)
+  }
+
+  def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+    dfr.parquet(path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
index aa97afa..bed28fd 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
@@ -40,14 +40,6 @@ case class MultiPersists(persists: Iterable[Persist]) 
extends Persist {
   def start(msg: String): Unit = { persists.foreach(_.start(msg)) }
   def finish(): Unit = { persists.foreach(_.finish()) }
 
-//  def result(rt: Long, result: Result): Unit = { 
persists.foreach(_.result(rt, result)) }
-//
-//  def records(recs: RDD[String], tp: String): Unit = { 
persists.foreach(_.records(recs, tp)) }
-//  def records(recs: Iterable[String], tp: String): Unit = { 
persists.foreach(_.records(recs, tp)) }
-
-//  def missRecords(records: RDD[String]): Unit = { 
persists.foreach(_.missRecords(records)) }
-//  def matchRecords(records: RDD[String]): Unit = { 
persists.foreach(_.matchRecords(records)) }
-
   def log(rt: Long, msg: String): Unit = {
     persists.foreach { persist =>
       try {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
index 44cca9a..8c95a39 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
@@ -20,24 +20,20 @@ package org.apache.griffin.measure.process
 
 import java.util.Date
 
-import org.apache.griffin.measure.cache.info.TimeInfoCache
-import org.apache.griffin.measure.cache.result.CacheResultProcesser
 import org.apache.griffin.measure.config.params._
 import org.apache.griffin.measure.config.params.env._
 import org.apache.griffin.measure.config.params.user._
 import org.apache.griffin.measure.data.source.DataSourceFactory
 import org.apache.griffin.measure.persist.{Persist, PersistFactory}
-import org.apache.griffin.measure.process.engine.{DqEngineFactory, 
SparkSqlEngine}
-import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters}
-import org.apache.griffin.measure.rule.adaptor.{RuleAdaptorGroup, RunPhase}
+import org.apache.griffin.measure.process.engine._
+import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters, TimeRange}
+import org.apache.griffin.measure.rule.adaptor._
 import org.apache.griffin.measure.rule.plan._
-import org.apache.griffin.measure.rule.udf.GriffinUdfs
-import org.apache.griffin.measure.utils.JsonUtil
+import org.apache.griffin.measure.rule.udf._
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.{SparkConf, SparkContext}
 
-import scala.concurrent.Await
 import scala.util.Try
 
 case class BatchDqProcess(allParam: AllParam) extends DqProcess {
@@ -64,6 +60,7 @@ case class BatchDqProcess(allParam: AllParam) extends 
DqProcess {
 
     // register udf
     GriffinUdfs.register(sqlContext)
+    GriffinUdafs.register(sqlContext)
 
     // init adaptors
     RuleAdaptorGroup.init(sqlContext, dataSourceNames, baselineDsName)
@@ -93,25 +90,11 @@ case class BatchDqProcess(allParam: AllParam) extends 
DqProcess {
 
     // init data sources
     val dsTimeRanges = dqEngines.loadData(dataSources, calcTimeInfo)
-
-    println(s"data source timeRanges: ${dsTimeRanges}")
-
-    // generate rule steps
-//    val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps(
-//      TimeInfo(appTime, appTime), userParam.evaluateRuleParam, dsTmsts, 
BatchProcessType, RunPhase)
-//    val ruleSteps = RuleAdaptorGroup.genRuleSteps(
-//      CalcTimeInfo(appTime), userParam.evaluateRuleParam, dsTmsts)
+    printTimeRanges(dsTimeRanges)
 
     val rulePlan = RuleAdaptorGroup.genRulePlan(
       calcTimeInfo, userParam.evaluateRuleParam, BatchProcessType, 
dsTimeRanges)
 
-//    rulePlan.ruleSteps.foreach(println)
-//    println("====")
-//    rulePlan.metricExports.foreach(println)
-//    println("====")
-//    rulePlan.recordExports.foreach(println)
-//    println("====")
-
     // run rules
     dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps)
 
@@ -119,11 +102,6 @@ case class BatchDqProcess(allParam: AllParam) extends 
DqProcess {
     dqEngines.persistAllMetrics(rulePlan.metricExports, persistFactory)
 
     dqEngines.persistAllRecords(rulePlan.recordExports, persistFactory, 
dataSources)
-//    dfs.foreach(_._2.cache())
-//
-//    dqEngines.persistAllRecords(dfs, persistFactory)
-
-//    dfs.foreach(_._2.unpersist())
 
     // end time
     val endTime = new Date().getTime
@@ -132,11 +110,6 @@ case class BatchDqProcess(allParam: AllParam) extends 
DqProcess {
     // finish
     persist.finish()
 
-//    sqlContext.tables().show(50)
-//    println(sqlContext.tableNames().size)
-
-//    sqlContext.tables().show(50)
-
     // clean data
     cleanData(calcTimeInfo)
 
@@ -190,4 +163,12 @@ case class BatchDqProcess(allParam: AllParam) extends 
DqProcess {
 //    }
 //  }
 
+  private def printTimeRanges(timeRanges: Map[String, TimeRange]): Unit = {
+    val timeRangesStr = timeRanges.map { pair =>
+      val (name, timeRange) = pair
+      s"${name} -> [${timeRange.begin}, ${timeRange.end})"
+    }.mkString(", ")
+    println(s"data source timeRanges: ${timeRangesStr}")
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
index 1cc2ab7..3c2376a 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
@@ -18,8 +18,6 @@ under the License.
 */
 package org.apache.griffin.measure.process
 
-import java.util.Date
-
 import org.apache.griffin.measure.cache.info.InfoCacheInstance
 import org.apache.griffin.measure.config.params._
 import org.apache.griffin.measure.config.params.env._
@@ -29,8 +27,7 @@ import org.apache.griffin.measure.persist.{Persist, 
PersistFactory}
 import org.apache.griffin.measure.process.engine.DqEngineFactory
 import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters}
 import org.apache.griffin.measure.rule.adaptor.RuleAdaptorGroup
-import org.apache.griffin.measure.rule.plan.TimeInfo
-import org.apache.griffin.measure.rule.udf.GriffinUdfs
+import org.apache.griffin.measure.rule.udf._
 import org.apache.griffin.measure.utils.TimeUtil
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.hive.HiveContext
@@ -67,6 +64,7 @@ case class StreamingDqProcess(allParam: AllParam) extends 
DqProcess {
 
     // register udf
     GriffinUdfs.register(sqlContext)
+    GriffinUdafs.register(sqlContext)
 
     // init adaptors
     val dataSourceNames = userParam.dataSources.map(_.name)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
index c3c4f09..dc49df0 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
@@ -28,7 +28,7 @@ import org.apache.griffin.measure.data.source.DataSource
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.persist.{Persist, PersistFactory}
 import org.apache.griffin.measure.process.engine.DqEngines
-import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters}
+import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters, TimeRange}
 import org.apache.griffin.measure.rule.adaptor.{ProcessDetailsKeys, 
RuleAdaptorGroup, RunPhase}
 import org.apache.griffin.measure.rule.plan._
 import org.apache.spark.sql.SQLContext
@@ -59,80 +59,41 @@ case class StreamingDqThread(sqlContext: SQLContext,
 
         // init data sources
         val dsTimeRanges = dqEngines.loadData(dataSources, calcTimeInfo)
-
-        println(s"data source timeRanges: ${dsTimeRanges}")
+        printTimeRanges(dsTimeRanges)
 
         // generate rule steps
-//        val ruleSteps = RuleAdaptorGroup.genRuleSteps(
-//          CalcTimeInfo(st), evaluateRuleParam, dsTmsts)
         val rulePlan = RuleAdaptorGroup.genRulePlan(
           calcTimeInfo, evaluateRuleParam, StreamingProcessType, dsTimeRanges)
 
-        // optimize rule plan
-//        val optRulePlan = optimizeRulePlan(rulePlan, dsTmsts)
-        val optRulePlan = rulePlan
-
-//        ruleSteps.foreach(println)
-
         // run rules
-//        dqEngines.runRuleSteps(ruleSteps)
-        dqEngines.runRuleSteps(calcTimeInfo, optRulePlan.ruleSteps)
+        dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps)
 
         val ct = new Date().getTime
         val calculationTimeStr = s"calculation using time: ${ct - st} ms"
-//        println(calculationTimeStr)
         appPersist.log(ct, calculationTimeStr)
 
         // persist results
-//        val timeGroups = dqEngines.persistAllMetrics(ruleSteps, 
persistFactory)
-        dqEngines.persistAllMetrics(optRulePlan.metricExports, persistFactory)
-//        println(s"--- timeGroups: ${timeGroups}")
+        dqEngines.persistAllMetrics(rulePlan.metricExports, persistFactory)
 
         val rt = new Date().getTime
         val persistResultTimeStr = s"persist result using time: ${rt - ct} ms"
         appPersist.log(rt, persistResultTimeStr)
 
         // persist records
-        dqEngines.persistAllRecords(optRulePlan.recordExports, persistFactory, 
dataSources)
+        dqEngines.persistAllRecords(rulePlan.recordExports, persistFactory, 
dataSources)
+
+        // update data sources
+        dqEngines.updateDataSources(rulePlan.dsUpdates, dataSources)
+
+        // finish calculation
+        finishCalculation()
 
         val et = new Date().getTime
         val persistTimeStr = s"persist records using time: ${et - rt} ms"
         appPersist.log(et, persistTimeStr)
 
-//        val dfs = dqEngines.collectUpdateRDDs(ruleSteps, timeGroups.toSet)
-//        dfs.foreach(_._2.cache())
-//        dfs.foreach { pr =>
-//          val (step, df) = pr
-//          val cnt = df.count
-//          println(s"step [${step.name}] group count: ${cnt}")
-//        }
-//
-//        val lt = new Date().getTime
-//        val collectRddTimeStr = s"collect records using time: ${lt - rt} ms"
-////        println(collectRddTimeStr)
-//        appPersist.log(lt, collectRddTimeStr)
-//
-//        // persist records
-//        dqEngines.persistAllRecords(dfs, persistFactory)
-////        dqEngines.persistAllRecords(ruleSteps, persistFactory, timeGroups)
-//
-//        // update data source
-//        dqEngines.updateDataSources(dfs, dataSources)
-////        dqEngines.updateDataSources(ruleSteps, dataSources, timeGroups)
-//
-//        dfs.foreach(_._2.unpersist())
-
         TimeInfoCache.endTimeInfoCache
 
-//        sqlContext.tables().show(20)
-
-        // cache global data
-//        val globalTables = TableRegisters.getRunGlobalTables
-//        globalTables.foreach { gt =>
-//          val df = sqlContext.table(gt)
-//          df.cache
-//        }
-
         // clean old data
         cleanData(calcTimeInfo)
 
@@ -150,6 +111,11 @@ case class StreamingDqThread(sqlContext: SQLContext,
     println(s"===== [${updateTimeDate}] process ends, using ${endTime - 
updateTime} ms =====")
   }
 
+  // finish calculation for this round
+  private def finishCalculation(): Unit = {
+    dataSources.foreach(_.processFinish)
+  }
+
   // clean old data and old result cache
   private def cleanData(timeInfo: TimeInfo): Unit = {
     try {
@@ -169,29 +135,12 @@ case class StreamingDqThread(sqlContext: SQLContext,
     }
   }
 
-  private def optimizeRulePlan(rulePlan: RulePlan, dsTmsts: Map[String, 
Set[Long]]): RulePlan = {
-    val steps = rulePlan.ruleSteps
-    val optExports = rulePlan.ruleExports.flatMap { export =>
-      findRuleStepByName(steps, export.stepName).map { rs =>
-        rs.details.get(ProcessDetailsKeys._baselineDataSource) match {
-          case Some(dsname: String) => {
-            val defTmstOpt = (dsTmsts.get(dsname)).flatMap { set =>
-              try { Some(set.max) } catch { case _: Throwable => None }
-            }
-            defTmstOpt match {
-              case Some(t) => export.setDefTimestamp(t)
-              case _ => export
-            }
-          }
-          case _ => export
-        }
-      }
-    }
-    RulePlan(steps, optExports)
-  }
-
-  private def findRuleStepByName(steps: Seq[RuleStep], name: String): 
Option[RuleStep] = {
-    steps.filter(_.name == name).headOption
+  private def printTimeRanges(timeRanges: Map[String, TimeRange]): Unit = {
+    val timeRangesStr = timeRanges.map { pair =>
+      val (name, timeRange) = pair
+      s"${name} -> [${timeRange.begin}, ${timeRange.end})"
+    }.mkString(", ")
+    println(s"data source timeRanges: ${timeRangesStr}")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
index ee3a65e..3d77458 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
@@ -46,4 +46,6 @@ trait DqEngine extends Loggable with Serializable {
 
   def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]]
   def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, 
Iterable[String])]], Set[Long])
+
+  def collectUpdateDf(dsUpdate: DsUpdate): Option[DataFrame]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
index 8f17764..6b9a215 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
@@ -23,20 +23,20 @@ import java.util.concurrent.atomic.AtomicInteger
 import org.apache.griffin.measure.config.params.user.DataSourceParam
 import org.apache.griffin.measure.data.source._
 import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.persist._
 import org.apache.griffin.measure.process.temp.TimeRange
 import org.apache.griffin.measure.process._
 import org.apache.griffin.measure.rule.adaptor.InternalColumns
 import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.plan._
+import org.apache.griffin.measure.rule.plan.{DsUpdate, _}
 import org.apache.griffin.measure.utils.JsonUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row}
 
-import scala.concurrent._
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success, Try}
-import ExecutionContext.Implicits.global
+//import scala.concurrent._
+//import scala.concurrent.duration.Duration
+//import scala.util.{Failure, Success, Try}
+//import ExecutionContext.Implicits.global
 
 case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
 
@@ -76,41 +76,40 @@ case class DqEngines(engines: Seq[DqEngine]) extends 
DqEngine {
     }
   }
 
-  private def persistCollectedRecords(recordExport: RecordExport, records: 
Map[Long, DataFrame],
-                                      persistFactory: PersistFactory, 
dataSources: Seq[DataSource]): Unit = {
-    val pc = ParallelCounter(records.size)
-    val pro = promise[Boolean]
-    if (records.size > 0) {
-      records.foreach { pair =>
-        val (tmst, df) = pair
-        val persist = persistFactory.getPersists(tmst)
-        val updateDsCaches = recordExport.dataSourceCacheOpt match {
-          case Some(dsName) => dataSources.filter(_.name == 
dsName).flatMap(_.dataSourceCacheOpt)
-          case _ => Nil
-        }
-        val future = Future {
-//        df.cache
-          persist.persistRecords(df, recordExport.name)
-          updateDsCaches.foreach(_.updateData(df, tmst))
-//        df.unpersist
-          true
-        }
-        future.onComplete {
-          case Success(v) => {
-            pc.finishOne(v)
-            if (pc.checkDone) pro.trySuccess(pc.checkResult)
-          }
-          case Failure(ex) => {
-            println(s"plan step failure: ${ex.getMessage}")
-            pc.finishOne(false)
-            if (pc.checkDone) pro.trySuccess(pc.checkResult)
-          }
-        }
-      }
-    } else pro.trySuccess(true)
-
-    Await.result(pro.future, Duration.Inf)
-  }
+//  private def persistCollectedRecords(recordExport: RecordExport, records: 
Map[Long, DataFrame],
+//                                      persistFactory: PersistFactory, 
dataSources: Seq[DataSource]): Unit = {
+//    val pc = ParallelCounter(records.size)
+//    val pro = promise[Boolean]
+//    if (records.size > 0) {
+//      records.foreach { pair =>
+//        val (tmst, df) = pair
+//        val persist = persistFactory.getPersists(tmst)
+//        val updateDsCaches = recordExport.dataSourceCacheOpt match {
+//          case Some(dsName) => dataSources.filter(_.name == 
dsName).flatMap(_.dataSourceCacheOpt)
+//          case _ => Nil
+//        }
+//        val future = Future {
+//          persist.persistRecords(df, recordExport.name)
+////          updateDsCaches.foreach(_.updateData(df, tmst))
+//          updateDsCaches.foreach(_.updateData(Some(df)))
+//          true
+//        }
+//        future.onComplete {
+//          case Success(v) => {
+//            pc.finishOne(v)
+//            if (pc.checkDone) pro.trySuccess(pc.checkResult)
+//          }
+//          case Failure(ex) => {
+//            println(s"plan step failure: ${ex.getMessage}")
+//            pc.finishOne(false)
+//            if (pc.checkDone) pro.trySuccess(pc.checkResult)
+//          }
+//        }
+//      }
+//    } else pro.trySuccess(true)
+//
+//    Await.result(pro.future, Duration.Inf)
+//  }
 
   def persistAllRecords(recordExports: Seq[RecordExport],
                         persistFactory: PersistFactory, dataSources: 
Seq[DataSource]
@@ -123,7 +122,6 @@ case class DqEngines(engines: Seq[DqEngine]) extends 
DqEngine {
 
     // method 2: multi thread persist multi iterable
     recordExports.foreach { recordExport =>
-//      val records = collectRecords(timeInfo, recordExport, procType)
       recordExport.mode match {
         case SimpleMode => {
           collectBatchRecords(recordExport).foreach { rdd =>
@@ -133,9 +131,6 @@ case class DqEngines(engines: Seq[DqEngine]) extends 
DqEngine {
         case TimestampMode => {
           val (rddOpt, emptySet) = collectStreamingRecords(recordExport)
           persistCollectedStreamingRecords(recordExport, rddOpt, emptySet, 
persistFactory, dataSources)
-//          collectStreamingRecords(recordExport).foreach { rddPair =>
-//            persistCollectedStreamingRecords(recordExport, rddPair._1, 
rddPair._2, persistFactory, dataSources)
-//          }
         }
       }
     }
@@ -165,10 +160,10 @@ case class DqEngines(engines: Seq[DqEngine]) extends 
DqEngine {
                                                emtpyRecordKeys: Set[Long], 
persistFactory: PersistFactory,
                                                dataSources: Seq[DataSource]
                                               ): Unit = {
-    val updateDsCaches = recordExport.dataSourceCacheOpt match {
-      case Some(dsName) => dataSources.filter(_.name == 
dsName).flatMap(_.dataSourceCacheOpt)
-      case _ => Nil
-    }
+//    val updateDsCaches = recordExport.dataSourceCacheOpt match {
+//      case Some(dsName) => dataSources.filter(_.name == 
dsName).flatMap(_.dataSourceCacheOpt)
+//      case _ => Nil
+//    }
 
     recordsOpt.foreach { records =>
       records.foreach { pair =>
@@ -176,90 +171,17 @@ case class DqEngines(engines: Seq[DqEngine]) extends 
DqEngine {
         val persist = persistFactory.getPersists(tmst)
 
         persist.persistRecords(strs, recordExport.name)
-        updateDsCaches.foreach(_.updateData(strs, tmst))
+//        updateDsCaches.foreach(_.updateData(strs, tmst))
       }
     }
 
     emtpyRecordKeys.foreach { t =>
       val persist = persistFactory.getPersists(t)
       persist.persistRecords(Nil, recordExport.name)
-      updateDsCaches.foreach(_.updateData(Nil, t))
+//      updateDsCaches.foreach(_.updateData(Nil, t))
     }
   }
 
-//  private def persistCollectedStreamingRecords(recordExport: RecordExport, 
records: RDD[(Long, Iterable[String])],
-//                                               emtpyRecordKeys: Set[Long], 
persistFactory: PersistFactory,
-//                                               dataSources: Seq[DataSource]
-//                                              ): Unit = {
-//    val updateDsCaches = recordExport.dataSourceCacheOpt match {
-//      case Some(dsName) => dataSources.filter(_.name == 
dsName).flatMap(_.dataSourceCacheOpt)
-//      case _ => Nil
-//    }
-//
-//    records.foreach { pair =>
-//      val (tmst, strs) = pair
-//      val persist = persistFactory.getPersists(tmst)
-//
-//      persist.persistRecords(strs, recordExport.name)
-//      updateDsCaches.foreach(_.updateData(strs, tmst))
-//    }
-//
-//    emtpyRecordKeys.foreach { t =>
-//      val persist = persistFactory.getPersists(t)
-//      persist.persistRecords(Nil, recordExport.name)
-//      updateDsCaches.foreach(_.updateData(Nil, t))
-//    }
-//  }
-
-//  def persistAllRecords(ruleSteps: Seq[ConcreteRuleStep], persistFactory: 
PersistFactory,
-//                        timeGroups: Iterable[Long]): Unit = {
-//    val recordSteps = ruleSteps.filter(_.persistType == RecordPersistType)
-//    recordSteps.foreach { step =>
-//      collectRecords(step, timeGroups) match {
-//        case Some(rdd) => {
-//          val name = step.name
-//          rdd.foreach { pair =>
-//            val (t, items) = pair
-//            val persist = persistFactory.getPersists(t)
-//            persist.persistRecords(items, name)
-//          }
-//        }
-//        case _ => {
-//          println(s"empty records to persist")
-//        }
-//      }
-//    }
-//  }
-//
-//  def updateDataSources(ruleSteps: Seq[ConcreteRuleStep], dataSources: 
Seq[DataSource],
-//                        timeGroups: Iterable[Long]): Unit = {
-//    val updateSteps = ruleSteps.filter(_.updateDataSource.nonEmpty)
-//    updateSteps.foreach { step =>
-//      collectUpdateCacheDatas(step, timeGroups) match {
-//        case Some(rdd) => {
-//          val udpateDataSources = dataSources.filter { ds =>
-//            step.updateDataSource match {
-//              case Some(dsName) if (dsName == ds.name) => true
-//              case _ => false
-//            }
-//          }
-//          if (udpateDataSources.size > 0) {
-//            val name = step.name
-//            rdd.foreach { pair =>
-//              val (t, items) = pair
-//              udpateDataSources.foreach { ds =>
-//                ds.dataSourceCacheOpt.foreach(_.updateData(items, t))
-//              }
-//            }
-//          }
-//        }
-//        case _ => {
-//          println(s"empty data source to update")
-//        }
-//      }
-//    }
-//  }
-
   ///////////////////////////
 
   def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = {
@@ -272,16 +194,6 @@ case class DqEngines(engines: Seq[DqEngine]) extends 
DqEngine {
 
   ///////////////////////////
 
-//  def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = {
-//    engines.flatMap { engine =>
-//      engine.collectRecords(ruleStep, timeGroups)
-//    }.headOption
-//  }
-//  def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = {
-//    engines.flatMap { engine =>
-//      engine.collectUpdateCacheDatas(ruleStep, timeGroups)
-//    }.headOption
-//  }
   def collectMetrics(metricExport: MetricExport
                     ): Map[Long, Map[String, Any]] = {
     val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) 
=>
@@ -290,121 +202,28 @@ case class DqEngines(engines: Seq[DqEngine]) extends 
DqEngine {
     ret
   }
 
-//  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport): 
Map[Long, DataFrame] = {
-//    val ret = engines.foldLeft(Map[Long, DataFrame]()) { (ret, engine) =>
-//      if (ret.nonEmpty) ret else engine.collectRecords(timeInfo, 
recordExport)
-//    }
-//    ret
-//  }
-
-  def collectUpdateRDD(ruleStep: RuleStep): Option[DataFrame] = {
-//    engines.flatMap { engine =>
-//      engine.collectUpdateRDD(ruleStep)
-//    }.headOption
-    None
-  }
-
-//  def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]
-//                      ): Option[RDD[(Long, Iterable[String])]] = {
-//    engines.flatMap { engine =>
-//      engine.collectUpdateRDD(ruleStep, timeGroups)
-//    }.headOption
-//  }
-
-  ////////////////////////////
-
-  def collectUpdateRDDs(ruleSteps: Seq[RuleStep], timeGroups: Set[Long]
-                       ): Seq[(RuleStep, DataFrame)] = {
-//    ruleSteps.flatMap { rs =>
-//      val t = rs.timeInfo.tmst
-//      if (timeGroups.contains(t)) {
-//        collectUpdateRDD(rs).map((rs, _))
-//      } else None
-//    }
-    Nil
-  }
-
-//  def collectUpdateRDDs(ruleSteps: Seq[ConcreteRuleStep], timeGroups: 
Iterable[Long]
-//                       ): Seq[(ConcreteRuleStep, RDD[(Long, 
Iterable[String])])] = {
-//    ruleSteps.flatMap { rs =>
-//      collectUpdateRDD(rs, timeGroups) match {
-//        case Some(rdd) => Some((rs, rdd))
-//        case _ => None
-//      }
-//    }
-//  }
-
-  def persistAllRecords(stepRdds: Seq[(RuleStep, DataFrame)],
-                        persistFactory: PersistFactory): Unit = {
-//    stepRdds.foreach { stepRdd =>
-//      val (step, df) = stepRdd
-//      if (step.ruleInfo.persistType == RecordPersistType) {
-//        val name = step.ruleInfo.name
-//        val t = step.timeInfo.tmst
-//        val persist = persistFactory.getPersists(t)
-//        persist.persistRecords(df, name)
-//      }
-//    }
+  def collectUpdateDf(dsUpdate: DsUpdate): Option[DataFrame] = {
+    val ret = engines.foldLeft(None: Option[DataFrame]) { (ret, engine) =>
+      if (ret.nonEmpty) ret else engine.collectUpdateDf(dsUpdate)
+    }
+    ret
   }
 
-//  def persistAllRecords(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, 
Iterable[String])])],
-//                        persistFactory: PersistFactory): Unit = {
-//    stepRdds.foreach { stepRdd =>
-//      val (step, rdd) = stepRdd
-//      if (step.ruleInfo.persistType == RecordPersistType) {
-//        val name = step.name
-//        rdd.foreach { pair =>
-//          val (t, items) = pair
-//          val persist = persistFactory.getPersists(t)
-//          persist.persistRecords(items, name)
-//        }
-//      }
-//    }
-//  }
-
-  def updateDataSources(stepRdds: Seq[(RuleStep, DataFrame)],
+  def updateDataSources(dsUpdates: Seq[DsUpdate],
                         dataSources: Seq[DataSource]): Unit = {
-//    stepRdds.foreach { stepRdd =>
-//      val (step, df) = stepRdd
-//      if (step.ruleInfo.cacheDataSourceOpt.nonEmpty) {
-//        val udpateDsCaches = dataSources.filter { ds =>
-//          step.ruleInfo.cacheDataSourceOpt match {
-//            case Some(dsName) if (dsName == ds.name) => true
-//            case _ => false
-//          }
-//        }.flatMap(_.dataSourceCacheOpt)
-//        if (udpateDsCaches.size > 0) {
-//          val t = step.timeInfo.tmst
-//          udpateDsCaches.foreach(_.updateData(df, t))
-//        }
-//      }
-//    }
+    dsUpdates.foreach { dsUpdate =>
+      val dsName = dsUpdate.dsName
+      collectUpdateDf(dsUpdate) match {
+        case Some(df) => {
+          dataSources.filter(_.name == 
dsName).headOption.foreach(_.updateData(df))
+        }
+        case _ => {
+          // do nothing
+        }
+      }
+    }
   }
 
-//  def updateDataSources(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, 
Iterable[String])])],
-//                        dataSources: Seq[DataSource]): Unit = {
-//    stepRdds.foreach { stepRdd =>
-//      val (step, rdd) = stepRdd
-//      if (step.ruleInfo.cacheDataSourceOpt.nonEmpty) {
-//        val udpateDataSources = dataSources.filter { ds =>
-//          step.ruleInfo.cacheDataSourceOpt match {
-//            case Some(dsName) if (dsName == ds.name) => true
-//            case _ => false
-//          }
-//        }
-//        if (udpateDataSources.size > 0) {
-//          val name = step.name
-//          rdd.foreach { pair =>
-//            val (t, items) = pair
-//            udpateDataSources.foreach { ds =>
-//              ds.dataSourceCacheOpt.foreach(_.updateData(items, t))
-//            }
-//          }
-//        }
-//      }
-//    }
-//  }
-
 }
 
 case class ParallelCounter(total: Int) extends Serializable {

Reply via email to