http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
deleted file mode 100644
index 5eae2a0..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.source
-
-import org.apache.griffin.measure.cache.tmst._
-import org.apache.griffin.measure.data.connector._
-import org.apache.griffin.measure.data.connector.batch._
-import org.apache.griffin.measure.data.connector.streaming._
-import org.apache.griffin.measure.data.source.cache._
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters, TimeRange}
-import org.apache.griffin.measure.rule.plan.TimeInfo
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-import org.apache.griffin.measure.utils.DataFrameUtil._
-
-case class DataSource(sqlContext: SQLContext,
-                      name: String,
-                      baseline: Boolean,
-                      dataConnectors: Seq[DataConnector],
-                      dataSourceCacheOpt: Option[DataSourceCache]
-                     ) extends Loggable with Serializable {
-
-  val batchDataConnectors = 
DataConnectorFactory.filterBatchDataConnectors(dataConnectors)
-  val streamingDataConnectors = 
DataConnectorFactory.filterStreamingDataConnectors(dataConnectors)
-  streamingDataConnectors.foreach(_.dataSourceCacheOpt = dataSourceCacheOpt)
-
-  val tmstCache: TmstCache = TmstCache()
-
-  def init(): Unit = {
-    dataSourceCacheOpt.foreach(_.init)
-    dataConnectors.foreach(_.init)
-
-    dataSourceCacheOpt.foreach(_.tmstCache = tmstCache)
-    dataConnectors.foreach(_.tmstCache = tmstCache)
-  }
-
-  def loadData(timeInfo: TimeInfo): TimeRange = {
-    val calcTime = timeInfo.calcTime
-    println(s"load data [${name}]")
-    val (dfOpt, tmsts) = data(calcTime)
-    dfOpt match {
-      case Some(df) => {
-//        DataFrameCaches.cacheDataFrame(timeInfo.key, name, df)
-        TableRegisters.registerRunTempTable(df, timeInfo.key, name)
-      }
-      case None => {
-        warn(s"load data source [${name}] fails")
-      }
-    }
-    tmsts
-  }
-
-  private def data(ms: Long): (Option[DataFrame], TimeRange) = {
-    val batches = batchDataConnectors.flatMap { dc =>
-      val (dfOpt, timeRange) = dc.data(ms)
-      dfOpt match {
-        case Some(df) => Some((dfOpt, timeRange))
-        case _ => None
-      }
-    }
-    val caches = dataSourceCacheOpt match {
-      case Some(dsc) => dsc.readData() :: Nil
-      case _ => Nil
-    }
-    val pairs = batches ++ caches
-
-    if (pairs.nonEmpty) {
-      pairs.reduce { (a, b) =>
-        (unionDfOpts(a._1, b._1), a._2.merge(b._2))
-      }
-    } else {
-      (None, TimeRange.emptyTimeRange)
-    }
-  }
-
-//  private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: 
Option[DataFrame]
-//                         ): Option[DataFrame] = {
-//    (dfOpt1, dfOpt2) match {
-//      case (Some(df1), Some(df2)) => Some(unionDataFrames(df1, df2))
-//      case (Some(df1), _) => dfOpt1
-//      case (_, Some(df2)) => dfOpt2
-//      case _ => None
-//    }
-//  }
-//
-//  private def unionDataFrames(df1: DataFrame, df2: DataFrame): DataFrame = {
-//    try {
-//      val cols = df1.columns
-//      val rdd2 = df2.map{ row =>
-//        val values = cols.map { col =>
-//          row.getAs[Any](col)
-//        }
-//        Row(values: _*)
-//      }
-//      val ndf2 = sqlContext.createDataFrame(rdd2, df1.schema)
-//      df1 unionAll ndf2
-//    } catch {
-//      case e: Throwable => df1
-//    }
-//  }
-
-  def updateData(df: DataFrame): Unit = {
-    dataSourceCacheOpt.foreach(_.updateData(Some(df)))
-  }
-
-  def updateData(df: DataFrame, ms: Long): Unit = {
-//    dataSourceCacheOpt.foreach(_.updateData(df, ms))
-  }
-
-  def updateDataMap(dfMap: Map[Long, DataFrame]): Unit = {
-//    dataSourceCacheOpt.foreach(_.updateDataMap(dfMap))
-  }
-
-  def cleanOldData(): Unit = {
-//    dataSourceCacheOpt.foreach(_.cleanOldData)
-    dataSourceCacheOpt.foreach(_.cleanOutTimeData)
-  }
-
-  def processFinish(): Unit = {
-    dataSourceCacheOpt.foreach(_.processFinish)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
deleted file mode 100644
index 831e990..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.source
-
-import org.apache.griffin.measure.config.params.user._
-import org.apache.griffin.measure.data.connector.DataConnectorFactory
-import org.apache.griffin.measure.data.source.cache._
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.process.engine._
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.streaming.StreamingContext
-
-import scala.util.{Success, Try}
-
-object DataSourceFactory extends Loggable {
-
-  def genDataSources(sqlContext: SQLContext, ssc: StreamingContext, dqEngines: 
DqEngines,
-                     dataSourceParams: Seq[DataSourceParam]) = {
-    val filteredDsParams = trimDataSourceParams(dataSourceParams)
-    filteredDsParams.zipWithIndex.flatMap { pair =>
-      val (param, index) = pair
-      genDataSource(sqlContext, ssc, dqEngines, param, index)
-    }
-  }
-
-  private def genDataSource(sqlContext: SQLContext, ssc: StreamingContext,
-                            dqEngines: DqEngines,
-                            dataSourceParam: DataSourceParam,
-                            index: Int
-                           ): Option[DataSource] = {
-    val name = dataSourceParam.name
-    val baseline = dataSourceParam.isBaseLine
-    val connectorParams = dataSourceParam.getConnectors
-    val cacheParam = dataSourceParam.cache
-    val dataConnectors = connectorParams.flatMap { connectorParam =>
-      DataConnectorFactory.getDataConnector(sqlContext, ssc, dqEngines, 
connectorParam) match {
-        case Success(connector) => Some(connector)
-        case _ => None
-      }
-    }
-    val dataSourceCacheOpt = 
DataSourceCacheFactory.genDataSourceCache(sqlContext, cacheParam, name, index)
-
-    Some(DataSource(sqlContext, name, baseline, dataConnectors, 
dataSourceCacheOpt))
-  }
-
-  private def trimDataSourceParams(dataSourceParams: Seq[DataSourceParam]): 
Seq[DataSourceParam] = {
-    val (validDsParams, _) =
-      dataSourceParams.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { 
(ret, dsParam) =>
-        val (seq, names) = ret
-        if (dsParam.hasName && !names.contains(dsParam.name)) {
-          (seq :+ dsParam, names + dsParam.name)
-        } else ret
-      }
-    if (validDsParams.nonEmpty) {
-      val baselineDsParam = 
validDsParams.find(_.isBaseLine).getOrElse(validDsParams.head)
-      validDsParams.map { dsParam =>
-        if (dsParam.name != baselineDsParam.name && dsParam.isBaseLine) {
-          dsParam.falseBaselineClone
-        } else dsParam
-      }
-    } else validDsParams
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala
deleted file mode 100644
index 36c556b..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.source.cache
-
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-
-trait DataCacheable {
-
-  val cacheInfoPath: String
-  val readyTimeInterval: Long
-  val readyTimeDelay: Long
-
-  def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}"
-
-  def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath)
-  def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath)
-  def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath)
-  def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath)
-  def selfOldCacheIndex = TimeInfoCache.oldCacheIndex(selfCacheInfoPath)
-
-  protected def submitCacheTime(ms: Long): Unit = {
-    val map = Map[String, String]((selfCacheTime -> ms.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  protected def submitReadyTime(ms: Long): Unit = {
-    val curReadyTime = ms - readyTimeDelay
-    if (curReadyTime % readyTimeInterval == 0) {
-      val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
-      InfoCacheInstance.cacheInfo(map)
-    }
-  }
-
-  protected def submitLastProcTime(ms: Long): Unit = {
-    val map = Map[String, String]((selfLastProcTime -> ms.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  protected def readLastProcTime(): Option[Long] = 
readSelfInfo(selfLastProcTime)
-
-  protected def submitCleanTime(ms: Long): Unit = {
-    val cleanTime = genCleanTime(ms)
-    val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  protected def genCleanTime(ms: Long): Long = ms
-
-  protected def readCleanTime(): Option[Long] = readSelfInfo(selfCleanTime)
-
-  protected def submitOldCacheIndex(index: Long): Unit = {
-    val map = Map[String, String]((selfOldCacheIndex -> index.toString))
-    InfoCacheInstance.cacheInfo(map)
-  }
-
-  protected def readOldCacheIndex(): Option[Long] = 
readSelfInfo(selfOldCacheIndex)
-
-  private def readSelfInfo(key: String): Option[Long] = {
-    InfoCacheInstance.readInfo(key :: Nil).get(key).flatMap { v =>
-      try {
-        Some(v.toLong)
-      } catch {
-        case _ => None
-      }
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
deleted file mode 100644
index f70bd11..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.source.cache
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.cache.tmst.TmstCache
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.process.temp.TimeRange
-import org.apache.griffin.measure.rule.adaptor.InternalColumns
-import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.sql._
-import org.apache.spark.sql.functions.col
-import org.apache.griffin.measure.utils.DataFrameUtil._
-
-import scala.util.Random
-
-// data source cache process steps
-// dump phase: save
-// process phase: read -> process -> update -> finish -> clean old data
-trait DataSourceCache extends DataCacheable with WithFanIn[Long] with Loggable 
with Serializable {
-
-  val sqlContext: SQLContext
-  val param: Map[String, Any]
-  val dsName: String
-  val index: Int
-
-  var tmstCache: TmstCache = _
-  protected def fromUntilRangeTmsts(from: Long, until: Long) = 
tmstCache.range(from, until)
-  protected def clearTmst(t: Long) = tmstCache.remove(t)
-  protected def clearTmstsUntil(until: Long) = {
-    val outDateTmsts = tmstCache.until(until)
-    tmstCache.remove(outDateTmsts)
-  }
-  protected def afterTilRangeTmsts(after: Long, til: Long) = 
fromUntilRangeTmsts(after + 1, til + 1)
-  protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1)
-
-  val _FilePath = "file.path"
-  val _InfoPath = "info.path"
-  val _ReadyTimeInterval = "ready.time.interval"
-  val _ReadyTimeDelay = "ready.time.delay"
-  val _TimeRange = "time.range"
-
-  val rdmStr = Random.alphanumeric.take(10).mkString
-  val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}"
-  val defInfoPath = s"${index}"
-
-  val filePath: String = param.getString(_FilePath, defFilePath)
-  val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
-  val readyTimeInterval: Long = 
TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, 
"1m")).getOrElse(60000L)
-  val readyTimeDelay: Long = 
TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
-  val deltaTimeRange: (Long, Long) = {
-    def negative(n: Long): Long = if (n <= 0) n else 0
-    param.get(_TimeRange) match {
-      case Some(seq: Seq[String]) => {
-        val nseq = seq.flatMap(TimeUtil.milliseconds(_))
-        val ns = negative(nseq.headOption.getOrElse(0))
-        val ne = negative(nseq.tail.headOption.getOrElse(0))
-        (ns, ne)
-      }
-      case _ => (0, 0)
-    }
-  }
-
-  val _ReadOnly = "read.only"
-  val readOnly = param.getBoolean(_ReadOnly, false)
-
-  val _Updatable = "updatable"
-  val updatable = param.getBoolean(_Updatable, false)
-
-//  val rowSepLiteral = "\n"
-//  val partitionUnits: List[String] = List("hour", "min", "sec")
-//  val minUnitTime: Long = TimeUtil.timeFromUnit(1, partitionUnits.last)
-
-  val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
-  val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-
-  val newFilePath = s"${filePath}/new"
-  val oldFilePath = s"${filePath}/old"
-
-  val defOldCacheIndex = 0L
-
-  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
-  protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
-
-  def init(): Unit = {}
-
-  // save new cache data only, need index for multiple streaming data 
connectors
-  def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
-    if (!readOnly) {
-      dfOpt match {
-        case Some(df) => {
-          df.cache
-
-          // cache df
-          val cnt = df.count
-          println(s"save ${dsName} data count: ${cnt}")
-
-          // lock makes it safer when writing new cache data
-          val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-          if (newCacheLocked) {
-            try {
-              val dfw = 
df.write.mode(SaveMode.Append).partitionBy(InternalColumns.tmst)
-              writeDataFrame(dfw, newFilePath)
-            } catch {
-              case e: Throwable => error(s"save data error: ${e.getMessage}")
-            } finally {
-              newCacheLock.unlock()
-            }
-          }
-
-          // uncache
-          df.unpersist
-        }
-        case _ => {
-          info(s"no data frame to save")
-        }
-      }
-
-      // submit cache time and ready time
-      if (fanIncrement(ms)) {
-        println(s"save data [${ms}] finish")
-        submitCacheTime(ms)
-        submitReadyTime(ms)
-      }
-
-    }
-  }
-
-  // read new cache data and old cache data
-  def readData(): (Option[DataFrame], TimeRange) = {
-    // time range: (a, b]
-    val timeRange = TimeInfoCache.getTimeRange
-    val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + 
deltaTimeRange._2)
-
-    // read partition info
-    val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) {
-      println(s"read time range: [${reviseTimeRange._1}]")
-      s"`${InternalColumns.tmst}` = ${reviseTimeRange._1}"
-    } else {
-      println(s"read time range: (${reviseTimeRange._1}, 
${reviseTimeRange._2}]")
-      s"`${InternalColumns.tmst}` > ${reviseTimeRange._1} AND 
`${InternalColumns.tmst}` <= ${reviseTimeRange._2}"
-    }
-
-    // new cache data
-    val newDfOpt = try {
-      val dfr = sqlContext.read
-      Some(readDataFrame(dfr, newFilePath).filter(filterStr))
-    } catch {
-      case e: Throwable => {
-        warn(s"read data source cache warn: ${e.getMessage}")
-        None
-      }
-    }
-
-    // old cache data
-    val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None
-    val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
-      val oldDfPath = s"${oldFilePath}/${idx}"
-      try {
-        val dfr = sqlContext.read
-        Some(readDataFrame(dfr, oldDfPath).filter(filterStr))
-      } catch {
-        case e: Throwable => {
-          warn(s"read old data source cache warn: ${e.getMessage}")
-          None
-        }
-      }
-    }
-
-    // whole cache data
-    val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt)
-
-    // from until tmst range
-    val (from, until) = (reviseTimeRange._1, reviseTimeRange._2)
-    val tmstSet = afterTilRangeTmsts(from, until)
-
-    val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
-    (cacheDfOpt, retTimeRange)
-  }
-
-//  private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: 
Option[DataFrame]
-//                         ): Option[DataFrame] = {
-//    (dfOpt1, dfOpt2) match {
-//      case (Some(df1), Some(df2)) => Some(unionByName(df1, df2))
-//      case (Some(df1), _) => dfOpt1
-//      case (_, Some(df2)) => dfOpt2
-//      case _ => None
-//    }
-//  }
-//
-//  private def unionByName(a: DataFrame, b: DataFrame): DataFrame = {
-//    val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq
-//    a.select(columns: _*).unionAll(b.select(columns: _*))
-//  }
-
-  private def cleanOutTimePartitions(path: String, outTime: Long, 
partitionOpt: Option[String],
-                                     func: (Long, Long) => Boolean
-                                    ): Unit = {
-    val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, 
partitionOpt, func)
-    // delete out time data path
-    earlierOrEqPaths.foreach { path =>
-      println(s"delete hdfs path: ${path}")
-      HdfsUtil.deleteHdfsPath(path)
-    }
-  }
-  private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: 
Option[String],
-                                        func: (Long, Long) => Boolean
-                                       ): Iterable[String] = {
-    val names = HdfsUtil.listSubPathsByType(path, "dir")
-    val regex = partitionOpt match {
-      case Some(partition) => s"^${partition}=(\\d+)$$".r
-      case _ => "^(\\d+)$".r
-    }
-    names.filter { name =>
-      name match {
-        case regex(value) => {
-          str2Long(value) match {
-            case Some(t) => func(t, bound)
-            case _ => false
-          }
-        }
-        case _ => false
-      }
-    }.map(name => s"${path}/${name}")
-  }
-  private def str2Long(str: String): Option[Long] = {
-    try {
-      Some(str.toLong)
-    } catch {
-      case e: Throwable => None
-    }
-  }
-
-  // clean out time from new cache data and old cache data
-  def cleanOutTimeData(): Unit = {
-    // clean tmst
-    val cleanTime = readCleanTime
-    cleanTime.foreach(clearTmstsTil(_))
-
-    if (!readOnly) {
-      // new cache data
-      val newCacheCleanTime = if (updatable) readLastProcTime else 
readCleanTime
-      newCacheCleanTime match {
-        case Some(nct) => {
-          // clean calculated new cache data
-          val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
-          if (newCacheLocked) {
-            try {
-              cleanOutTimePartitions(newFilePath, nct, 
Some(InternalColumns.tmst),
-                (a: Long, b: Long) => (a <= b))
-            } catch {
-              case e: Throwable => error(s"clean new cache data error: 
${e.getMessage}")
-            } finally {
-              newCacheLock.unlock()
-            }
-          }
-        }
-        case _ => {
-          // do nothing
-        }
-      }
-
-      // old cache data
-      val oldCacheCleanTime = if (updatable) readCleanTime else None
-      oldCacheCleanTime match {
-        case Some(oct) => {
-          val oldCacheIndexOpt = readOldCacheIndex
-          oldCacheIndexOpt.foreach { idx =>
-            val oldDfPath = s"${oldFilePath}/${idx}"
-            val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-            if (oldCacheLocked) {
-              try {
-                // clean calculated old cache data
-                cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: 
Long) => (a < b))
-                // clean out time old cache data not calculated
-//                cleanOutTimePartitions(oldDfPath, oct, 
Some(InternalColumns.tmst))
-              } catch {
-                case e: Throwable => error(s"clean old cache data error: 
${e.getMessage}")
-              } finally {
-                oldCacheLock.unlock()
-              }
-            }
-          }
-        }
-        case _ => {
-          // do nothing
-        }
-      }
-    }
-  }
-
-  // update old cache data
-  def updateData(dfOpt: Option[DataFrame]): Unit = {
-    if (!readOnly && updatable) {
-      dfOpt match {
-        case Some(df) => {
-          // old cache lock
-          val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
-          if (oldCacheLocked) {
-            try {
-              val oldCacheIndexOpt = readOldCacheIndex
-              val nextOldCacheIndex = 
oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1
-
-              val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}"
-//              val cleanTime = readCleanTime
-//              val updateDf = cleanTime match {
-//                case Some(ct) => {
-//                  val filterStr = s"`${InternalColumns.tmst}` > ${ct}"
-//                  df.filter(filterStr)
-//                }
-//                case _ => df
-//              }
-              val cleanTime = getNextCleanTime
-              val filterStr = s"`${InternalColumns.tmst}` > ${cleanTime}"
-              val updateDf = df.filter(filterStr)
-
-              val prlCount = sqlContext.sparkContext.defaultParallelism
-              // coalesce
-//              val ptnCount = updateDf.rdd.getNumPartitions
-//              val repartitionedDf = if (prlCount < ptnCount) {
-//                updateDf.coalesce(prlCount)
-//              } else updateDf
-              // repartition
-              val repartitionedDf = updateDf.repartition(prlCount)
-              val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
-              writeDataFrame(dfw, oldDfPath)
-
-              submitOldCacheIndex(nextOldCacheIndex)
-            } catch {
-              case e: Throwable => error(s"update data error: ${e.getMessage}")
-            } finally {
-              oldCacheLock.unlock()
-            }
-          }
-        }
-        case _ => {
-          info(s"no data frame to update")
-        }
-      }
-    }
-  }
-
-  // process finish
-  def processFinish(): Unit = {
-    // next last proc time
-    val timeRange = TimeInfoCache.getTimeRange
-    submitLastProcTime(timeRange._2)
-
-    // next clean time
-    val nextCleanTime = timeRange._2 + deltaTimeRange._1
-    submitCleanTime(nextCleanTime)
-  }
-
-  // read next clean time
-  private def getNextCleanTime(): Long = {
-    val timeRange = TimeInfoCache.getTimeRange
-    val nextCleanTime = timeRange._2 + deltaTimeRange._1
-    nextCleanTime
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala
deleted file mode 100644
index d03c181..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.source.cache
-
-import org.apache.griffin.measure.log.Loggable
-import org.apache.spark.sql.SQLContext
-import org.apache.griffin.measure.utils.ParamUtil._
-
-object DataSourceCacheFactory extends Loggable {
-
-  private object DataSourceCacheType {
-    val parquet = "^(?i)parq(uet)?$".r
-    val json = "^(?i)json$".r
-    val orc = "^(?i)orc$".r
-  }
-  import DataSourceCacheType._
-
-  val _type = "type"
-
-  def genDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
-                                 name: String, index: Int
-                                ) = {
-    if (param != null) {
-      try {
-        val tp = param.getString(_type, "")
-        val dsCache = tp match {
-          case parquet() => ParquetDataSourceCache(sqlContext, param, name, 
index)
-          case json() => JsonDataSourceCache(sqlContext, param, name, index)
-          case orc() => OrcDataSourceCache(sqlContext, param, name, index)
-          case _ => ParquetDataSourceCache(sqlContext, param, name, index)
-        }
-        Some(dsCache)
-      } catch {
-        case e: Throwable => {
-          error(s"generate data source cache fails")
-          None
-        }
-      }
-    } else None
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
deleted file mode 100644
index 2fa5316..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.source.cache
-
-import org.apache.spark.sql._
-
-case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
-                               dsName: String, index: Int
-                              ) extends DataSourceCache {
-
-  override def init(): Unit = {
-//    
sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
 "false");
-  }
-
-  def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
-    println(s"write path: ${path}")
-    dfw.json(path)
-  }
-
-  def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
-    dfr.json(path)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
deleted file mode 100644
index 5bf2500..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.source.cache
-
-import org.apache.spark.sql._
-
-case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
-                              dsName: String, index: Int
-                             ) extends DataSourceCache {
-
-  override def init(): Unit = {
-//    
sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
 "false");
-  }
-
-  def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
-    println(s"write path: ${path}")
-    dfw.orc(path)
-  }
-
-  def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
-    dfr.orc(path)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
deleted file mode 100644
index f39d832..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.source.cache
-
-import org.apache.spark.sql._
-
-case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, 
Any],
-                                  dsName: String, index: Int
-                                 ) extends DataSourceCache {
-
-  override def init(): Unit = {
-    
sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
 "false")
-  }
-
-  def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
-    println(s"write path: ${path}")
-    dfw.parquet(path)
-  }
-
-  def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
-    dfr.parquet(path)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala
deleted file mode 100644
index aa5e04d..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.source.cache
-
-import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.concurrent.{TrieMap, Map => ConcMap}
-
-trait WithFanIn[T] {
-
-  val totalNum: AtomicInteger = new AtomicInteger(0)
-  val fanInCountMap: ConcMap[T, Int] = TrieMap[T, Int]()
-
-  def registerFanIn(): Int = {
-    totalNum.incrementAndGet()
-  }
-
-  def fanIncrement(key: T): Boolean = {
-    fanInc(key)
-    fanInCountMap.get(key) match {
-      case Some(n) if (n >= totalNum.get) => {
-        fanInCountMap.remove(key)
-        true
-      }
-      case _ => false
-    }
-  }
-
-  private def fanInc(key: T): Unit = {
-    fanInCountMap.get(key) match {
-      case Some(n) => {
-        val suc = fanInCountMap.replace(key, n, n + 1)
-        if (!suc) fanInc(key)
-      }
-      case _ => {
-        val oldOpt = fanInCountMap.putIfAbsent(key, 1)
-        if (oldOpt.nonEmpty) fanInc(key)
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala 
b/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala
new file mode 100644
index 0000000..c943db9
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.job
+
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.DQStep
+
+case class DQJob(dqSteps: Seq[DQStep]) extends Serializable {
+
+  def execute(context: DQContext): Boolean = {
+    dqSteps.foldLeft(true) { (ret, dqStep) =>
+      ret && dqStep.execute(context)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
new file mode 100644
index 0000000..a8e5b26
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala
@@ -0,0 +1,68 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.job.builder
+
+import org.apache.griffin.measure.configuration.enums.DslType
+import org.apache.griffin.measure.configuration.params._
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.job._
+import org.apache.griffin.measure.step.DQStep
+import org.apache.griffin.measure.step.builder.DQStepBuilder
+import org.apache.griffin.measure.step.write.MetricFlushStep
+
+/**
+  * build dq job based on configuration
+  */
+object DQJobBuilder {
+
+  /**
+    * build dq job with rule param
+    * @param context              dq context
+    * @param evaluateRuleParam    evaluate rule param
+    * @return       dq job
+    */
+  def buildDQJob(context: DQContext, evaluateRuleParam: EvaluateRuleParam): 
DQJob = {
+    val defaultDslType = evaluateRuleParam.getDslType
+    val ruleParams = evaluateRuleParam.getRules
+    buildDQJob(context, ruleParams, defaultDslType)
+  }
+
+  /**
+    * build dq job with rules in evaluate rule param or pre-proc param
+    * @param context          dq context
+    * @param ruleParams       rule params
+    * @param defaultDslType   default dsl type in evaluate rule param
+    * @return       dq job
+    */
+  def buildDQJob(context: DQContext, ruleParams: Seq[RuleParam], 
defaultDslType: DslType): DQJob = {
+    // build steps by datasources
+    val dsSteps = context.dataSources.flatMap { dataSource =>
+      DQStepBuilder.buildStepOptByDataSourceParam(context, dataSource.dsParam)
+    }
+    // build steps by rules
+    val ruleSteps = ruleParams.flatMap { ruleParam =>
+      DQStepBuilder.buildStepOptByRuleParam(context, ruleParam, defaultDslType)
+    }
+    // metric flush step
+    val metricFlushStep = MetricFlushStep()
+
+    DQJob(dsSteps ++ ruleSteps :+ metricFlushStep)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala 
b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
new file mode 100644
index 0000000..42364a8
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -0,0 +1,52 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.launch
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.params.{DQParam, EnvParam}
+
+import scala.util.Try
+
+/**
+  * dq application process
+  */
+trait DQApp extends Loggable with Serializable {
+
+  val envParam: EnvParam
+  val dqParam: DQParam
+
+  def init: Try[_]
+
+  def run: Try[_]
+
+  def close: Try[_]
+
+  /**
+    * application will exit if it fails in run phase.
+    * if retryable is true, the exception will be threw to spark env,
+    * and enable retry strategy of spark application
+    */
+  def retryable: Boolean
+
+  protected def getAppTime: Long = {
+    if (dqParam.timestamp != null && dqParam.timestamp > 0) { 
dqParam.timestamp }
+    else { System.currentTimeMillis }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
new file mode 100644
index 0000000..fec27b1
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -0,0 +1,107 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.launch.batch
+
+import java.util.Date
+
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.params._
+import org.apache.griffin.measure.context._
+import org.apache.griffin.measure.context.datasource.DataSourceFactory
+import org.apache.griffin.measure.job.builder.DQJobBuilder
+import org.apache.griffin.measure.launch.DQApp
+import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{SQLContext, SparkSession}
+
+import scala.util.Try
+
+case class BatchDQApp(allParam: AllParam) extends DQApp {
+
+  val envParam: EnvParam = allParam.envParam
+  val dqParam: DQParam = allParam.dqParam
+
+  val sparkParam = envParam.sparkParam
+  val metricName = dqParam.name
+  val dataSourceParams = dqParam.dataSources
+  val dataSourceNames = dataSourceParams.map(_.name)
+  val persistParams = envParam.persistParams
+
+  var sqlContext: SQLContext = _
+
+  implicit var sparkSession: SparkSession = _
+
+  def retryable: Boolean = false
+
+  def init: Try[_] = Try {
+    // build spark 2.0+ application context
+    val conf = new SparkConf().setAppName(metricName)
+    conf.setAll(sparkParam.config)
+    conf.set("spark.sql.crossJoin.enabled", "true")
+    sparkSession = 
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
+    sparkSession.sparkContext.setLogLevel(sparkParam.logLevel)
+    sqlContext = sparkSession.sqlContext
+
+    // register udf
+    GriffinUDFAgent.register(sqlContext)
+  }
+
+  def run: Try[_] = Try {
+    // start time
+    val startTime = new Date().getTime
+
+    val appTime = getAppTime
+    val contextId = ContextId(appTime)
+
+    // generate data sources
+    val dataSources = DataSourceFactory.getDataSources(sparkSession, null, 
dqParam.dataSources)
+    dataSources.foreach(_.init)
+
+    // create dq context
+    val dqContext: DQContext = DQContext(
+      contextId, metricName, dataSources, persistParams, BatchProcessType
+    )(sparkSession)
+
+    // start id
+    val applicationId = sparkSession.sparkContext.applicationId
+    dqContext.getPersist().start(applicationId)
+
+    // build job
+    val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.evaluateRule)
+
+    // dq job execute
+    dqJob.execute(dqContext)
+
+    // end time
+    val endTime = new Date().getTime
+    dqContext.getPersist().log(endTime, s"process using time: ${endTime - 
startTime} ms")
+
+    // clean context
+    dqContext.clean()
+
+    // finish
+    dqContext.getPersist().finish()
+  }
+
+  def close: Try[_] = Try {
+    sparkSession.close()
+    sparkSession.stop()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
new file mode 100644
index 0000000..e4a8108
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -0,0 +1,177 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.launch.streaming
+
+import java.util.{Timer, TimerTask}
+import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
+
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.configuration.params._
+import org.apache.griffin.measure.context._
+import org.apache.griffin.measure.context.datasource.DataSourceFactory
+import org.apache.griffin.measure.context.streaming.info.InfoCacheInstance
+import org.apache.griffin.measure.launch.DQApp
+import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
+import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
+import scala.util.Try
+
+case class StreamingDQApp(allParam: AllParam) extends DQApp {
+
+  val envParam: EnvParam = allParam.envParam
+  val dqParam: DQParam = allParam.dqParam
+
+  val sparkParam = envParam.sparkParam
+  val metricName = dqParam.name
+  val dataSourceParams = dqParam.dataSources
+  val dataSourceNames = dataSourceParams.map(_.name)
+  val persistParams = envParam.persistParams
+
+  var sqlContext: SQLContext = _
+
+  implicit var sparkSession: SparkSession = _
+
+  def retryable: Boolean = true
+
+  def init: Try[_] = Try {
+    // build spark 2.0+ application context
+    val conf = new SparkConf().setAppName(metricName)
+    conf.setAll(sparkParam.config)
+    conf.set("spark.sql.crossJoin.enabled", "true")
+    sparkSession = 
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
+    sparkSession.sparkContext.setLogLevel(sparkParam.logLevel)
+    sqlContext = sparkSession.sqlContext
+
+    // clear checkpoint directory
+    clearCpDir
+
+    // init info cache instance
+    InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName)
+    InfoCacheInstance.init
+
+    // register udf
+    GriffinUDFAgent.register(sqlContext)
+  }
+
+  def run: Try[_] = Try {
+
+    // streaming context
+    val ssc = StreamingContext.getOrCreate(sparkParam.cpDir, () => {
+      try {
+        createStreamingContext
+      } catch {
+        case e: Throwable => {
+          error(s"create streaming context error: ${e.getMessage}")
+          throw e
+        }
+      }
+    })
+
+    // start time
+    val appTime = getAppTime
+    val contextId = ContextId(appTime)
+
+    // generate data sources
+    val dataSources = DataSourceFactory.getDataSources(sparkSession, ssc, 
dqParam.dataSources)
+    dataSources.foreach(_.init)
+
+    // create dq context
+    val globalContext: DQContext = DQContext(
+      contextId, metricName, dataSources, persistParams, StreamingProcessType
+    )(sparkSession)
+
+    // start id
+    val applicationId = sparkSession.sparkContext.applicationId
+    globalContext.getPersist().start(applicationId)
+
+    // process thread
+    val dqThread = StreamingDQApp2(globalContext, dqParam.evaluateRule)
+
+    val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) 
match {
+      case Some(interval) => interval
+      case _ => throw new Exception("invalid batch interval")
+    }
+    val process = TimingProcess(processInterval, dqThread)
+    process.startup()
+
+    ssc.start()
+    ssc.awaitTermination()
+    ssc.stop(stopSparkContext=true, stopGracefully=true)
+
+    // clean context
+    globalContext.clean()
+
+    // finish
+    globalContext.getPersist().finish()
+
+  }
+
+  def close: Try[_] = Try {
+    sparkSession.close()
+    sparkSession.stop()
+  }
+
+
+  def createStreamingContext: StreamingContext = {
+    val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match {
+      case Some(interval) => Milliseconds(interval)
+      case _ => throw new Exception("invalid batch interval")
+    }
+    val ssc = new StreamingContext(sparkSession.sparkContext, batchInterval)
+    ssc.checkpoint(sparkParam.cpDir)
+
+    ssc
+  }
+
+  private def clearCpDir: Unit = {
+    if (sparkParam.needInitClear) {
+      val cpDir = sparkParam.cpDir
+      info(s"clear checkpoint directory ${cpDir}")
+      HdfsUtil.deleteHdfsPath(cpDir)
+    }
+  }
+
+  case class TimingProcess(interval: Long, runnable: Runnable) {
+
+    val pool: ThreadPoolExecutor = 
Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
+
+    val timer = new Timer("process", true)
+
+    val timerTask = new TimerTask() {
+      override def run(): Unit = {
+        pool.submit(runnable)
+      }
+    }
+
+    def startup(): Unit = {
+      timer.schedule(timerTask, interval, interval)
+    }
+
+    def shutdown(): Unit = {
+      timer.cancel()
+      pool.shutdown()
+      pool.awaitTermination(10, TimeUnit.SECONDS)
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala
new file mode 100644
index 0000000..97ce980
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala
@@ -0,0 +1,104 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.launch.streaming
+
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.params._
+import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, 
TimeInfoCache}
+import org.apache.griffin.measure.context.streaming.metric.CacheResults
+import org.apache.griffin.measure.context.{ContextId, DQContext}
+import org.apache.griffin.measure.job.builder.DQJobBuilder
+
+case class StreamingDQApp2(globalContext: DQContext,
+                           evaluateRuleParam: EvaluateRuleParam
+                          ) extends Runnable with Loggable {
+
+  val lock = InfoCacheInstance.genLock("process")
+  val appPersist = globalContext.getPersist()
+
+  def run(): Unit = {
+    val updateTimeDate = new Date()
+    val updateTime = updateTimeDate.getTime
+    println(s"===== [${updateTimeDate}] process begins =====")
+    val locked = lock.lock(5, TimeUnit.SECONDS)
+    if (locked) {
+      try {
+
+        TimeInfoCache.startTimeInfoCache
+
+        val startTime = new Date().getTime
+        appPersist.log(startTime, s"starting process ...")
+        val contextId = ContextId(startTime)
+
+        // create dq context
+        val dqContext: DQContext = globalContext.cloneDQContext(contextId)
+
+        // build job
+        val dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam)
+
+        // dq job execute
+        dqJob.execute(dqContext)
+
+        // finish calculation
+        finishCalculation(dqContext)
+
+        // end time
+        val endTime = new Date().getTime
+        appPersist.log(endTime, s"process using time: ${endTime - startTime} 
ms")
+
+        TimeInfoCache.endTimeInfoCache
+
+        // clean old data
+        cleanData(dqContext)
+
+      } catch {
+        case e: Throwable => error(s"process error: ${e.getMessage}")
+      } finally {
+        lock.unlock()
+      }
+    } else {
+      println(s"===== [${updateTimeDate}] process ignores =====")
+    }
+    val endTime = new Date().getTime
+    println(s"===== [${updateTimeDate}] process ends, using ${endTime - 
updateTime} ms =====")
+  }
+
+  // finish calculation for this round
+  private def finishCalculation(context: DQContext): Unit = {
+    context.dataSources.foreach(_.processFinish)
+  }
+
+  // clean old data and old result cache
+  private def cleanData(context: DQContext): Unit = {
+    try {
+      context.dataSources.foreach(_.cleanOldData)
+
+      context.clean()
+
+      val cleanTime = TimeInfoCache.getCleanTime
+      CacheResults.refresh(cleanTime)
+    } catch {
+      case e: Throwable => error(s"clean data error: ${e.getMessage}")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala 
b/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala
deleted file mode 100644
index 265a8cd..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.log
-
-import org.slf4j.LoggerFactory
-
-trait Loggable {
-
-  @transient private lazy val logger = LoggerFactory.getLogger(getClass)
-
-  protected def info(msg: String): Unit = {
-    logger.info(msg)
-  }
-
-  protected def debug(msg: String): Unit = {
-    logger.debug(msg)
-  }
-
-  protected def warn(msg: String): Unit = {
-    logger.warn(msg)
-  }
-
-  protected def error(msg: String): Unit = {
-    logger.error(msg)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
deleted file mode 100644
index 11c44d8..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.persist
-
-import java.util.Date
-
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-
-import scala.util.Try
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.TaskContext
-
-// persist result and data to hdfs
-case class HdfsPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
-
-  val Path = "path"
-  val MaxPersistLines = "max.persist.lines"
-  val MaxLinesPerFile = "max.lines.per.file"
-
-  val path = config.getOrElse(Path, "").toString
-  val maxPersistLines = config.getInt(MaxPersistLines, -1)
-  val maxLinesPerFile = math.min(config.getInt(MaxLinesPerFile, 10000), 
1000000)
-
-  val separator = "/"
-
-  val StartFile = filePath("_START")
-  val FinishFile = filePath("_FINISH")
-  val MetricsFile = filePath("_METRICS")
-
-//  val MissRecFile = filePath("_MISSREC")      // optional
-//  val MatchRecFile = filePath("_MATCHREC")    // optional
-
-  val LogFile = filePath("_LOG")
-
-  val _MetricName = "metricName"
-  val _Timestamp = "timestamp"
-  val _Value = "value"
-
-  var _init = true
-  private def isInit = {
-    val i = _init
-    _init = false
-    i
-  }
-
-  def available(): Boolean = {
-    path.nonEmpty
-  }
-
-  private def persistHead: String = {
-    val dt = new Date(timeStamp)
-    s"================ log of ${dt} ================\n"
-  }
-
-  private def timeHead(rt: Long): String = {
-    val dt = new Date(rt)
-    s"--- ${dt} ---\n"
-  }
-
-  protected def filePath(file: String): String = {
-    HdfsUtil.getHdfsFilePath(path, s"${metricName}/${timeStamp}/${file}")
-  }
-
-  protected def withSuffix(path: String, suffix: String): String = {
-    s"${path}.${suffix}"
-  }
-
-  def start(msg: String): Unit = {
-    try {
-      HdfsUtil.writeContent(StartFile, msg)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-  def finish(): Unit = {
-    try {
-      HdfsUtil.createEmptyFile(FinishFile)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-//  def result(rt: Long, result: Result): Unit = {
-//    try {
-//      val resStr = result match {
-//        case ar: AccuracyResult => {
-//          s"match percentage: ${ar.matchPercentage}\ntotal count: 
${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
-//        }
-//        case pr: ProfileResult => {
-//          s"match percentage: ${pr.matchPercentage}\ntotal count: 
${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}"
-//        }
-//        case _ => {
-//          s"result: ${result}"
-//        }
-//      }
-//      HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr)
-//      log(rt, resStr)
-//
-//      info(resStr)
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-//  }
-
-  // need to avoid string too long
-//  private def rddRecords(records: RDD[String], path: String): Unit = {
-//    try {
-//      val recordCount = records.count
-//      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
-//      if (count > 0) {
-//        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
-//        if (groupCount <= 1) {
-//          val recs = records.take(count.toInt)
-//          persistRecords(path, recs)
-//        } else {
-//          val groupedRecords: RDD[(Long, Iterable[String])] =
-//            records.zipWithIndex.flatMap { r =>
-//              val gid = r._2 / maxLinesPerFile
-//              if (gid < groupCount) Some((gid, r._1)) else None
-//            }.groupByKey()
-//          groupedRecords.foreach { group =>
-//            val (gid, recs) = group
-//            val hdfsPath = if (gid == 0) path else withSuffix(path, 
gid.toString)
-//            persistRecords(hdfsPath, recs)
-//          }
-//        }
-//      }
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-//  }
-//
-//  private def iterableRecords(records: Iterable[String], path: String): Unit 
= {
-//    try {
-//      val recordCount = records.size
-//      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
-//      if (count > 0) {
-//        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
-//        if (groupCount <= 1) {
-//          val recs = records.take(count.toInt)
-//          persistRecords(path, recs)
-//        } else {
-//          val groupedRecords = records.grouped(groupCount).zipWithIndex
-//          groupedRecords.take(groupCount).foreach { group =>
-//            val (recs, gid) = group
-//            val hdfsPath = if (gid == 0) path else withSuffix(path, 
gid.toString)
-//            persistRecords(hdfsPath, recs)
-//          }
-//        }
-//      }
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-//  }
-//
-//  def records(recs: RDD[String], tp: String): Unit = {
-//    tp match {
-//      case PersistDataType.MISS => rddRecords(recs, MissRecFile)
-//      case PersistDataType.MATCH => rddRecords(recs, MatchRecFile)
-//      case _ => {}
-//    }
-//  }
-//
-//  def records(recs: Iterable[String], tp: String): Unit = {
-//    tp match {
-//      case PersistDataType.MISS => iterableRecords(recs, MissRecFile)
-//      case PersistDataType.MATCH => iterableRecords(recs, MatchRecFile)
-//      case _ => {}
-//    }
-//  }
-
-//  private def persistRecords2Hdfs(hdfsPath: String, rdd: RDD[String]): Unit 
= {
-//    try {
-////      rdd.saveAsTextFile(hdfsPath)
-//      val recStr = rdd.collect().mkString("\n")
-//      HdfsUtil.writeContent(hdfsPath, recStr)
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-//  }
-  private def persistRecords2Hdfs(hdfsPath: String, records: 
Iterable[String]): Unit = {
-    try {
-      val recStr = records.mkString("\n")
-      HdfsUtil.writeContent(hdfsPath, recStr)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-  def log(rt: Long, msg: String): Unit = {
-    try {
-      val logStr = (if (isInit) persistHead else "") + timeHead(rt) + 
s"${msg}\n\n"
-      HdfsUtil.appendContent(LogFile, logStr)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-  private def getHdfsPath(path: String, groupId: Int): String = {
-    HdfsUtil.getHdfsFilePath(path, s"${groupId}")
-//    if (groupId == 0) path else withSuffix(path, s"${groupId}")
-  }
-  private def getHdfsPath(path: String, ptnId: Int, groupId: Int): String = {
-    HdfsUtil.getHdfsFilePath(path, s"${ptnId}.${groupId}")
-//    if (ptnId == 0 && groupId == 0) path else withSuffix(path, 
s"${ptnId}.${groupId}")
-  }
-
-  private def clearOldRecords(path: String): Unit = {
-    HdfsUtil.deleteHdfsPath(path)
-  }
-
-  def persistRecords(df: DataFrame, name: String): Unit = {
-    val path = filePath(name)
-    clearOldRecords(path)
-    try {
-      val recordCount = df.count
-      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
-      val maxCount = count.toInt
-      if (maxCount > 0) {
-        val recDf = df.limit(maxCount)
-        recDf.toJSON.foreachPartition { ptn =>
-          val ptnid = TaskContext.getPartitionId()
-          val groupedRecords = ptn.grouped(maxLinesPerFile).zipWithIndex
-          groupedRecords.foreach { group =>
-            val (recs, gid) = group
-            val hdfsPath = getHdfsPath(path, ptnid, gid)
-            persistRecords2Hdfs(hdfsPath, recs)
-          }
-        }
-      }
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-  def persistRecords(records: RDD[String], name: String): Unit = {
-    val path = filePath(name)
-    clearOldRecords(path)
-    try {
-      val recordCount = records.count
-      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
-      if (count > 0) {
-        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
-        if (groupCount <= 1) {
-          val recs = records.take(count.toInt)
-          persistRecords2Hdfs(path, recs)
-        } else {
-          val groupedRecords: RDD[(Long, Iterable[String])] =
-            records.zipWithIndex.flatMap { r =>
-              val gid = r._2 / maxLinesPerFile
-              if (gid < groupCount) Some((gid, r._1)) else None
-            }.groupByKey()
-          groupedRecords.foreach { group =>
-            val (gid, recs) = group
-            val hdfsPath = if (gid == 0) path else withSuffix(path, 
gid.toString)
-            persistRecords2Hdfs(hdfsPath, recs)
-          }
-        }
-      }
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-  def persistRecords(records: Iterable[String], name: String): Unit = {
-    val path = filePath(name)
-    clearOldRecords(path)
-    try {
-      val recordCount = records.size
-      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
-      if (count > 0) {
-        val groupCount = (count - 1) / maxLinesPerFile + 1
-        if (groupCount <= 1) {
-          val recs = records.take(count.toInt)
-          persistRecords2Hdfs(path, recs)
-        } else {
-          val groupedRecords = records.grouped(maxLinesPerFile).zipWithIndex
-          groupedRecords.take(groupCount).foreach { group =>
-            val (recs, gid) = group
-            val hdfsPath = getHdfsPath(path, gid)
-            persistRecords2Hdfs(hdfsPath, recs)
-          }
-        }
-      }
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-//  def persistMetrics(metrics: Seq[String], name: String): Unit = {
-//    val path = filePath(name)
-//    try {
-//      val recordCount = metrics.size
-//      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
-//      if (count > 0) {
-//        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
-//        if (groupCount <= 1) {
-//          val recs = metrics.take(count.toInt)
-//          persistRecords(path, recs)
-//        } else {
-//          val groupedRecords = metrics.grouped(groupCount).zipWithIndex
-//          groupedRecords.take(groupCount).foreach { group =>
-//            val (recs, gid) = group
-//            val hdfsPath = if (gid == 0) path else withSuffix(path, 
gid.toString)
-//            persistRecords(hdfsPath, recs)
-//          }
-//        }
-//      }
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-//  }
-
-  def persistMetrics(metrics: Map[String, Any]): Unit = {
-    val head = Map[String, Any]((_MetricName -> metricName), (_Timestamp -> 
timeStamp))
-    val result = head + (_Value -> metrics)
-    try {
-      val json = JsonUtil.toJson(result)
-      persistRecords2Hdfs(MetricsFile, json :: Nil)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
deleted file mode 100644
index e41ae55..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.persist
-
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-
-import scala.util.Try
-import org.apache.griffin.measure.utils.ParamUtil._
-
-// persist result by http way
-case class HttpPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
-
-  val Api = "api"
-  val Method = "method"
-
-  val api = config.getString(Api, "")
-  val method = config.getString(Method, "post")
-
-  val _Value = "value"
-
-  def available(): Boolean = {
-    api.nonEmpty
-  }
-
-  def start(msg: String): Unit = {}
-  def finish(): Unit = {}
-
-//  def result(rt: Long, result: Result): Unit = {
-//    result match {
-//      case ar: AccuracyResult => {
-//        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> 
timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch))
-//        httpResult(dataMap)
-//      }
-//      case pr: ProfileResult => {
-//        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> 
timeStamp), ("total" -> pr.getTotal), ("matched" -> pr.getMatch))
-//        httpResult(dataMap)
-//      }
-//      case _ => {
-//        info(s"result: ${result}")
-//      }
-//    }
-//  }
-
-  private def httpResult(dataMap: Map[String, Any]) = {
-    try {
-      val data = JsonUtil.toJson(dataMap)
-      // post
-      val params = Map[String, Object]()
-      val header = Map[String, Object](("Content-Type","application/json"))
-
-      def func(): Boolean = {
-        HttpUtil.httpRequest(api, method, params, header, data)
-      }
-
-      PersistThreadPool.addTask(func _, 10)
-
-//      val status = HttpUtil.httpRequest(api, method, params, header, data)
-//      info(s"${method} to ${api} response status: ${status}")
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-
-  }
-
-//  def records(recs: RDD[String], tp: String): Unit = {}
-//  def records(recs: Iterable[String], tp: String): Unit = {}
-
-//  def missRecords(records: RDD[String]): Unit = {}
-//  def matchRecords(records: RDD[String]): Unit = {}
-
-  def log(rt: Long, msg: String): Unit = {}
-
-  def persistRecords(df: DataFrame, name: String): Unit = {}
-  def persistRecords(records: RDD[String], name: String): Unit = {}
-  def persistRecords(records: Iterable[String], name: String): Unit = {}
-
-//  def persistMetrics(metrics: Seq[String], name: String): Unit = {
-//    val maps = metrics.flatMap { m =>
-//      try {
-//        Some(JsonUtil.toAnyMap(m) ++ Map[String, Any](("name" -> 
metricName), ("tmst" -> timeStamp)))
-//      } catch {
-//        case e: Throwable => None
-//      }
-//    }
-//    maps.foreach { map =>
-//      httpResult(map)
-//    }
-//  }
-
-  def persistMetrics(metrics: Map[String, Any]): Unit = {
-    val head = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp))
-    val result = head + (_Value -> metrics)
-    httpResult(result)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
deleted file mode 100644
index d9a601a..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.persist
-
-import java.util.Date
-
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-import org.apache.griffin.measure.utils.ParamUtil._
-
-// persist result and data to hdfs
-case class LoggerPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
-
-  val MaxLogLines = "max.log.lines"
-
-  val maxLogLines = config.getInt(MaxLogLines, 100)
-
-  def available(): Boolean = true
-
-  def start(msg: String): Unit = {
-    println(s"[${timeStamp}] ${metricName} start: ${msg}")
-  }
-  def finish(): Unit = {
-    println(s"[${timeStamp}] ${metricName} finish")
-  }
-
-//  def result(rt: Long, result: Result): Unit = {
-//    try {
-//      val resStr = result match {
-//        case ar: AccuracyResult => {
-//          s"match percentage: ${ar.matchPercentage}\ntotal count: 
${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
-//        }
-//        case pr: ProfileResult => {
-//          s"match percentage: ${pr.matchPercentage}\ntotal count: 
${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}"
-//        }
-//        case _ => {
-//          s"result: ${result}"
-//        }
-//      }
-//      println(s"[${timeStamp}] ${metricName} result: \n${resStr}")
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-//  }
-//
-//  // need to avoid string too long
-//  private def rddRecords(records: RDD[String]): Unit = {
-//    try {
-//      val recordCount = records.count.toInt
-//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
-//      if (count > 0) {
-//        val recordsArray = records.take(count)
-////        recordsArray.foreach(println)
-//      }
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-//  }
-
-//  private def iterableRecords(records: Iterable[String]): Unit = {
-//    try {
-//      val recordCount = records.size
-//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
-//      if (count > 0) {
-//        val recordsArray = records.take(count)
-////        recordsArray.foreach(println)
-//      }
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-//  }
-
-//  def records(recs: RDD[String], tp: String): Unit = {
-//    tp match {
-//      case PersistDataType.MISS => rddRecords(recs)
-//      case PersistDataType.MATCH => rddRecords(recs)
-//      case _ => {}
-//    }
-//  }
-//
-//  def records(recs: Iterable[String], tp: String): Unit = {
-//    tp match {
-//      case PersistDataType.MISS => iterableRecords(recs)
-//      case PersistDataType.MATCH => iterableRecords(recs)
-//      case _ => {}
-//    }
-//  }
-
-//  def missRecords(records: RDD[String]): Unit = {
-//    warn(s"[${timeStamp}] ${metricName} miss records: ")
-//    rddRecords(records)
-//  }
-//  def matchRecords(records: RDD[String]): Unit = {
-//    warn(s"[${timeStamp}] ${metricName} match records: ")
-//    rddRecords(records)
-//  }
-
-  def log(rt: Long, msg: String): Unit = {
-    println(s"[${timeStamp}] ${rt}: ${msg}")
-  }
-
-  def persistRecords(df: DataFrame, name: String): Unit = {
-//    println(s"${metricName} [${timeStamp}] records: ")
-//    try {
-//      val recordCount = df.count
-//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
-//      val maxCount = count.toInt
-//      if (maxCount > 0) {
-//        val recDf = df.limit(maxCount)
-//        val recordsArray = recDf.toJSON.collect()
-//        recordsArray.foreach(println)
-//      }
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-  }
-
-  def persistRecords(records: RDD[String], name: String): Unit = {
-//    println(s"${metricName} [${timeStamp}] records: ")
-//    try {
-//      val recordCount = records.count
-//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
-//      val maxCount = count.toInt
-//      if (maxCount > 0) {
-//        val recordsArray = records.take(maxCount)
-//        recordsArray.foreach(println)
-//      }
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-  }
-
-  def persistRecords(records: Iterable[String], name: String): Unit = {
-//    println(s"${metricName} [${timeStamp}] records: ")
-//    try {
-//      val recordCount = records.size
-//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
-//      if (count > 0) {
-//        records.foreach(println)
-//      }
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-  }
-
-//  def persistMetrics(metrics: Seq[String], name: String): Unit = {
-//    try {
-//      val recordCount = metrics.size
-//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
-//      if (count > 0) {
-//        val recordsArray = metrics.take(count)
-//        recordsArray.foreach(println)
-//      }
-//    } catch {
-//      case e: Throwable => error(e.getMessage)
-//    }
-//  }
-
-  def persistMetrics(metrics: Map[String, Any]): Unit = {
-    println(s"${metricName} [${timeStamp}] metrics: ")
-    val json = JsonUtil.toJson(metrics)
-    println(json)
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/MongoPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/MongoPersist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/MongoPersist.scala
deleted file mode 100644
index b5923ce..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/MongoPersist.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.persist
-
-import org.mongodb.scala._
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-import org.mongodb.scala.model.{Filters, UpdateOptions, Updates}
-import org.mongodb.scala.result.UpdateResult
-
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-
-case class MongoPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
-
-  MongoConnection.init(config)
-
-  val _MetricName = "metricName"
-  val _Timestamp = "timestamp"
-  val _Value = "value"
-
-  def available(): Boolean = MongoConnection.dataConf.available
-
-  def start(msg: String): Unit = {}
-  def finish(): Unit = {}
-
-  def log(rt: Long, msg: String): Unit = {}
-
-  def persistRecords(df: DataFrame, name: String): Unit = {}
-  def persistRecords(records: RDD[String], name: String): Unit = {}
-  def persistRecords(records: Iterable[String], name: String): Unit = {}
-
-  def persistMetrics(metrics: Map[String, Any]): Unit = {
-    mongoInsert(metrics)
-  }
-
-  private val filter = Filters.and(
-    Filters.eq(_MetricName, metricName),
-    Filters.eq(_Timestamp, timeStamp)
-  )
-
-  private def mongoInsert(dataMap: Map[String, Any]): Unit = {
-    try {
-      val update = Updates.set(_Value, dataMap)
-      def func(): (Long, Future[UpdateResult]) = {
-        (timeStamp, MongoConnection.getDataCollection.updateOne(
-          filter, update, UpdateOptions().upsert(true)).toFuture)
-      }
-      MongoThreadPool.addTask(func _, 10)
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
-
-}
-
-case class MongoConf(url: String, database: String, collection: String) {
-  def available: Boolean = url.nonEmpty && database.nonEmpty && 
collection.nonEmpty
-}
-
-object MongoConnection {
-
-  val _MongoHead = "mongodb://"
-
-  val Url = "url"
-  val Database = "database"
-  val Collection = "collection"
-
-  private var initialed = false
-
-  var dataConf: MongoConf = _
-  private var dataCollection: MongoCollection[Document] = _
-
-  def getDataCollection = dataCollection
-
-  def init(config: Map[String, Any]): Unit = {
-    if (!initialed) {
-      dataConf = mongoConf(config)
-      dataCollection = mongoCollection(dataConf)
-      initialed = true
-    }
-  }
-
-  private def mongoConf(cfg: Map[String, Any]): MongoConf = {
-    val url = cfg.getString(Url, "").trim
-    val mongoUrl = if (url.startsWith(_MongoHead)) url else {
-      _MongoHead + url
-    }
-    MongoConf(
-      mongoUrl,
-      cfg.getString(Database, ""),
-      cfg.getString(Collection, "")
-    )
-  }
-  private def mongoCollection(mongoConf: MongoConf): MongoCollection[Document] 
= {
-    val mongoClient: MongoClient = MongoClient(mongoConf.url)
-    val database: MongoDatabase = mongoClient.getDatabase(mongoConf.database)
-    database.getCollection(mongoConf.collection)
-  }
-
-}
\ No newline at end of file


Reply via email to