http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala
new file mode 100644
index 0000000..42166ae
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala
@@ -0,0 +1,217 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.info
+
+import org.apache.curator.framework.imps.CuratorFrameworkState
+import org.apache.curator.framework.recipes.locks.InterProcessMutex
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.utils.ZKPaths
+import org.apache.griffin.measure.context.streaming.lock.ZKCacheLock
+import org.apache.zookeeper.CreateMode
+
+import scala.collection.JavaConverters._
+
+/**
+  * leverage zookeeper for info cache
+  * @param config
+  * @param metricName
+  */
+case class ZKInfoCache(config: Map[String, Any], metricName: String) extends 
InfoCache {
+
+  val Hosts = "hosts"
+  val Namespace = "namespace"
+  val Mode = "mode"
+  val InitClear = "init.clear"
+  val CloseClear = "close.clear"
+  val LockPath = "lock.path"
+
+  val PersistRegex = """^(?i)persist$""".r
+  val EphemeralRegex = """^(?i)ephemeral$""".r
+
+  final val separator = ZKPaths.PATH_SEPARATOR
+
+  val hosts = config.getOrElse(Hosts, "").toString
+  val namespace = config.getOrElse(Namespace, "").toString
+  val mode: CreateMode = config.get(Mode) match {
+    case Some(s: String) => s match {
+      case PersistRegex() => CreateMode.PERSISTENT
+      case EphemeralRegex() => CreateMode.EPHEMERAL
+      case _ => CreateMode.PERSISTENT
+    }
+    case _ => CreateMode.PERSISTENT
+  }
+  val initClear = config.get(InitClear) match {
+    case Some(b: Boolean) => b
+    case _ => true
+  }
+  val closeClear = config.get(CloseClear) match {
+    case Some(b: Boolean) => b
+    case _ => false
+  }
+  val lockPath = config.getOrElse(LockPath, "lock").toString
+
+  private val cacheNamespace: String = if (namespace.isEmpty) metricName else 
namespace + separator + metricName
+  private val builder = CuratorFrameworkFactory.builder()
+    .connectString(hosts)
+    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+    .namespace(cacheNamespace)
+  private val client: CuratorFramework = builder.build
+
+  def init(): Unit = {
+    client.start()
+    info("start zk info cache")
+    client.usingNamespace(cacheNamespace)
+    info(s"init with namespace: ${cacheNamespace}")
+    deleteInfo(lockPath :: Nil)
+    if (initClear) {
+      clearInfo
+    }
+  }
+
+  def available(): Boolean = {
+    client.getState match {
+      case CuratorFrameworkState.STARTED => true
+      case _ => false
+    }
+  }
+
+  def close(): Unit = {
+    if (closeClear) {
+      clearInfo
+    }
+    info("close zk info cache")
+    client.close()
+  }
+
+  def cacheInfo(info: Map[String, String]): Boolean = {
+    info.foldLeft(true) { (rs, pair) =>
+      val (k, v) = pair
+      createOrUpdate(path(k), v) && rs
+    }
+  }
+
+  def readInfo(keys: Iterable[String]): Map[String, String] = {
+    keys.flatMap { key =>
+      read(path(key)) match {
+        case Some(v) => Some((key, v))
+        case _ => None
+      }
+    }.toMap
+  }
+
+  def deleteInfo(keys: Iterable[String]): Unit = {
+    keys.foreach { key => delete(path(key)) }
+  }
+
+  def clearInfo(): Unit = {
+//    delete("/")
+    deleteInfo(TimeInfoCache.finalCacheInfoPath :: Nil)
+    deleteInfo(TimeInfoCache.infoPath :: Nil)
+    info("clear info")
+  }
+
+  def listKeys(p: String): List[String] = {
+    children(path(p))
+  }
+
+  def genLock(s: String): ZKCacheLock = {
+    val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s
+    ZKCacheLock(new InterProcessMutex(client, lpt))
+  }
+
+  private def path(k: String): String = {
+    if (k.startsWith(separator)) k else separator + k
+  }
+
+  private def children(path: String): List[String] = {
+    try {
+      client.getChildren().forPath(path).asScala.toList
+    } catch {
+      case e: Throwable => {
+        warn(s"list ${path} warn: ${e.getMessage}")
+        Nil
+      }
+    }
+  }
+
+  private def createOrUpdate(path: String, content: String): Boolean = {
+    if (checkExists(path)) {
+      update(path, content)
+    } else {
+      create(path, content)
+    }
+  }
+
+  private def create(path: String, content: String): Boolean = {
+    try {
+      client.create().creatingParentsIfNeeded().withMode(mode)
+        .forPath(path, content.getBytes("utf-8"))
+      true
+    } catch {
+      case e: Throwable => {
+        error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+  private def update(path: String, content: String): Boolean = {
+    try {
+      client.setData().forPath(path, content.getBytes("utf-8"))
+      true
+    } catch {
+      case e: Throwable => {
+        error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+  private def read(path: String): Option[String] = {
+    try {
+      Some(new String(client.getData().forPath(path), "utf-8"))
+    } catch {
+      case e: Throwable => {
+        warn(s"read ${path} warn: ${e.getMessage}")
+        None
+      }
+    }
+  }
+
+  private def delete(path: String): Unit = {
+    try {
+      client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path)
+    } catch {
+      case e: Throwable => error(s"delete ${path} error: ${e.getMessage}")
+    }
+  }
+
+  private def checkExists(path: String): Boolean = {
+    try {
+      client.checkExists().forPath(path) != null
+    } catch {
+      case e: Throwable => {
+        warn(s"check exists ${path} warn: ${e.getMessage}")
+        false
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala
new file mode 100644
index 0000000..a2cfad9
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLock.scala
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.lock
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.Loggable
+
+/**
+  * lock for info cache
+  */
+trait CacheLock extends Loggable with Serializable {
+
+  def lock(outtime: Long, unit: TimeUnit): Boolean
+
+  def unlock(): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala
new file mode 100644
index 0000000..6eea0b6
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.lock
+
+import java.util.concurrent.TimeUnit
+
+case class MultiCacheLock(cacheLocks: List[CacheLock]) extends CacheLock {
+
+  def lock(outtime: Long, unit: TimeUnit): Boolean = {
+    cacheLocks.headOption match {
+      case Some(cl) => cl.lock(outtime, unit)
+      case None => true
+    }
+  }
+
+  def unlock(): Unit = {
+    cacheLocks.headOption match {
+      case Some(cl) => cl.unlock
+      case None => {}
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala
new file mode 100644
index 0000000..2c9b717
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala
@@ -0,0 +1,53 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.lock
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex
+
+case class ZKCacheLock(@transient mutex: InterProcessMutex) extends CacheLock {
+
+  def lock(outtime: Long, unit: TimeUnit): Boolean = {
+    try {
+      if (outtime >= 0) {
+        mutex.acquire(outtime, unit)
+      } else {
+        mutex.acquire(-1, null)
+      }
+    } catch {
+      case e: Throwable => {
+        error(s"lock error: ${e.getMessage}")
+        false
+      }
+    }
+
+  }
+
+  def unlock(): Unit = {
+    try {
+      if (mutex.isAcquiredInThisProcess) mutex.release
+    } catch {
+      case e: Throwable => {
+        error(s"unlock error: ${e.getMessage}")
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
new file mode 100644
index 0000000..e69716e
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.metric
+
+/**
+  * accuracy metric
+  * @param miss     miss count
+  * @param total    total count
+  */
+case class AccuracyMetric(miss: Long, total: Long) extends Metric {
+
+  type T = AccuracyMetric
+
+  override def isLegal(): Boolean = getTotal > 0
+
+  def update(delta: T): T = {
+    if (delta.miss < miss) AccuracyMetric(delta.miss, total) else this
+  }
+
+  def initial(): Boolean = {
+    getMatch <= 0
+  }
+
+  def eventual(): Boolean = {
+    this.miss <= 0
+  }
+
+  def differsFrom(other: T): Boolean = {
+    (this.miss != other.miss) || (this.total != other.total)
+  }
+
+  def getMiss = miss
+  def getTotal = total
+  def getMatch = total - miss
+
+  def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / 
getTotal * 100
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
new file mode 100644
index 0000000..cc8e772
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala
@@ -0,0 +1,78 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.metric
+
+import org.apache.griffin.measure.Loggable
+
+import scala.collection.mutable.{Map => MutableMap}
+
+/**
+  * in streaming mode, some metrics may update,
+  * the old metrics are cached here
+  */
+object CacheResults extends Loggable {
+
+  case class CacheResult(timeStamp: Long, updateTime: Long, result: Metric) {
+    def olderThan(ut: Long): Boolean = updateTime < ut
+    def update(ut: Long, r: Metric): Option[Metric] = {
+      r match {
+        case m: result.T if (olderThan(ut)) => {
+          val ur = result.update(m)
+          if (result.differsFrom(ur)) Some(ur) else None
+        }
+        case _ => None
+      }
+    }
+  }
+
+  private val cacheGroup: MutableMap[Long, CacheResult] = MutableMap()
+
+  private def update(r: CacheResult): Unit = {
+    cacheGroup += (r.timeStamp -> r)
+  }
+
+  /**
+    * input new metric results, output the updated metric results.
+    */
+  def update(cacheResults: Iterable[CacheResult]): Iterable[CacheResult] = {
+    val updatedCacheResults = cacheResults.flatMap { cacheResult =>
+      val CacheResult(t, ut, r) = cacheResult
+      (cacheGroup.get(t) match {
+        case Some(cr) => cr.update(ut, r)
+        case _ => Some(r)
+      }).map(m => CacheResult(t, ut, m))
+    }
+    updatedCacheResults.foreach(r => update(r))
+    updatedCacheResults
+  }
+
+  /**
+    * clean the out-time cached results, to avoid memory leak
+    */
+  def refresh(overtime: Long): Unit = {
+    val curCacheGroup = cacheGroup.toMap
+    val deadCache = curCacheGroup.filter { pr =>
+      val (_, cr) = pr
+      cr.timeStamp < overtime || cr.result.eventual()
+    }
+    info(s"=== dead cache group count: ${deadCache.size} ===")
+    deadCache.keySet.foreach(cacheGroup -= _)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/Metric.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/Metric.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/Metric.scala
new file mode 100644
index 0000000..5d91ee3
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/Metric.scala
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.metric
+
+trait Metric extends Serializable {
+
+  type T <: Metric
+
+  def isLegal(): Boolean = true
+
+  def update(delta: T): T
+
+  def initial(): Boolean
+
+  def eventual(): Boolean
+
+  def differsFrom(other: T): Boolean
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala
new file mode 100644
index 0000000..8f424e6
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala
@@ -0,0 +1,190 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.writer
+
+import java.util.Date
+
+import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.spark.rdd.RDD
+
+/**
+  * persist metric and record to hdfs
+  */
+case class HdfsPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
+
+  val Path = "path"
+  val MaxPersistLines = "max.persist.lines"
+  val MaxLinesPerFile = "max.lines.per.file"
+
+  val path = config.getOrElse(Path, "").toString
+  val maxPersistLines = config.getInt(MaxPersistLines, -1)
+  val maxLinesPerFile = math.min(config.getInt(MaxLinesPerFile, 10000), 
1000000)
+
+  val StartFile = filePath("_START")
+  val FinishFile = filePath("_FINISH")
+  val MetricsFile = filePath("_METRICS")
+
+  val LogFile = filePath("_LOG")
+
+  val _MetricName = "metricName"
+  val _Timestamp = "timestamp"
+  val _Value = "value"
+
+  var _init = true
+
+  def available(): Boolean = {
+    path.nonEmpty
+  }
+
+  private def logHead: String = {
+    if (_init) {
+      _init = false
+      val dt = new Date(timeStamp)
+      s"================ log of ${dt} ================\n"
+    } else ""
+  }
+
+  private def timeHead(rt: Long): String = {
+    val dt = new Date(rt)
+    s"--- ${dt} ---\n"
+  }
+
+  private def logWrap(rt: Long, msg: String): String = {
+    logHead + timeHead(rt) + s"${msg}\n\n"
+  }
+
+  protected def filePath(file: String): String = {
+    HdfsUtil.getHdfsFilePath(path, s"${metricName}/${timeStamp}/${file}")
+  }
+
+  protected def withSuffix(path: String, suffix: String): String = {
+    s"${path}.${suffix}"
+  }
+
+  def start(msg: String): Unit = {
+    try {
+      HdfsUtil.writeContent(StartFile, msg)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+  def finish(): Unit = {
+    try {
+      HdfsUtil.createEmptyFile(FinishFile)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  def log(rt: Long, msg: String): Unit = {
+    try {
+      val logStr = logWrap(rt, msg)
+      HdfsUtil.appendContent(LogFile, logStr)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  private def getHdfsPath(path: String, groupId: Int): String = {
+    HdfsUtil.getHdfsFilePath(path, s"${groupId}")
+  }
+  private def getHdfsPath(path: String, ptnId: Int, groupId: Int): String = {
+    HdfsUtil.getHdfsFilePath(path, s"${ptnId}.${groupId}")
+  }
+
+  private def clearOldRecords(path: String): Unit = {
+    HdfsUtil.deleteHdfsPath(path)
+  }
+
+  def persistRecords(records: RDD[String], name: String): Unit = {
+    val path = filePath(name)
+    clearOldRecords(path)
+    try {
+      val recordCount = records.count
+      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
+      if (count > 0) {
+        val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
+        if (groupCount <= 1) {
+          val recs = records.take(count.toInt)
+          persistRecords2Hdfs(path, recs)
+        } else {
+          val groupedRecords: RDD[(Long, Iterable[String])] =
+            records.zipWithIndex.flatMap { r =>
+              val gid = r._2 / maxLinesPerFile
+              if (gid < groupCount) Some((gid, r._1)) else None
+            }.groupByKey()
+          groupedRecords.foreach { group =>
+            val (gid, recs) = group
+            val hdfsPath = if (gid == 0) path else withSuffix(path, 
gid.toString)
+            persistRecords2Hdfs(hdfsPath, recs)
+          }
+        }
+      }
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  def persistRecords(records: Iterable[String], name: String): Unit = {
+    val path = filePath(name)
+    clearOldRecords(path)
+    try {
+      val recordCount = records.size
+      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
+      if (count > 0) {
+        val groupCount = (count - 1) / maxLinesPerFile + 1
+        if (groupCount <= 1) {
+          val recs = records.take(count.toInt)
+          persistRecords2Hdfs(path, recs)
+        } else {
+          val groupedRecords = records.grouped(maxLinesPerFile).zipWithIndex
+          groupedRecords.take(groupCount).foreach { group =>
+            val (recs, gid) = group
+            val hdfsPath = getHdfsPath(path, gid)
+            persistRecords2Hdfs(hdfsPath, recs)
+          }
+        }
+      }
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  def persistMetrics(metrics: Map[String, Any]): Unit = {
+    val head = Map[String, Any]((_MetricName -> metricName), (_Timestamp -> 
timeStamp))
+    val result = head + (_Value -> metrics)
+    try {
+      val json = JsonUtil.toJson(result)
+      persistRecords2Hdfs(MetricsFile, json :: Nil)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+  private def persistRecords2Hdfs(hdfsPath: String, records: 
Iterable[String]): Unit = {
+    try {
+      val recStr = records.mkString("\n")
+      HdfsUtil.writeContent(hdfsPath, recStr)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala
new file mode 100644
index 0000000..4c12652
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala
@@ -0,0 +1,76 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.writer
+
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.spark.rdd.RDD
+
+import scala.concurrent.Future
+
+/**
+  * persist metric and record through http request
+  */
+case class HttpPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
+
+  val Api = "api"
+  val Method = "method"
+
+  val api = config.getString(Api, "")
+  val method = config.getString(Method, "post")
+
+  val _Value = "value"
+
+  def available(): Boolean = {
+    api.nonEmpty
+  }
+
+  def start(msg: String): Unit = {}
+  def finish(): Unit = {}
+
+  private def httpResult(dataMap: Map[String, Any]) = {
+    try {
+      val data = JsonUtil.toJson(dataMap)
+      // http request
+      val params = Map[String, Object]()
+      val header = Map[String, Object](("Content-Type","application/json"))
+
+      def func(): (Long, Future[Boolean]) = {
+        import scala.concurrent.ExecutionContext.Implicits.global
+        (timeStamp, Future(HttpUtil.httpRequest(api, method, params, header, 
data)))
+      }
+      PersistThreadPool.addTask(func _, 10)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+
+  }
+
+  def log(rt: Long, msg: String): Unit = {}
+
+  def persistRecords(records: RDD[String], name: String): Unit = {}
+  def persistRecords(records: Iterable[String], name: String): Unit = {}
+
+  def persistMetrics(metrics: Map[String, Any]): Unit = {
+    val head = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp))
+    val result = head + (_Value -> metrics)
+    httpResult(result)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala
new file mode 100644
index 0000000..3063faf
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.writer
+
+import org.apache.griffin.measure.utils.JsonUtil
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.spark.rdd.RDD
+
+/**
+  * persist metric and record to console, for debug
+  */
+case class LoggerPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
+
+  val MaxLogLines = "max.log.lines"
+
+  val maxLogLines = config.getInt(MaxLogLines, 100)
+
+  def available(): Boolean = true
+
+  def start(msg: String): Unit = {
+    println(s"[${timeStamp}] ${metricName} start: ${msg}")
+  }
+  def finish(): Unit = {
+    println(s"[${timeStamp}] ${metricName} finish")
+  }
+
+  def log(rt: Long, msg: String): Unit = {
+    println(s"[${timeStamp}] ${rt}: ${msg}")
+  }
+
+  def persistRecords(records: RDD[String], name: String): Unit = {
+//    println(s"${metricName} [${timeStamp}] records: ")
+//    try {
+//      val recordCount = records.count
+//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
+//      val maxCount = count.toInt
+//      if (maxCount > 0) {
+//        val recordsArray = records.take(maxCount)
+//        recordsArray.foreach(println)
+//      }
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
+  }
+
+  def persistRecords(records: Iterable[String], name: String): Unit = {
+//    println(s"${metricName} [${timeStamp}] records: ")
+//    try {
+//      val recordCount = records.size
+//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
+//      if (count > 0) {
+//        records.foreach(println)
+//      }
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
+  }
+
+  def persistMetrics(metrics: Map[String, Any]): Unit = {
+    println(s"${metricName} [${timeStamp}] metrics: ")
+    val json = JsonUtil.toJson(metrics)
+    println(json)
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala
new file mode 100644
index 0000000..3cfcf04
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.writer
+
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.spark.rdd.RDD
+import org.mongodb.scala._
+import org.mongodb.scala.model.{Filters, UpdateOptions, Updates}
+import org.mongodb.scala.result.UpdateResult
+
+import scala.concurrent.Future
+
+/**
+  * persist metric and record to mongo
+  */
+case class MongoPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
+
+  MongoConnection.init(config)
+
+  val _MetricName = "metricName"
+  val _Timestamp = "timestamp"
+  val _Value = "value"
+
+  def available(): Boolean = MongoConnection.dataConf.available
+
+  def start(msg: String): Unit = {}
+  def finish(): Unit = {}
+
+  def log(rt: Long, msg: String): Unit = {}
+
+  def persistRecords(records: RDD[String], name: String): Unit = {}
+  def persistRecords(records: Iterable[String], name: String): Unit = {}
+
+  def persistMetrics(metrics: Map[String, Any]): Unit = {
+    mongoInsert(metrics)
+  }
+
+  private val filter = Filters.and(
+    Filters.eq(_MetricName, metricName),
+    Filters.eq(_Timestamp, timeStamp)
+  )
+
+  private def mongoInsert(dataMap: Map[String, Any]): Unit = {
+    try {
+      val update = Updates.set(_Value, dataMap)
+      def func(): (Long, Future[UpdateResult]) = {
+        (timeStamp, MongoConnection.getDataCollection.updateOne(
+          filter, update, UpdateOptions().upsert(true)).toFuture)
+      }
+      PersistThreadPool.addTask(func _, 10)
+    } catch {
+      case e: Throwable => error(e.getMessage)
+    }
+  }
+
+}
+
+object MongoConnection {
+
+  case class MongoConf(url: String, database: String, collection: String) {
+    def available: Boolean = url.nonEmpty && database.nonEmpty && 
collection.nonEmpty
+  }
+
+  val _MongoHead = "mongodb://"
+
+  val Url = "url"
+  val Database = "database"
+  val Collection = "collection"
+
+  private var initialed = false
+
+  var dataConf: MongoConf = _
+  private var dataCollection: MongoCollection[Document] = _
+
+  def getDataCollection = dataCollection
+
+  def init(config: Map[String, Any]): Unit = {
+    if (!initialed) {
+      dataConf = mongoConf(config)
+      dataCollection = mongoCollection(dataConf)
+      initialed = true
+    }
+  }
+
+  private def mongoConf(cfg: Map[String, Any]): MongoConf = {
+    val url = cfg.getString(Url, "").trim
+    val mongoUrl = if (url.startsWith(_MongoHead)) url else {
+      _MongoHead + url
+    }
+    MongoConf(
+      mongoUrl,
+      cfg.getString(Database, ""),
+      cfg.getString(Collection, "")
+    )
+  }
+  private def mongoCollection(mongoConf: MongoConf): MongoCollection[Document] 
= {
+    val mongoClient: MongoClient = MongoClient(mongoConf.url)
+    val database: MongoDatabase = mongoClient.getDatabase(mongoConf.database)
+    database.getCollection(mongoConf.collection)
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala
new file mode 100644
index 0000000..ea9133a
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.writer
+
+import org.apache.spark.rdd.RDD
+
+/**
+  * persist metric and record in multiple ways
+  */
+case class MultiPersists(persists: Iterable[Persist]) extends Persist {
+
+  val metricName: String = persists match {
+    case Nil => ""
+    case _ => persists.head.metricName
+  }
+
+  val timeStamp: Long = persists match {
+    case Nil => 0
+    case _ => persists.head.timeStamp
+  }
+
+  val config: Map[String, Any] = Map[String, Any]()
+
+  def available(): Boolean = { persists.exists(_.available()) }
+
+  def start(msg: String): Unit = { persists.foreach(_.start(msg)) }
+  def finish(): Unit = { persists.foreach(_.finish()) }
+
+  def log(rt: Long, msg: String): Unit = {
+    persists.foreach { persist =>
+      try {
+        persist.log(rt, msg)
+      } catch {
+        case e: Throwable => error(s"log error: ${e.getMessage}")
+      }
+    }
+  }
+
+  def persistRecords(records: RDD[String], name: String): Unit = {
+    persists.foreach { persist =>
+      try {
+        persist.persistRecords(records, name)
+      } catch {
+        case e: Throwable => error(s"persist records error: ${e.getMessage}")
+      }
+    }
+  }
+  def persistRecords(records: Iterable[String], name: String): Unit = {
+    persists.foreach { persist =>
+      try {
+        persist.persistRecords(records, name)
+      } catch {
+        case e: Throwable => error(s"persist records error: ${e.getMessage}")
+      }
+    }
+  }
+  def persistMetrics(metrics: Map[String, Any]): Unit = {
+    persists.foreach { persist =>
+      try {
+        persist.persistMetrics(metrics)
+      } catch {
+        case e: Throwable => error(s"persist metrics error: ${e.getMessage}")
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala
new file mode 100644
index 0000000..28eeb64
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.writer
+
+import org.apache.griffin.measure.Loggable
+import org.apache.spark.rdd.RDD
+
+/**
+  * persist metric and record
+  */
+trait Persist extends Loggable with Serializable {
+  val metricName: String
+  val timeStamp: Long
+
+  val config: Map[String, Any]
+
+  def available(): Boolean
+
+  def start(msg: String): Unit
+  def finish(): Unit
+
+  def log(rt: Long, msg: String): Unit
+
+  def persistRecords(records: RDD[String], name: String): Unit
+  def persistRecords(records: Iterable[String], name: String): Unit
+
+  def persistMetrics(metrics: Map[String, Any]): Unit
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
new file mode 100644
index 0000000..12b5f0b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala
@@ -0,0 +1,55 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.writer
+
+import org.apache.griffin.measure.configuration.params.PersistParam
+
+import scala.util.{Success, Try}
+
+
+case class PersistFactory(persistParams: Iterable[PersistParam], metricName: 
String) extends Serializable {
+
+  val HDFS_REGEX = """^(?i)hdfs$""".r
+  val HTTP_REGEX = """^(?i)http$""".r
+  val LOG_REGEX = """^(?i)log$""".r
+  val MONGO_REGEX = """^(?i)mongo$""".r
+
+  def getPersists(timeStamp: Long): MultiPersists = {
+    MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param)))
+  }
+
+  /**
+    * create persist by param
+    */
+  private def getPersist(timeStamp: Long, persistParam: PersistParam): 
Option[Persist] = {
+    val config = persistParam.config
+    val persistTry = persistParam.persistType match {
+      case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp))
+      case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp))
+      case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp))
+      case MONGO_REGEX() => Try(MongoPersist(config, metricName, timeStamp))
+      case _ => throw new Exception("not supported persist type")
+    }
+    persistTry match {
+      case Success(persist) if (persist.available) => Some(persist)
+      case _ => None
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala
new file mode 100644
index 0000000..221fcad
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.writer
+
+import java.util.Date
+import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
+
+import org.apache.griffin.measure.Loggable
+
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
+/**
+  * persist thread pool, to persist metrics in parallel mode
+  */
+object PersistThreadPool extends Loggable {
+
+  import scala.concurrent.ExecutionContext.Implicits.global
+
+  private val pool: ThreadPoolExecutor = 
Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
+  val MAX_RETRY = 100
+
+  def start(): Unit = {
+  }
+
+  def shutdown(): Unit = {
+    pool.shutdown()
+    pool.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  def addTask(func: () => (Long, Future[_]), retry: Int): Unit = {
+    val r = if (retry < 0) MAX_RETRY else retry
+    info(s"add task, current task num: ${pool.getQueue.size}")
+    pool.submit(Task(func, r))
+  }
+
+  case class Task(func: () => (Long, Future[_]), retry: Int) extends Runnable 
with Loggable {
+
+    override def run(): Unit = {
+      val st = new Date().getTime
+      val (t, res) = func()
+      res.onComplete {
+        case Success(value) => {
+          val et = new Date().getTime
+          info(s"task ${t} success [ using time ${et - st} ms ]")
+        }
+        case Failure(e) => {
+          val et = new Date().getTime
+          warn(s"task ${t} fails [ using time ${et - st} ms ] : 
${e.getMessage}")
+          if (retry > 0) {
+            info(s"task ${t} retry [ rest retry count: ${retry - 1} ]")
+            pool.submit(Task(func, retry - 1))
+          } else {
+            fail(s"task ${t} retry ends but fails")
+          }
+        }
+      }
+    }
+
+    def fail(msg: String): Unit = {
+      error(s"task fails: ${msg}")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
deleted file mode 100644
index 79d4250..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.connector
-
-import java.util.concurrent.atomic.AtomicLong
-
-import org.apache.griffin.measure.cache.tmst.TmstCache
-import org.apache.griffin.measure.config.params.user.DataConnectorParam
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.process.{BatchDqProcess, BatchProcessType}
-import org.apache.griffin.measure.process.engine._
-import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters, TimeRange}
-import org.apache.griffin.measure.rule.adaptor.{InternalColumns, PreProcPhase, 
RuleAdaptorGroup, RunPhase}
-import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.plan._
-import org.apache.griffin.measure.rule.preproc.PreProcRuleGenerator
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.functions._
-import org.apache.spark.sql.{DataFrame, SQLContext}
-
-
-trait DataConnector extends Loggable with Serializable {
-
-//  def available(): Boolean
-
-  var tmstCache: TmstCache = _
-  protected def saveTmst(t: Long) = tmstCache.insert(t)
-  protected def readTmst(t: Long) = tmstCache.range(t, t + 1)
-
-  def init(): Unit
-
-  def data(ms: Long): (Option[DataFrame], TimeRange)
-
-  val dqEngines: DqEngines
-
-  val dcParam: DataConnectorParam
-
-  val sqlContext: SQLContext
-
-  val id: String = DataConnectorIdGenerator.genId
-
-  protected def suffix(ms: Long): String = s"${id}_${ms}"
-  protected def thisName(ms: Long): String = s"this_${suffix(ms)}"
-
-  final val tmstColName = InternalColumns.tmst
-
-  def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = {
-    val timeInfo = CalcTimeInfo(ms, id)
-    val thisTable = thisName(ms)
-
-    try {
-      saveTmst(ms)    // save tmst
-
-      dfOpt.flatMap { df =>
-        val preProcRules = 
PreProcRuleGenerator.genPreProcRules(dcParam.preProc, suffix(ms))
-
-        // init data
-        TableRegisters.registerRunTempTable(df, timeInfo.key, thisTable)
-
-//        val dsTmsts = Map[String, Set[Long]]((thisTable -> Set[Long](ms)))
-//        val tmsts = Seq[Long](ms)
-        val dsTimeRanges = Map[String, TimeRange]((thisTable -> TimeRange(ms)))
-
-        // generate rule steps
-        val rulePlan = RuleAdaptorGroup.genRulePlan(
-          timeInfo, preProcRules, SparkSqlType, BatchProcessType, dsTimeRanges)
-
-        // run rules
-        dqEngines.runRuleSteps(timeInfo, rulePlan.ruleSteps)
-
-        // out data
-        val outDf = sqlContext.table(s"`${thisTable}`")
-
-//        names.foreach { name =>
-//          try {
-//            TempTables.unregisterTempTable(sqlContext, ms, name)
-//          } catch {
-//            case e: Throwable => warn(s"drop temp table ${name} fails")
-//          }
-//        }
-
-//        val range = if (id == "dc1") (0 until 20).toList else (0 until 
1).toList
-//        val withTmstDfs = range.map { i =>
-//          saveTmst(ms + i)
-//          outDf.withColumn(tmstColName, lit(ms + i)).limit(49 - i)
-//        }
-//        Some(withTmstDfs.reduce(_ unionAll _))
-
-        // add tmst column
-        val withTmstDf = outDf.withColumn(tmstColName, lit(ms))
-
-        // tmst cache
-//        saveTmst(ms)
-
-        // drop temp tables
-        cleanData(timeInfo)
-
-        Some(withTmstDf)
-      }
-    } catch {
-      case e: Throwable => {
-        error(s"pre-process of data connector [${id}] error: ${e.getMessage}")
-        None
-      }
-    }
-
-  }
-
-  private def cleanData(timeInfo: TimeInfo): Unit = {
-    TableRegisters.unregisterRunTempTables(sqlContext, timeInfo.key)
-    TableRegisters.unregisterCompileTempTables(timeInfo.key)
-
-    DataFrameCaches.uncacheDataFrames(timeInfo.key)
-    DataFrameCaches.clearTrashDataFrames(timeInfo.key)
-  }
-
-}
-
-object DataConnectorIdGenerator {
-  private val counter: AtomicLong = new AtomicLong(0L)
-  private val head: String = "dc"
-
-  def genId: String = {
-    s"${head}${increment}"
-  }
-
-  private def increment: Long = {
-    counter.incrementAndGet()
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
deleted file mode 100644
index 27b390a..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.connector
-
-import kafka.serializer.StringDecoder
-import org.apache.griffin.measure.config.params.user._
-import 
org.apache.griffin.measure.data.connector.streaming.{KafkaStreamingDataConnector,
 KafkaStreamingStringDataConnector, StreamingDataConnector}
-import org.apache.griffin.measure.process.engine.{DqEngine, DqEngines}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-
-import scala.util.Success
-//import org.apache.griffin.measure.data.connector.cache._
-import org.apache.griffin.measure.data.connector.batch._
-//import org.apache.griffin.measure.data.connector.streaming._
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream.InputDStream
-import org.apache.spark.streaming.kafka.KafkaUtils
-
-import scala.reflect.ClassTag
-import scala.util.Try
-
-object DataConnectorFactory {
-
-  val HiveRegex = """^(?i)hive$""".r
-  val AvroRegex = """^(?i)avro$""".r
-  val TextDirRegex = """^(?i)text-dir$""".r
-
-  val KafkaRegex = """^(?i)kafka$""".r
-
-  val TextRegex = """^(?i)text$""".r
-
-  def getDataConnector(sqlContext: SQLContext,
-                       @transient ssc: StreamingContext,
-                       dqEngines: DqEngines,
-                       dataConnectorParam: DataConnectorParam
-                      ): Try[DataConnector] = {
-    val conType = dataConnectorParam.conType
-    val version = dataConnectorParam.version
-    val config = dataConnectorParam.config
-    Try {
-      conType match {
-        case HiveRegex() => HiveBatchDataConnector(sqlContext, dqEngines, 
dataConnectorParam)
-        case AvroRegex() => AvroBatchDataConnector(sqlContext, dqEngines, 
dataConnectorParam)
-        case TextDirRegex() => TextDirBatchDataConnector(sqlContext, 
dqEngines, dataConnectorParam)
-        case KafkaRegex() => {
-          getStreamingDataConnector(sqlContext, ssc, dqEngines, 
dataConnectorParam)
-        }
-        case _ => throw new Exception("connector creation error!")
-      }
-    }
-  }
-
-  private def getStreamingDataConnector(sqlContext: SQLContext,
-                                        @transient ssc: StreamingContext,
-                                        dqEngines: DqEngines,
-                                        dataConnectorParam: DataConnectorParam
-                                       ): StreamingDataConnector = {
-    if (ssc == null) throw new Exception("streaming context is null!")
-    val conType = dataConnectorParam.conType
-    val version = dataConnectorParam.version
-    conType match {
-      case KafkaRegex() => genKafkaDataConnector(sqlContext, ssc, dqEngines, 
dataConnectorParam)
-      case _ => throw new Exception("streaming connector creation error!")
-    }
-  }
-//
-//  private def getCacheDataConnector(sqlContext: SQLContext,
-//                                    dataCacheParam: DataCacheParam
-//                                   ): Try[CacheDataConnector] = {
-//    if (dataCacheParam == null) {
-//      throw new Exception("invalid data cache param!")
-//    }
-//    val cacheType = dataCacheParam.cacheType
-//    Try {
-//      cacheType match {
-//        case HiveRegex() => HiveCacheDataConnector(sqlContext, 
dataCacheParam)
-//        case TextRegex() => TextCacheDataConnector(sqlContext, 
dataCacheParam)
-//        case _ => throw new Exception("cache connector creation error!")
-//      }
-//    }
-//  }
-//
-  private def genKafkaDataConnector(sqlContext: SQLContext,
-                                    @transient ssc: StreamingContext,
-                                    dqEngines: DqEngines,
-                                    dataConnectorParam: DataConnectorParam
-                                   ) = {
-    val config = dataConnectorParam.config
-    val KeyType = "key.type"
-    val ValueType = "value.type"
-    val keyType = config.getOrElse(KeyType, "java.lang.String").toString
-    val valueType = config.getOrElse(ValueType, "java.lang.String").toString
-    (getClassTag(keyType), getClassTag(valueType)) match {
-      case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
-        KafkaStreamingStringDataConnector(sqlContext, ssc, dqEngines, 
dataConnectorParam)
-      }
-      case _ => {
-        throw new Exception("not supported type kafka data connector")
-      }
-    }
-  }
-
-  private def getClassTag(tp: String): ClassTag[_] = {
-    try {
-      val clazz = Class.forName(tp)
-      ClassTag(clazz)
-    } catch {
-      case e: Throwable => throw e
-    }
-  }
-
-  def filterBatchDataConnectors(connectors: Seq[DataConnector]): 
Seq[BatchDataConnector] = {
-    connectors.flatMap { dc =>
-      dc match {
-        case mdc: BatchDataConnector => Some(mdc)
-        case _ => None
-      }
-    }
-  }
-  def filterStreamingDataConnectors(connectors: Seq[DataConnector]): 
Seq[StreamingDataConnector] = {
-    connectors.flatMap { dc =>
-      dc match {
-        case mdc: StreamingDataConnector => Some(mdc)
-        case _ => None
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
deleted file mode 100644
index 5a1c22c..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.connector.batch
-
-import org.apache.griffin.measure.config.params.user.DataConnectorParam
-import org.apache.griffin.measure.data.connector._
-import org.apache.griffin.measure.process.engine.DqEngines
-import org.apache.griffin.measure.process.temp.TimeRange
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.griffin.measure.utils.ParamUtil._
-
-import scala.util.Try
-
-// data connector for avro file
-case class AvroBatchDataConnector(sqlContext: SQLContext, dqEngines: 
DqEngines, dcParam: DataConnectorParam
-                                 ) extends BatchDataConnector {
-
-  val config = dcParam.config
-
-  val FilePath = "file.path"
-  val FileName = "file.name"
-
-  val filePath = config.getString(FilePath, "")
-  val fileName = config.getString(FileName, "")
-
-  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else 
fileName
-
-  private def pathPrefix(): Boolean = {
-    filePath.nonEmpty
-  }
-
-  private def fileExist(): Boolean = {
-    HdfsUtil.existPath(concreteFileFullPath)
-  }
-
-  def data(ms: Long): (Option[DataFrame], TimeRange) = {
-    val dfOpt = try {
-      val df = 
sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
-      val dfOpt = Some(df)
-      val preDfOpt = preProcess(dfOpt, ms)
-      preDfOpt
-    } catch {
-      case e: Throwable => {
-        error(s"load avro file ${concreteFileFullPath} fails")
-        None
-      }
-    }
-    val tmsts = readTmst(ms)
-    (dfOpt, TimeRange(ms, tmsts))
-  }
-
-//  def available(): Boolean = {
-//    (!concreteFileFullPath.isEmpty) && fileExist
-//  }
-
-//  def init(): Unit = {}
-
-//  def metaData(): Try[Iterable[(String, String)]] = {
-//    Try {
-//      val st = 
sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
-//      st.fields.map(f => (f.name, f.dataType.typeName))
-//    }
-//  }
-
-//  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
-//    Try {
-//      loadDataFile.flatMap { row =>
-//        // generate cache data
-//        val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), 
ruleExprs.cacheExprs, constFinalExprValueMap)
-//        val finalExprValueMaps = 
ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
-//
-//        // data info
-//        val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { 
info =>
-//          try {
-//            (info.key -> row.getAs[info.T](info.key))
-//          } catch {
-//            case e: Throwable => info.defWrap
-//          }
-//        }.toMap
-//
-//        finalExprValueMaps.flatMap { finalExprValueMap =>
-//          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { 
expr =>
-//            expr.calculate(finalExprValueMap) match {
-//              case Some(v) => Some(v.asInstanceOf[AnyRef])
-//              case _ => None
-//            }
-//          }
-//          val key = toTuple(groupbyData)
-//
-//          Some((key, (finalExprValueMap, dataInfoMap)))
-//        }
-//      }
-//    }
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala
deleted file mode 100644
index 4d138ab..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.connector.batch
-
-import org.apache.griffin.measure.data.connector._
-//import org.apache.griffin.measure.data.connector.cache.DataUpdatable
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.types.StructType
-
-import scala.util.{Failure, Success, Try}
-
-
-trait BatchDataConnector extends DataConnector {
-
-//  def metaData(): Option[StructType]
-
-  def init(): Unit = {}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
deleted file mode 100644
index e57bd32..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.connector.batch
-
-import org.apache.griffin.measure.config.params.user.DataConnectorParam
-import org.apache.griffin.measure.data.connector._
-import org.apache.griffin.measure.process.engine.DqEngines
-import org.apache.griffin.measure.process.temp.TimeRange
-import org.apache.griffin.measure.result._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.{DataFrame, SQLContext}
-
-import scala.util.{Success, Try}
-import org.apache.griffin.measure.utils.ParamUtil._
-
-// data connector for hive
-case class HiveBatchDataConnector(sqlContext: SQLContext, dqEngines: 
DqEngines, dcParam: DataConnectorParam
-                                  ) extends BatchDataConnector {
-
-  val config = dcParam.config
-
-//  if (!sqlContext.isInstanceOf[HiveContext]) {
-//    throw new Exception("hive context not prepared!")
-//  }
-
-  val Database = "database"
-  val TableName = "table.name"
-  val Where = "where"
-
-  val database = config.getString(Database, "default")
-  val tableName = config.getString(TableName, "")
-  val whereString = config.getString(Where, "")
-
-  val concreteTableName = s"${database}.${tableName}"
-//  val partitions = partitionsString.split(";").map(s => 
s.split(",").map(_.trim))
-  val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
-//  val wheres: Array[Array[String]] = whereString.split(",").flatMap { s =>
-//    val arr = s.trim.split(",").flatMap { t =>
-//      t.trim match {
-//        case p if (p.nonEmpty) => Some(p)
-//        case _ => None
-//      }
-//    }
-//    if (arr.size > 0) Some(arr) else None
-//  }
-
-  def data(ms: Long): (Option[DataFrame], TimeRange) = {
-    val dfOpt = try {
-      val dtSql = dataSql
-      info(dtSql)
-      val df = sqlContext.sql(dtSql)
-      val dfOpt = Some(df)
-      val preDfOpt = preProcess(dfOpt, ms)
-      preDfOpt
-    } catch {
-      case e: Throwable => {
-        error(s"load hive table ${concreteTableName} fails: ${e.getMessage}")
-        None
-      }
-    }
-    val tmsts = readTmst(ms)
-    (dfOpt, TimeRange(ms, tmsts))
-  }
-
-//  def available(): Boolean = {
-//    (!tableName.isEmpty) && {
-//      Try {
-//        if (dbPrefix) {
-//          sqlContext.tables(database).filter(tableExistsSql).collect.size
-//        } else {
-//          sqlContext.tables().filter(tableExistsSql).collect.size
-//        }
-//      } match {
-//        case Success(s) => s > 0
-//        case _ => false
-//      }
-//    }
-//  }
-
-//  def init(): Unit = {}
-
-//  def metaData(): Try[Iterable[(String, String)]] = {
-//    Try {
-//      val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), 
r.getString(1))).collect
-//      val partitionPos: Int = originRows.indexWhere(pair => 
pair._1.startsWith("# "))
-//      if (partitionPos < 0) originRows
-//      else originRows.take(partitionPos)
-//    }
-//  }
-
-//  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
-//    Try {
-//      sqlContext.sql(dataSql).flatMap { row =>
-//        // generate cache data
-//        val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), 
ruleExprs.cacheExprs, constFinalExprValueMap)
-//        val finalExprValueMaps = 
ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
-//
-//        // data info
-//        val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { 
info =>
-//          try {
-//            (info.key -> row.getAs[info.T](info.key))
-//          } catch {
-//            case e: Throwable => info.defWrap
-//          }
-//        }.toMap
-//
-//        finalExprValueMaps.flatMap { finalExprValueMap =>
-//          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { 
expr =>
-//            expr.calculate(finalExprValueMap) match {
-//              case Some(v) => Some(v.asInstanceOf[AnyRef])
-//              case _ => None
-//            }
-//          }
-//          val key = toTuple(groupbyData)
-//
-//          Some((key, (finalExprValueMap, dataInfoMap)))
-//        }
-//      }
-//    }
-//  }
-
-  private def tableExistsSql(): String = {
-//    s"SHOW TABLES LIKE '${concreteTableName}'"    // this is hive sql, but 
not work for spark sql
-    s"tableName LIKE '${tableName}'"
-  }
-
-  private def metaDataSql(): String = {
-    s"DESCRIBE ${concreteTableName}"
-  }
-
-  private def dataSql(): String = {
-    val tableClause = s"SELECT * FROM ${concreteTableName}"
-    if (wheres.length > 0) {
-      val clauses = wheres.map { w =>
-        s"${tableClause} WHERE ${w}"
-      }
-      clauses.mkString(" UNION ALL ")
-    } else tableClause
-  }
-
-//  private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
-//    if (as.size > 0) {
-//      val tupleClass = Class.forName("scala.Tuple" + as.size)
-//      tupleClass.getConstructors.apply(0).newInstance(as: 
_*).asInstanceOf[Product]
-//    } else None
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
deleted file mode 100644
index 16e79ae..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/TextDirBatchDataConnector.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.connector.batch
-
-import org.apache.griffin.measure.config.params.user.DataConnectorParam
-import org.apache.griffin.measure.process.engine.DqEngines
-import org.apache.griffin.measure.process.temp.TimeRange
-import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.griffin.measure.utils.ParamUtil._
-
-// data connector for avro file
-case class TextDirBatchDataConnector(sqlContext: SQLContext, dqEngines: 
DqEngines, dcParam: DataConnectorParam
-                                    ) extends BatchDataConnector {
-
-  val config = dcParam.config
-
-  val DirPath = "dir.path"
-  val DataDirDepth = "data.dir.depth"
-  val SuccessFile = "success.file"
-  val DoneFile = "done.file"
-
-  val dirPath = config.getString(DirPath, "")
-  val dataDirDepth = config.getInt(DataDirDepth, 0)
-  val successFile = config.getString(SuccessFile, "_SUCCESS")
-  val doneFile = config.getString(DoneFile, "_DONE")
-
-  val ignoreFilePrefix = "_"
-
-  private def dirExist(): Boolean = {
-    HdfsUtil.existPath(dirPath)
-  }
-
-  def data(ms: Long): (Option[DataFrame], TimeRange) = {
-    val dfOpt = try {
-      val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable)
-      // touch done file for read dirs
-      dataDirs.foreach(dir => touchDone(dir))
-
-      val validDataDirs = dataDirs.filter(dir => !emptyDir(dir))
-
-      if (validDataDirs.nonEmpty) {
-        val df = sqlContext.read.text(validDataDirs:  _*)
-        val dfOpt = Some(df)
-        val preDfOpt = preProcess(dfOpt, ms)
-        preDfOpt
-      } else {
-        None
-      }
-    } catch {
-      case e: Throwable => {
-        error(s"load text dir ${dirPath} fails: ${e.getMessage}")
-        None
-      }
-    }
-    val tmsts = readTmst(ms)
-    (dfOpt, TimeRange(ms, tmsts))
-  }
-
-  private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) 
=> Boolean): Seq[String] = {
-    val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, 
"dir", true) }
-    if (depth <= 0) {
-      subDirs.filter(filteFunc)
-    } else {
-      listSubDirs(subDirs, depth - 1, filteFunc)
-    }
-  }
-
-  private def readable(dir: String): Boolean = isSuccess(dir) && !isDone(dir)
-  private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, 
doneFile)
-  private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, 
successFile)
-
-  private def touchDone(dir: String): Unit = 
HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
-
-  private def emptyDir(dir: String): Boolean = {
-    !HdfsUtil.listSubPathsByType(dir, 
"file").exists(!_.startsWith(ignoreFilePrefix))
-  }
-
-//  def available(): Boolean = {
-//    (!concreteFileFullPath.isEmpty) && fileExist
-//  }
-
-//  def init(): Unit = {}
-
-//  def metaData(): Try[Iterable[(String, String)]] = {
-//    Try {
-//      val st = 
sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
-//      st.fields.map(f => (f.name, f.dataType.typeName))
-//    }
-//  }
-
-//  def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
-//    Try {
-//      loadDataFile.flatMap { row =>
-//        // generate cache data
-//        val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), 
ruleExprs.cacheExprs, constFinalExprValueMap)
-//        val finalExprValueMaps = 
ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
-//
-//        // data info
-//        val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { 
info =>
-//          try {
-//            (info.key -> row.getAs[info.T](info.key))
-//          } catch {
-//            case e: Throwable => info.defWrap
-//          }
-//        }.toMap
-//
-//        finalExprValueMaps.flatMap { finalExprValueMap =>
-//          val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { 
expr =>
-//            expr.calculate(finalExprValueMap) match {
-//              case Some(v) => Some(v.asInstanceOf[AnyRef])
-//              case _ => None
-//            }
-//          }
-//          val key = toTuple(groupbyData)
-//
-//          Some((key, (finalExprValueMap, dataInfoMap)))
-//        }
-//      }
-//    }
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
deleted file mode 100644
index d46d6b7..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.connector.streaming
-
-import kafka.serializer.Decoder
-import org.apache.spark.streaming.dstream.InputDStream
-
-import scala.util.{Failure, Success, Try}
-import org.apache.griffin.measure.utils.ParamUtil._
-
-trait KafkaStreamingDataConnector extends StreamingDataConnector {
-
-  type KD <: Decoder[K]
-  type VD <: Decoder[V]
-  type OUT = (K, V)
-
-  val config = dcParam.config
-
-  val KafkaConfig = "kafka.config"
-  val Topics = "topics"
-
-  val kafkaConfig = config.getAnyRef(KafkaConfig, Map[String, String]())
-  val topics = config.getString(Topics, "")
-
-  def available(): Boolean = {
-    true
-  }
-
-  def init(): Unit = {
-    // register fan in
-    dataSourceCacheOpt.foreach(_.registerFanIn)
-
-    val ds = stream match {
-      case Success(dstream) => dstream
-      case Failure(ex) => throw ex
-    }
-    ds.foreachRDD((rdd, time) => {
-      val ms = time.milliseconds
-      val saveDfOpt = try {
-        // coalesce partition number
-        val prlCount = rdd.sparkContext.defaultParallelism
-        val ptnCount = rdd.getNumPartitions
-        val repartitionedRdd = if (prlCount < ptnCount) {
-          rdd.coalesce(prlCount)
-        } else rdd
-
-        val dfOpt = transform(repartitionedRdd)
-
-        preProcess(dfOpt, ms)
-      } catch {
-        case e: Throwable => {
-          error(s"streaming data connector error: ${e.getMessage}")
-          None
-        }
-      }
-
-      // save data frame
-      dataSourceCacheOpt.foreach(_.saveData(saveDfOpt, ms))
-    })
-  }
-
-  def stream(): Try[InputDStream[OUT]] = Try {
-    val topicSet = topics.split(",").toSet
-    createDStream(topicSet)
-  }
-
-  protected def createDStream(topicSet: Set[String]): InputDStream[OUT]
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala
deleted file mode 100644
index 038cb77..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.connector.streaming
-
-import kafka.serializer.StringDecoder
-import org.apache.griffin.measure.config.params.user.DataConnectorParam
-import org.apache.griffin.measure.process.engine.DqEngines
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream.InputDStream
-import org.apache.spark.streaming.kafka.KafkaUtils
-import org.apache.spark.sql.functions.lit
-
-case class KafkaStreamingStringDataConnector(sqlContext: SQLContext,
-                                             @transient ssc: StreamingContext,
-                                             dqEngines: DqEngines,
-                                             dcParam: DataConnectorParam
-                                            ) extends 
KafkaStreamingDataConnector {
-  type K = String
-  type KD = StringDecoder
-  type V = String
-  type VD = StringDecoder
-
-  val valueColName = "value"
-  val schema = StructType(Array(
-    StructField(valueColName, StringType)
-  ))
-
-  def createDStream(topicSet: Set[String]): InputDStream[OUT] = {
-    KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet)
-  }
-
-  def transform(rdd: RDD[OUT]): Option[DataFrame] = {
-    if (rdd.isEmpty) None else {
-      try {
-        val rowRdd = rdd.map(d => Row(d._2))
-        val df = sqlContext.createDataFrame(rowRdd, schema)
-        Some(df)
-      } catch {
-        case e: Throwable => {
-          error(s"streaming data transform fails")
-          None
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
deleted file mode 100644
index e52ca1b..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.data.connector.streaming
-
-import org.apache.griffin.measure.data.connector._
-import org.apache.griffin.measure.data.source.cache._
-import org.apache.griffin.measure.process.temp.TimeRange
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.streaming.dstream.InputDStream
-
-import scala.util.Try
-
-
-trait StreamingDataConnector extends DataConnector {
-
-  type K
-  type V
-  type OUT
-
-  protected def stream(): Try[InputDStream[OUT]]
-
-  def transform(rdd: RDD[OUT]): Option[DataFrame]
-
-  def data(ms: Long): (Option[DataFrame], TimeRange) = (None, 
TimeRange.emptyTimeRange)
-
-  var dataSourceCacheOpt: Option[DataSourceCache] = None
-
-}

Reply via email to