http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala
new file mode 100644
index 0000000..07c8187
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala
@@ -0,0 +1,30 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector.cache
+
+import org.apache.spark.rdd.RDD
+
+trait DataUpdatable {
+
+  def cleanOldData(): Unit = {}
+
+  def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {}
+  def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala
new file mode 100644
index 0000000..e241188
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala
@@ -0,0 +1,351 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector.cache
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
+import org.apache.griffin.measure.config.params.user.DataCacheParam
+import org.apache.griffin.measure.result.TimeStampInfo
+import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, 
TimeUtil}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.hive.HiveContext
+
+import scala.util.{Success, Try}
+
+case class HiveCacheDataConnector(sqlContext: SQLContext, dataCacheParam: 
DataCacheParam
+                                 ) extends CacheDataConnector {
+
+  if (!sqlContext.isInstanceOf[HiveContext]) {
+    throw new Exception("hive context not prepared!")
+  }
+
+  val config = dataCacheParam.config
+  val InfoPath = "info.path"
+  val cacheInfoPath: String = config.getOrElse(InfoPath, 
defCacheInfoPath).toString
+
+  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
+  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
+
+  val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) 
dataCacheParam.timeRange else Nil
+  val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match 
{
+    case s :: e :: _ => {
+      val ns = TimeUtil.milliseconds(s) match {
+        case Some(n) if (n < 0) => n
+        case _ => 0
+      }
+      val ne = TimeUtil.milliseconds(e) match {
+        case Some(n) if (n < 0) => n
+        case _ => 0
+      }
+      (ns, ne)
+    }
+    case _ => (0, 0)
+  }
+
+  val Database = "database"
+  val database: String = config.getOrElse(Database, "").toString
+  val TableName = "table.name"
+  val tableName: String = config.get(TableName) match {
+    case Some(s: String) if (s.nonEmpty) => s
+    case _ => throw new Exception("invalid table.name!")
+  }
+  val ParentPath = "parent.path"
+  val parentPath: String = config.get(ParentPath) match {
+    case Some(s: String) => s
+    case _ => throw new Exception("invalid parent.path!")
+  }
+  val tablePath = HdfsUtil.getHdfsFilePath(parentPath, tableName)
+
+  val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else 
tableName
+
+  val ReadyTimeInterval = "ready.time.interval"
+  val ReadyTimeDelay = "ready.time.delay"
+  val readyTimeInterval: Long = 
TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, 
"1m").toString).getOrElse(60000L)
+  val readyTimeDelay: Long = 
TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, 
"1m").toString).getOrElse(60000L)
+
+  val TimeStampColumn: String = TimeStampInfo.key
+  val PayloadColumn: String = "payload"
+
+//  type Schema = (Long, String)
+  val schema: List[(String, String)] = List(
+    (TimeStampColumn, "bigint"),
+    (PayloadColumn, "string")
+  )
+  val schemaName = schema.map(_._1)
+
+//  type Partition = (Long, Long)
+  val partition: List[(String, String, String)] = List(
+    ("hr", "bigint", "hour"),
+    ("min", "bigint", "min")
+  )
+  val partitionName = partition.map(_._1)
+
+  private val fieldSep = """|"""
+  private val rowSep = """\n"""
+  private val rowSepLiteral = "\n"
+
+  private def dbPrefix(): Boolean = {
+    database.nonEmpty && !database.equals("default")
+  }
+
+  private def tableExists(): Boolean = {
+    Try {
+      if (dbPrefix) {
+        sqlContext.tables(database).filter(tableExistsSql).collect.size
+      } else {
+        sqlContext.tables().filter(tableExistsSql).collect.size
+      }
+    } match {
+      case Success(s) => s > 0
+      case _ => false
+    }
+  }
+
+  override def init(): Unit = {
+    try {
+      if (tableExists) {
+        // drop exist table
+        val dropSql = s"""DROP TABLE ${concreteTableName}"""
+        sqlContext.sql(dropSql)
+      }
+
+      val colsSql = schema.map { field =>
+        s"`${field._1}` ${field._2}"
+      }.mkString(", ")
+      val partitionsSql = partition.map { partition =>
+        s"`${partition._1}` ${partition._2}"
+      }.mkString(", ")
+      val sql = s"""CREATE EXTERNAL TABLE IF NOT EXISTS ${concreteTableName}
+                    |(${colsSql}) PARTITIONED BY (${partitionsSql})
+                    |ROW FORMAT DELIMITED
+                    |FIELDS TERMINATED BY '${fieldSep}'
+                    |LINES TERMINATED BY '${rowSep}'
+                    |STORED AS TEXTFILE
+                    |LOCATION '${tablePath}'""".stripMargin
+      sqlContext.sql(sql)
+    } catch {
+      case e: Throwable => throw e
+    }
+  }
+
+  def available(): Boolean = {
+    true
+  }
+
+  private def encode(data: Map[String, Any], ms: Long): Option[List[Any]] = {
+    try {
+      Some(schema.map { field =>
+        val (name, _) = field
+        name match {
+          case TimeStampColumn => ms
+          case PayloadColumn => JsonUtil.toJson(data)
+          case _ => null
+        }
+      })
+    } catch {
+      case _ => None
+    }
+  }
+
+  private def decode(data: List[Any], updateTimeStamp: Boolean): 
Option[Map[String, Any]] = {
+    val dataMap = schemaName.zip(data).toMap
+    dataMap.get(PayloadColumn) match {
+      case Some(v: String) => {
+        try {
+          val map = JsonUtil.toAnyMap(v)
+          val resMap = if (updateTimeStamp) {
+            dataMap.get(TimeStampColumn) match {
+              case Some(t) => map + (TimeStampColumn -> t)
+              case _ => map
+            }
+          } else map
+          Some(resMap)
+        } catch {
+          case _ => None
+        }
+      }
+      case _ => None
+    }
+  }
+
+  def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = {
+    val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+    if (newCacheLocked) {
+      try {
+        val ptns = getPartition(ms)
+        val ptnsPath = genPartitionHdfsPath(ptns)
+        val dirPath = s"${tablePath}/${ptnsPath}"
+        val fileName = s"${ms}"
+        val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName)
+
+        // encode data
+        val dataRdd: RDD[List[Any]] = rdd.flatMap(encode(_, ms))
+
+        // save data
+        val recordRdd: RDD[String] = dataRdd.map { dt =>
+          dt.map(_.toString).mkString(fieldSep)
+        }
+
+        val dumped = if (!recordRdd.isEmpty) {
+          HdfsFileDumpUtil.dump(filePath, recordRdd, rowSepLiteral)
+        } else false
+
+        // add partition
+        if (dumped) {
+          val sql = addPartitionSql(concreteTableName, ptns)
+          sqlContext.sql(sql)
+        }
+
+        // submit ms
+        submitCacheTime(ms)
+        submitReadyTime(ms)
+      } catch {
+        case e: Throwable => error(s"save data error: ${e.getMessage}")
+      } finally {
+        newCacheLock.unlock()
+      }
+    }
+  }
+
+  def readData(): Try[RDD[Map[String, Any]]] = Try {
+    val timeRange = TimeInfoCache.getTimeRange
+    submitLastProcTime(timeRange._2)
+
+    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + 
deltaTimeRange._2)
+    submitCleanTime(reviseTimeRange._1)
+
+    // read directly through partition info
+    val partitionRange = getPartitionRange(reviseTimeRange._1, 
reviseTimeRange._2)
+    val sql = selectSql(concreteTableName, partitionRange)
+    val df = sqlContext.sql(sql)
+
+    // decode data
+    df.flatMap { row =>
+      val dt = schemaName.map { sn =>
+        row.getAs[Any](sn)
+      }
+      decode(dt, true)
+    }
+  }
+
+  override def cleanOldData(): Unit = {
+    val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+    if (oldCacheLocked) {
+      try {
+        val cleanTime = readCleanTime()
+        cleanTime match {
+          case Some(ct) => {
+            // drop partition
+            val bound = getPartition(ct)
+            val sql = dropPartitionSql(concreteTableName, bound)
+            sqlContext.sql(sql)
+          }
+          case _ => {
+            // do nothing
+          }
+        }
+      } catch {
+        case e: Throwable => error(s"clean old data error: ${e.getMessage}")
+      } finally {
+        oldCacheLock.unlock()
+      }
+    }
+  }
+
+  override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): 
Unit = {
+    // parallel process different time groups, lock is unnecessary
+    val ptns = getPartition(t)
+    val ptnsPath = genPartitionHdfsPath(ptns)
+    val dirPath = s"${tablePath}/${ptnsPath}"
+    val fileName = s"${t}"
+    val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName)
+
+    try {
+      // remove out time old data
+      HdfsFileDumpUtil.remove(dirPath, fileName, true)
+
+      // save updated old data
+      if (oldData.size > 0) {
+        val recordDatas = oldData.flatMap { dt =>
+          encode(dt, t)
+        }
+        val records: Iterable[String] = recordDatas.map { dt =>
+          dt.map(_.toString).mkString(fieldSep)
+        }
+        val dumped = HdfsFileDumpUtil.dump(filePath, records, rowSepLiteral)
+      }
+    } catch {
+      case e: Throwable => error(s"update old data error: ${e.getMessage}")
+    }
+  }
+
+  override protected def genCleanTime(ms: Long): Long = {
+    val minPartition = partition.last
+    val t1 = TimeUtil.timeToUnit(ms, minPartition._3)
+    val t2 = TimeUtil.timeFromUnit(t1, minPartition._3)
+    t2
+  }
+
+  private def getPartition(ms: Long): List[(String, Any)] = {
+    partition.map { p =>
+      val (name, _, unit) = p
+      val t = TimeUtil.timeToUnit(ms, unit)
+      (name, t)
+    }
+  }
+  private def getPartitionRange(ms1: Long, ms2: Long): List[(String, (Any, 
Any))] = {
+    partition.map { p =>
+      val (name, _, unit) = p
+      val t1 = TimeUtil.timeToUnit(ms1, unit)
+      val t2 = TimeUtil.timeToUnit(ms2, unit)
+      (name, (t1, t2))
+    }
+  }
+
+  private def genPartitionHdfsPath(partition: List[(String, Any)]): String = {
+    partition.map(prtn => s"${prtn._1}=${prtn._2}").mkString("/")
+  }
+  private def addPartitionSql(tbn: String, partition: List[(String, Any)]): 
String = {
+    val partitionSql = partition.map(ptn => (s"`${ptn._1}` = 
${ptn._2}")).mkString(", ")
+    val sql = s"""ALTER TABLE ${tbn} ADD IF NOT EXISTS PARTITION 
(${partitionSql})"""
+    sql
+  }
+  private def selectSql(tbn: String, partitionRange: List[(String, (Any, 
Any))]): String = {
+    val clause = partitionRange.map { pr =>
+      val (name, (r1, r2)) = pr
+      s"""`${name}` BETWEEN '${r1}' and '${r2}'"""
+    }.mkString(" AND ")
+    val whereClause = if (clause.nonEmpty) s"WHERE ${clause}" else ""
+    val sql = s"""SELECT * FROM ${tbn} ${whereClause}"""
+    sql
+  }
+  private def dropPartitionSql(tbn: String, partition: List[(String, Any)]): 
String = {
+    val partitionSql = partition.map(ptn => (s"PARTITION ( `${ptn._1}` < 
'${ptn._2}' ) ")).mkString(", ")
+    val sql = s"""ALTER TABLE ${tbn} DROP ${partitionSql}"""
+    println(sql)
+    sql
+  }
+
+  private def tableExistsSql(): String = {
+    s"tableName LIKE '${tableName}'"
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala
new file mode 100644
index 0000000..62b6086
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala
@@ -0,0 +1,311 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector.cache
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
+import org.apache.griffin.measure.config.params.user.DataCacheParam
+import org.apache.griffin.measure.result.TimeStampInfo
+import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, 
TimeUtil}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+
+import scala.util.Try
+
+case class TextCacheDataConnector(sqlContext: SQLContext, dataCacheParam: 
DataCacheParam
+                                 ) extends CacheDataConnector {
+
+  val config = dataCacheParam.config
+  val InfoPath = "info.path"
+  val cacheInfoPath: String = config.getOrElse(InfoPath, 
defCacheInfoPath).toString
+
+  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
+  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
+
+  val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) 
dataCacheParam.timeRange else Nil
+  val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match 
{
+    case s :: e :: _ => {
+      val ns = TimeUtil.milliseconds(s) match {
+        case Some(n) if (n < 0) => n
+        case _ => 0
+      }
+      val ne = TimeUtil.milliseconds(e) match {
+        case Some(n) if (n < 0) => n
+        case _ => 0
+      }
+      (ns, ne)
+    }
+    case _ => (0, 0)
+  }
+
+  val FilePath = "file.path"
+  val filePath: String = config.get(FilePath) match {
+    case Some(s: String) => s
+    case _ => throw new Exception("invalid file.path!")
+  }
+
+  val ReadyTimeInterval = "ready.time.interval"
+  val ReadyTimeDelay = "ready.time.delay"
+  val readyTimeInterval: Long = 
TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, 
"1m").toString).getOrElse(60000L)
+  val readyTimeDelay: Long = 
TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, 
"1m").toString).getOrElse(60000L)
+
+//  val TimeStampColumn: String = TimeStampInfo.key
+//  val PayloadColumn: String = "payload"
+
+  // cache schema: Long, String
+//  val fields = List[StructField](
+//    StructField(TimeStampColumn, LongType),
+//    StructField(PayloadColumn, StringType)
+//  )
+//  val schema = StructType(fields)
+
+  //  case class CacheData(time: Long, payload: String) {
+  //    def getTime(): Long = time
+  //    def getPayload(): String = payload
+  //  }
+
+  private val rowSepLiteral = "\n"
+
+  val partitionUnits: List[String] = List("hour", "min")
+
+  override def init(): Unit = {
+    // do nothing
+  }
+
+  def available(): Boolean = {
+    true
+  }
+
+  private def encode(data: Map[String, Any], ms: Long): Option[String] = {
+    try {
+      val map = data + (TimeStampInfo.key -> ms)
+      Some(JsonUtil.toJson(map))
+    } catch {
+      case _: Throwable => None
+    }
+  }
+
+  private def decode(data: String): Option[Map[String, Any]] = {
+    try {
+      Some(JsonUtil.toAnyMap(data))
+    } catch {
+      case _: Throwable => None
+    }
+  }
+
+  def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = {
+    val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+    if (newCacheLocked) {
+      try {
+        val ptns = getPartition(ms)
+        val ptnsPath = genPartitionHdfsPath(ptns)
+        val dirPath = s"${filePath}/${ptnsPath}"
+        val dataFileName = s"${ms}"
+        val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+
+        // encode data
+        val dataRdd: RDD[String] = rdd.flatMap(encode(_, ms))
+
+        // save data
+        val dumped = if (!dataRdd.isEmpty) {
+          HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
+        } else false
+
+        // submit ms
+        submitCacheTime(ms)
+        submitReadyTime(ms)
+      } catch {
+        case e: Throwable => error(s"save data error: ${e.getMessage}")
+      } finally {
+        newCacheLock.unlock()
+      }
+    }
+  }
+
+  def readData(): Try[RDD[Map[String, Any]]] = Try {
+    val timeRange = TimeInfoCache.getTimeRange
+    submitLastProcTime(timeRange._2)
+
+    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + 
deltaTimeRange._2)
+    submitCleanTime(reviseTimeRange._1)
+
+    // read directly through partition info
+    val partitionRanges = getPartitionRange(reviseTimeRange._1, 
reviseTimeRange._2)
+    println(s"read time ranges: ${reviseTimeRange}")
+    println(s"read partition ranges: ${partitionRanges}")
+
+    // list partition paths
+    val partitionPaths = listPathsBetweenRanges(filePath :: Nil, 
partitionRanges)
+
+    if (partitionPaths.isEmpty) {
+      sqlContext.sparkContext.emptyRDD[Map[String, Any]]
+    } else {
+      val filePaths = partitionPaths.mkString(",")
+      val rdd = sqlContext.sparkContext.textFile(filePaths)
+
+      // decode data
+      rdd.flatMap { row =>
+        decode(row)
+      }
+    }
+  }
+
+  override def cleanOldData(): Unit = {
+    val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+    if (oldCacheLocked) {
+      try {
+        val cleanTime = readCleanTime()
+        cleanTime match {
+          case Some(ct) => {
+            // drop partitions
+            val bounds = getPartition(ct)
+
+            // list partition paths
+            val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, 
bounds)
+
+            // delete out time data path
+            earlierPaths.foreach { path =>
+              println(s"delete hdfs path: ${path}")
+              HdfsUtil.deleteHdfsPath(path)
+            }
+          }
+          case _ => {
+            // do nothing
+          }
+        }
+      } catch {
+        case e: Throwable => error(s"clean old data error: ${e.getMessage}")
+      } finally {
+        oldCacheLock.unlock()
+      }
+    }
+  }
+
+  override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): 
Unit = {
+    // parallel process different time groups, lock is unnecessary
+    val ptns = getPartition(t)
+    val ptnsPath = genPartitionHdfsPath(ptns)
+    val dirPath = s"${filePath}/${ptnsPath}"
+    val dataFileName = s"${t}"
+    val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+
+    try {
+      // remove out time old data
+      HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
+
+      // save updated old data
+      if (oldData.size > 0) {
+        val recordDatas = oldData.flatMap { dt =>
+          encode(dt, t)
+        }
+        val dumped = HdfsFileDumpUtil.dump(dataFilePath, recordDatas, 
rowSepLiteral)
+      }
+    } catch {
+      case e: Throwable => error(s"update old data error: ${e.getMessage}")
+    }
+  }
+
+  override protected def genCleanTime(ms: Long): Long = {
+    val minPartitionUnit = partitionUnits.last
+    val t1 = TimeUtil.timeToUnit(ms, minPartitionUnit)
+    val t2 = TimeUtil.timeFromUnit(t1, minPartitionUnit)
+    t2
+  }
+
+  private def getPartition(ms: Long): List[Long] = {
+    partitionUnits.map { unit =>
+      TimeUtil.timeToUnit(ms, unit)
+    }
+  }
+  private def getPartitionRange(ms1: Long, ms2: Long): List[(Long, Long)] = {
+    partitionUnits.map { unit =>
+      val t1 = TimeUtil.timeToUnit(ms1, unit)
+      val t2 = TimeUtil.timeToUnit(ms2, unit)
+      (t1, t2)
+    }
+  }
+
+  private def genPartitionHdfsPath(partition: List[Long]): String = {
+    partition.map(prtn => s"${prtn}").mkString("/")
+  }
+
+  private def str2Long(str: String): Option[Long] = {
+    try {
+      Some(str.toLong)
+    } catch {
+      case e: Throwable => None
+    }
+  }
+
+  // here the range means [min, max], but the best range should be (min, max]
+  private def listPathsBetweenRanges(paths: List[String],
+                                     partitionRanges: List[(Long, Long)]
+                                    ): List[String] = {
+    partitionRanges match {
+      case Nil => paths
+      case head :: tail => {
+        val (lb, ub) = head
+        val curPaths = paths.flatMap { path =>
+          val names = HdfsUtil.listSubPaths(path, "dir").toList
+          names.filter { name =>
+            str2Long(name) match {
+              case Some(t) => (t >= lb) && (t <= ub)
+              case _ => false
+            }
+          }.map(HdfsUtil.getHdfsFilePath(path, _))
+        }
+        listPathsBetweenRanges(curPaths, tail)
+      }
+    }
+  }
+
+  private def listPathsEarlierThanBounds(paths: List[String], bounds: 
List[Long]
+                                        ): List[String] = {
+    bounds match {
+      case Nil => paths
+      case head :: tail => {
+        val earlierPaths = paths.flatMap { path =>
+          val names = HdfsUtil.listSubPaths(path, "dir").toList
+          names.filter { name =>
+            str2Long(name) match {
+              case Some(t) => (t < head)
+              case _ => false
+            }
+          }.map(HdfsUtil.getHdfsFilePath(path, _))
+        }
+        val equalPaths = paths.flatMap { path =>
+          val names = HdfsUtil.listSubPaths(path, "dir").toList
+          names.filter { name =>
+            str2Long(name) match {
+              case Some(t) => (t == head)
+              case _ => false
+            }
+          }.map(HdfsUtil.getHdfsFilePath(path, _))
+        }
+
+        tail match {
+          case Nil => earlierPaths
+          case _ => earlierPaths ::: listPathsEarlierThanBounds(equalPaths, 
tail)
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala
new file mode 100644
index 0000000..b45e5a9
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala
@@ -0,0 +1,132 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector.direct
+
+import org.apache.griffin.measure.result._
+import org.apache.griffin.measure.rule.{ExprValueUtil, RuleExprs}
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+
+import scala.util.Try
+
+// data connector for avro file
+case class AvroDirectDataConnector(sqlContext: SQLContext, config: Map[String, 
Any],
+                                   ruleExprs: RuleExprs, 
constFinalExprValueMap: Map[String, Any]
+                                 ) extends DirectDataConnector {
+
+  val FilePath = "file.path"
+  val FileName = "file.name"
+
+  val filePath = config.getOrElse(FilePath, "").toString
+  val fileName = config.getOrElse(FileName, "").toString
+
+  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else 
fileName
+
+  private def pathPrefix(): Boolean = {
+    filePath.nonEmpty
+  }
+
+  private def fileExist(): Boolean = {
+    HdfsUtil.existPath(concreteFileFullPath)
+  }
+
+  def available(): Boolean = {
+    (!concreteFileFullPath.isEmpty) && fileExist
+  }
+
+  def init(): Unit = {}
+
+  def metaData(): Try[Iterable[(String, String)]] = {
+    Try {
+      val st = 
sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+      st.fields.map(f => (f.name, f.dataType.typeName))
+    }
+  }
+
+  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
+    Try {
+      loadDataFile.flatMap { row =>
+        // generate cache data
+        val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), 
ruleExprs.cacheExprs, constFinalExprValueMap)
+        val finalExprValueMaps = 
ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
+
+        // data info
+        val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info 
=>
+          try {
+            (info.key -> row.getAs[info.T](info.key))
+          } catch {
+            case e: Throwable => info.defWrap
+          }
+        }.toMap
+
+        finalExprValueMaps.flatMap { finalExprValueMap =>
+          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr 
=>
+            expr.calculate(finalExprValueMap) match {
+              case Some(v) => Some(v.asInstanceOf[AnyRef])
+              case _ => None
+            }
+          }
+          val key = toTuple(groupbyData)
+
+          Some((key, (finalExprValueMap, dataInfoMap)))
+        }
+
+//        val cacheExprValueMap: Map[String, Any] = 
ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
+//          ExprValueUtil.genExprValueMaps(Some(row), expr, cachedMap)
+//        }
+//        val finalExprValueMap = 
ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMap)
+
+        // when clause filter data source
+//        val whenResult = ruleExprs.whenClauseExprOpt match {
+//          case Some(whenClause) => whenClause.calculate(finalExprValueMap)
+//          case _ => None
+//        }
+//
+//        // get groupby data
+//        whenResult match {
+//          case Some(false) => None
+//          case _ => {
+//            val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { 
expr =>
+//              expr.calculate(finalExprValueMap) match {
+//                case Some(v) => Some(v.asInstanceOf[AnyRef])
+//                case _ => None
+//              }
+//            }
+//            val key = toTuple(groupbyData)
+//
+//            Some((key, finalExprValueMap))
+//          }
+//        }
+      }
+    }
+  }
+
+  private def loadDataFile() = {
+    
sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
+  }
+
+  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
+    if (as.size > 0) {
+      val tupleClass = Class.forName("scala.Tuple" + as.size)
+      tupleClass.getConstructors.apply(0).newInstance(as: 
_*).asInstanceOf[Product]
+    } else None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala
new file mode 100644
index 0000000..ac1a792
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector.direct
+
+import org.apache.griffin.measure.connector.DataConnector
+import org.apache.griffin.measure.connector.cache.DataUpdatable
+import org.apache.spark.rdd.RDD
+
+import scala.util.Try
+
+
+trait DirectDataConnector extends DataConnector with DataUpdatable {
+
+  def metaData(): Try[Iterable[(String, String)]]
+
+  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala
new file mode 100644
index 0000000..7de2b02
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala
@@ -0,0 +1,158 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector.direct
+
+import org.apache.griffin.measure.result._
+import org.apache.griffin.measure.rule.{ExprValueUtil, RuleExprs}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+
+import scala.util.{Success, Try}
+
+// data connector for hive
+case class HiveDirectDataConnector(sqlContext: SQLContext, config: Map[String, 
Any],
+                                   ruleExprs: RuleExprs, 
constFinalExprValueMap: Map[String, Any]
+                                 ) extends DirectDataConnector {
+
+  val Database = "database"
+  val TableName = "table.name"
+  val Partitions = "partitions"
+
+  val database = config.getOrElse(Database, "").toString
+  val tableName = config.getOrElse(TableName, "").toString
+  val partitionsString = config.getOrElse(Partitions, "").toString
+
+  val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else 
tableName
+  val partitions = partitionsString.split(";").map(s => 
s.split(",").map(_.trim))
+
+  private def dbPrefix(): Boolean = {
+    database.nonEmpty && !database.equals("default")
+  }
+
+  def available(): Boolean = {
+    (!tableName.isEmpty) && {
+      Try {
+        if (dbPrefix) {
+          sqlContext.tables(database).filter(tableExistsSql).collect.size
+        } else {
+          sqlContext.tables().filter(tableExistsSql).collect.size
+        }
+      } match {
+        case Success(s) => s > 0
+        case _ => false
+      }
+    }
+  }
+
+  def init(): Unit = {}
+
+  def metaData(): Try[Iterable[(String, String)]] = {
+    Try {
+      val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), 
r.getString(1))).collect
+      val partitionPos: Int = originRows.indexWhere(pair => 
pair._1.startsWith("# "))
+      if (partitionPos < 0) originRows
+      else originRows.take(partitionPos)
+    }
+  }
+
+  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
+    Try {
+      sqlContext.sql(dataSql).flatMap { row =>
+        // generate cache data
+        val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), 
ruleExprs.cacheExprs, constFinalExprValueMap)
+        val finalExprValueMaps = 
ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
+
+        // data info
+        val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info 
=>
+          try {
+            (info.key -> row.getAs[info.T](info.key))
+          } catch {
+            case e: Throwable => info.defWrap
+          }
+        }.toMap
+
+        finalExprValueMaps.flatMap { finalExprValueMap =>
+          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr 
=>
+            expr.calculate(finalExprValueMap) match {
+              case Some(v) => Some(v.asInstanceOf[AnyRef])
+              case _ => None
+            }
+          }
+          val key = toTuple(groupbyData)
+
+          Some((key, (finalExprValueMap, dataInfoMap)))
+        }
+
+        // generate cache data
+//        val cacheExprValueMap: Map[String, Any] = 
ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
+//          ExprValueUtil.genExprValueMap(Some(row), expr, cachedMap)
+//        }
+//        val finalExprValueMap = 
ExprValueUtil.updateExprValueMap(ruleExprs.finalCacheExprs, cacheExprValueMap)
+//
+//        // when clause filter data source
+//        val whenResult = ruleExprs.whenClauseExprOpt match {
+//          case Some(whenClause) => whenClause.calculate(finalExprValueMap)
+//          case _ => None
+//        }
+//
+//        // get groupby data
+//        whenResult match {
+//          case Some(false) => None
+//          case _ => {
+//            val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { 
expr =>
+//              expr.calculate(finalExprValueMap) match {
+//                case Some(v) => Some(v.asInstanceOf[AnyRef])
+//                case _ => None
+//              }
+//            }
+//            val key = toTuple(groupbyData)
+//
+//            Some((key, finalExprValueMap))
+//          }
+//        }
+      }
+    }
+  }
+
+  private def tableExistsSql(): String = {
+//    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but 
not work for spark sql
+    s"tableName LIKE '${tableName}'"
+  }
+
+  private def metaDataSql(): String = {
+    s"DESCRIBE ${concreteTableName}"
+  }
+
+  private def dataSql(): String = {
+    val clauses = partitions.map { prtn =>
+      val cls = prtn.mkString(" AND ")
+      if (cls.isEmpty) s"SELECT * FROM ${concreteTableName}"
+      else s"SELECT * FROM ${concreteTableName} WHERE ${cls}"
+    }
+    clauses.mkString(" UNION ALL ")
+  }
+
+  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
+    if (as.size > 0) {
+      val tupleClass = Class.forName("scala.Tuple" + as.size)
+      tupleClass.getConstructors.apply(0).newInstance(as: 
_*).asInstanceOf[Product]
+    } else None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala
new file mode 100644
index 0000000..d2534cc
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala
@@ -0,0 +1,125 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector.direct
+
+import org.apache.griffin.measure.config.params.user.DataConnectorParam
+import org.apache.griffin.measure.connector.DataConnectorFactory
+import org.apache.griffin.measure.connector.cache.CacheDataConnector
+import org.apache.griffin.measure.connector.streaming.StreamingDataConnector
+import org.apache.griffin.measure.result._
+import org.apache.griffin.measure.rule._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.streaming.StreamingContext
+
+import scala.util.{Failure, Success, Try}
+
+case class KafkaCacheDirectDataConnector(@transient streamingDataConnectorTry: 
Try[StreamingDataConnector],
+                                         cacheDataConnectorTry: 
Try[CacheDataConnector],
+                                         dataConnectorParam: 
DataConnectorParam,
+                                         ruleExprs: RuleExprs,
+                                         constFinalExprValueMap: Map[String, 
Any]
+                                        ) extends 
StreamingCacheDirectDataConnector {
+
+  val cacheDataConnector: CacheDataConnector = cacheDataConnectorTry match {
+    case Success(cntr) => cntr
+    case Failure(ex) => throw ex
+  }
+  @transient val streamingDataConnector: StreamingDataConnector = 
streamingDataConnectorTry match {
+    case Success(cntr) => cntr
+    case Failure(ex) => throw ex
+  }
+
+  protected def transform(rdd: RDD[(streamingDataConnector.K, 
streamingDataConnector.V)],
+                          ms: Long
+                         ): RDD[Map[String, Any]] = {
+    val dataInfoMap = DataInfo.cacheInfoList.map(_.defWrap).toMap + 
TimeStampInfo.wrap(ms)
+
+    rdd.flatMap { kv =>
+      val msg = kv._2
+
+      val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(msg), 
ruleExprs.cacheExprs, constFinalExprValueMap)
+      val finalExprValueMaps = 
ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
+
+      finalExprValueMaps.map { vm =>
+        vm ++ dataInfoMap
+      }
+    }
+  }
+
+  def metaData(): Try[Iterable[(String, String)]] = Try {
+    Map.empty[String, String]
+  }
+
+  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = Try {
+    cacheDataConnector.readData match {
+      case Success(rdd) => {
+        rdd.flatMap { row =>
+          val finalExprValueMap = ruleExprs.finalCacheExprs.flatMap { expr =>
+            row.get(expr._id).flatMap { d =>
+              Some((expr._id, d))
+            }
+          }.toMap
+
+          val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { 
info =>
+            row.get(info.key) match {
+              case Some(d) => (info.key -> d)
+              case _ => info.defWrap
+            }
+          }.toMap
+
+          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr 
=>
+            expr.calculate(finalExprValueMap) match {
+              case Some(v) => Some(v.asInstanceOf[AnyRef])
+              case _ => None
+            }
+          }
+          val key = toTuple(groupbyData)
+
+          Some((key, (finalExprValueMap, dataInfoMap)))
+        }
+      }
+      case Failure(ex) => throw ex
+    }
+  }
+
+  override def cleanOldData(): Unit = {
+    cacheDataConnector.cleanOldData
+  }
+
+  override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): 
Unit = {
+    if (dataConnectorParam.getMatchOnce) {
+      cacheDataConnector.updateOldData(t, oldData)
+    }
+  }
+
+  override def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {
+    if (dataConnectorParam.getMatchOnce) {
+      cacheDataConnector.updateAllOldData(oldRdd)
+    }
+  }
+
+  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
+    if (as.size > 0) {
+      val tupleClass = Class.forName("scala.Tuple" + as.size)
+      tupleClass.getConstructors.apply(0).newInstance(as: 
_*).asInstanceOf[Product]
+    } else None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala
new file mode 100644
index 0000000..87139d6
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector.direct
+
+import org.apache.griffin.measure.connector.cache.CacheDataConnector
+import org.apache.griffin.measure.connector.streaming.StreamingDataConnector
+import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
+import org.apache.griffin.measure.rule.ExprValueUtil
+import org.apache.spark.rdd.RDD
+
+import scala.util.{Failure, Success}
+
+trait StreamingCacheDirectDataConnector extends DirectDataConnector {
+
+  val cacheDataConnector: CacheDataConnector
+  @transient val streamingDataConnector: StreamingDataConnector
+
+  def available(): Boolean = {
+    cacheDataConnector.available && streamingDataConnector.available
+  }
+
+  def init(): Unit = {
+    cacheDataConnector.init
+
+    val ds = streamingDataConnector.stream match {
+      case Success(dstream) => dstream
+      case Failure(ex) => throw ex
+    }
+
+    ds.foreachRDD((rdd, time) => {
+      val ms = time.milliseconds
+
+      val valueMapRdd = transform(rdd, ms)
+
+      // save data frame
+      cacheDataConnector.saveData(valueMapRdd, ms)
+    })
+  }
+
+  protected def transform(rdd: RDD[(streamingDataConnector.K, 
streamingDataConnector.V)],
+                          ms: Long
+                         ): RDD[Map[String, Any]]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala
new file mode 100644
index 0000000..fdd511d
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector.streaming
+
+import kafka.serializer.Decoder
+import org.apache.griffin.measure.connector.cache.{CacheDataConnector, 
DataCacheable}
+import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
+import org.apache.griffin.measure.rule.{ExprValueUtil, RuleExprs}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.InputDStream
+
+import scala.util.{Failure, Success, Try}
+
+abstract class KafkaStreamingDataConnector(@transient ssc: StreamingContext,
+                                           config: Map[String, Any]
+                                          ) extends StreamingDataConnector {
+  type KD <: Decoder[K]
+  type VD <: Decoder[V]
+
+  val KafkaConfig = "kafka.config"
+  val Topics = "topics"
+
+  val kafkaConfig = config.get(KafkaConfig) match {
+    case Some(map: Map[String, Any]) => map.mapValues(_.toString).map(identity)
+    case _ => Map[String, String]()
+  }
+  val topics = config.getOrElse(Topics, "").toString
+
+  def available(): Boolean = {
+    true
+  }
+
+  def init(): Unit = {}
+
+  def stream(): Try[InputDStream[(K, V)]] = Try {
+    val topicSet = topics.split(",").toSet
+    createDStream(topicSet)
+  }
+
+  protected def createDStream(topicSet: Set[String]): InputDStream[(K, V)]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala
new file mode 100644
index 0000000..c37caac
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.connector.streaming
+
+import org.apache.griffin.measure.connector.DataConnector
+import org.apache.spark.streaming.dstream.InputDStream
+
+import scala.util.Try
+
+
+trait StreamingDataConnector extends DataConnector {
+
+  type K
+  type V
+
+  def stream(): Try[InputDStream[(K, V)]]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala 
b/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala
new file mode 100644
index 0000000..265a8cd
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala
@@ -0,0 +1,43 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.log
+
+import org.slf4j.LoggerFactory
+
+trait Loggable {
+
+  @transient private lazy val logger = LoggerFactory.getLogger(getClass)
+
+  protected def info(msg: String): Unit = {
+    logger.info(msg)
+  }
+
+  protected def debug(msg: String): Unit = {
+    logger.debug(msg)
+  }
+
+  protected def warn(msg: String): Unit = {
+    logger.warn(msg)
+  }
+
+  protected def error(msg: String): Unit = {
+    logger.error(msg)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
new file mode 100644
index 0000000..97786c4
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
@@ -0,0 +1,206 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.persist
+
+import java.util.Date
+
+import org.apache.griffin.measure.result._
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.spark.rdd.RDD
+
+import scala.util.Try
+
+// persist result and data to hdfs
+case class HdfsPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
+
+  val Path = "path"
+  val MaxPersistLines = "max.persist.lines"
+  val MaxLinesPerFile = "max.lines.per.file"
+
+  val path = config.getOrElse(Path, "").toString
+  val maxPersistLines = try { config.getOrElse(MaxPersistLines, 
-1).toString.toInt } catch { case _ => -1 }
+  val maxLinesPerFile = try { config.getOrElse(MaxLinesPerFile, 
10000).toString.toLong } catch { case _ => 10000 }
+
+  val separator = "/"
+
+  val StartFile = filePath("_START")
+  val FinishFile = filePath("_FINISH")
+  val ResultFile = filePath("_RESULT")
+
+  val MissRecFile = filePath("_MISSREC")      // optional
+  val MatchRecFile = filePath("_MATCHREC")    // optional
+
+  val LogFile = filePath("_LOG")
+
+  var _init = true
+  private def isInit = {
+    val i = _init
+    _init = false
+    i
+  }
+
+  def available(): Boolean = {
+    (path.nonEmpty) && (maxPersistLines < Int.MaxValue)
+  }
+
+  private def persistHead: String = {
+    val dt = new Date(timeStamp)
+    s"================ log of ${dt} ================\n"
+  }
+
+  private def timeHead(rt: Long): String = {
+    val dt = new Date(rt)
+    s"--- ${dt} ---\n"
+  }
+
+  protected def filePath(file: String): String = {
+    HdfsUtil.getHdfsFilePath(path, s"${metricName}/${timeStamp}/${file}")
+  }
+
+  protected def withSuffix(path: String, suffix: String): String = {
+    s"${path}.${suffix}"
+  }
+
+  def start(msg: String): Unit = {
+    try {
+      HdfsUtil.writeContent(StartFile, msg)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+  def finish(): Unit = {
+    try {
+      HdfsUtil.createEmptyFile(FinishFile)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  def result(rt: Long, result: Result): Unit = {
+    try {
+      val resStr = result match {
+        case ar: AccuracyResult => {
+          s"match percentage: ${ar.matchPercentage}\ntotal count: 
${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
+        }
+        case pr: ProfileResult => {
+          s"match percentage: ${pr.matchPercentage}\ntotal count: 
${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}"
+        }
+        case _ => {
+          s"result: ${result}"
+        }
+      }
+      HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr)
+      log(rt, resStr)
+
+      info(resStr)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  // need to avoid string too long
+  private def rddRecords(records: RDD[String], path: String): Unit = {
+    try {
+      val recordCount = records.count
+      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
+      if (count > 0) {
+        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
+        if (groupCount <= 1) {
+          val recs = records.take(count.toInt)
+          persistRecords(path, recs)
+        } else {
+          val groupedRecords: RDD[(Long, Iterable[String])] =
+            records.zipWithIndex.flatMap { r =>
+              val gid = r._2 / maxLinesPerFile
+              if (gid < groupCount) Some((gid, r._1)) else None
+            }.groupByKey()
+          groupedRecords.foreach { group =>
+            val (gid, recs) = group
+            val hdfsPath = if (gid == 0) path else withSuffix(path, 
gid.toString)
+            persistRecords(hdfsPath, recs)
+          }
+        }
+      }
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  private def iterableRecords(records: Iterable[String], path: String): Unit = 
{
+    try {
+      val recordCount = records.size
+      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
+      if (count > 0) {
+        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
+        if (groupCount <= 1) {
+          val recs = records.take(count.toInt)
+          persistRecords(path, recs)
+        } else {
+          val groupedRecords = records.grouped(groupCount).zipWithIndex
+          groupedRecords.take(groupCount).foreach { group =>
+            val (recs, gid) = group
+            val hdfsPath = if (gid == 0) path else withSuffix(path, 
gid.toString)
+            persistRecords(hdfsPath, recs)
+          }
+        }
+      }
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  def records(recs: RDD[String], tp: String): Unit = {
+    tp match {
+      case PersistType.MISS => rddRecords(recs, MissRecFile)
+      case PersistType.MATCH => rddRecords(recs, MatchRecFile)
+      case _ => {}
+    }
+  }
+
+  def records(recs: Iterable[String], tp: String): Unit = {
+    tp match {
+      case PersistType.MISS => iterableRecords(recs, MissRecFile)
+      case PersistType.MATCH => iterableRecords(recs, MatchRecFile)
+      case _ => {}
+    }
+  }
+
+//  def missRecords(records: RDD[String]): Unit = {
+//    rddRecords(records, MissRecFile)
+//  }
+//
+//  def matchRecords(records: RDD[String]): Unit = {
+//    rddRecords(records, MatchRecFile)
+//  }
+
+  private def persistRecords(hdfsPath: String, records: Iterable[String]): 
Unit = {
+    val recStr = records.mkString("\n")
+    HdfsUtil.writeContent(hdfsPath, recStr)
+  }
+
+  def log(rt: Long, msg: String): Unit = {
+    try {
+      val logStr = (if (isInit) persistHead else "") + timeHead(rt) + 
s"${msg}\n\n"
+      HdfsUtil.appendContent(LogFile, logStr)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
new file mode 100644
index 0000000..6d5bac3
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.persist
+
+import org.apache.griffin.measure.result._
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
+import org.apache.spark.rdd.RDD
+
+import scala.util.Try
+
+// persist result by http way
+case class HttpPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
+
+  val Api = "api"
+  val Method = "method"
+
+  val api = config.getOrElse(Api, "").toString
+  val method = config.getOrElse(Method, "post").toString
+
+  def available(): Boolean = {
+    api.nonEmpty
+  }
+
+  def start(msg: String): Unit = {}
+  def finish(): Unit = {}
+
+  def result(rt: Long, result: Result): Unit = {
+    result match {
+      case ar: AccuracyResult => {
+        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> 
timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch))
+        httpResult(dataMap)
+      }
+      case pr: ProfileResult => {
+        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> 
timeStamp), ("total" -> pr.getTotal), ("matched" -> pr.getMatch))
+        httpResult(dataMap)
+      }
+      case _ => {
+        info(s"result: ${result}")
+      }
+    }
+  }
+
+  private def httpResult(dataMap: Map[String, Any]) = {
+    try {
+      val data = JsonUtil.toJson(dataMap)
+      // post
+      val params = Map[String, Object]()
+      val header = Map[String, Object]()
+
+      def func(): Boolean = {
+        HttpUtil.httpRequest(api, method, params, header, data)
+      }
+
+      PersistThreadPool.addTask(func _, 10)
+
+//      val status = HttpUtil.httpRequest(api, method, params, header, data)
+//      info(s"${method} to ${api} response status: ${status}")
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+
+  }
+
+  def records(recs: RDD[String], tp: String): Unit = {}
+  def records(recs: Iterable[String], tp: String): Unit = {}
+
+//  def missRecords(records: RDD[String]): Unit = {}
+//  def matchRecords(records: RDD[String]): Unit = {}
+
+  def log(rt: Long, msg: String): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
new file mode 100644
index 0000000..00d41ea
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.persist
+
+import java.util.Date
+
+import org.apache.griffin.measure.result._
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.spark.rdd.RDD
+
+// persist result and data to hdfs
+case class LoggerPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
+
+  val MaxLogLines = "max.log.lines"
+
+  val maxLogLines = try { config.getOrElse(MaxLogLines, 100).toString.toInt } 
catch { case _ => 100 }
+
+  def available(): Boolean = true
+
+  def start(msg: String): Unit = {
+    println(s"[${timeStamp}] ${metricName} start")
+  }
+  def finish(): Unit = {
+    println(s"[${timeStamp}] ${metricName} finish")
+  }
+
+  def result(rt: Long, result: Result): Unit = {
+    try {
+      val resStr = result match {
+        case ar: AccuracyResult => {
+          s"match percentage: ${ar.matchPercentage}\ntotal count: 
${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
+        }
+        case pr: ProfileResult => {
+          s"match percentage: ${pr.matchPercentage}\ntotal count: 
${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}"
+        }
+        case _ => {
+          s"result: ${result}"
+        }
+      }
+      println(s"[${timeStamp}] ${metricName} result: \n${resStr}")
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  // need to avoid string too long
+  private def rddRecords(records: RDD[String]): Unit = {
+    try {
+      val recordCount = records.count.toInt
+      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
+      if (count > 0) {
+        val recordsArray = records.take(count)
+//        recordsArray.foreach(println)
+      }
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  private def iterableRecords(records: Iterable[String]): Unit = {
+    try {
+      val recordCount = records.size
+      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
+      if (count > 0) {
+        val recordsArray = records.take(count)
+//        recordsArray.foreach(println)
+      }
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  def records(recs: RDD[String], tp: String): Unit = {
+    tp match {
+      case PersistType.MISS => rddRecords(recs)
+      case PersistType.MATCH => rddRecords(recs)
+      case _ => {}
+    }
+  }
+
+  def records(recs: Iterable[String], tp: String): Unit = {
+    tp match {
+      case PersistType.MISS => iterableRecords(recs)
+      case PersistType.MATCH => iterableRecords(recs)
+      case _ => {}
+    }
+  }
+
+//  def missRecords(records: RDD[String]): Unit = {
+//    warn(s"[${timeStamp}] ${metricName} miss records: ")
+//    rddRecords(records)
+//  }
+//  def matchRecords(records: RDD[String]): Unit = {
+//    warn(s"[${timeStamp}] ${metricName} match records: ")
+//    rddRecords(records)
+//  }
+
+  def log(rt: Long, msg: String): Unit = {
+    println(s"[${timeStamp}] ${rt}: ${msg}")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
new file mode 100644
index 0000000..25c8b0b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
@@ -0,0 +1,52 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.persist
+
+import org.apache.griffin.measure.result._
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
+import org.apache.spark.rdd.RDD
+
+import scala.util.Try
+
+// persist result and data by multiple persists
+case class MultiPersists(persists: Iterable[Persist]) extends Persist {
+
+  val timeStamp: Long = persists match {
+    case Nil => 0
+    case _ => persists.head.timeStamp
+  }
+
+  val config: Map[String, Any] = Map[String, Any]()
+
+  def available(): Boolean = { persists.exists(_.available()) }
+
+  def start(msg: String): Unit = { persists.foreach(_.start(msg)) }
+  def finish(): Unit = { persists.foreach(_.finish()) }
+
+  def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, 
result)) }
+
+  def records(recs: RDD[String], tp: String): Unit = { 
persists.foreach(_.records(recs, tp)) }
+  def records(recs: Iterable[String], tp: String): Unit = { 
persists.foreach(_.records(recs, tp)) }
+
+//  def missRecords(records: RDD[String]): Unit = { 
persists.foreach(_.missRecords(records)) }
+//  def matchRecords(records: RDD[String]): Unit = { 
persists.foreach(_.matchRecords(records)) }
+
+  def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
new file mode 100644
index 0000000..357d6e1
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.persist
+
+import org.apache.griffin.measure.result._
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
+import org.apache.spark.rdd.RDD
+
+// persist result by old http way -- temporary way
+case class OldHttpPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
+
+  val Api = "api"
+  val Method = "method"
+
+  val api = config.getOrElse(Api, "").toString
+  val method = config.getOrElse(Method, "post").toString
+
+  def available(): Boolean = {
+    api.nonEmpty
+  }
+
+  def start(msg: String): Unit = {}
+  def finish(): Unit = {}
+
+  def result(rt: Long, result: Result): Unit = {
+    result match {
+      case ar: AccuracyResult => {
+        val matchPercentage: Double = if (ar.getTotal <= 0) 0 else 
(ar.getMatch * 1.0 / ar.getTotal) * 100
+        val dataMap = Map[String, Any](("metricName" -> metricName), 
("timestamp" -> timeStamp), ("value" -> matchPercentage), ("count" -> 
ar.getTotal))
+        httpResult(dataMap)
+      }
+      case pr: ProfileResult => {
+        val dataMap = Map[String, Any](("metricName" -> metricName), 
("timestamp" -> timeStamp), ("value" -> pr.getMatch), ("count" -> pr.getTotal))
+        httpResult(dataMap)
+      }
+      case _ => {
+        info(s"result: ${result}")
+      }
+    }
+  }
+
+  private def httpResult(dataMap: Map[String, Any]) = {
+    try {
+      val data = JsonUtil.toJson(dataMap)
+      // post
+      val params = Map[String, Object]()
+      val header = Map[String, Object](("content-type" -> "application/json"))
+
+      def func(): Boolean = {
+        HttpUtil.httpRequest(api, method, params, header, data)
+      }
+
+      PersistThreadPool.addTask(func _, 10)
+
+//      val status = HttpUtil.httpRequest(api, method, params, header, data)
+//      info(s"${method} to ${api} response status: ${status}")
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+
+  }
+
+  def records(recs: RDD[String], tp: String): Unit = {}
+  def records(recs: Iterable[String], tp: String): Unit = {}
+
+//  def missRecords(records: RDD[String]): Unit = {}
+//  def matchRecords(records: RDD[String]): Unit = {}
+
+  def log(rt: Long, msg: String): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
new file mode 100644
index 0000000..bc16599
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
@@ -0,0 +1,52 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.persist
+
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.result._
+import org.apache.spark.rdd.RDD
+
+import scala.util.Try
+
+
+trait Persist extends Loggable with Serializable {
+  val timeStamp: Long
+
+  val config: Map[String, Any]
+
+  def available(): Boolean
+
+  def start(msg: String): Unit
+  def finish(): Unit
+
+  def result(rt: Long, result: Result): Unit
+
+  def records(recs: RDD[String], tp: String): Unit
+  def records(recs: Iterable[String], tp: String): Unit
+
+//  def missRecords(records: RDD[String]): Unit
+//  def matchRecords(records: RDD[String]): Unit
+
+  def log(rt: Long, msg: String): Unit
+}
+
+object PersistType {
+  final val MISS = "miss"
+  final val MATCH = "match"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
new file mode 100644
index 0000000..4330160
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
@@ -0,0 +1,53 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.persist
+
+import org.apache.griffin.measure.config.params.env._
+
+import scala.util.{Success, Try}
+
+
+case class PersistFactory(persistParams: Iterable[PersistParam], metricName: 
String) extends Serializable {
+
+  val HDFS_REGEX = """^(?i)hdfs$""".r
+  val HTTP_REGEX = """^(?i)http$""".r
+  val OLDHTTP_REGEX = """^(?i)oldhttp$""".r
+  val LOG_REGEX = """^(?i)log$""".r
+
+  def getPersists(timeStamp: Long): MultiPersists = {
+    MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param)))
+  }
+
+  // get the persists configured
+  private def getPersist(timeStamp: Long, persistParam: PersistParam): 
Option[Persist] = {
+    val config = persistParam.config
+    val persistTry = persistParam.persistType match {
+      case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp))
+      case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp))
+      case OLDHTTP_REGEX() => Try(OldHttpPersist(config, metricName, 
timeStamp))
+      case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp))
+      case _ => throw new Exception("not supported persist type")
+    }
+    persistTry match {
+      case Success(persist) if (persist.available) => Some(persist)
+      case _ => None
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala
new file mode 100644
index 0000000..7993aab
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala
@@ -0,0 +1,62 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.persist
+
+import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
+
+object PersistThreadPool {
+
+  private val pool: ThreadPoolExecutor = 
Executors.newFixedThreadPool(10).asInstanceOf[ThreadPoolExecutor]
+  val MAX_RETRY = 100
+
+  def shutdown(): Unit = {
+    pool.shutdown()
+    pool.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  def addTask(func: () => Boolean, retry: Int): Unit = {
+    val r = if (retry < 0) MAX_RETRY else retry
+    println(s"add task, current task num: ${pool.getQueue.size}")
+    pool.submit(Task(func, r))
+  }
+
+  case class Task(func: () => Boolean, retry: Int) extends Runnable {
+
+    override def run(): Unit = {
+      try {
+        var i = retry
+        var suc = false
+        while (!suc && i > 0) {
+          if (func()) {
+            println("task success")
+            suc = true
+          } else i = i - 1
+        }
+        if (!suc) fail(s"retried for ${retry} times")
+      } catch {
+        case e: Throwable => fail(s"${e.getMessage}")
+      }
+    }
+
+    def fail(msg: String): Unit = {
+      println(s"task fails: ${msg}")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala 
b/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala
new file mode 100644
index 0000000..16bb772
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala
@@ -0,0 +1,44 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.result
+
+// result for accuracy: miss count, total count
+case class AccuracyResult(miss: Long, total: Long) extends Result {
+
+  type T = AccuracyResult
+
+  def update(delta: T): T = {
+    AccuracyResult(delta.miss, total)
+  }
+
+  def eventual(): Boolean = {
+    this.miss <= 0
+  }
+
+  def differsFrom(other: T): Boolean = {
+    (this.miss != other.miss) || (this.total != other.total)
+  }
+
+  def getMiss = miss
+  def getTotal = total
+  def getMatch = total - miss
+
+  def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / 
getTotal * 100
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/result/DataInfo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/result/DataInfo.scala 
b/measure/src/main/scala/org/apache/griffin/measure/result/DataInfo.scala
new file mode 100644
index 0000000..7ec0783
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/result/DataInfo.scala
@@ -0,0 +1,50 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.result
+
+
+sealed trait DataInfo {
+  type T
+  val key: String
+  def wrap(value: T) = (key -> value)
+  def defWrap() = wrap(dfv)
+  val dfv: T
+}
+
+final case object TimeStampInfo extends DataInfo {
+  type T = Long
+  val key = "_tmst_"
+  val dfv = 0L
+}
+
+final case object MismatchInfo extends DataInfo {
+  type T = String
+  val key = "_mismatch_"
+  val dfv = ""
+}
+
+final case object ErrorInfo extends DataInfo {
+  type T = String
+  val key = "_error_"
+  val dfv = ""
+}
+
+object DataInfo {
+  val cacheInfoList = List(TimeStampInfo, MismatchInfo, ErrorInfo)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala 
b/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala
new file mode 100644
index 0000000..803416e
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala
@@ -0,0 +1,44 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.result
+
+// result for profile: match count, total count
+case class ProfileResult(matchCount: Long, totalCount: Long) extends Result {
+
+  type T = ProfileResult
+
+  def update(delta: T): T = {
+    ProfileResult(matchCount + delta.matchCount, totalCount)
+  }
+
+  def eventual(): Boolean = {
+    this.matchCount >= totalCount
+  }
+
+  def differsFrom(other: T): Boolean = {
+    (this.matchCount != other.matchCount) || (this.totalCount != 
other.totalCount)
+  }
+
+  def getMiss = totalCount - matchCount
+  def getTotal = totalCount
+  def getMatch = matchCount
+
+  def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / 
getTotal * 100
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala 
b/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala
new file mode 100644
index 0000000..6dcd9a1
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.result
+
+
+trait Result extends Serializable {
+
+  type T <: Result
+
+  def update(delta: T): T
+
+  def eventual(): Boolean
+
+  def differsFrom(other: T): Boolean
+
+}


Reply via email to