http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala
new file mode 100644
index 0000000..70ddcde
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/KafkaCacheDirectDataConnector.scala
@@ -0,0 +1,125 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.data.connector.direct
+//
+//import org.apache.griffin.measure.config.params.user.DataConnectorParam
+//import org.apache.griffin.measure.data.connector.DataConnectorFactory
+//import org.apache.griffin.measure.data.connector.cache.CacheDataConnector
+//import 
org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector
+//import org.apache.griffin.measure.result._
+//import org.apache.griffin.measure.rule._
+//import org.apache.spark.rdd.RDD
+//import org.apache.spark.sql.SQLContext
+//import org.apache.spark.streaming.StreamingContext
+//
+//import scala.util.{Failure, Success, Try}
+//
+//case class KafkaCacheDirectDataConnector(@transient 
streamingDataConnectorTry: Try[StreamingDataConnector],
+//                                         cacheDataConnectorTry: 
Try[CacheDataConnector],
+//                                         dataConnectorParam: 
DataConnectorParam,
+//                                         ruleExprs: RuleExprs,
+//                                         constFinalExprValueMap: Map[String, 
Any]
+//                                        ) extends 
StreamingCacheDirectDataConnector {
+//
+//  val cacheDataConnector: CacheDataConnector = cacheDataConnectorTry match {
+//    case Success(cntr) => cntr
+//    case Failure(ex) => throw ex
+//  }
+//  @transient val streamingDataConnector: StreamingDataConnector = 
streamingDataConnectorTry match {
+//    case Success(cntr) => cntr
+//    case Failure(ex) => throw ex
+//  }
+//
+//  protected def transform(rdd: RDD[(streamingDataConnector.K, 
streamingDataConnector.V)],
+//                          ms: Long
+//                         ): RDD[Map[String, Any]] = {
+//    val dataInfoMap = DataInfo.cacheInfoList.map(_.defWrap).toMap + 
TimeStampInfo.wrap(ms)
+//
+//    rdd.flatMap { kv =>
+//      val msg = kv._2
+//
+//      val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(msg), 
ruleExprs.cacheExprs, constFinalExprValueMap)
+//      val finalExprValueMaps = 
ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
+//
+//      finalExprValueMaps.map { vm =>
+//        vm ++ dataInfoMap
+//      }
+//    }
+//  }
+//
+//  def metaData(): Try[Iterable[(String, String)]] = Try {
+//    Map.empty[String, String]
+//  }
+//
+//  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = 
Try {
+//    cacheDataConnector.readData match {
+//      case Success(rdd) => {
+//        rdd.flatMap { row =>
+//          val finalExprValueMap = ruleExprs.finalCacheExprs.flatMap { expr =>
+//            row.get(expr._id).flatMap { d =>
+//              Some((expr._id, d))
+//            }
+//          }.toMap
+//
+//          val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { 
info =>
+//            row.get(info.key) match {
+//              case Some(d) => (info.key -> d)
+//              case _ => info.defWrap
+//            }
+//          }.toMap
+//
+//          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { 
expr =>
+//            expr.calculate(finalExprValueMap) match {
+//              case Some(v) => Some(v.asInstanceOf[AnyRef])
+//              case _ => None
+//            }
+//          }
+//          val key = toTuple(groupbyData)
+//
+//          Some((key, (finalExprValueMap, dataInfoMap)))
+//        }
+//      }
+//      case Failure(ex) => throw ex
+//    }
+//  }
+//
+//  override def cleanOldData(): Unit = {
+//    cacheDataConnector.cleanOldData
+//  }
+//
+//  override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): 
Unit = {
+//    if (dataConnectorParam.getMatchOnce) {
+//      cacheDataConnector.updateOldData(t, oldData)
+//    }
+//  }
+//
+//  override def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {
+//    if (dataConnectorParam.getMatchOnce) {
+//      cacheDataConnector.updateAllOldData(oldRdd)
+//    }
+//  }
+//
+//  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
+//    if (as.size > 0) {
+//      val tupleClass = Class.forName("scala.Tuple" + as.size)
+//      tupleClass.getConstructors.apply(0).newInstance(as: 
_*).asInstanceOf[Product]
+//    } else None
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala
new file mode 100644
index 0000000..dddf430
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/StreamingCacheDirectDataConnector.scala
@@ -0,0 +1,60 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.data.connector.direct
+//
+//import org.apache.griffin.measure.data.connector.cache.CacheDataConnector
+//import 
org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector
+//import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
+//import org.apache.griffin.measure.rule.ExprValueUtil
+//import org.apache.spark.rdd.RDD
+//
+//import scala.util.{Failure, Success}
+//
+//trait StreamingCacheDirectDataConnector extends DirectDataConnector {
+//
+//  val cacheDataConnector: CacheDataConnector
+//  @transient val streamingDataConnector: StreamingDataConnector
+//
+//  def available(): Boolean = {
+//    cacheDataConnector.available && streamingDataConnector.available
+//  }
+//
+//  def init(): Unit = {
+//    cacheDataConnector.init
+//
+//    val ds = streamingDataConnector.stream match {
+//      case Success(dstream) => dstream
+//      case Failure(ex) => throw ex
+//    }
+//
+//    ds.foreachRDD((rdd, time) => {
+//      val ms = time.milliseconds
+//
+//      val valueMapRdd = transform(rdd, ms)
+//
+//      // save data frame
+//      cacheDataConnector.saveData(valueMapRdd, ms)
+//    })
+//  }
+//
+//  protected def transform(rdd: RDD[(streamingDataConnector.K, 
streamingDataConnector.V)],
+//                          ms: Long
+//                         ): RDD[Map[String, Any]]
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
new file mode 100644
index 0000000..abc547b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
@@ -0,0 +1,136 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector.batch
+
+import org.apache.griffin.measure.config.params.user.DataConnectorParam
+import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+// data connector for avro file
+case class TextDirBatchDataConnector(sqlContext: SQLContext, dqEngines: 
DqEngines, dcParam: DataConnectorParam
+                                    ) extends BatchDataConnector {
+
+  val config = dcParam.config
+
+  val DirPath = "dir.path"
+  val DataDirDepth = "data.dir.depth"
+  val SuccessFile = "success.file"
+  val DoneFile = "done.file"
+
+  val dirPath = config.getString(DirPath, "")
+  val dataDirDepth = config.getInt(DataDirDepth, 0)
+  val successFile = config.getString(SuccessFile, "_SUCCESS")
+  val doneFile = config.getString(DoneFile, "_DONE")
+
+  val ignoreFilePrefix = "_"
+
+  private def dirExist(): Boolean = {
+    HdfsUtil.existPath(dirPath)
+  }
+
+  def data(ms: Long): Option[DataFrame] = {
+    try {
+      val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable)
+      // touch done file for read dirs
+      dataDirs.foreach(dir => touchDone(dir))
+
+      val validDataDirs = dataDirs.filter(dir => !emptyDir(dir))
+
+      if (validDataDirs.size > 0) {
+        val df = sqlContext.read.text(validDataDirs:  _*)
+        val dfOpt = Some(df)
+        val preDfOpt = preProcess(dfOpt, ms)
+        preDfOpt
+      } else {
+        None
+      }
+    } catch {
+      case e: Throwable => {
+        error(s"load text dir ${dirPath} fails: ${e.getMessage}")
+        None
+      }
+    }
+  }
+
+  private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) 
=> Boolean): Seq[String] = {
+    val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, 
"dir", true) }
+    if (depth <= 0) {
+      subDirs.filter(filteFunc)
+    } else {
+      listSubDirs(subDirs, depth, filteFunc)
+    }
+  }
+
+  private def readable(dir: String): Boolean = isSuccess(dir) && !isDone(dir)
+  private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, 
doneFile)
+  private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, 
successFile)
+
+  private def touchDone(dir: String): Unit = 
HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
+
+  private def emptyDir(dir: String): Boolean = {
+    HdfsUtil.listSubPathsByType(dir, 
"file").filter(!_.startsWith(ignoreFilePrefix)).size == 0
+  }
+
+//  def available(): Boolean = {
+//    (!concreteFileFullPath.isEmpty) && fileExist
+//  }
+
+//  def init(): Unit = {}
+
+//  def metaData(): Try[Iterable[(String, String)]] = {
+//    Try {
+//      val st = 
sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+//      st.fields.map(f => (f.name, f.dataType.typeName))
+//    }
+//  }
+
+//  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
+//    Try {
+//      loadDataFile.flatMap { row =>
+//        // generate cache data
+//        val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), 
ruleExprs.cacheExprs, constFinalExprValueMap)
+//        val finalExprValueMaps = 
ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
+//
+//        // data info
+//        val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { 
info =>
+//          try {
+//            (info.key -> row.getAs[info.T](info.key))
+//          } catch {
+//            case e: Throwable => info.defWrap
+//          }
+//        }.toMap
+//
+//        finalExprValueMaps.flatMap { finalExprValueMap =>
+//          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { 
expr =>
+//            expr.calculate(finalExprValueMap) match {
+//              case Some(v) => Some(v.asInstanceOf[AnyRef])
+//              case _ => None
+//            }
+//          }
+//          val key = toTuple(groupbyData)
+//
+//          Some((key, (finalExprValueMap, dataInfoMap)))
+//        }
+//      }
+//    }
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala
new file mode 100644
index 0000000..67dcc06
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/CacheDataConnector.scala
@@ -0,0 +1,33 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.data.connector.cache
+//
+//import org.apache.griffin.measure.data.connector.DataConnector
+//import org.apache.spark.rdd.RDD
+//
+//import scala.util.Try
+//
+//trait CacheDataConnector extends DataConnector with DataCacheable with 
DataUpdatable {
+//
+//  def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit
+//
+//  def readData(): Try[RDD[Map[String, Any]]]
+//
+//}
+//

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala
new file mode 100644
index 0000000..79162be
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataCacheable.scala
@@ -0,0 +1,86 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.data.connector.cache
+//
+//import java.util.concurrent.atomic.AtomicLong
+//
+//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, 
TimeInfoCache}
+//
+//trait DataCacheable {
+//
+//  protected val defCacheInfoPath = PathCounter.genPath
+//
+//  val cacheInfoPath: String
+//  val readyTimeInterval: Long
+//  val readyTimeDelay: Long
+//
+//  def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}"
+//
+//  def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath)
+//  def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath)
+//  def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath)
+//  def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath)
+//
+//  protected def submitCacheTime(ms: Long): Unit = {
+//    val map = Map[String, String]((selfCacheTime -> ms.toString))
+//    InfoCacheInstance.cacheInfo(map)
+//  }
+//
+//  protected def submitReadyTime(ms: Long): Unit = {
+//    val curReadyTime = ms - readyTimeDelay
+//    if (curReadyTime % readyTimeInterval == 0) {
+//      val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
+//      InfoCacheInstance.cacheInfo(map)
+//    }
+//  }
+//
+//  protected def submitLastProcTime(ms: Long): Unit = {
+//    val map = Map[String, String]((selfLastProcTime -> ms.toString))
+//    InfoCacheInstance.cacheInfo(map)
+//  }
+//
+//  protected def submitCleanTime(ms: Long): Unit = {
+//    val cleanTime = genCleanTime(ms)
+//    val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
+//    InfoCacheInstance.cacheInfo(map)
+//  }
+//
+//  protected def genCleanTime(ms: Long): Long = ms
+//
+//  protected def readCleanTime(): Option[Long] = {
+//    val key = selfCleanTime
+//    val keys = key :: Nil
+//    InfoCacheInstance.readInfo(keys).get(key).flatMap { v =>
+//      try {
+//        Some(v.toLong)
+//      } catch {
+//        case _ => None
+//      }
+//    }
+//  }
+//
+//}
+//
+//object PathCounter {
+//  private val counter: AtomicLong = new AtomicLong(0L)
+//  def genPath(): String = s"path_${increment}"
+//  private def increment(): Long = {
+//    counter.incrementAndGet()
+//  }
+//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala
new file mode 100644
index 0000000..61e8413
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/DataUpdatable.scala
@@ -0,0 +1,30 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.data.connector.cache
+//
+//import org.apache.spark.rdd.RDD
+//
+//trait DataUpdatable {
+//
+//  def cleanOldData(): Unit = {}
+//
+//  def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {}
+//  def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {}
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala
new file mode 100644
index 0000000..4c7b45b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/HiveCacheDataConnector.scala
@@ -0,0 +1,351 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.data.connector.cache
+//
+//import java.util.concurrent.TimeUnit
+//
+//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, 
TimeInfoCache}
+//import org.apache.griffin.measure.config.params.user.DataCacheParam
+//import org.apache.griffin.measure.result.TimeStampInfo
+//import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, 
JsonUtil, TimeUtil}
+//import org.apache.spark.rdd.RDD
+//import org.apache.spark.sql.SQLContext
+//import org.apache.spark.sql.hive.HiveContext
+//
+//import scala.util.{Success, Try}
+//
+//case class HiveCacheDataConnector(sqlContext: SQLContext, dataCacheParam: 
DataCacheParam
+//                                 ) extends CacheDataConnector {
+//
+//  if (!sqlContext.isInstanceOf[HiveContext]) {
+//    throw new Exception("hive context not prepared!")
+//  }
+//
+//  val config = dataCacheParam.config
+//  val InfoPath = "info.path"
+//  val cacheInfoPath: String = config.getOrElse(InfoPath, 
defCacheInfoPath).toString
+//
+//  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
+//  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
+//
+//  val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) 
dataCacheParam.timeRange else Nil
+//  val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) 
match {
+//    case s :: e :: _ => {
+//      val ns = TimeUtil.milliseconds(s) match {
+//        case Some(n) if (n < 0) => n
+//        case _ => 0
+//      }
+//      val ne = TimeUtil.milliseconds(e) match {
+//        case Some(n) if (n < 0) => n
+//        case _ => 0
+//      }
+//      (ns, ne)
+//    }
+//    case _ => (0, 0)
+//  }
+//
+//  val Database = "database"
+//  val database: String = config.getOrElse(Database, "").toString
+//  val TableName = "table.name"
+//  val tableName: String = config.get(TableName) match {
+//    case Some(s: String) if (s.nonEmpty) => s
+//    case _ => throw new Exception("invalid table.name!")
+//  }
+//  val ParentPath = "parent.path"
+//  val parentPath: String = config.get(ParentPath) match {
+//    case Some(s: String) => s
+//    case _ => throw new Exception("invalid parent.path!")
+//  }
+//  val tablePath = HdfsUtil.getHdfsFilePath(parentPath, tableName)
+//
+//  val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else 
tableName
+//
+//  val ReadyTimeInterval = "ready.time.interval"
+//  val ReadyTimeDelay = "ready.time.delay"
+//  val readyTimeInterval: Long = 
TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, 
"1m").toString).getOrElse(60000L)
+//  val readyTimeDelay: Long = 
TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, 
"1m").toString).getOrElse(60000L)
+//
+//  val TimeStampColumn: String = TimeStampInfo.key
+//  val PayloadColumn: String = "payload"
+//
+////  type Schema = (Long, String)
+//  val schema: List[(String, String)] = List(
+//    (TimeStampColumn, "bigint"),
+//    (PayloadColumn, "string")
+//  )
+//  val schemaName = schema.map(_._1)
+//
+////  type Partition = (Long, Long)
+//  val partition: List[(String, String, String)] = List(
+//    ("hr", "bigint", "hour"),
+//    ("min", "bigint", "min")
+//  )
+//  val partitionName = partition.map(_._1)
+//
+//  private val fieldSep = """|"""
+//  private val rowSep = """\n"""
+//  private val rowSepLiteral = "\n"
+//
+//  private def dbPrefix(): Boolean = {
+//    database.nonEmpty && !database.equals("default")
+//  }
+//
+//  private def tableExists(): Boolean = {
+//    Try {
+//      if (dbPrefix) {
+//        sqlContext.tables(database).filter(tableExistsSql).collect.size
+//      } else {
+//        sqlContext.tables().filter(tableExistsSql).collect.size
+//      }
+//    } match {
+//      case Success(s) => s > 0
+//      case _ => false
+//    }
+//  }
+//
+//  override def init(): Unit = {
+//    try {
+//      if (tableExists) {
+//        // drop exist table
+//        val dropSql = s"""DROP TABLE ${concreteTableName}"""
+//        sqlContext.sql(dropSql)
+//      }
+//
+//      val colsSql = schema.map { field =>
+//        s"`${field._1}` ${field._2}"
+//      }.mkString(", ")
+//      val partitionsSql = partition.map { partition =>
+//        s"`${partition._1}` ${partition._2}"
+//      }.mkString(", ")
+//      val sql = s"""CREATE EXTERNAL TABLE IF NOT EXISTS ${concreteTableName}
+//                    |(${colsSql}) PARTITIONED BY (${partitionsSql})
+//                    |ROW FORMAT DELIMITED
+//                    |FIELDS TERMINATED BY '${fieldSep}'
+//                    |LINES TERMINATED BY '${rowSep}'
+//                    |STORED AS TEXTFILE
+//                    |LOCATION '${tablePath}'""".stripMargin
+//      sqlContext.sql(sql)
+//    } catch {
+//      case e: Throwable => throw e
+//    }
+//  }
+//
+//  def available(): Boolean = {
+//    true
+//  }
+//
+//  private def encode(data: Map[String, Any], ms: Long): Option[List[Any]] = {
+//    try {
+//      Some(schema.map { field =>
+//        val (name, _) = field
+//        name match {
+//          case TimeStampColumn => ms
+//          case PayloadColumn => JsonUtil.toJson(data)
+//          case _ => null
+//        }
+//      })
+//    } catch {
+//      case _ => None
+//    }
+//  }
+//
+//  private def decode(data: List[Any], updateTimeStamp: Boolean): 
Option[Map[String, Any]] = {
+//    val dataMap = schemaName.zip(data).toMap
+//    dataMap.get(PayloadColumn) match {
+//      case Some(v: String) => {
+//        try {
+//          val map = JsonUtil.toAnyMap(v)
+//          val resMap = if (updateTimeStamp) {
+//            dataMap.get(TimeStampColumn) match {
+//              case Some(t) => map + (TimeStampColumn -> t)
+//              case _ => map
+//            }
+//          } else map
+//          Some(resMap)
+//        } catch {
+//          case _ => None
+//        }
+//      }
+//      case _ => None
+//    }
+//  }
+//
+//  def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = {
+//    val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+//    if (newCacheLocked) {
+//      try {
+//        val ptns = getPartition(ms)
+//        val ptnsPath = genPartitionHdfsPath(ptns)
+//        val dirPath = s"${tablePath}/${ptnsPath}"
+//        val fileName = s"${ms}"
+//        val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName)
+//
+//        // encode data
+//        val dataRdd: RDD[List[Any]] = rdd.flatMap(encode(_, ms))
+//
+//        // save data
+//        val recordRdd: RDD[String] = dataRdd.map { dt =>
+//          dt.map(_.toString).mkString(fieldSep)
+//        }
+//
+//        val dumped = if (!recordRdd.isEmpty) {
+//          HdfsFileDumpUtil.dump(filePath, recordRdd, rowSepLiteral)
+//        } else false
+//
+//        // add partition
+//        if (dumped) {
+//          val sql = addPartitionSql(concreteTableName, ptns)
+//          sqlContext.sql(sql)
+//        }
+//
+//        // submit ms
+//        submitCacheTime(ms)
+//        submitReadyTime(ms)
+//      } catch {
+//        case e: Throwable => error(s"save data error: ${e.getMessage}")
+//      } finally {
+//        newCacheLock.unlock()
+//      }
+//    }
+//  }
+//
+//  def readData(): Try[RDD[Map[String, Any]]] = Try {
+//    val timeRange = TimeInfoCache.getTimeRange
+//    submitLastProcTime(timeRange._2)
+//
+//    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + 
deltaTimeRange._2)
+//    submitCleanTime(reviseTimeRange._1)
+//
+//    // read directly through partition info
+//    val partitionRange = getPartitionRange(reviseTimeRange._1, 
reviseTimeRange._2)
+//    val sql = selectSql(concreteTableName, partitionRange)
+//    val df = sqlContext.sql(sql)
+//
+//    // decode data
+//    df.flatMap { row =>
+//      val dt = schemaName.map { sn =>
+//        row.getAs[Any](sn)
+//      }
+//      decode(dt, true)
+//    }
+//  }
+//
+//  override def cleanOldData(): Unit = {
+//    val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+//    if (oldCacheLocked) {
+//      try {
+//        val cleanTime = readCleanTime()
+//        cleanTime match {
+//          case Some(ct) => {
+//            // drop partition
+//            val bound = getPartition(ct)
+//            val sql = dropPartitionSql(concreteTableName, bound)
+//            sqlContext.sql(sql)
+//          }
+//          case _ => {
+//            // do nothing
+//          }
+//        }
+//      } catch {
+//        case e: Throwable => error(s"clean old data error: ${e.getMessage}")
+//      } finally {
+//        oldCacheLock.unlock()
+//      }
+//    }
+//  }
+//
+//  override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): 
Unit = {
+//    // parallel process different time groups, lock is unnecessary
+//    val ptns = getPartition(t)
+//    val ptnsPath = genPartitionHdfsPath(ptns)
+//    val dirPath = s"${tablePath}/${ptnsPath}"
+//    val fileName = s"${t}"
+//    val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName)
+//
+//    try {
+//      // remove out time old data
+//      HdfsFileDumpUtil.remove(dirPath, fileName, true)
+//
+//      // save updated old data
+//      if (oldData.size > 0) {
+//        val recordDatas = oldData.flatMap { dt =>
+//          encode(dt, t)
+//        }
+//        val records: Iterable[String] = recordDatas.map { dt =>
+//          dt.map(_.toString).mkString(fieldSep)
+//        }
+//        val dumped = HdfsFileDumpUtil.dump(filePath, records, rowSepLiteral)
+//      }
+//    } catch {
+//      case e: Throwable => error(s"update old data error: ${e.getMessage}")
+//    }
+//  }
+//
+//  override protected def genCleanTime(ms: Long): Long = {
+//    val minPartition = partition.last
+//    val t1 = TimeUtil.timeToUnit(ms, minPartition._3)
+//    val t2 = TimeUtil.timeFromUnit(t1, minPartition._3)
+//    t2
+//  }
+//
+//  private def getPartition(ms: Long): List[(String, Any)] = {
+//    partition.map { p =>
+//      val (name, _, unit) = p
+//      val t = TimeUtil.timeToUnit(ms, unit)
+//      (name, t)
+//    }
+//  }
+//  private def getPartitionRange(ms1: Long, ms2: Long): List[(String, (Any, 
Any))] = {
+//    partition.map { p =>
+//      val (name, _, unit) = p
+//      val t1 = TimeUtil.timeToUnit(ms1, unit)
+//      val t2 = TimeUtil.timeToUnit(ms2, unit)
+//      (name, (t1, t2))
+//    }
+//  }
+//
+//  private def genPartitionHdfsPath(partition: List[(String, Any)]): String = 
{
+//    partition.map(prtn => s"${prtn._1}=${prtn._2}").mkString("/")
+//  }
+//  private def addPartitionSql(tbn: String, partition: List[(String, Any)]): 
String = {
+//    val partitionSql = partition.map(ptn => (s"`${ptn._1}` = 
${ptn._2}")).mkString(", ")
+//    val sql = s"""ALTER TABLE ${tbn} ADD IF NOT EXISTS PARTITION 
(${partitionSql})"""
+//    sql
+//  }
+//  private def selectSql(tbn: String, partitionRange: List[(String, (Any, 
Any))]): String = {
+//    val clause = partitionRange.map { pr =>
+//      val (name, (r1, r2)) = pr
+//      s"""`${name}` BETWEEN '${r1}' and '${r2}'"""
+//    }.mkString(" AND ")
+//    val whereClause = if (clause.nonEmpty) s"WHERE ${clause}" else ""
+//    val sql = s"""SELECT * FROM ${tbn} ${whereClause}"""
+//    sql
+//  }
+//  private def dropPartitionSql(tbn: String, partition: List[(String, Any)]): 
String = {
+//    val partitionSql = partition.map(ptn => (s"PARTITION ( `${ptn._1}` < 
'${ptn._2}' ) ")).mkString(", ")
+//    val sql = s"""ALTER TABLE ${tbn} DROP ${partitionSql}"""
+//    println(sql)
+//    sql
+//  }
+//
+//  private def tableExistsSql(): String = {
+//    s"tableName LIKE '${tableName}'"
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala
new file mode 100644
index 0000000..0daf2d9
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/cache/TextCacheDataConnector.scala
@@ -0,0 +1,311 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.data.connector.cache
+//
+//import java.util.concurrent.TimeUnit
+//
+//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, 
TimeInfoCache}
+//import org.apache.griffin.measure.config.params.user.DataCacheParam
+//import org.apache.griffin.measure.result.TimeStampInfo
+//import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, 
JsonUtil, TimeUtil}
+//import org.apache.spark.rdd.RDD
+//import org.apache.spark.sql.SQLContext
+//
+//import scala.util.Try
+//
+//case class TextCacheDataConnector(sqlContext: SQLContext, dataCacheParam: 
DataCacheParam
+//                                 ) extends CacheDataConnector {
+//
+//  val config = dataCacheParam.config
+//  val InfoPath = "info.path"
+//  val cacheInfoPath: String = config.getOrElse(InfoPath, 
defCacheInfoPath).toString
+//
+//  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
+//  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
+//
+//  val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) 
dataCacheParam.timeRange else Nil
+//  val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) 
match {
+//    case s :: e :: _ => {
+//      val ns = TimeUtil.milliseconds(s) match {
+//        case Some(n) if (n < 0) => n
+//        case _ => 0
+//      }
+//      val ne = TimeUtil.milliseconds(e) match {
+//        case Some(n) if (n < 0) => n
+//        case _ => 0
+//      }
+//      (ns, ne)
+//    }
+//    case _ => (0, 0)
+//  }
+//
+//  val FilePath = "file.path"
+//  val filePath: String = config.get(FilePath) match {
+//    case Some(s: String) => s
+//    case _ => throw new Exception("invalid file.path!")
+//  }
+//
+//  val ReadyTimeInterval = "ready.time.interval"
+//  val ReadyTimeDelay = "ready.time.delay"
+//  val readyTimeInterval: Long = 
TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, 
"1m").toString).getOrElse(60000L)
+//  val readyTimeDelay: Long = 
TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, 
"1m").toString).getOrElse(60000L)
+//
+////  val TimeStampColumn: String = TimeStampInfo.key
+////  val PayloadColumn: String = "payload"
+//
+//  // cache schema: Long, String
+////  val fields = List[StructField](
+////    StructField(TimeStampColumn, LongType),
+////    StructField(PayloadColumn, StringType)
+////  )
+////  val schema = StructType(fields)
+//
+//  //  case class CacheData(time: Long, payload: String) {
+//  //    def getTime(): Long = time
+//  //    def getPayload(): String = payload
+//  //  }
+//
+//  private val rowSepLiteral = "\n"
+//
+//  val partitionUnits: List[String] = List("hour", "min")
+//
+//  override def init(): Unit = {
+//    // do nothing
+//  }
+//
+//  def available(): Boolean = {
+//    true
+//  }
+//
+//  private def encode(data: Map[String, Any], ms: Long): Option[String] = {
+//    try {
+//      val map = data + (TimeStampInfo.key -> ms)
+//      Some(JsonUtil.toJson(map))
+//    } catch {
+//      case _: Throwable => None
+//    }
+//  }
+//
+//  private def decode(data: String): Option[Map[String, Any]] = {
+//    try {
+//      Some(JsonUtil.toAnyMap(data))
+//    } catch {
+//      case _: Throwable => None
+//    }
+//  }
+//
+//  def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = {
+//    val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+//    if (newCacheLocked) {
+//      try {
+//        val ptns = getPartition(ms)
+//        val ptnsPath = genPartitionHdfsPath(ptns)
+//        val dirPath = s"${filePath}/${ptnsPath}"
+//        val dataFileName = s"${ms}"
+//        val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+//
+//        // encode data
+//        val dataRdd: RDD[String] = rdd.flatMap(encode(_, ms))
+//
+//        // save data
+//        val dumped = if (!dataRdd.isEmpty) {
+//          HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
+//        } else false
+//
+//        // submit ms
+//        submitCacheTime(ms)
+//        submitReadyTime(ms)
+//      } catch {
+//        case e: Throwable => error(s"save data error: ${e.getMessage}")
+//      } finally {
+//        newCacheLock.unlock()
+//      }
+//    }
+//  }
+//
+//  def readData(): Try[RDD[Map[String, Any]]] = Try {
+//    val timeRange = TimeInfoCache.getTimeRange
+//    submitLastProcTime(timeRange._2)
+//
+//    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + 
deltaTimeRange._2)
+//    submitCleanTime(reviseTimeRange._1)
+//
+//    // read directly through partition info
+//    val partitionRanges = getPartitionRange(reviseTimeRange._1, 
reviseTimeRange._2)
+//    println(s"read time ranges: ${reviseTimeRange}")
+//    println(s"read partition ranges: ${partitionRanges}")
+//
+//    // list partition paths
+//    val partitionPaths = listPathsBetweenRanges(filePath :: Nil, 
partitionRanges)
+//
+//    if (partitionPaths.isEmpty) {
+//      sqlContext.sparkContext.emptyRDD[Map[String, Any]]
+//    } else {
+//      val filePaths = partitionPaths.mkString(",")
+//      val rdd = sqlContext.sparkContext.textFile(filePaths)
+//
+//      // decode data
+//      rdd.flatMap { row =>
+//        decode(row)
+//      }
+//    }
+//  }
+//
+//  override def cleanOldData(): Unit = {
+//    val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+//    if (oldCacheLocked) {
+//      try {
+//        val cleanTime = readCleanTime()
+//        cleanTime match {
+//          case Some(ct) => {
+//            // drop partitions
+//            val bounds = getPartition(ct)
+//
+//            // list partition paths
+//            val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, 
bounds)
+//
+//            // delete out time data path
+//            earlierPaths.foreach { path =>
+//              println(s"delete hdfs path: ${path}")
+//              HdfsUtil.deleteHdfsPath(path)
+//            }
+//          }
+//          case _ => {
+//            // do nothing
+//          }
+//        }
+//      } catch {
+//        case e: Throwable => error(s"clean old data error: ${e.getMessage}")
+//      } finally {
+//        oldCacheLock.unlock()
+//      }
+//    }
+//  }
+//
+//  override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): 
Unit = {
+//    // parallel process different time groups, lock is unnecessary
+//    val ptns = getPartition(t)
+//    val ptnsPath = genPartitionHdfsPath(ptns)
+//    val dirPath = s"${filePath}/${ptnsPath}"
+//    val dataFileName = s"${t}"
+//    val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+//
+//    try {
+//      // remove out time old data
+//      HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
+//
+//      // save updated old data
+//      if (oldData.size > 0) {
+//        val recordDatas = oldData.flatMap { dt =>
+//          encode(dt, t)
+//        }
+//        val dumped = HdfsFileDumpUtil.dump(dataFilePath, recordDatas, 
rowSepLiteral)
+//      }
+//    } catch {
+//      case e: Throwable => error(s"update old data error: ${e.getMessage}")
+//    }
+//  }
+//
+//  override protected def genCleanTime(ms: Long): Long = {
+//    val minPartitionUnit = partitionUnits.last
+//    val t1 = TimeUtil.timeToUnit(ms, minPartitionUnit)
+//    val t2 = TimeUtil.timeFromUnit(t1, minPartitionUnit)
+//    t2
+//  }
+//
+//  private def getPartition(ms: Long): List[Long] = {
+//    partitionUnits.map { unit =>
+//      TimeUtil.timeToUnit(ms, unit)
+//    }
+//  }
+//  private def getPartitionRange(ms1: Long, ms2: Long): List[(Long, Long)] = {
+//    partitionUnits.map { unit =>
+//      val t1 = TimeUtil.timeToUnit(ms1, unit)
+//      val t2 = TimeUtil.timeToUnit(ms2, unit)
+//      (t1, t2)
+//    }
+//  }
+//
+//  private def genPartitionHdfsPath(partition: List[Long]): String = {
+//    partition.map(prtn => s"${prtn}").mkString("/")
+//  }
+//
+//  private def str2Long(str: String): Option[Long] = {
+//    try {
+//      Some(str.toLong)
+//    } catch {
+//      case e: Throwable => None
+//    }
+//  }
+//
+//  // here the range means [min, max], but the best range should be (min, max]
+//  private def listPathsBetweenRanges(paths: List[String],
+//                                     partitionRanges: List[(Long, Long)]
+//                                    ): List[String] = {
+//    partitionRanges match {
+//      case Nil => paths
+//      case head :: tail => {
+//        val (lb, ub) = head
+//        val curPaths = paths.flatMap { path =>
+//          val names = HdfsUtil.listSubPaths(path, "dir").toList
+//          names.filter { name =>
+//            str2Long(name) match {
+//              case Some(t) => (t >= lb) && (t <= ub)
+//              case _ => false
+//            }
+//          }.map(HdfsUtil.getHdfsFilePath(path, _))
+//        }
+//        listPathsBetweenRanges(curPaths, tail)
+//      }
+//    }
+//  }
+//
+//  private def listPathsEarlierThanBounds(paths: List[String], bounds: 
List[Long]
+//                                        ): List[String] = {
+//    bounds match {
+//      case Nil => paths
+//      case head :: tail => {
+//        val earlierPaths = paths.flatMap { path =>
+//          val names = HdfsUtil.listSubPaths(path, "dir").toList
+//          names.filter { name =>
+//            str2Long(name) match {
+//              case Some(t) => (t < head)
+//              case _ => false
+//            }
+//          }.map(HdfsUtil.getHdfsFilePath(path, _))
+//        }
+//        val equalPaths = paths.flatMap { path =>
+//          val names = HdfsUtil.listSubPaths(path, "dir").toList
+//          names.filter { name =>
+//            str2Long(name) match {
+//              case Some(t) => (t == head)
+//              case _ => false
+//            }
+//          }.map(HdfsUtil.getHdfsFilePath(path, _))
+//        }
+//
+//        tail match {
+//          case Nil => earlierPaths
+//          case _ => earlierPaths ::: listPathsEarlierThanBounds(equalPaths, 
tail)
+//        }
+//      }
+//    }
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
new file mode 100644
index 0000000..41de217
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
@@ -0,0 +1,70 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector.streaming
+
+import kafka.serializer.Decoder
+import org.apache.spark.streaming.dstream.InputDStream
+
+import scala.util.{Failure, Success, Try}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+trait KafkaStreamingDataConnector extends StreamingDataConnector {
+
+  type KD <: Decoder[K]
+  type VD <: Decoder[V]
+
+  val config = dcParam.config
+
+  val KafkaConfig = "kafka.config"
+  val Topics = "topics"
+
+  val kafkaConfig = config.getAnyRef(KafkaConfig, Map[String, String]())
+  val topics = config.getString(Topics, "")
+
+  def available(): Boolean = {
+    true
+  }
+
+  def init(): Unit = {
+    val ds = stream match {
+      case Success(dstream) => dstream
+      case Failure(ex) => throw ex
+    }
+    ds.foreachRDD((rdd, time) => {
+      val ms = time.milliseconds
+
+      val dfOpt = transform(rdd)
+
+      val preDfOpt = preProcess(dfOpt, ms)
+
+      // save data frame
+      dataSourceCacheOpt.foreach(_.saveData(preDfOpt, ms))
+    })
+  }
+
+  def stream(): Try[InputDStream[(K, V)]] = Try {
+    val topicSet = topics.split(",").toSet
+    createDStream(topicSet)
+  }
+
+  protected def createDStream(topicSet: Set[String]): InputDStream[(K, V)]
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala
new file mode 100644
index 0000000..5e0413e
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector.streaming
+
+import kafka.serializer.StringDecoder
+import org.apache.griffin.measure.config.params.user.DataConnectorParam
+import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.kafka.KafkaUtils
+import org.apache.spark.sql.functions.lit
+
+case class KafkaStreamingStringDataConnector(sqlContext: SQLContext,
+                                             @transient ssc: StreamingContext,
+                                             dqEngines: DqEngines,
+                                             dcParam: DataConnectorParam
+                                            ) extends 
KafkaStreamingDataConnector {
+  type K = String
+  type KD = StringDecoder
+  type V = String
+  type VD = StringDecoder
+
+  val valueColName = "value"
+  val schema = StructType(Array(
+    StructField(valueColName, StringType)
+  ))
+
+  def createDStream(topicSet: Set[String]): InputDStream[(K, V)] = {
+    KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet)
+  }
+
+  def transform(rdd: RDD[(K, V)]): Option[DataFrame] = {
+    if (rdd.isEmpty) None else {
+      try {
+        val rowRdd = rdd.map(d => Row(d._2))
+        val df = sqlContext.createDataFrame(rowRdd, schema)
+        Some(df)
+      } catch {
+        case e: Throwable => {
+          error(s"streaming data transform fails")
+          None
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
new file mode 100644
index 0000000..cc21761
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
@@ -0,0 +1,43 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector.streaming
+
+import org.apache.griffin.measure.data.connector._
+import org.apache.griffin.measure.data.source.DataSourceCache
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.streaming.dstream.InputDStream
+
+import scala.util.Try
+
+
+trait StreamingDataConnector extends DataConnector {
+
+  type K
+  type V
+
+  protected def stream(): Try[InputDStream[(K, V)]]
+
+  def transform(rdd: RDD[(K, V)]): Option[DataFrame]
+
+  def data(ms: Long): Option[DataFrame] = None
+
+  var dataSourceCacheOpt: Option[DataSourceCache] = None
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala
new file mode 100644
index 0000000..3c9106a
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala
@@ -0,0 +1,76 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.source
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
+
+trait DataCacheable {
+
+  val cacheInfoPath: String
+  val readyTimeInterval: Long
+  val readyTimeDelay: Long
+
+  def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}"
+
+  def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath)
+  def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath)
+  def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath)
+  def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath)
+
+  protected def submitCacheTime(ms: Long): Unit = {
+    val map = Map[String, String]((selfCacheTime -> ms.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  protected def submitReadyTime(ms: Long): Unit = {
+    val curReadyTime = ms - readyTimeDelay
+    if (curReadyTime % readyTimeInterval == 0) {
+      val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
+      InfoCacheInstance.cacheInfo(map)
+    }
+  }
+
+  protected def submitLastProcTime(ms: Long): Unit = {
+    val map = Map[String, String]((selfLastProcTime -> ms.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  protected def submitCleanTime(ms: Long): Unit = {
+    val cleanTime = genCleanTime(ms)
+    val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
+    InfoCacheInstance.cacheInfo(map)
+  }
+
+  protected def genCleanTime(ms: Long): Long = ms
+
+  protected def readCleanTime(): Option[Long] = {
+    val key = selfCleanTime
+    val keys = key :: Nil
+    InfoCacheInstance.readInfo(keys).get(key).flatMap { v =>
+      try {
+        Some(v.toLong)
+      } catch {
+        case _ => None
+      }
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
new file mode 100644
index 0000000..0927754
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
@@ -0,0 +1,109 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.source
+
+import org.apache.griffin.measure.data.connector._
+import org.apache.griffin.measure.data.connector.batch._
+import org.apache.griffin.measure.data.connector.streaming._
+import org.apache.griffin.measure.log.Loggable
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+
+case class DataSource(sqlContext: SQLContext,
+                      name: String,
+                      dataConnectors: Seq[DataConnector],
+                      dataSourceCacheOpt: Option[DataSourceCache]
+                     ) extends Loggable with Serializable {
+
+  val batchDataConnectors = 
DataConnectorFactory.filterBatchDataConnectors(dataConnectors)
+  val streamingDataConnectors = 
DataConnectorFactory.filterStreamingDataConnectors(dataConnectors)
+  streamingDataConnectors.foreach(_.dataSourceCacheOpt = dataSourceCacheOpt)
+
+  def init(): Unit = {
+    dataSourceCacheOpt.foreach(_.init)
+    dataConnectors.foreach(_.init)
+  }
+
+  def loadData(ms: Long): Unit = {
+    data(ms) match {
+      case Some(df) => {
+        df.registerTempTable(name)
+      }
+      case None => {
+//        val df = sqlContext.emptyDataFrame
+//        df.registerTempTable(name)
+        warn(s"load data source [${name}] fails")
+//        throw new Exception(s"load data source [${name}] fails")
+      }
+    }
+  }
+
+  def dropTable(): Unit = {
+    try {
+      sqlContext.dropTempTable(name)
+    } catch {
+      case e: Throwable => warn(s"drop table [${name}] fails")
+    }
+  }
+
+  private def data(ms: Long): Option[DataFrame] = {
+    val batchDataFrameOpt = batchDataConnectors.flatMap { dc =>
+      dc.data(ms)
+    }.reduceOption((a, b) => unionDataFrames(a, b))
+
+    val cacheDataFrameOpt = dataSourceCacheOpt.flatMap(_.readData())
+
+    (batchDataFrameOpt, cacheDataFrameOpt) match {
+      case (Some(bdf), Some(cdf)) => Some(unionDataFrames(bdf, cdf))
+      case (Some(bdf), _) => Some(bdf)
+      case (_, Some(cdf)) => Some(cdf)
+      case _ => None
+    }
+  }
+
+  private def unionDataFrames(df1: DataFrame, df2: DataFrame): DataFrame = {
+    try {
+      val cols = df1.columns
+      val rdd2 = df2.map{ row =>
+        val values = cols.map { col =>
+          row.getAs[Any](col)
+        }
+        Row(values: _*)
+      }
+      val ndf2 = sqlContext.createDataFrame(rdd2, df1.schema)
+      df1 unionAll ndf2
+//      df1 unionAll df2
+    } catch {
+      case e: Throwable => df1
+    }
+  }
+
+  def updateData(df: DataFrame, ms: Long): Unit = {
+    dataSourceCacheOpt.foreach(_.updateData(df, ms))
+  }
+
+  def updateDataMap(dfMap: Map[Long, DataFrame]): Unit = {
+    dataSourceCacheOpt.foreach(_.updateDataMap(dfMap))
+  }
+
+  def cleanOldData(): Unit = {
+    dataSourceCacheOpt.foreach(_.cleanOldData)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
new file mode 100644
index 0000000..769550f
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala
@@ -0,0 +1,347 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.source
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
+import 
org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector
+import org.apache.griffin.measure.data.connector._
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, TimeUtil}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+import scala.util.{Failure, Success}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
+                           metricName: String, index: Int
+                          ) extends DataCacheable with Loggable with 
Serializable {
+
+  val name = ""
+
+  val _FilePath = "file.path"
+  val _InfoPath = "info.path"
+  val _ReadyTimeInterval = "ready.time.interval"
+  val _ReadyTimeDelay = "ready.time.delay"
+  val _TimeRange = "time.range"
+
+  val defFilePath = s"hdfs:///griffin/cache/${metricName}/${index}"
+  val defInfoPath = s"${index}"
+
+  val filePath: String = param.getString(_FilePath, defFilePath)
+  val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
+  val readyTimeInterval: Long = 
TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, 
"1m")).getOrElse(60000L)
+  val readyTimeDelay: Long = 
TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+  val deltaTimeRange: (Long, Long) = {
+    def negative(n: Long): Long = if (n <= 0) n else 0
+    param.get(_TimeRange) match {
+      case Some(seq: Seq[String]) => {
+        val nseq = seq.flatMap(TimeUtil.milliseconds(_))
+        val ns = negative(nseq.headOption.getOrElse(0))
+        val ne = negative(nseq.tail.headOption.getOrElse(0))
+        (ns, ne)
+      }
+      case _ => (0, 0)
+    }
+  }
+
+  val rowSepLiteral = "\n"
+  val partitionUnits: List[String] = List("hour", "min", "sec")
+
+  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
+  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
+
+  def init(): Unit = {
+    ;
+  }
+
+  def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
+    dfOpt match {
+      case Some(df) => {
+        val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+        if (newCacheLocked) {
+          try {
+            val ptns = getPartition(ms)
+            val ptnsPath = genPartitionHdfsPath(ptns)
+            val dirPath = s"${filePath}/${ptnsPath}"
+            val dataFileName = s"${ms}"
+            val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+
+            // transform data
+            val dataRdd: RDD[String] = df.toJSON
+
+            // save data
+            val dumped = if (!dataRdd.isEmpty) {
+              HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
+            } else false
+
+          } catch {
+            case e: Throwable => error(s"save data error: ${e.getMessage}")
+          } finally {
+            newCacheLock.unlock()
+          }
+        }
+      }
+      case _ => {
+        info(s"no data frame to save")
+      }
+    }
+
+    // submit cache time and ready time
+    submitCacheTime(ms)
+    submitReadyTime(ms)
+  }
+
+  def readData(): Option[DataFrame] = {
+    val timeRange = TimeInfoCache.getTimeRange
+    submitLastProcTime(timeRange._2)
+
+    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + 
deltaTimeRange._2)
+    submitCleanTime(reviseTimeRange._1)
+
+    // read directly through partition info
+    val partitionRanges = getPartitionRange(reviseTimeRange._1, 
reviseTimeRange._2)
+    println(s"read time ranges: ${reviseTimeRange}")
+    println(s"read partition ranges: ${partitionRanges}")
+
+    // list partition paths
+    val partitionPaths = listPathsBetweenRanges(filePath :: Nil, 
partitionRanges)
+
+    if (partitionPaths.isEmpty) {
+      None
+    } else {
+      try {
+        Some(sqlContext.read.json(partitionPaths: _*))
+      } catch {
+        case e: Throwable => {
+          warn(s"read data source cache warn: ${e.getMessage}")
+          None
+        }
+      }
+    }
+  }
+
+  // -- deprecated --
+  def updateData(df: DataFrame, ms: Long): Unit = {
+    val ptns = getPartition(ms)
+    val ptnsPath = genPartitionHdfsPath(ptns)
+    val dirPath = s"${filePath}/${ptnsPath}"
+    val dataFileName = s"${ms}"
+    val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+
+    try {
+      val records = df.toJSON
+      val arr = records.collect
+      val needSave = !arr.isEmpty
+
+      // remove out time old data
+      HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
+      println(s"remove file path: ${dirPath}/${dataFileName}")
+
+      // save updated data
+      val dumped = if (needSave) {
+        HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral)
+        println(s"update file path: ${dataFilePath}")
+      } else false
+    } catch {
+      case e: Throwable => error(s"update data error: ${e.getMessage}")
+    }
+  }
+
+  def updateData(rdd: RDD[String], ms: Long, cnt: Long): Unit = {
+    val ptns = getPartition(ms)
+    val ptnsPath = genPartitionHdfsPath(ptns)
+    val dirPath = s"${filePath}/${ptnsPath}"
+    val dataFileName = s"${ms}"
+    val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+
+    try {
+//      val needSave = !rdd.isEmpty
+
+      // remove out time old data
+      HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
+      println(s"remove file path: ${dirPath}/${dataFileName}")
+
+      // save updated data
+      val dumped = if (cnt > 0) {
+        HdfsFileDumpUtil.dump(dataFilePath, rdd, rowSepLiteral)
+        println(s"update file path: ${dataFilePath}")
+      } else false
+    } catch {
+      case e: Throwable => error(s"update data error: ${e.getMessage}")
+    } finally {
+      rdd.unpersist()
+    }
+  }
+
+  def updateData(rdd: Iterable[String], ms: Long): Unit = {
+    val ptns = getPartition(ms)
+    val ptnsPath = genPartitionHdfsPath(ptns)
+    val dirPath = s"${filePath}/${ptnsPath}"
+    val dataFileName = s"${ms}"
+    val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
+
+    try {
+      val needSave = !rdd.isEmpty
+
+      // remove out time old data
+      HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
+      println(s"remove file path: ${dirPath}/${dataFileName}")
+
+      // save updated data
+      val dumped = if (needSave) {
+        HdfsFileDumpUtil.dump(dataFilePath, rdd, rowSepLiteral)
+        println(s"update file path: ${dataFilePath}")
+      } else false
+    } catch {
+      case e: Throwable => error(s"update data error: ${e.getMessage}")
+    }
+  }
+
+  def updateDataMap(dfMap: Map[Long, DataFrame]): Unit = {
+    val dataMap = dfMap.map { pair =>
+      val (t, recs) = pair
+      val rdd = recs.toJSON
+//      rdd.cache
+      (t, rdd, rdd.count)
+    }
+
+    dataMap.foreach { pair =>
+      val (t, arr, cnt) = pair
+      updateData(arr, t, cnt)
+    }
+  }
+
+  def cleanOldData(): Unit = {
+    val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+    if (oldCacheLocked) {
+      try {
+        val cleanTime = readCleanTime()
+        cleanTime match {
+          case Some(ct) => {
+            // drop partitions
+            val bounds = getPartition(ct)
+
+            // list partition paths
+            val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, 
bounds)
+
+            // delete out time data path
+            earlierPaths.foreach { path =>
+              println(s"delete hdfs path: ${path}")
+              HdfsUtil.deleteHdfsPath(path)
+            }
+          }
+          case _ => {
+            // do nothing
+          }
+        }
+      } catch {
+        case e: Throwable => error(s"clean old data error: ${e.getMessage}")
+      } finally {
+        oldCacheLock.unlock()
+      }
+    }
+  }
+
+  override protected def genCleanTime(ms: Long): Long = {
+    val minPartitionUnit = partitionUnits.last
+    val t1 = TimeUtil.timeToUnit(ms, minPartitionUnit)
+    val t2 = TimeUtil.timeFromUnit(t1, minPartitionUnit)
+    t2
+  }
+
+  private def getPartition(ms: Long): List[Long] = {
+    partitionUnits.map { unit =>
+      TimeUtil.timeToUnit(ms, unit)
+    }
+  }
+  private def getPartitionRange(ms1: Long, ms2: Long): List[(Long, Long)] = {
+    partitionUnits.map { unit =>
+      val t1 = TimeUtil.timeToUnit(ms1, unit)
+      val t2 = TimeUtil.timeToUnit(ms2, unit)
+      (t1, t2)
+    }
+  }
+  private def genPartitionHdfsPath(partition: List[Long]): String = {
+    partition.map(prtn => s"${prtn}").mkString("/")
+  }
+  private def str2Long(str: String): Option[Long] = {
+    try {
+      Some(str.toLong)
+    } catch {
+      case e: Throwable => None
+    }
+  }
+
+
+  // here the range means [min, max], but the best range should be (min, max]
+  private def listPathsBetweenRanges(paths: List[String],
+                                     partitionRanges: List[(Long, Long)]
+                                    ): List[String] = {
+    partitionRanges match {
+      case Nil => paths
+      case head :: tail => {
+        val (lb, ub) = head
+        val curPaths = paths.flatMap { path =>
+          val names = HdfsUtil.listSubPathsByType(path, "dir").toList
+          names.filter { name =>
+            str2Long(name) match {
+              case Some(t) => (t >= lb) && (t <= ub)
+              case _ => false
+            }
+          }.map(HdfsUtil.getHdfsFilePath(path, _))
+        }
+        listPathsBetweenRanges(curPaths, tail)
+      }
+    }
+  }
+  private def listPathsEarlierThanBounds(paths: List[String], bounds: 
List[Long]
+                                        ): List[String] = {
+    bounds match {
+      case Nil => paths
+      case head :: tail => {
+        val earlierPaths = paths.flatMap { path =>
+          val names = HdfsUtil.listSubPathsByType(path, "dir").toList
+          names.filter { name =>
+            str2Long(name) match {
+              case Some(t) => (t < head)
+              case _ => false
+            }
+          }.map(HdfsUtil.getHdfsFilePath(path, _))
+        }
+        val equalPaths = paths.flatMap { path =>
+          val names = HdfsUtil.listSubPathsByType(path, "dir").toList
+          names.filter { name =>
+            str2Long(name) match {
+              case Some(t) => (t == head)
+              case _ => false
+            }
+          }.map(HdfsUtil.getHdfsFilePath(path, _))
+        }
+
+        tail match {
+          case Nil => earlierPaths
+          case _ => earlierPaths ::: listPathsEarlierThanBounds(equalPaths, 
tail)
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
new file mode 100644
index 0000000..6c1b76e
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
@@ -0,0 +1,80 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.source
+
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.data.connector.batch.BatchDataConnector
+import 
org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector
+import org.apache.griffin.measure.data.connector.{DataConnector, 
DataConnectorFactory}
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.process.engine.{DqEngine, DqEngines}
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.streaming.StreamingContext
+
+import scala.util.{Success, Try}
+
+object DataSourceFactory extends Loggable {
+
+  val HiveRegex = """^(?i)hive$""".r
+  val TextRegex = """^(?i)text$""".r
+  val AvroRegex = """^(?i)avro$""".r
+
+  def genDataSources(sqlContext: SQLContext, ssc: StreamingContext, dqEngines: 
DqEngines,
+                     dataSourceParams: Seq[DataSourceParam], metricName: 
String): Seq[DataSource] = {
+    dataSourceParams.zipWithIndex.flatMap { pair =>
+      val (param, index) = pair
+      genDataSource(sqlContext, ssc, dqEngines, param, metricName, index)
+    }
+  }
+
+  private def genDataSource(sqlContext: SQLContext, ssc: StreamingContext,
+                            dqEngines: DqEngines,
+                            dataSourceParam: DataSourceParam,
+                            metricName: String, index: Int
+                           ): Option[DataSource] = {
+    val name = dataSourceParam.name
+    val connectorParams = dataSourceParam.connectors
+    val cacheParam = dataSourceParam.cache
+    val dataConnectors = connectorParams.flatMap { connectorParam =>
+      DataConnectorFactory.getDataConnector(sqlContext, ssc, dqEngines, 
connectorParam) match {
+        case Success(connector) => Some(connector)
+        case _ => None
+      }
+    }
+    val dataSourceCacheOpt = genDataSourceCache(sqlContext, cacheParam, 
metricName, index)
+
+    Some(DataSource(sqlContext, name, dataConnectors, dataSourceCacheOpt))
+  }
+
+  private def genDataSourceCache(sqlContext: SQLContext, param: Map[String, 
Any],
+                                 metricName: String, index: Int
+                                ) = {
+    if (param != null) {
+      try {
+        Some(DataSourceCache(sqlContext, param, metricName, index))
+      } catch {
+        case e: Throwable => {
+          error(s"generate data source cache fails")
+          None
+        }
+      }
+    } else None
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
index 97786c4..431fe10 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
@@ -21,10 +21,12 @@ package org.apache.griffin.measure.persist
 import java.util.Date
 
 import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
 
 import scala.util.Try
+import org.apache.griffin.measure.utils.ParamUtil._
 
 // persist result and data to hdfs
 case class HdfsPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
@@ -34,17 +36,17 @@ case class HdfsPersist(config: Map[String, Any], 
metricName: String, timeStamp:
   val MaxLinesPerFile = "max.lines.per.file"
 
   val path = config.getOrElse(Path, "").toString
-  val maxPersistLines = try { config.getOrElse(MaxPersistLines, 
-1).toString.toInt } catch { case _ => -1 }
-  val maxLinesPerFile = try { config.getOrElse(MaxLinesPerFile, 
10000).toString.toLong } catch { case _ => 10000 }
+  val maxPersistLines = config.getInt(MaxPersistLines, -1)
+  val maxLinesPerFile = config.getLong(MaxLinesPerFile, 10000)
 
   val separator = "/"
 
   val StartFile = filePath("_START")
   val FinishFile = filePath("_FINISH")
-  val ResultFile = filePath("_RESULT")
+  val MetricsFile = filePath("_METRICS")
 
-  val MissRecFile = filePath("_MISSREC")      // optional
-  val MatchRecFile = filePath("_MATCHREC")    // optional
+//  val MissRecFile = filePath("_MISSREC")      // optional
+//  val MatchRecFile = filePath("_MATCHREC")    // optional
 
   val LogFile = filePath("_LOG")
 
@@ -56,7 +58,7 @@ case class HdfsPersist(config: Map[String, Any], metricName: 
String, timeStamp:
   }
 
   def available(): Boolean = {
-    (path.nonEmpty) && (maxPersistLines < Int.MaxValue)
+    path.nonEmpty
   }
 
   private def persistHead: String = {
@@ -92,57 +94,141 @@ case class HdfsPersist(config: Map[String, Any], 
metricName: String, timeStamp:
     }
   }
 
-  def result(rt: Long, result: Result): Unit = {
-    try {
-      val resStr = result match {
-        case ar: AccuracyResult => {
-          s"match percentage: ${ar.matchPercentage}\ntotal count: 
${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
-        }
-        case pr: ProfileResult => {
-          s"match percentage: ${pr.matchPercentage}\ntotal count: 
${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}"
-        }
-        case _ => {
-          s"result: ${result}"
-        }
-      }
-      HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr)
-      log(rt, resStr)
+//  def result(rt: Long, result: Result): Unit = {
+//    try {
+//      val resStr = result match {
+//        case ar: AccuracyResult => {
+//          s"match percentage: ${ar.matchPercentage}\ntotal count: 
${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
+//        }
+//        case pr: ProfileResult => {
+//          s"match percentage: ${pr.matchPercentage}\ntotal count: 
${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}"
+//        }
+//        case _ => {
+//          s"result: ${result}"
+//        }
+//      }
+//      HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr)
+//      log(rt, resStr)
+//
+//      info(resStr)
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
+//  }
 
-      info(resStr)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
+  // need to avoid string too long
+//  private def rddRecords(records: RDD[String], path: String): Unit = {
+//    try {
+//      val recordCount = records.count
+//      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
+//      if (count > 0) {
+//        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
+//        if (groupCount <= 1) {
+//          val recs = records.take(count.toInt)
+//          persistRecords(path, recs)
+//        } else {
+//          val groupedRecords: RDD[(Long, Iterable[String])] =
+//            records.zipWithIndex.flatMap { r =>
+//              val gid = r._2 / maxLinesPerFile
+//              if (gid < groupCount) Some((gid, r._1)) else None
+//            }.groupByKey()
+//          groupedRecords.foreach { group =>
+//            val (gid, recs) = group
+//            val hdfsPath = if (gid == 0) path else withSuffix(path, 
gid.toString)
+//            persistRecords(hdfsPath, recs)
+//          }
+//        }
+//      }
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
+//  }
+//
+//  private def iterableRecords(records: Iterable[String], path: String): Unit 
= {
+//    try {
+//      val recordCount = records.size
+//      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
+//      if (count > 0) {
+//        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
+//        if (groupCount <= 1) {
+//          val recs = records.take(count.toInt)
+//          persistRecords(path, recs)
+//        } else {
+//          val groupedRecords = records.grouped(groupCount).zipWithIndex
+//          groupedRecords.take(groupCount).foreach { group =>
+//            val (recs, gid) = group
+//            val hdfsPath = if (gid == 0) path else withSuffix(path, 
gid.toString)
+//            persistRecords(hdfsPath, recs)
+//          }
+//        }
+//      }
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
+//  }
+//
+//  def records(recs: RDD[String], tp: String): Unit = {
+//    tp match {
+//      case PersistDataType.MISS => rddRecords(recs, MissRecFile)
+//      case PersistDataType.MATCH => rddRecords(recs, MatchRecFile)
+//      case _ => {}
+//    }
+//  }
+//
+//  def records(recs: Iterable[String], tp: String): Unit = {
+//    tp match {
+//      case PersistDataType.MISS => iterableRecords(recs, MissRecFile)
+//      case PersistDataType.MATCH => iterableRecords(recs, MatchRecFile)
+//      case _ => {}
+//    }
+//  }
+
+  private def persistRecords(hdfsPath: String, records: Iterable[String]): 
Unit = {
+    val recStr = records.mkString("\n")
+    HdfsUtil.writeContent(hdfsPath, recStr)
   }
 
-  // need to avoid string too long
-  private def rddRecords(records: RDD[String], path: String): Unit = {
+  def log(rt: Long, msg: String): Unit = {
     try {
-      val recordCount = records.count
-      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
-      if (count > 0) {
-        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
-        if (groupCount <= 1) {
-          val recs = records.take(count.toInt)
-          persistRecords(path, recs)
-        } else {
-          val groupedRecords: RDD[(Long, Iterable[String])] =
-            records.zipWithIndex.flatMap { r =>
-              val gid = r._2 / maxLinesPerFile
-              if (gid < groupCount) Some((gid, r._1)) else None
-            }.groupByKey()
-          groupedRecords.foreach { group =>
-            val (gid, recs) = group
-            val hdfsPath = if (gid == 0) path else withSuffix(path, 
gid.toString)
-            persistRecords(hdfsPath, recs)
-          }
-        }
-      }
+      val logStr = (if (isInit) persistHead else "") + timeHead(rt) + 
s"${msg}\n\n"
+      HdfsUtil.appendContent(LogFile, logStr)
     } catch {
       case e: Throwable => error(e.getMessage)
     }
   }
 
-  private def iterableRecords(records: Iterable[String], path: String): Unit = 
{
+
+//  def persistRecords(df: DataFrame, name: String): Unit = {
+//    val records = df.toJSON
+//    val path = filePath(name)
+//    try {
+//      val recordCount = records.count
+//      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
+//      if (count > 0) {
+//        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
+//        if (groupCount <= 1) {
+//          val recs = records.take(count.toInt)
+//          persistRecords(path, recs)
+//        } else {
+//          val groupedRecords: RDD[(Long, Iterable[String])] =
+//            records.zipWithIndex.flatMap { r =>
+//              val gid = r._2 / maxLinesPerFile
+//              if (gid < groupCount) Some((gid, r._1)) else None
+//            }.groupByKey()
+//          groupedRecords.foreach { group =>
+//            val (gid, recs) = group
+//            val hdfsPath = if (gid == 0) path else withSuffix(path, 
gid.toString)
+//            persistRecords(hdfsPath, recs)
+//          }
+//        }
+//      }
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
+//  }
+
+  def persistRecords(records: Iterable[String], name: String): Unit = {
+    val path = filePath(name)
     try {
       val recordCount = records.size
       val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
@@ -165,39 +251,35 @@ case class HdfsPersist(config: Map[String, Any], 
metricName: String, timeStamp:
     }
   }
 
-  def records(recs: RDD[String], tp: String): Unit = {
-    tp match {
-      case PersistType.MISS => rddRecords(recs, MissRecFile)
-      case PersistType.MATCH => rddRecords(recs, MatchRecFile)
-      case _ => {}
-    }
-  }
-
-  def records(recs: Iterable[String], tp: String): Unit = {
-    tp match {
-      case PersistType.MISS => iterableRecords(recs, MissRecFile)
-      case PersistType.MATCH => iterableRecords(recs, MatchRecFile)
-      case _ => {}
-    }
-  }
-
-//  def missRecords(records: RDD[String]): Unit = {
-//    rddRecords(records, MissRecFile)
+//  def persistMetrics(metrics: Seq[String], name: String): Unit = {
+//    val path = filePath(name)
+//    try {
+//      val recordCount = metrics.size
+//      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
+//      if (count > 0) {
+//        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
+//        if (groupCount <= 1) {
+//          val recs = metrics.take(count.toInt)
+//          persistRecords(path, recs)
+//        } else {
+//          val groupedRecords = metrics.grouped(groupCount).zipWithIndex
+//          groupedRecords.take(groupCount).foreach { group =>
+//            val (recs, gid) = group
+//            val hdfsPath = if (gid == 0) path else withSuffix(path, 
gid.toString)
+//            persistRecords(hdfsPath, recs)
+//          }
+//        }
+//      }
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
 //  }
-//
-//  def matchRecords(records: RDD[String]): Unit = {
-//    rddRecords(records, MatchRecFile)
-//  }
-
-  private def persistRecords(hdfsPath: String, records: Iterable[String]): 
Unit = {
-    val recStr = records.mkString("\n")
-    HdfsUtil.writeContent(hdfsPath, recStr)
-  }
 
-  def log(rt: Long, msg: String): Unit = {
+  def persistMetrics(metrics: Map[String, Any]): Unit = {
+    val json = JsonUtil.toJson(metrics)
     try {
-      val logStr = (if (isInit) persistHead else "") + timeHead(rt) + 
s"${msg}\n\n"
-      HdfsUtil.appendContent(LogFile, logStr)
+      info(s"${json}")
+      persistRecords(MetricsFile, json :: Nil)
     } catch {
       case e: Throwable => error(e.getMessage)
     }


Reply via email to