http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/TimeRange.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/TimeRange.scala 
b/measure/src/main/scala/org/apache/griffin/measure/context/TimeRange.scala
new file mode 100644
index 0000000..4df700b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/TimeRange.scala
@@ -0,0 +1,48 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context
+
+import scala.math.{max, min}
+
+case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends 
Serializable {
+  def merge(tr: TimeRange): TimeRange = {
+    TimeRange(min(begin, tr.begin), max(end, tr.end), tmsts ++ tr.tmsts)
+  }
+  def minTmstOpt: Option[Long] = {
+    try {
+      if (tmsts.nonEmpty) Some(tmsts.min) else None
+    } catch {
+      case _: Throwable => None
+    }
+  }
+}
+
+object TimeRange {
+  val emptyTimeRange = TimeRange(0, 0, Set[Long]())
+  def apply(range: (Long, Long), tmsts: Set[Long]): TimeRange = 
TimeRange(range._1, range._2, tmsts)
+  def apply(ts: Long, tmsts: Set[Long]): TimeRange = TimeRange(ts, ts, tmsts)
+  def apply(ts: Long): TimeRange = TimeRange(ts, ts, Set[Long](ts))
+  def apply(tmsts: Set[Long]): TimeRange = {
+    try {
+      TimeRange(tmsts.min, tmsts.max, tmsts)
+    } catch {
+      case _: Throwable => emptyTimeRange
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
new file mode 100644
index 0000000..09ab9ea
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
@@ -0,0 +1,97 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.params.DataSourceParam
+import org.apache.griffin.measure.context.datasource.cache.DataSourceCache
+import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
+import org.apache.griffin.measure.context.datasource.connector.DataConnector
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.griffin.measure.utils.DataFrameUtil._
+import org.apache.spark.sql._
+
+/**
+  * data source
+  * @param name     name of data source
+  * @param dsParam  param of this data source
+  * @param dataConnectors       list of data connectors
+  * @param dataSourceCacheOpt   data source cache option in streaming mode
+  */
+case class DataSource(name: String,
+                      dsParam: DataSourceParam,
+                      dataConnectors: Seq[DataConnector],
+                      dataSourceCacheOpt: Option[DataSourceCache]
+                     ) extends Loggable with Serializable {
+
+  def init(): Unit = {
+    dataConnectors.foreach(_.init)
+  }
+
+  def loadData(context: DQContext): TimeRange = {
+    info(s"load data [${name}]")
+    val timestamp = context.contextId.timestamp
+    val (dfOpt, timeRange) = data(timestamp)
+    dfOpt match {
+      case Some(df) => {
+        context.runTimeTableRegister.registerTable(name, df)
+      }
+      case None => {
+        warn(s"load data source [${name}] fails")
+      }
+    }
+    timeRange
+  }
+
+  private def data(timestamp: Long): (Option[DataFrame], TimeRange) = {
+    val batches = dataConnectors.flatMap { dc =>
+      val (dfOpt, timeRange) = dc.data(timestamp)
+      dfOpt match {
+        case Some(df) => Some((dfOpt, timeRange))
+        case _ => None
+      }
+    }
+    val caches = dataSourceCacheOpt match {
+      case Some(dsc) => dsc.readData() :: Nil
+      case _ => Nil
+    }
+    val pairs = batches ++ caches
+
+    if (pairs.size > 0) {
+      pairs.reduce { (a, b) =>
+        (unionDfOpts(a._1, b._1), a._2.merge(b._2))
+      }
+    } else {
+      (None, TimeRange.emptyTimeRange)
+    }
+  }
+
+  def updateData(df: DataFrame): Unit = {
+    dataSourceCacheOpt.foreach(_.updateData(Some(df)))
+  }
+
+  def cleanOldData(): Unit = {
+    dataSourceCacheOpt.foreach(_.cleanOutTimeData)
+  }
+
+  def processFinish(): Unit = {
+    dataSourceCacheOpt.foreach(_.processFinish)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
new file mode 100644
index 0000000..a22b856
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.params.DataSourceParam
+import 
org.apache.griffin.measure.context.datasource.cache.DataSourceCacheFactory
+import org.apache.griffin.measure.context.datasource.connector.{DataConnector, 
DataConnectorFactory}
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
+import scala.util.Success
+
+object DataSourceFactory extends Loggable {
+
+  def getDataSources(sparkSession: SparkSession,
+                     ssc: StreamingContext,
+                     dataSourceParams: Seq[DataSourceParam]
+                    ): Seq[DataSource] = {
+    dataSourceParams.zipWithIndex.flatMap { pair =>
+      val (param, index) = pair
+      getDataSource(sparkSession, ssc, param, index)
+    }
+  }
+
+  private def getDataSource(sparkSession: SparkSession,
+                            ssc: StreamingContext,
+                            dataSourceParam: DataSourceParam,
+                            index: Int
+                           ): Option[DataSource] = {
+    val name = dataSourceParam.name
+    val connectorParams = dataSourceParam.getConnectors
+    val tmstCache = TmstCache()
+
+    val dataSourceCacheOpt = DataSourceCacheFactory.getDataSourceCacheOpt(
+      sparkSession.sqlContext, dataSourceParam.cache, name, index, tmstCache)
+
+    val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { 
connectorParam =>
+      DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
+        tmstCache, dataSourceCacheOpt) match {
+          case Success(connector) => Some(connector)
+          case _ => None
+        }
+    }
+
+    Some(DataSource(name, dataSourceParam, dataConnectors, dataSourceCacheOpt))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala
new file mode 100644
index 0000000..c70fd20
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala
@@ -0,0 +1,366 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.TimeRange
+import 
org.apache.griffin.measure.context.datasource.info.{DataSourceCacheable, 
TmstCache}
+import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, 
TimeInfoCache}
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.DataFrameUtil._
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
+import org.apache.spark.sql._
+
+import scala.util.Random
+
+/**
+  * data source cache in streaming mode
+  * save data frame into hdfs in dump phase
+  * read data frame from hdfs in calculate phase
+  * with update and clean actions for the cache data
+  */
+trait DataSourceCache extends DataSourceCacheable with WithFanIn[Long] with 
Loggable with Serializable {
+
+  val sqlContext: SQLContext
+  val param: Map[String, Any]
+  val dsName: String
+  val index: Int
+
+  val tmstCache: TmstCache
+  protected def fromUntilRangeTmsts(from: Long, until: Long) = 
tmstCache.fromUntil(from, until)
+  protected def clearTmst(t: Long) = tmstCache.remove(t)
+  protected def clearTmstsUntil(until: Long) = {
+    val outDateTmsts = tmstCache.until(until)
+    tmstCache.remove(outDateTmsts)
+  }
+  protected def afterTilRangeTmsts(after: Long, til: Long) = 
fromUntilRangeTmsts(after + 1, til + 1)
+  protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1)
+
+  val _FilePath = "file.path"
+  val _InfoPath = "info.path"
+  val _ReadyTimeInterval = "ready.time.interval"
+  val _ReadyTimeDelay = "ready.time.delay"
+  val _TimeRange = "time.range"
+
+  val rdmStr = Random.alphanumeric.take(10).mkString
+  val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}"
+  val defInfoPath = s"${index}"
+
+  val filePath: String = param.getString(_FilePath, defFilePath)
+  val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
+  val readyTimeInterval: Long = 
TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, 
"1m")).getOrElse(60000L)
+  val readyTimeDelay: Long = 
TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+  val deltaTimeRange: (Long, Long) = {
+    def negative(n: Long): Long = if (n <= 0) n else 0
+    param.get(_TimeRange) match {
+      case Some(seq: Seq[String]) => {
+        val nseq = seq.flatMap(TimeUtil.milliseconds(_))
+        val ns = negative(nseq.headOption.getOrElse(0))
+        val ne = negative(nseq.tail.headOption.getOrElse(0))
+        (ns, ne)
+      }
+      case _ => (0, 0)
+    }
+  }
+
+  val _ReadOnly = "read.only"
+  val readOnly = param.getBoolean(_ReadOnly, false)
+
+  val _Updatable = "updatable"
+  val updatable = param.getBoolean(_Updatable, false)
+
+  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
+  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
+
+  val newFilePath = s"${filePath}/new"
+  val oldFilePath = s"${filePath}/old"
+
+  val defOldCacheIndex = 0L
+
+  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
+  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
+
+  /**
+    * save data frame in dump phase
+    * @param dfOpt    data frame to be saved
+    * @param ms       timestamp of this data frame
+    */
+  def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
+    if (!readOnly) {
+      dfOpt match {
+        case Some(df) => {
+          // cache df
+          df.cache
+
+          // cache df
+          val cnt = df.count
+          info(s"save ${dsName} data count: ${cnt}")
+
+          if (cnt > 0) {
+            // lock makes it safer when writing new cache data
+            val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+            if (newCacheLocked) {
+              try {
+                val dfw = 
df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst)
+                writeDataFrame(dfw, newFilePath)
+              } catch {
+                case e: Throwable => error(s"save data error: ${e.getMessage}")
+              } finally {
+                newCacheLock.unlock()
+              }
+            }
+          }
+
+          // uncache df
+          df.unpersist
+        }
+        case _ => {
+          info(s"no data frame to save")
+        }
+      }
+
+      // submit cache time and ready time
+      if (fanIncrement(ms)) {
+        info(s"save data [${ms}] finish")
+        submitCacheTime(ms)
+        submitReadyTime(ms)
+      }
+
+    }
+  }
+
+  /**
+    * read data frame in calculation phase
+    * @return   data frame to calculate, with the time range of data
+    */
+  def readData(): (Option[DataFrame], TimeRange) = {
+    // time range: (a, b]
+    val timeRange = TimeInfoCache.getTimeRange
+    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + 
deltaTimeRange._2)
+
+    // read partition info
+    val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) {
+      info(s"read time range: [${reviseTimeRange._1}]")
+      s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
+    } else {
+      info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
+      s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND 
`${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
+    }
+
+    // new cache data
+    val newDfOpt = try {
+      val dfr = sqlContext.read
+      Some(readDataFrame(dfr, newFilePath).filter(filterStr))
+    } catch {
+      case e: Throwable => {
+        warn(s"read data source cache warn: ${e.getMessage}")
+        None
+      }
+    }
+
+    // old cache data
+    val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None
+    val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
+      val oldDfPath = s"${oldFilePath}/${idx}"
+      try {
+        val dfr = sqlContext.read
+        Some(readDataFrame(dfr, oldDfPath).filter(filterStr))
+      } catch {
+        case e: Throwable => {
+          warn(s"read old data source cache warn: ${e.getMessage}")
+          None
+        }
+      }
+    }
+
+    // whole cache data
+    val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt)
+
+    // from until tmst range
+    val (from, until) = (reviseTimeRange._1, reviseTimeRange._2)
+    val tmstSet = afterTilRangeTmsts(from, until)
+
+    val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
+    (cacheDfOpt, retTimeRange)
+  }
+
+  private def cleanOutTimePartitions(path: String, outTime: Long, 
partitionOpt: Option[String],
+                                     func: (Long, Long) => Boolean
+                                    ): Unit = {
+    val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, 
partitionOpt, func)
+    // delete out time data path
+    earlierOrEqPaths.foreach { path =>
+      info(s"delete hdfs path: ${path}")
+      HdfsUtil.deleteHdfsPath(path)
+    }
+  }
+  private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: 
Option[String],
+                                        func: (Long, Long) => Boolean
+                                       ): Iterable[String] = {
+    val names = HdfsUtil.listSubPathsByType(path, "dir")
+    val regex = partitionOpt match {
+      case Some(partition) => s"^${partition}=(\\d+)$$".r
+      case _ => "^(\\d+)$".r
+    }
+    names.filter { name =>
+      name match {
+        case regex(value) => {
+          str2Long(value) match {
+            case Some(t) => func(t, bound)
+            case _ => false
+          }
+        }
+        case _ => false
+      }
+    }.map(name => s"${path}/${name}")
+  }
+  private def str2Long(str: String): Option[Long] = {
+    try {
+      Some(str.toLong)
+    } catch {
+      case e: Throwable => None
+    }
+  }
+
+  /**
+    * clean out-time cached data on hdfs
+    */
+  def cleanOutTimeData(): Unit = {
+    // clean tmst
+    val cleanTime = readCleanTime
+    cleanTime.foreach(clearTmstsTil(_))
+
+    if (!readOnly) {
+      // new cache data
+      val newCacheCleanTime = if (updatable) readLastProcTime else 
readCleanTime
+      newCacheCleanTime match {
+        case Some(nct) => {
+          // clean calculated new cache data
+          val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+          if (newCacheLocked) {
+            try {
+              cleanOutTimePartitions(newFilePath, nct, 
Some(ConstantColumns.tmst),
+                (a: Long, b: Long) => (a <= b))
+            } catch {
+              case e: Throwable => error(s"clean new cache data error: 
${e.getMessage}")
+            } finally {
+              newCacheLock.unlock()
+            }
+          }
+        }
+        case _ => {
+          // do nothing
+        }
+      }
+
+      // old cache data
+      val oldCacheCleanTime = if (updatable) readCleanTime else None
+      oldCacheCleanTime match {
+        case Some(oct) => {
+          val oldCacheIndexOpt = readOldCacheIndex
+          oldCacheIndexOpt.foreach { idx =>
+            val oldDfPath = s"${oldFilePath}/${idx}"
+            val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+            if (oldCacheLocked) {
+              try {
+                // clean calculated old cache data
+                cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: 
Long) => (a < b))
+                // clean out time old cache data not calculated
+//                cleanOutTimePartitions(oldDfPath, oct, 
Some(InternalColumns.tmst))
+              } catch {
+                case e: Throwable => error(s"clean old cache data error: 
${e.getMessage}")
+              } finally {
+                oldCacheLock.unlock()
+              }
+            }
+          }
+        }
+        case _ => {
+          // do nothing
+        }
+      }
+    }
+  }
+
+  /**
+    * update old cached data by new data frame
+    * @param dfOpt    data frame to update old cached data
+    */
+  def updateData(dfOpt: Option[DataFrame]): Unit = {
+    if (!readOnly && updatable) {
+      dfOpt match {
+        case Some(df) => {
+          // old cache lock
+          val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+          if (oldCacheLocked) {
+            try {
+              val oldCacheIndexOpt = readOldCacheIndex
+              val nextOldCacheIndex = 
oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1
+
+              val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}"
+              val cleanTime = getNextCleanTime
+              val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}"
+              val updateDf = df.filter(filterStr)
+
+              val prlCount = sqlContext.sparkContext.defaultParallelism
+              // repartition
+              val repartitionedDf = updateDf.repartition(prlCount)
+              val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
+              writeDataFrame(dfw, oldDfPath)
+
+              submitOldCacheIndex(nextOldCacheIndex)
+            } catch {
+              case e: Throwable => error(s"update data error: ${e.getMessage}")
+            } finally {
+              oldCacheLock.unlock()
+            }
+          }
+        }
+        case _ => {
+          info(s"no data frame to update")
+        }
+      }
+    }
+  }
+
+  /**
+    * each time calculation phase finishes,
+    * data source cache needs to submit some cache information
+    */
+  def processFinish(): Unit = {
+    // next last proc time
+    val timeRange = TimeInfoCache.getTimeRange
+    submitLastProcTime(timeRange._2)
+
+    // next clean time
+    val nextCleanTime = timeRange._2 + deltaTimeRange._1
+    submitCleanTime(nextCleanTime)
+  }
+
+  // read next clean time
+  private def getNextCleanTime(): Long = {
+    val timeRange = TimeInfoCache.getTimeRange
+    val nextCleanTime = timeRange._2 + deltaTimeRange._1
+    nextCleanTime
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala
new file mode 100644
index 0000000..ca882e0
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala
@@ -0,0 +1,68 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.spark.sql.SQLContext
+
+object DataSourceCacheFactory extends Loggable {
+
+  private object DataSourceCacheType {
+    val ParquetRegex = "^(?i)parq(uet)?$".r
+    val JsonRegex = "^(?i)json$".r
+    val OrcRegex = "^(?i)orc$".r
+  }
+  import DataSourceCacheType._
+
+  val _type = "type"
+
+  /**
+    * create data source cache
+    * @param sqlContext   sqlContext in spark environment
+    * @param param        data source cache config
+    * @param name         data source name
+    * @param index        data source index
+    * @param tmstCache    the same tmstCache instance inside a data source
+    * @return             data source option
+    */
+  def getDataSourceCacheOpt(sqlContext: SQLContext, param: Map[String, Any],
+                            name: String, index: Int, tmstCache: TmstCache
+                           ): Option[DataSourceCache] = {
+    if (param != null) {
+      try {
+        val tp = param.getString(_type, "")
+        val dsCache = tp match {
+          case ParquetRegex() => ParquetDataSourceCache(sqlContext, param, 
name, index, tmstCache)
+          case JsonRegex() => JsonDataSourceCache(sqlContext, param, name, 
index, tmstCache)
+          case OrcRegex() => OrcDataSourceCache(sqlContext, param, name, 
index, tmstCache)
+          case _ => ParquetDataSourceCache(sqlContext, param, name, index, 
tmstCache)
+        }
+        Some(dsCache)
+      } catch {
+        case e: Throwable => {
+          error(s"generate data source cache fails")
+          None
+        }
+      }
+    } else None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala
new file mode 100644
index 0000000..cb01274
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.spark.sql._
+
+/**
+  * data source cache in json format
+  */
+case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
+                               dsName: String, index: Int, tmstCache: TmstCache
+                              ) extends DataSourceCache {
+
+  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit 
= {
+    info(s"write path: ${path}")
+    dfw.json(path)
+  }
+
+  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = 
{
+    dfr.json(path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala
new file mode 100644
index 0000000..daba15f
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.spark.sql._
+
+/**
+  * data source cache in orc format
+  */
+case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
+                              dsName: String, index: Int, tmstCache: TmstCache
+                             ) extends DataSourceCache {
+
+  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit 
= {
+    info(s"write path: ${path}")
+    dfw.orc(path)
+  }
+
+  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = 
{
+    dfr.orc(path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala
new file mode 100644
index 0000000..f00c6a3
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.spark.sql._
+
+/**
+  * data source cache in parquet format
+  */
+case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, 
Any],
+                                  dsName: String, index: Int, tmstCache: 
TmstCache
+                                 ) extends DataSourceCache {
+
+  
sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
 "false")
+
+  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit 
= {
+    info(s"write path: ${path}")
+    dfw.parquet(path)
+  }
+
+  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = 
{
+    dfr.parquet(path)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala
new file mode 100644
index 0000000..ebd2e55
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala
@@ -0,0 +1,69 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.concurrent.{TrieMap, Map => ConcMap}
+
+/**
+  * fan in trait, for multiple input and one output
+  * to support multiple parallel data connectors in one data source
+  */
+trait WithFanIn[T] {
+
+  // total input number
+  val totalNum: AtomicInteger = new AtomicInteger(0)
+  // concurrent map of fan in count for each key
+  val fanInCountMap: ConcMap[T, Int] = TrieMap[T, Int]()
+
+  def registerFanIn(): Int = {
+    totalNum.incrementAndGet()
+  }
+
+  /**
+    * increment for a key, to test if all parallel inputs finished
+    * @param key
+    * @return
+    */
+  def fanIncrement(key: T): Boolean = {
+    fanInc(key)
+    fanInCountMap.get(key) match {
+      case Some(n) if (n >= totalNum.get) => {
+        fanInCountMap.remove(key)
+        true
+      }
+      case _ => false
+    }
+  }
+
+  private def fanInc(key: T): Unit = {
+    fanInCountMap.get(key) match {
+      case Some(n) => {
+        val suc = fanInCountMap.replace(key, n, n + 1)
+        if (!suc) fanInc(key)
+      }
+      case _ => {
+        val oldOpt = fanInCountMap.putIfAbsent(key, 1)
+        if (oldOpt.nonEmpty) fanInc(key)
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
new file mode 100644
index 0000000..6dc17d1
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
@@ -0,0 +1,112 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.connector
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.enums.{BatchProcessType, 
DslType, SparkSqlType}
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
+import org.apache.griffin.measure.job.builder.DQJobBuilder
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import 
org.apache.griffin.measure.step.builder.preproc.PreProcRuleParamGenerator
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.functions._
+
+trait DataConnector extends Loggable with Serializable {
+
+  @transient val sparkSession: SparkSession
+
+  val dcParam: DataConnectorParam
+
+  val id: String = DataConnectorIdGenerator.genId
+  protected def thisName(suffix: String): String = s"this_${suffix}"
+
+  val tmstCache: TmstCache
+  protected def saveTmst(t: Long) = tmstCache.insert(t)
+  protected def readTmst(t: Long) = tmstCache.fromUntil(t, t + 1)
+
+  def init(): Unit
+
+  // get data frame in batch mode
+  def data(ms: Long): (Option[DataFrame], TimeRange)
+
+  private def createContext(t: Long): DQContext = {
+    DQContext(ContextId(t, id), id, Nil, Nil, BatchProcessType)(sparkSession)
+  }
+
+  def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = {
+    // new context
+    val context = createContext(ms)
+
+    val timestamp = context.contextId.timestamp
+    val suffix = context.contextId.id
+    val thisTable = thisName(suffix)
+
+    try {
+      saveTmst(timestamp)    // save timestamp
+
+      dfOpt.flatMap { df =>
+        val preProcRules = 
PreProcRuleParamGenerator.getNewPreProcRules(dcParam.preProc, suffix)
+
+        // init data
+        context.compileTableRegister.registerTable(thisTable)
+        context.runTimeTableRegister.registerTable(thisTable, df)
+
+        // build job
+        val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules, 
SparkSqlType)
+
+        // job execute
+        preprocJob.execute(context)
+
+        // out data
+        val outDf = context.sparkSession.table(s"`${thisTable}`")
+
+        // add tmst column
+        val withTmstDf = outDf.withColumn(ConstantColumns.tmst, lit(timestamp))
+
+        // clean context
+        context.clean()
+
+        Some(withTmstDf)
+      }
+
+    } catch {
+      case e: Throwable => {
+        error(s"pre-process of data connector [${id}] error: ${e.getMessage}")
+        None
+      }
+    }
+  }
+}
+
+object DataConnectorIdGenerator {
+  private val counter: AtomicLong = new AtomicLong(0L)
+  private val head: String = "dc"
+
+  def genId: String = {
+    s"${head}${increment}"
+  }
+
+  private def increment: Long = {
+    counter.incrementAndGet()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
new file mode 100644
index 0000000..ea22309
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
@@ -0,0 +1,125 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.connector
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.datasource.cache.DataSourceCache
+import org.apache.griffin.measure.context.datasource.connector.batch._
+import org.apache.griffin.measure.context.datasource.connector.streaming._
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
+import scala.reflect.ClassTag
+import scala.util.Try
+
+object DataConnectorFactory extends Loggable {
+
+  val HiveRegex = """^(?i)hive$""".r
+  val AvroRegex = """^(?i)avro$""".r
+  val TextDirRegex = """^(?i)text-dir$""".r
+
+  val KafkaRegex = """^(?i)kafka$""".r
+
+  /**
+    * create data connector
+    * @param sparkSession     spark env
+    * @param ssc              spark streaming env
+    * @param dcParam          data connector param
+    * @param tmstCache        same tmst cache in one data source
+    * @param dataSourceCacheOpt   for streaming data connector
+    * @return   data connector
+    */
+  def getDataConnector(sparkSession: SparkSession,
+                       ssc: StreamingContext,
+                       dcParam: DataConnectorParam,
+                       tmstCache: TmstCache,
+                       dataSourceCacheOpt: Option[DataSourceCache]
+                      ): Try[DataConnector] = {
+    val conType = dcParam.conType
+    val version = dcParam.version
+    Try {
+      conType match {
+        case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, 
tmstCache)
+        case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, 
tmstCache)
+        case TextDirRegex() => TextDirBatchDataConnector(sparkSession, 
dcParam, tmstCache)
+        case KafkaRegex() => {
+          getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, 
dataSourceCacheOpt)
+        }
+        case _ => throw new Exception("connector creation error!")
+      }
+    }
+  }
+
+  private def getStreamingDataConnector(sparkSession: SparkSession,
+                                        ssc: StreamingContext,
+                                        dcParam: DataConnectorParam,
+                                        tmstCache: TmstCache,
+                                        dataSourceCacheOpt: 
Option[DataSourceCache]
+                                       ): StreamingDataConnector = {
+    if (ssc == null) throw new Exception("streaming context is null!")
+    val conType = dcParam.conType
+    val version = dcParam.version
+    conType match {
+      case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, 
tmstCache, dataSourceCacheOpt)
+      case _ => throw new Exception("streaming connector creation error!")
+    }
+  }
+
+  private def getKafkaDataConnector(sparkSession: SparkSession,
+                                    ssc: StreamingContext,
+                                    dcParam: DataConnectorParam,
+                                    tmstCache: TmstCache,
+                                    dataSourceCacheOpt: Option[DataSourceCache]
+                                   ): KafkaStreamingDataConnector  = {
+    val KeyType = "key.type"
+    val ValueType = "value.type"
+    val config = dcParam.config
+    val keyType = config.getOrElse(KeyType, "java.lang.String").toString
+    val valueType = config.getOrElse(ValueType, "java.lang.String").toString
+    (getClassTag(keyType), getClassTag(valueType)) match {
+      case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
+        KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, 
tmstCache, dataSourceCacheOpt)
+      }
+      case _ => {
+        throw new Exception("not supported type kafka data connector")
+      }
+    }
+  }
+
+  private def getClassTag(tp: String): ClassTag[_] = {
+    try {
+      val clazz = Class.forName(tp)
+      ClassTag(clazz)
+    } catch {
+      case e: Throwable => throw e
+    }
+  }
+
+//  def filterDataConnectors[T <: DataConnector : ClassTag](connectors: 
Seq[DataConnector]): Seq[T] = {
+//    connectors.flatMap { dc =>
+//      dc match {
+//        case mdc: T => Some(mdc)
+//        case _ => None
+//      }
+//    }
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
new file mode 100644
index 0000000..1ee6e78
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.connector.batch
+
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * batch data connector for avro file
+  */
+case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
+                                  dcParam: DataConnectorParam,
+                                  tmstCache: TmstCache
+                                 ) extends BatchDataConnector {
+
+  val config = dcParam.config
+
+  val FilePath = "file.path"
+  val FileName = "file.name"
+
+  val filePath = config.getString(FilePath, "")
+  val fileName = config.getString(FileName, "")
+
+  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else 
fileName
+
+  private def pathPrefix(): Boolean = {
+    filePath.nonEmpty
+  }
+
+  private def fileExist(): Boolean = {
+    HdfsUtil.existPath(concreteFileFullPath)
+  }
+
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
+    val dfOpt = try {
+      val df = 
sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
+      val dfOpt = Some(df)
+      val preDfOpt = preProcess(dfOpt, ms)
+      preDfOpt
+    } catch {
+      case e: Throwable => {
+        error(s"load avro file ${concreteFileFullPath} fails")
+        None
+      }
+    }
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala
new file mode 100644
index 0000000..8f32687
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.connector.batch
+
+import org.apache.griffin.measure.context.datasource.connector.DataConnector
+
+trait BatchDataConnector extends DataConnector {
+
+  def init(): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
new file mode 100644
index 0000000..85cd120
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
@@ -0,0 +1,86 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.connector.batch
+
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * batch data connector for hive table
+  */
+case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
+                                  dcParam: DataConnectorParam,
+                                  tmstCache: TmstCache
+                                 ) extends BatchDataConnector {
+
+  val config = dcParam.config
+
+  val Database = "database"
+  val TableName = "table.name"
+  val Where = "where"
+
+  val database = config.getString(Database, "default")
+  val tableName = config.getString(TableName, "")
+  val whereString = config.getString(Where, "")
+
+  val concreteTableName = s"${database}.${tableName}"
+  val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
+
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
+    val dfOpt = try {
+      val dtSql = dataSql
+      info(dtSql)
+      val df = sparkSession.sql(dtSql)
+      val dfOpt = Some(df)
+      val preDfOpt = preProcess(dfOpt, ms)
+      preDfOpt
+    } catch {
+      case e: Throwable => {
+        error(s"load hive table ${concreteTableName} fails: ${e.getMessage}")
+        None
+      }
+    }
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
+  }
+
+
+  private def tableExistsSql(): String = {
+//    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but 
not work for spark sql
+    s"tableName LIKE '${tableName}'"
+  }
+
+  private def metaDataSql(): String = {
+    s"DESCRIBE ${concreteTableName}"
+  }
+
+  private def dataSql(): String = {
+    val tableClause = s"SELECT * FROM ${concreteTableName}"
+    if (wheres.length > 0) {
+      val clauses = wheres.map { w =>
+        s"${tableClause} WHERE ${w}"
+      }
+      clauses.mkString(" UNION ALL ")
+    } else tableClause
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
new file mode 100644
index 0000000..ca5b7b5
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -0,0 +1,106 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.connector.batch
+
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * batch data connector for directory with text format data in the nth depth 
sub-directories
+  */
+case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
+                                     dcParam: DataConnectorParam,
+                                     tmstCache: TmstCache
+                                    ) extends BatchDataConnector {
+
+  val config = dcParam.config
+
+  val DirPath = "dir.path"
+  val DataDirDepth = "data.dir.depth"
+  val SuccessFile = "success.file"
+  val DoneFile = "done.file"
+
+  val dirPath = config.getString(DirPath, "")
+  val dataDirDepth = config.getInt(DataDirDepth, 0)
+  val successFile = config.getString(SuccessFile, "_SUCCESS")
+  val doneFile = config.getString(DoneFile, "_DONE")
+
+  val ignoreFilePrefix = "_"
+
+  private def dirExist(): Boolean = {
+    HdfsUtil.existPath(dirPath)
+  }
+
+  def data(ms: Long): (Option[DataFrame], TimeRange) = {
+    val dfOpt = try {
+      val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable)
+      // touch done file for read dirs
+      dataDirs.foreach(dir => touchDone(dir))
+
+      val validDataDirs = dataDirs.filter(dir => !emptyDir(dir))
+
+      if (validDataDirs.nonEmpty) {
+        val df = sparkSession.read.text(validDataDirs:  _*)
+        val dfOpt = Some(df)
+        val preDfOpt = preProcess(dfOpt, ms)
+        preDfOpt
+      } else {
+        None
+      }
+    } catch {
+      case e: Throwable => {
+        error(s"load text dir ${dirPath} fails: ${e.getMessage}")
+        None
+      }
+    }
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
+  }
+
+  private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) 
=> Boolean): Seq[String] = {
+    val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, 
"dir", true) }
+    if (depth <= 0) {
+      subDirs.filter(filteFunc)
+    } else {
+      listSubDirs(subDirs, depth - 1, filteFunc)
+    }
+  }
+
+  private def readable(dir: String): Boolean = isSuccess(dir) && !isDone(dir)
+  private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, 
doneFile)
+  private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, 
successFile)
+
+  private def touchDone(dir: String): Unit = 
HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
+
+  private def emptyDir(dir: String): Boolean = {
+    HdfsUtil.listSubPathsByType(dir, 
"file").filter(!_.startsWith(ignoreFilePrefix)).size == 0
+  }
+
+//  def metaData(): Try[Iterable[(String, String)]] = {
+//    Try {
+//      val st = 
sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+//      st.fields.map(f => (f.name, f.dataType.typeName))
+//    }
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
new file mode 100644
index 0000000..9fe4876
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -0,0 +1,85 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.connector.streaming
+
+import kafka.serializer.Decoder
+import org.apache.spark.streaming.dstream.InputDStream
+
+import scala.util.{Failure, Success, Try}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * streaming data connector for kafka
+  */
+trait KafkaStreamingDataConnector extends StreamingDataConnector {
+
+  type KD <: Decoder[K]
+  type VD <: Decoder[V]
+  type OUT = (K, V)
+
+  val config = dcParam.config
+
+  val KafkaConfig = "kafka.config"
+  val Topics = "topics"
+
+  val kafkaConfig = config.getAnyRef(KafkaConfig, Map[String, String]())
+  val topics = config.getString(Topics, "")
+
+  def init(): Unit = {
+    // register fan in
+    dataSourceCacheOpt.foreach(_.registerFanIn)
+
+    val ds = stream match {
+      case Success(dstream) => dstream
+      case Failure(ex) => throw ex
+    }
+    ds.foreachRDD((rdd, time) => {
+      val ms = time.milliseconds
+      val saveDfOpt = try {
+        // coalesce partition number
+        val prlCount = rdd.sparkContext.defaultParallelism
+        val ptnCount = rdd.getNumPartitions
+        val repartitionedRdd = if (prlCount < ptnCount) {
+          rdd.coalesce(prlCount)
+        } else rdd
+
+        val dfOpt = transform(repartitionedRdd)
+
+        // pre-process
+        preProcess(dfOpt, ms)
+      } catch {
+        case e: Throwable => {
+          error(s"streaming data connector error: ${e.getMessage}")
+          None
+        }
+      }
+
+      // save data frame
+      dataSourceCacheOpt.foreach(_.saveData(saveDfOpt, ms))
+    })
+  }
+
+  def stream(): Try[InputDStream[OUT]] = Try {
+    val topicSet = topics.split(",").toSet
+    createDStream(topicSet)
+  }
+
+  protected def createDStream(topicSet: Set[String]): InputDStream[OUT]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
new file mode 100644
index 0000000..b483933
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.connector.streaming
+
+import kafka.serializer.StringDecoder
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.datasource.cache.DataSourceCache
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{Row, _}
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.kafka.KafkaUtils
+
+/**
+  * streaming data connector for kafka with string format key and value
+  */
+case class KafkaStreamingStringDataConnector(@transient sparkSession: 
SparkSession,
+                                             @transient ssc: StreamingContext,
+                                             dcParam: DataConnectorParam,
+                                             tmstCache: TmstCache,
+                                             dataSourceCacheOpt: 
Option[DataSourceCache]
+                                            ) extends 
KafkaStreamingDataConnector {
+
+  type K = String
+  type KD = StringDecoder
+  type V = String
+  type VD = StringDecoder
+
+  val valueColName = "value"
+  val schema = StructType(Array(
+    StructField(valueColName, StringType)
+  ))
+
+  def createDStream(topicSet: Set[String]): InputDStream[OUT] = {
+    KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet)
+  }
+
+  def transform(rdd: RDD[OUT]): Option[DataFrame] = {
+    if (rdd.isEmpty) None else {
+      try {
+        val rowRdd = rdd.map(d => Row(d._2))
+        val df = sparkSession.createDataFrame(rowRdd, schema)
+        Some(df)
+      } catch {
+        case e: Throwable => {
+          error(s"streaming data transform fails")
+          None
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
new file mode 100644
index 0000000..3b2c355
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.connector.streaming
+
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.context.datasource.cache.DataSourceCache
+import org.apache.griffin.measure.context.datasource.connector.DataConnector
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.streaming.dstream.InputDStream
+
+import scala.util.Try
+
+trait StreamingDataConnector extends DataConnector {
+
+  type K
+  type V
+  type OUT
+
+  protected def stream(): Try[InputDStream[OUT]]
+
+  // transform rdd to dataframe
+  def transform(rdd: RDD[OUT]): Option[DataFrame]
+
+  // streaming data connector cannot directly read data frame
+  def data(ms: Long): (Option[DataFrame], TimeRange) = (None, 
TimeRange.emptyTimeRange)
+
+  val dataSourceCacheOpt: Option[DataSourceCache]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala
new file mode 100644
index 0000000..f721a1e
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.info
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, 
TimeInfoCache}
+
+/**
+  * timestamp info of data source cache
+  */
+trait DataSourceCacheable extends Loggable with Serializable {
+
+  val cacheInfoPath: String
+  val readyTimeInterval: Long
+  val readyTimeDelay: Long
+
+  def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}"
+
+  def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath)
+  def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath)
+  def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath)
+  def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath)
+  def selfOldCacheIndex = TimeInfoCache.oldCacheIndex(selfCacheInfoPath)
+
+  protected def submitCacheTime(ms: Long): Unit = {
+    val map = Map[String, String]((selfCacheTime -> ms.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  protected def submitReadyTime(ms: Long): Unit = {
+    val curReadyTime = ms - readyTimeDelay
+    if (curReadyTime % readyTimeInterval == 0) {
+      val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
+      InfoCacheInstance.cacheInfo(map)
+    }
+  }
+
+  protected def submitLastProcTime(ms: Long): Unit = {
+    val map = Map[String, String]((selfLastProcTime -> ms.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  protected def readLastProcTime(): Option[Long] = 
readSelfInfo(selfLastProcTime)
+
+  protected def submitCleanTime(ms: Long): Unit = {
+    val cleanTime = genCleanTime(ms)
+    val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  protected def genCleanTime(ms: Long): Long = ms
+
+  protected def readCleanTime(): Option[Long] = readSelfInfo(selfCleanTime)
+
+  protected def submitOldCacheIndex(index: Long): Unit = {
+    val map = Map[String, String]((selfOldCacheIndex -> index.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  def readOldCacheIndex(): Option[Long] = readSelfInfo(selfOldCacheIndex)
+
+  private def readSelfInfo(key: String): Option[Long] = {
+    InfoCacheInstance.readInfo(key :: Nil).get(key).flatMap { v =>
+      try {
+        Some(v.toLong)
+      } catch {
+        case _ => None
+      }
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala
new file mode 100644
index 0000000..4b1c410
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.info
+
+import org.apache.griffin.measure.Loggable
+
+import scala.collection.mutable.{SortedSet => MutableSortedSet}
+
+/**
+  * tmst cache, CRUD of timestamps
+  */
+case class TmstCache() extends Loggable {
+
+  private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long]
+
+  //-- insert tmst into tmst group --
+  def insert(tmst: Long) = tmstGroup += tmst
+  def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts
+
+  //-- remove tmst from tmst group --
+  def remove(tmst: Long) = tmstGroup -= tmst
+  def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts
+
+  //-- get subset of tmst group --
+  def fromUntil(from: Long, until: Long) = tmstGroup.range(from, until).toSet
+  def afterTil(after: Long, til: Long) = tmstGroup.range(after + 1, til + 
1).toSet
+  def until(until: Long) = tmstGroup.until(until).toSet
+  def from(from: Long) = tmstGroup.from(from).toSet
+  def all = tmstGroup.toSet
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala
new file mode 100644
index 0000000..e1d498b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.info
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.streaming.lock.CacheLock
+
+trait InfoCache extends Loggable with Serializable {
+
+  def init(): Unit
+  def available(): Boolean
+  def close(): Unit
+
+  def cacheInfo(info: Map[String, String]): Boolean
+  def readInfo(keys: Iterable[String]): Map[String, String]
+  def deleteInfo(keys: Iterable[String]): Unit
+  def clearInfo(): Unit
+
+  def listKeys(path: String): List[String]
+
+  def genLock(s: String): CacheLock
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
new file mode 100644
index 0000000..85106b4
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
@@ -0,0 +1,41 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.info
+
+import org.apache.griffin.measure.configuration.params.InfoCacheParam
+
+import scala.util.{Success, Try}
+
+case class InfoCacheFactory(infoCacheParams: Iterable[InfoCacheParam], 
metricName: String) extends Serializable {
+
+  val ZK_REGEX = """^(?i)zk|zookeeper$""".r
+
+  def getInfoCache(infoCacheParam: InfoCacheParam): Option[InfoCache] = {
+    val config = infoCacheParam.config
+    val infoCacheTry = infoCacheParam.cacheType match {
+      case ZK_REGEX() => Try(ZKInfoCache(config, metricName))
+      case _ => throw new Exception("not supported info cache type")
+    }
+    infoCacheTry match {
+      case Success(infoCache) => Some(infoCache)
+      case _ => None
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala
new file mode 100644
index 0000000..9e8a9f6
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala
@@ -0,0 +1,53 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.info
+
+import org.apache.griffin.measure.configuration.params.InfoCacheParam
+import org.apache.griffin.measure.context.streaming.lock.{CacheLock, 
MultiCacheLock}
+
+object InfoCacheInstance extends InfoCache {
+  var infoCaches: List[InfoCache] = Nil
+
+  def initInstance(infoCacheParams: Iterable[InfoCacheParam], metricName: 
String) = {
+    val fac = InfoCacheFactory(infoCacheParams, metricName)
+    infoCaches = infoCacheParams.flatMap(param => 
fac.getInfoCache(param)).toList
+  }
+
+  def init(): Unit = infoCaches.foreach(_.init)
+  def available(): Boolean = infoCaches.foldLeft(false)(_ || _.available)
+  def close(): Unit = infoCaches.foreach(_.close)
+
+  def cacheInfo(info: Map[String, String]): Boolean = {
+    infoCaches.foldLeft(false) { (res, infoCache) => res || 
infoCache.cacheInfo(info) }
+  }
+  def readInfo(keys: Iterable[String]): Map[String, String] = {
+    val maps = infoCaches.map(_.readInfo(keys)).reverse
+    maps.fold(Map[String, String]())(_ ++ _)
+  }
+  def deleteInfo(keys: Iterable[String]): Unit = 
infoCaches.foreach(_.deleteInfo(keys))
+  def clearInfo(): Unit = infoCaches.foreach(_.clearInfo)
+
+  def listKeys(path: String): List[String] = {
+    infoCaches.foldLeft(Nil: List[String]) { (res, infoCache) =>
+      if (res.size > 0) res else infoCache.listKeys(path)
+    }
+  }
+
+  def genLock(s: String): CacheLock = 
MultiCacheLock(infoCaches.map(_.genLock(s)))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala
new file mode 100644
index 0000000..b1e6764
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala
@@ -0,0 +1,127 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.info
+
+import org.apache.griffin.measure.Loggable
+
+object TimeInfoCache extends Loggable with Serializable {
+
+  private val CacheTime = "cache.time"
+  private val LastProcTime = "last.proc.time"
+  private val ReadyTime = "ready.time"
+  private val CleanTime = "clean.time"
+  private val OldCacheIndex = "old.cache.index"
+
+  def cacheTime(path: String): String = s"${path}/${CacheTime}"
+  def lastProcTime(path: String): String = s"${path}/${LastProcTime}"
+  def readyTime(path: String): String = s"${path}/${ReadyTime}"
+  def cleanTime(path: String): String = s"${path}/${CleanTime}"
+  def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}"
+
+  val infoPath = "info"
+
+  val finalCacheInfoPath = "info.final"
+  val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}"
+  val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}"
+  val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}"
+
+  def startTimeInfoCache(): Unit = {
+    genFinalReadyTime
+  }
+
+  def getTimeRange(): (Long, Long) = {
+    readTimeRange
+  }
+
+  def getCleanTime(): Long = {
+    readCleanTime
+  }
+
+  def endTimeInfoCache: Unit = {
+    genFinalLastProcTime
+    genFinalCleanTime
+  }
+
+  private def genFinalReadyTime(): Unit = {
+    val subPath = InfoCacheInstance.listKeys(infoPath)
+    val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
+    val result = InfoCacheInstance.readInfo(keys)
+    val times = keys.flatMap { k =>
+      getLongOpt(result, k)
+    }
+    if (times.nonEmpty) {
+      val time = times.min
+      val map = Map[String, String]((finalReadyTime -> time.toString))
+      InfoCacheInstance.cacheInfo(map)
+    }
+  }
+
+  private def genFinalLastProcTime(): Unit = {
+    val subPath = InfoCacheInstance.listKeys(infoPath)
+    val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
+    val result = InfoCacheInstance.readInfo(keys)
+    val times = keys.flatMap { k =>
+      getLongOpt(result, k)
+    }
+    if (times.nonEmpty) {
+      val time = times.min
+      val map = Map[String, String]((finalLastProcTime -> time.toString))
+      InfoCacheInstance.cacheInfo(map)
+    }
+  }
+
+  private def genFinalCleanTime(): Unit = {
+    val subPath = InfoCacheInstance.listKeys(infoPath)
+    val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
+    val result = InfoCacheInstance.readInfo(keys)
+    val times = keys.flatMap { k =>
+      getLongOpt(result, k)
+    }
+    if (times.nonEmpty) {
+      val time = times.min
+      val map = Map[String, String]((finalCleanTime -> time.toString))
+      InfoCacheInstance.cacheInfo(map)
+    }
+  }
+
+  private def readTimeRange(): (Long, Long) = {
+    val map = InfoCacheInstance.readInfo(List(finalLastProcTime, 
finalReadyTime))
+    val lastProcTime = getLong(map, finalLastProcTime)
+    val curReadyTime = getLong(map, finalReadyTime)
+    (lastProcTime, curReadyTime)
+  }
+
+  private def readCleanTime(): Long = {
+    val map = InfoCacheInstance.readInfo(List(finalCleanTime))
+    val cleanTime = getLong(map, finalCleanTime)
+    cleanTime
+  }
+
+  private def getLongOpt(map: Map[String, String], key: String): Option[Long] 
= {
+    try {
+      map.get(key).map(_.toLong)
+    } catch {
+      case e: Throwable => None
+    }
+  }
+  private def getLong(map: Map[String, String], key: String) = {
+    getLongOpt(map, key).getOrElse(-1L)
+  }
+
+}

Reply via email to