http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
index 6d5bac3..225ee41 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala
@@ -21,8 +21,10 @@ package org.apache.griffin.measure.persist
 import org.apache.griffin.measure.result._
 import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
 
 import scala.util.Try
+import org.apache.griffin.measure.utils.ParamUtil._
 
 // persist result by http way
 case class HttpPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
@@ -30,8 +32,10 @@ case class HttpPersist(config: Map[String, Any], metricName: 
String, timeStamp:
   val Api = "api"
   val Method = "method"
 
-  val api = config.getOrElse(Api, "").toString
-  val method = config.getOrElse(Method, "post").toString
+  val api = config.getString(Api, "")
+  val method = config.getString(Method, "post")
+
+  val _Value = "value"
 
   def available(): Boolean = {
     api.nonEmpty
@@ -40,21 +44,21 @@ case class HttpPersist(config: Map[String, Any], 
metricName: String, timeStamp:
   def start(msg: String): Unit = {}
   def finish(): Unit = {}
 
-  def result(rt: Long, result: Result): Unit = {
-    result match {
-      case ar: AccuracyResult => {
-        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> 
timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch))
-        httpResult(dataMap)
-      }
-      case pr: ProfileResult => {
-        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> 
timeStamp), ("total" -> pr.getTotal), ("matched" -> pr.getMatch))
-        httpResult(dataMap)
-      }
-      case _ => {
-        info(s"result: ${result}")
-      }
-    }
-  }
+//  def result(rt: Long, result: Result): Unit = {
+//    result match {
+//      case ar: AccuracyResult => {
+//        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> 
timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch))
+//        httpResult(dataMap)
+//      }
+//      case pr: ProfileResult => {
+//        val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> 
timeStamp), ("total" -> pr.getTotal), ("matched" -> pr.getMatch))
+//        httpResult(dataMap)
+//      }
+//      case _ => {
+//        info(s"result: ${result}")
+//      }
+//    }
+//  }
 
   private def httpResult(dataMap: Map[String, Any]) = {
     try {
@@ -77,12 +81,34 @@ case class HttpPersist(config: Map[String, Any], 
metricName: String, timeStamp:
 
   }
 
-  def records(recs: RDD[String], tp: String): Unit = {}
-  def records(recs: Iterable[String], tp: String): Unit = {}
+//  def records(recs: RDD[String], tp: String): Unit = {}
+//  def records(recs: Iterable[String], tp: String): Unit = {}
 
 //  def missRecords(records: RDD[String]): Unit = {}
 //  def matchRecords(records: RDD[String]): Unit = {}
 
   def log(rt: Long, msg: String): Unit = {}
 
+//  def persistRecords(df: DataFrame, name: String): Unit = {}
+  def persistRecords(records: Iterable[String], name: String): Unit = {}
+
+//  def persistMetrics(metrics: Seq[String], name: String): Unit = {
+//    val maps = metrics.flatMap { m =>
+//      try {
+//        Some(JsonUtil.toAnyMap(m) ++ Map[String, Any](("name" -> 
metricName), ("tmst" -> timeStamp)))
+//      } catch {
+//        case e: Throwable => None
+//      }
+//    }
+//    maps.foreach { map =>
+//      httpResult(map)
+//    }
+//  }
+
+  def persistMetrics(metrics: Map[String, Any]): Unit = {
+    val head = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp))
+    val result = head + (_Value -> metrics)
+    httpResult(result)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
index 00d41ea..0cd6f6b 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala
@@ -21,98 +21,151 @@ package org.apache.griffin.measure.persist
 import java.util.Date
 
 import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+import org.apache.griffin.measure.utils.ParamUtil._
 
 // persist result and data to hdfs
 case class LoggerPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
 
   val MaxLogLines = "max.log.lines"
 
-  val maxLogLines = try { config.getOrElse(MaxLogLines, 100).toString.toInt } 
catch { case _ => 100 }
+  val maxLogLines = config.getInt(MaxLogLines, 100)
 
   def available(): Boolean = true
 
   def start(msg: String): Unit = {
-    println(s"[${timeStamp}] ${metricName} start")
+    println(s"[${timeStamp}] ${metricName} start: ${msg}")
   }
   def finish(): Unit = {
     println(s"[${timeStamp}] ${metricName} finish")
   }
 
-  def result(rt: Long, result: Result): Unit = {
-    try {
-      val resStr = result match {
-        case ar: AccuracyResult => {
-          s"match percentage: ${ar.matchPercentage}\ntotal count: 
${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
-        }
-        case pr: ProfileResult => {
-          s"match percentage: ${pr.matchPercentage}\ntotal count: 
${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}"
-        }
-        case _ => {
-          s"result: ${result}"
-        }
-      }
-      println(s"[${timeStamp}] ${metricName} result: \n${resStr}")
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
+//  def result(rt: Long, result: Result): Unit = {
+//    try {
+//      val resStr = result match {
+//        case ar: AccuracyResult => {
+//          s"match percentage: ${ar.matchPercentage}\ntotal count: 
${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}"
+//        }
+//        case pr: ProfileResult => {
+//          s"match percentage: ${pr.matchPercentage}\ntotal count: 
${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}"
+//        }
+//        case _ => {
+//          s"result: ${result}"
+//        }
+//      }
+//      println(s"[${timeStamp}] ${metricName} result: \n${resStr}")
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
+//  }
+//
+//  // need to avoid string too long
+//  private def rddRecords(records: RDD[String]): Unit = {
+//    try {
+//      val recordCount = records.count.toInt
+//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
+//      if (count > 0) {
+//        val recordsArray = records.take(count)
+////        recordsArray.foreach(println)
+//      }
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
+//  }
+
+//  private def iterableRecords(records: Iterable[String]): Unit = {
+//    try {
+//      val recordCount = records.size
+//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
+//      if (count > 0) {
+//        val recordsArray = records.take(count)
+////        recordsArray.foreach(println)
+//      }
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
+//  }
+
+//  def records(recs: RDD[String], tp: String): Unit = {
+//    tp match {
+//      case PersistDataType.MISS => rddRecords(recs)
+//      case PersistDataType.MATCH => rddRecords(recs)
+//      case _ => {}
+//    }
+//  }
+//
+//  def records(recs: Iterable[String], tp: String): Unit = {
+//    tp match {
+//      case PersistDataType.MISS => iterableRecords(recs)
+//      case PersistDataType.MATCH => iterableRecords(recs)
+//      case _ => {}
+//    }
+//  }
+
+//  def missRecords(records: RDD[String]): Unit = {
+//    warn(s"[${timeStamp}] ${metricName} miss records: ")
+//    rddRecords(records)
+//  }
+//  def matchRecords(records: RDD[String]): Unit = {
+//    warn(s"[${timeStamp}] ${metricName} match records: ")
+//    rddRecords(records)
+//  }
+
+  def log(rt: Long, msg: String): Unit = {
+    println(s"[${timeStamp}] ${rt}: ${msg}")
   }
 
-  // need to avoid string too long
-  private def rddRecords(records: RDD[String]): Unit = {
-    try {
-      val recordCount = records.count.toInt
-      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
-      if (count > 0) {
-        val recordsArray = records.take(count)
+//  def persistRecords(df: DataFrame, name: String): Unit = {
+//    val records = df.toJSON
+//    println(s"${name} [${timeStamp}] records: ")
+//    try {
+//      val recordCount = records.count.toInt
+//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
+//      if (count > 0) {
+//        val recordsArray = records.take(count)
 //        recordsArray.foreach(println)
-      }
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-  }
+//      }
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
+//  }
 
-  private def iterableRecords(records: Iterable[String]): Unit = {
+  def persistRecords(records: Iterable[String], name: String): Unit = {
     try {
       val recordCount = records.size
       val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
       if (count > 0) {
-        val recordsArray = records.take(count)
-//        recordsArray.foreach(println)
+        records.foreach(println)
       }
     } catch {
       case e: Throwable => error(e.getMessage)
     }
   }
 
-  def records(recs: RDD[String], tp: String): Unit = {
-    tp match {
-      case PersistType.MISS => rddRecords(recs)
-      case PersistType.MATCH => rddRecords(recs)
-      case _ => {}
-    }
-  }
-
-  def records(recs: Iterable[String], tp: String): Unit = {
-    tp match {
-      case PersistType.MISS => iterableRecords(recs)
-      case PersistType.MATCH => iterableRecords(recs)
-      case _ => {}
-    }
-  }
-
-//  def missRecords(records: RDD[String]): Unit = {
-//    warn(s"[${timeStamp}] ${metricName} miss records: ")
-//    rddRecords(records)
-//  }
-//  def matchRecords(records: RDD[String]): Unit = {
-//    warn(s"[${timeStamp}] ${metricName} match records: ")
-//    rddRecords(records)
+//  def persistMetrics(metrics: Seq[String], name: String): Unit = {
+//    try {
+//      val recordCount = metrics.size
+//      val count = if (maxLogLines < 0) recordCount else 
scala.math.min(maxLogLines, recordCount)
+//      if (count > 0) {
+//        val recordsArray = metrics.take(count)
+//        recordsArray.foreach(println)
+//      }
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
 //  }
 
-  def log(rt: Long, msg: String): Unit = {
-    println(s"[${timeStamp}] ${rt}: ${msg}")
+  def persistMetrics(metrics: Map[String, Any]): Unit = {
+    println(s"${metricName} [${timeStamp}] metrics: ")
+    val json = JsonUtil.toJson(metrics)
+    println(json)
+//    metrics.foreach { metric =>
+//      val (key, value) = metric
+//      println(s"${key}: ${value}")
+//    }
   }
 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
index 25c8b0b..0b7c98c 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
@@ -21,6 +21,7 @@ package org.apache.griffin.measure.persist
 import org.apache.griffin.measure.result._
 import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
 
 import scala.util.Try
 
@@ -39,14 +40,19 @@ case class MultiPersists(persists: Iterable[Persist]) 
extends Persist {
   def start(msg: String): Unit = { persists.foreach(_.start(msg)) }
   def finish(): Unit = { persists.foreach(_.finish()) }
 
-  def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, 
result)) }
-
-  def records(recs: RDD[String], tp: String): Unit = { 
persists.foreach(_.records(recs, tp)) }
-  def records(recs: Iterable[String], tp: String): Unit = { 
persists.foreach(_.records(recs, tp)) }
+//  def result(rt: Long, result: Result): Unit = { 
persists.foreach(_.result(rt, result)) }
+//
+//  def records(recs: RDD[String], tp: String): Unit = { 
persists.foreach(_.records(recs, tp)) }
+//  def records(recs: Iterable[String], tp: String): Unit = { 
persists.foreach(_.records(recs, tp)) }
 
 //  def missRecords(records: RDD[String]): Unit = { 
persists.foreach(_.missRecords(records)) }
 //  def matchRecords(records: RDD[String]): Unit = { 
persists.foreach(_.matchRecords(records)) }
 
   def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) }
 
+//  def persistRecords(df: DataFrame, name: String): Unit = { 
persists.foreach(_.persistRecords(df, name)) }
+  def persistRecords(records: Iterable[String], name: String): Unit = { 
persists.foreach(_.persistRecords(records, name)) }
+//  def persistMetrics(metrics: Seq[String], name: String): Unit = { 
persists.foreach(_.persistMetrics(metrics, name)) }
+  def persistMetrics(metrics: Map[String, Any]): Unit = { 
persists.foreach(_.persistMetrics(metrics)) }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
index 357d6e1..84316b3 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala
@@ -1,87 +1,87 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.persist
-
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
-import org.apache.spark.rdd.RDD
-
-// persist result by old http way -- temporary way
-case class OldHttpPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
-
-  val Api = "api"
-  val Method = "method"
-
-  val api = config.getOrElse(Api, "").toString
-  val method = config.getOrElse(Method, "post").toString
-
-  def available(): Boolean = {
-    api.nonEmpty
-  }
-
-  def start(msg: String): Unit = {}
-  def finish(): Unit = {}
-
-  def result(rt: Long, result: Result): Unit = {
-    result match {
-      case ar: AccuracyResult => {
-        val matchPercentage: Double = if (ar.getTotal <= 0) 0 else 
(ar.getMatch * 1.0 / ar.getTotal) * 100
-        val dataMap = Map[String, Any](("metricName" -> metricName), 
("timestamp" -> timeStamp), ("value" -> matchPercentage), ("count" -> 
ar.getTotal))
-        httpResult(dataMap)
-      }
-      case pr: ProfileResult => {
-        val dataMap = Map[String, Any](("metricName" -> metricName), 
("timestamp" -> timeStamp), ("value" -> pr.getMatch), ("count" -> pr.getTotal))
-        httpResult(dataMap)
-      }
-      case _ => {
-        info(s"result: ${result}")
-      }
-    }
-  }
-
-  private def httpResult(dataMap: Map[String, Any]) = {
-    try {
-      val data = JsonUtil.toJson(dataMap)
-      // post
-      val params = Map[String, Object]()
-      val header = Map[String, Object](("content-type" -> "application/json"))
-
-      def func(): Boolean = {
-        HttpUtil.httpRequest(api, method, params, header, data)
-      }
-
-      PersistThreadPool.addTask(func _, 10)
-
-//      val status = HttpUtil.httpRequest(api, method, params, header, data)
-//      info(s"${method} to ${api} response status: ${status}")
-    } catch {
-      case e: Throwable => error(e.getMessage)
-    }
-
-  }
-
-  def records(recs: RDD[String], tp: String): Unit = {}
-  def records(recs: Iterable[String], tp: String): Unit = {}
-
-//  def missRecords(records: RDD[String]): Unit = {}
-//  def matchRecords(records: RDD[String]): Unit = {}
-
-  def log(rt: Long, msg: String): Unit = {}
-
-}
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.persist
+//
+//import org.apache.griffin.measure.result._
+//import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
+//import org.apache.spark.rdd.RDD
+//
+//// persist result by old http way -- temporary way
+//case class OldHttpPersist(config: Map[String, Any], metricName: String, 
timeStamp: Long) extends Persist {
+//
+//  val Api = "api"
+//  val Method = "method"
+//
+//  val api = config.getOrElse(Api, "").toString
+//  val method = config.getOrElse(Method, "post").toString
+//
+//  def available(): Boolean = {
+//    api.nonEmpty
+//  }
+//
+//  def start(msg: String): Unit = {}
+//  def finish(): Unit = {}
+//
+//  def result(rt: Long, result: Result): Unit = {
+//    result match {
+//      case ar: AccuracyResult => {
+//        val matchPercentage: Double = if (ar.getTotal <= 0) 0 else 
(ar.getMatch * 1.0 / ar.getTotal) * 100
+//        val dataMap = Map[String, Any](("metricName" -> metricName), 
("timestamp" -> timeStamp), ("value" -> matchPercentage), ("count" -> 
ar.getTotal))
+//        httpResult(dataMap)
+//      }
+//      case pr: ProfileResult => {
+//        val dataMap = Map[String, Any](("metricName" -> metricName), 
("timestamp" -> timeStamp), ("value" -> pr.getMatch), ("count" -> pr.getTotal))
+//        httpResult(dataMap)
+//      }
+//      case _ => {
+//        info(s"result: ${result}")
+//      }
+//    }
+//  }
+//
+//  private def httpResult(dataMap: Map[String, Any]) = {
+//    try {
+//      val data = JsonUtil.toJson(dataMap)
+//      // post
+//      val params = Map[String, Object]()
+//      val header = Map[String, Object](("content-type" -> 
"application/json"))
+//
+//      def func(): Boolean = {
+//        HttpUtil.httpRequest(api, method, params, header, data)
+//      }
+//
+//      PersistThreadPool.addTask(func _, 10)
+//
+////      val status = HttpUtil.httpRequest(api, method, params, header, data)
+////      info(s"${method} to ${api} response status: ${status}")
+//    } catch {
+//      case e: Throwable => error(e.getMessage)
+//    }
+//
+//  }
+//
+//  def records(recs: RDD[String], tp: String): Unit = {}
+//  def records(recs: Iterable[String], tp: String): Unit = {}
+//
+////  def missRecords(records: RDD[String]): Unit = {}
+////  def matchRecords(records: RDD[String]): Unit = {}
+//
+//  def log(rt: Long, msg: String): Unit = {}
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
index bc16599..2884fa6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
@@ -21,6 +21,7 @@ package org.apache.griffin.measure.persist
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.result._
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
 
 import scala.util.Try
 
@@ -35,18 +36,21 @@ trait Persist extends Loggable with Serializable {
   def start(msg: String): Unit
   def finish(): Unit
 
-  def result(rt: Long, result: Result): Unit
+  def log(rt: Long, msg: String): Unit
 
-  def records(recs: RDD[String], tp: String): Unit
-  def records(recs: Iterable[String], tp: String): Unit
+//  def result(rt: Long, result: Result): Unit
+//
+//  def records(recs: RDD[String], tp: String): Unit
+//  def records(recs: Iterable[String], tp: String): Unit
 
-//  def missRecords(records: RDD[String]): Unit
-//  def matchRecords(records: RDD[String]): Unit
+//  def persistRecords(df: DataFrame, name: String): Unit
+  def persistRecords(records: Iterable[String], name: String): Unit
+//  def persistMetrics(metrics: Seq[String], name: String): Unit
+  def persistMetrics(metrics: Map[String, Any]): Unit
 
-  def log(rt: Long, msg: String): Unit
 }
 
-object PersistType {
-  final val MISS = "miss"
-  final val MATCH = "match"
-}
\ No newline at end of file
+//object PersistDataType {
+//  final val MISS = "miss"
+//  final val MATCH = "match"
+//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
index 4330160..3a74343 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
@@ -27,7 +27,7 @@ case class PersistFactory(persistParams: 
Iterable[PersistParam], metricName: Str
 
   val HDFS_REGEX = """^(?i)hdfs$""".r
   val HTTP_REGEX = """^(?i)http$""".r
-  val OLDHTTP_REGEX = """^(?i)oldhttp$""".r
+//  val OLDHTTP_REGEX = """^(?i)oldhttp$""".r
   val LOG_REGEX = """^(?i)log$""".r
 
   def getPersists(timeStamp: Long): MultiPersists = {
@@ -40,7 +40,7 @@ case class PersistFactory(persistParams: 
Iterable[PersistParam], metricName: Str
     val persistTry = persistParam.persistType match {
       case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp))
       case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp))
-      case OLDHTTP_REGEX() => Try(OldHttpPersist(config, metricName, 
timeStamp))
+//      case OLDHTTP_REGEX() => Try(OldHttpPersist(config, metricName, 
timeStamp))
       case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp))
       case _ => throw new Exception("not supported persist type")
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala 
b/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala
new file mode 100644
index 0000000..7f1b153
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala
@@ -0,0 +1,34 @@
+///*
+//Licensed to the Apache Software Foundation (ASF) under one
+//or more contributor license agreements.  See the NOTICE file
+//distributed with this work for additional information
+//regarding copyright ownership.  The ASF licenses this file
+//to you under the Apache License, Version 2.0 (the
+//"License"); you may not use this file except in compliance
+//with the License.  You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+//Unless required by applicable law or agreed to in writing,
+//software distributed under the License is distributed on an
+//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//KIND, either express or implied.  See the License for the
+//specific language governing permissions and limitations
+//under the License.
+//*/
+//package org.apache.griffin.measure.algo
+//
+//import org.apache.griffin.measure.config.params.env._
+//import org.apache.griffin.measure.config.params.user._
+//import org.apache.griffin.measure.log.Loggable
+//
+//import scala.util.Try
+//
+//trait Algo extends Loggable with Serializable {
+//
+//  val envParam: EnvParam
+//  val userParam: UserParam
+//
+//  def run(): Try[_]
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
new file mode 100644
index 0000000..737a43f
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
@@ -0,0 +1,117 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+import java.util.Date
+
+import org.apache.griffin.measure.config.params._
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.data.source.DataSourceFactory
+import org.apache.griffin.measure.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.process.engine.{DqEngineFactory, 
SparkSqlEngine}
+import org.apache.griffin.measure.rule.adaptor.{RuleAdaptorGroup, RunPhase}
+import org.apache.griffin.measure.rule.udf.GriffinUdfs
+import org.apache.griffin.measure.utils.JsonUtil
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.util.Try
+
+case class BatchDqProcess(allParam: AllParam) extends DqProcess {
+
+  val envParam: EnvParam = allParam.envParam
+  val userParam: UserParam = allParam.userParam
+
+  val metricName = userParam.name
+  val sparkParam = envParam.sparkParam
+
+  var sparkContext: SparkContext = _
+  var sqlContext: SQLContext = _
+
+  def retriable: Boolean = false
+
+  def init: Try[_] = Try {
+    val conf = new SparkConf().setAppName(metricName)
+    conf.setAll(sparkParam.config)
+    sparkContext = new SparkContext(conf)
+    sparkContext.setLogLevel(sparkParam.logLevel)
+    sqlContext = new HiveContext(sparkContext)
+
+    // register udf
+    GriffinUdfs.register(sqlContext)
+
+    // init adaptors
+    val dataSourceNames = userParam.dataSources.map(_.name)
+    RuleAdaptorGroup.init(sqlContext, dataSourceNames)
+  }
+
+  def run: Try[_] = Try {
+    // start time
+    val startTime = new Date().getTime()
+
+    // get persists to persist measure result
+    val persistFactory = PersistFactory(envParam.persistParams, metricName)
+    val persist: Persist = persistFactory.getPersists(startTime)
+
+    // persist start id
+    val applicationId = sparkContext.applicationId
+    persist.start(applicationId)
+
+    // get dq engines
+    val dqEngines = DqEngineFactory.genDqEngines(sqlContext)
+
+    // generate data sources
+    val dataSources = DataSourceFactory.genDataSources(sqlContext, null, 
dqEngines, userParam.dataSources, metricName)
+    dataSources.foreach(_.init)
+
+    // init data sources
+    dqEngines.loadData(dataSources, startTime)
+
+    // generate rule steps
+    val ruleSteps = 
RuleAdaptorGroup.genConcreteRuleSteps(userParam.evaluateRuleParam, RunPhase)
+
+    // run rules
+    dqEngines.runRuleSteps(ruleSteps)
+
+    // persist results
+    val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory)
+
+    val rdds = dqEngines.collectUpdateRDDs(ruleSteps, timeGroups)
+    rdds.foreach(_._2.cache())
+
+    dqEngines.persistAllRecords(rdds, persistFactory)
+//    dqEngines.persistAllRecords(ruleSteps, persistFactory, timeGroups)
+
+    rdds.foreach(_._2.unpersist())
+
+    // end time
+    val endTime = new Date().getTime
+    persist.log(endTime, s"process using time: ${endTime - startTime} ms")
+
+    // finish
+    persist.finish()
+  }
+
+  def end: Try[_] = Try {
+    sparkContext.stop
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala 
b/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala
new file mode 100644
index 0000000..50b04a8
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.log.Loggable
+
+import scala.util.Try
+
+trait DqProcess extends Loggable with Serializable {
+
+  val envParam: EnvParam
+  val userParam: UserParam
+
+  def init: Try[_]
+
+  def run: Try[_]
+
+  def end: Try[_]
+
+  def retriable: Boolean
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala 
b/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala
new file mode 100644
index 0000000..36f88e1
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+import scala.util.matching.Regex
+
+sealed trait ProcessType {
+  val regex: Regex
+  val desc: String
+}
+
+object ProcessType {
+  private val procTypes: List[ProcessType] = List(BatchProcessType, 
StreamingProcessType)
+  def apply(ptn: String): ProcessType = {
+    procTypes.filter(tp => ptn match {
+      case tp.regex() => true
+      case _ => false
+    }).headOption.getOrElse(BatchProcessType)
+  }
+  def unapply(pt: ProcessType): Option[String] = Some(pt.desc)
+}
+
+final case object BatchProcessType extends ProcessType {
+  val regex = """^(?i)batch$""".r
+  val desc = "batch"
+}
+
+final case object StreamingProcessType extends ProcessType {
+  val regex = """^(?i)streaming$""".r
+  val desc = "streaming"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
new file mode 100644
index 0000000..a567941
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
@@ -0,0 +1,157 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+import java.util.Date
+
+import org.apache.griffin.measure.cache.info.InfoCacheInstance
+import org.apache.griffin.measure.config.params._
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.data.source.DataSourceFactory
+import org.apache.griffin.measure.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.process.engine.DqEngineFactory
+import org.apache.griffin.measure.rule.adaptor.RuleAdaptorGroup
+import org.apache.griffin.measure.rule.udf.GriffinUdfs
+import org.apache.griffin.measure.utils.TimeUtil
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.util.Try
+
+case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
+
+  val envParam: EnvParam = allParam.envParam
+  val userParam: UserParam = allParam.userParam
+
+  val metricName = userParam.name
+  val sparkParam = envParam.sparkParam
+
+  var sparkContext: SparkContext = _
+  var sqlContext: SQLContext = _
+
+  def retriable: Boolean = true
+
+  def init: Try[_] = Try {
+    val conf = new SparkConf().setAppName(metricName)
+    conf.setAll(sparkParam.config)
+    sparkContext = new SparkContext(conf)
+    sparkContext.setLogLevel(sparkParam.logLevel)
+    sqlContext = new HiveContext(sparkContext)
+
+    // init info cache instance
+    InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName)
+    InfoCacheInstance.init
+
+    // register udf
+    GriffinUdfs.register(sqlContext)
+
+    // init adaptors
+    val dataSourceNames = userParam.dataSources.map(_.name)
+    RuleAdaptorGroup.init(sqlContext, dataSourceNames)
+  }
+
+  def run: Try[_] = Try {
+    val ssc = StreamingContext.getOrCreate(sparkParam.cpDir, () => {
+      try {
+        createStreamingContext
+      } catch {
+        case e: Throwable => {
+          error(s"create streaming context error: ${e.getMessage}")
+          throw e
+        }
+      }
+    })
+
+    // start time
+    val startTime = new Date().getTime()
+
+    // get persists to persist measure result
+    val persistFactory = PersistFactory(envParam.persistParams, metricName)
+    val persist: Persist = persistFactory.getPersists(startTime)
+
+    // persist start id
+    val applicationId = sparkContext.applicationId
+    persist.start(applicationId)
+
+    // get dq engines
+    val dqEngines = DqEngineFactory.genDqEngines(sqlContext)
+
+    // generate data sources
+    val dataSources = DataSourceFactory.genDataSources(sqlContext, ssc, 
dqEngines, userParam.dataSources, metricName)
+    dataSources.foreach(_.init)
+
+    // process thread
+    val dqThread = StreamingDqThread(dqEngines, dataSources, 
userParam.evaluateRuleParam, persistFactory, persist)
+
+    // init data sources
+//    dqEngines.loadData(dataSources)
+//
+//    // generate rule steps
+//    val ruleSteps = 
RuleAdaptorGroup.genConcreteRuleSteps(userParam.evaluateRuleParam)
+//
+//    // run rules
+//    dqEngines.runRuleSteps(ruleSteps)
+//
+//    // persist results
+//    dqEngines.persistAllResults(ruleSteps, persist)
+
+    // end time
+//    val endTime = new Date().getTime
+//    persist.log(endTime, s"process using time: ${endTime - startTime} ms")
+
+    val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) 
match {
+      case Some(interval) => interval
+      case _ => throw new Exception("invalid batch interval")
+    }
+    val process = TimingProcess(processInterval, dqThread)
+    process.startup()
+
+    ssc.start()
+    ssc.awaitTermination()
+    ssc.stop(stopSparkContext=true, stopGracefully=true)
+
+    // finish
+    persist.finish()
+
+//    process.shutdown()
+  }
+
+  def end: Try[_] = Try {
+    sparkContext.stop
+
+    InfoCacheInstance.close
+  }
+
+  def createStreamingContext: StreamingContext = {
+    val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match {
+      case Some(interval) => Milliseconds(interval)
+      case _ => throw new Exception("invalid batch interval")
+    }
+    val ssc = new StreamingContext(sparkContext, batchInterval)
+    ssc.checkpoint(sparkParam.cpDir)
+
+
+
+    ssc
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
new file mode 100644
index 0000000..df1cc1b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
@@ -0,0 +1,185 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
+import org.apache.griffin.measure.cache.result.CacheResultProcesser
+import org.apache.griffin.measure.config.params.user.EvaluateRuleParam
+import org.apache.griffin.measure.data.source.DataSource
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.rule.adaptor.{RuleAdaptorGroup, RunPhase}
+
+case class StreamingDqThread(dqEngines: DqEngines,
+                             dataSources: Seq[DataSource],
+                             evaluateRuleParam: EvaluateRuleParam,
+                             persistFactory: PersistFactory,
+                             appPersist: Persist
+                            ) extends Runnable with Loggable {
+
+  val lock = InfoCacheInstance.genLock("process")
+
+  def run(): Unit = {
+    val updateTimeDate = new Date()
+    val updateTime = updateTimeDate.getTime
+    println(s"===== [${updateTimeDate}] process begins =====")
+    val locked = lock.lock(5, TimeUnit.SECONDS)
+    if (locked) {
+      try {
+
+        val st = new Date().getTime
+        appPersist.log(st, s"starting process ...")
+
+        TimeInfoCache.startTimeInfoCache
+
+        // init data sources
+        dqEngines.loadData(dataSources, st)
+
+        // generate rule steps
+        val ruleSteps = 
RuleAdaptorGroup.genConcreteRuleSteps(evaluateRuleParam, RunPhase)
+
+        // run rules
+        dqEngines.runRuleSteps(ruleSteps)
+
+        val ct = new Date().getTime
+        val calculationTimeStr = s"calculation using time: ${ct - st} ms"
+        println(calculationTimeStr)
+        appPersist.log(ct, calculationTimeStr)
+
+        // persist results
+        val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory)
+
+        val rt = new Date().getTime
+        val persistResultTimeStr = s"persist result using time: ${rt - ct} ms"
+        println(persistResultTimeStr)
+        appPersist.log(rt, persistResultTimeStr)
+
+        val rdds = dqEngines.collectUpdateRDDs(ruleSteps, timeGroups)
+        rdds.foreach(_._2.cache())
+        rdds.foreach { pr =>
+          val (step, rdd) = pr
+          val cnt = rdd.count
+          println(s"step [${step.name}] group count: ${cnt}")
+        }
+
+        val lt = new Date().getTime
+        val collectoRddTimeStr = s"collect records using time: ${lt - rt} ms"
+        println(collectoRddTimeStr)
+        appPersist.log(lt, collectoRddTimeStr)
+
+        // persist records
+        dqEngines.persistAllRecords(rdds, persistFactory)
+//        dqEngines.persistAllRecords(ruleSteps, persistFactory, timeGroups)
+
+        // update data source
+        dqEngines.updateDataSources(rdds, dataSources)
+//        dqEngines.updateDataSources(ruleSteps, dataSources, timeGroups)
+
+        rdds.foreach(_._2.unpersist())
+
+        TimeInfoCache.endTimeInfoCache
+
+        // clean old data
+        cleanData
+
+        val et = new Date().getTime
+        val persistTimeStr = s"persist records using time: ${et - lt} ms"
+        println(persistTimeStr)
+        appPersist.log(et, persistTimeStr)
+
+      } catch {
+        case e: Throwable => error(s"process error: ${e.getMessage}")
+      } finally {
+        lock.unlock()
+      }
+    } else {
+      println(s"===== [${updateTimeDate}] process ignores =====")
+    }
+    val endTime = new Date().getTime
+    println(s"===== [${updateTimeDate}] process ends, using ${endTime - 
updateTime} ms =====")
+  }
+
+  // clean old data and old result cache
+  private def cleanData(): Unit = {
+    try {
+      dataSources.foreach(_.cleanOldData)
+      dataSources.foreach(_.dropTable)
+
+      val cleanTime = TimeInfoCache.getCleanTime
+      CacheResultProcesser.refresh(cleanTime)
+    } catch {
+      case e: Throwable => error(s"clean data error: ${e.getMessage}")
+    }
+  }
+
+//  // calculate accuracy between source data and target data
+//  private def accuracy(sourceData: RDD[(Product, (Map[String, Any], 
Map[String, Any]))],
+//               targetData: RDD[(Product, (Map[String, Any], Map[String, 
Any]))],
+//               ruleAnalyzer: RuleAnalyzer) = {
+//    // 1. cogroup
+//    val allKvs = sourceData.cogroup(targetData)
+//
+//    // 2. accuracy calculation
+//    val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, 
ruleAnalyzer)
+//
+//    (accuResult, missingRdd, matchedRdd)
+//  }
+//
+//  private def reorgByTimeGroup(rdd: RDD[(Product, (Map[String, Any], 
Map[String, Any]))]
+//                      ): RDD[(Long, (Product, (Map[String, Any], Map[String, 
Any])))] = {
+//    rdd.flatMap { row =>
+//      val (key, (value, info)) = row
+//      val b: Option[(Long, (Product, (Map[String, Any], Map[String, Any])))] 
= info.get(TimeStampInfo.key) match {
+//        case Some(t: Long) => Some((t, row))
+//        case _ => None
+//      }
+//      b
+//    }
+//  }
+//
+//  // convert data into a string
+//  def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), 
dataPersist: Iterable[Expr], infoPersist: Iterable[Expr]): String = {
+//    val (key, (data, info)) = rec
+//    val persistData = getPersistMap(data, dataPersist)
+//    val persistInfo = info.mapValues { value =>
+//      value match {
+//        case vd: Map[String, Any] => getPersistMap(vd, infoPersist)
+//        case v => v
+//      }
+//    }.map(identity)
+//    s"${persistData} [${persistInfo}]"
+//  }
+//
+//  // get the expr value map of the persist expressions
+//  private def getPersistMap(data: Map[String, Any], persist: 
Iterable[Expr]): Map[String, Any] = {
+//    val persistMap = persist.map(e => (e._id, e.desc)).toMap
+//    data.flatMap { pair =>
+//      val (k, v) = pair
+//      persistMap.get(k) match {
+//        case Some(d) => Some((d -> v))
+//        case _ => None
+//      }
+//    }
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala 
b/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala
new file mode 100644
index 0000000..8d9bcb2
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
+import java.util.{Timer, TimerTask}
+
+case class TimingProcess(interval: Long, runnable: Runnable) {
+
+  val pool: ThreadPoolExecutor = 
Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
+
+  val timer = new Timer("process", true)
+
+  val timerTask = new TimerTask() {
+    override def run(): Unit = {
+      pool.submit(runnable)
+    }
+  }
+
+  def startup(): Unit = {
+    timer.schedule(timerTask, interval, interval)
+  }
+
+  def shutdown(): Unit = {
+    timer.cancel()
+    pool.shutdown()
+    pool.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala
new file mode 100644
index 0000000..91855c2
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala
@@ -0,0 +1,29 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process.check
+
+import org.apache.spark.sql.SQLContext
+
+case class DataChecker(sqlContext: SQLContext) {
+
+  def existDataSourceName(name: String): Boolean = {
+    sqlContext.tableNames.exists(_ == name)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
new file mode 100644
index 0000000..b409b8d
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
@@ -0,0 +1,165 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process.engine
+
+import java.util.Date
+
+import org.apache.griffin.measure.cache.result.CacheResultProcesser
+import org.apache.griffin.measure.config.params.user.DataSourceParam
+import org.apache.griffin.measure.data.connector.GroupByColumn
+import org.apache.griffin.measure.data.source.{DataSource, DataSourceFactory}
+import org.apache.griffin.measure.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.result.AccuracyResult
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.utils.JsonUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.streaming.StreamingContext
+
+case class DataFrameOprEngine(sqlContext: SQLContext) extends SparkDqEngine {
+
+  def runRuleStep(ruleStep: ConcreteRuleStep): Boolean = {
+    ruleStep match {
+      case DfOprStep(name, rule, details, _, _) => {
+        try {
+          rule match {
+            case DataFrameOprs._fromJson => {
+              val df = DataFrameOprs.fromJson(sqlContext, details)
+              df.registerTempTable(name)
+            }
+            case DataFrameOprs._accuracy => {
+              val df = DataFrameOprs.accuracy(sqlContext, details)
+              df.registerTempTable(name)
+            }
+            case DataFrameOprs._clear => {
+              val df = DataFrameOprs.clear(sqlContext, details)
+              df.registerTempTable(name)
+            }
+            case _ => {
+              throw new Exception(s"df opr [ ${rule} ] not supported")
+            }
+          }
+          true
+        } catch {
+          case e: Throwable => {
+            error(s"run df opr [ ${rule} ] error: ${e.getMessage}")
+            false
+          }
+        }
+      }
+      case _ => false
+    }
+  }
+
+}
+
+object DataFrameOprs {
+
+  final val _fromJson = "from_json"
+  final val _accuracy = "accuracy"
+  final val _clear = "clear"
+
+  def fromJson(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = 
{
+    val _dfName = "df.name"
+    val _colName = "col.name"
+    val dfName = details.getOrElse(_dfName, "").toString
+    val colNameOpt = details.get(_colName).map(_.toString)
+
+    val df = sqlContext.table(s"`${dfName}`")
+    val rdd = colNameOpt match {
+      case Some(colName: String) => df.map(_.getAs[String](colName))
+      case _ => df.map(_.getAs[String](0))
+    }
+    sqlContext.read.json(rdd)
+  }
+
+  def accuracy(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = 
{
+    val _dfName = "df.name"
+    val _miss = "miss"
+    val _total = "total"
+    val _matched = "matched"
+    val _tmst = "tmst"
+    val dfName = details.getOrElse(_dfName, _dfName).toString
+    val miss = details.getOrElse(_miss, _miss).toString
+    val total = details.getOrElse(_total, _total).toString
+    val matched = details.getOrElse(_matched, _matched).toString
+    val tmst = details.getOrElse(_tmst, _tmst).toString
+
+    val updateTime = new Date().getTime
+
+    def getLong(r: Row, k: String): Long = {
+      try {
+        r.getAs[Long](k)
+      } catch {
+        case e: Throwable => 0L
+      }
+    }
+
+    val df = sqlContext.table(s"`${dfName}`")
+    val results = df.flatMap { row =>
+      val t = getLong(row, tmst)
+      if (t > 0) {
+        val missCount = getLong(row, miss)
+        val totalCount = getLong(row, total)
+        val ar = AccuracyResult(missCount, totalCount)
+        Some((t, ar))
+      } else None
+    }.collect
+
+    val updateResults = results.flatMap { pair =>
+      val (t, result) = pair
+      val updatedCacheResultOpt = CacheResultProcesser.genUpdateCacheResult(t, 
updateTime, result)
+      updatedCacheResultOpt
+    }
+
+    // update
+    updateResults.foreach { r =>
+      CacheResultProcesser.update(r)
+    }
+
+    val schema = StructType(Array(
+      StructField(tmst, LongType),
+      StructField(miss, LongType),
+      StructField(total, LongType),
+      StructField(matched, LongType)
+    ))
+    val rows = updateResults.map { r =>
+      val ar = r.result.asInstanceOf[AccuracyResult]
+      Row(r.timeGroup, ar.miss, ar.total, ar.getMatch)
+    }
+    val rowRdd = sqlContext.sparkContext.parallelize(rows)
+    sqlContext.createDataFrame(rowRdd, schema)
+
+  }
+
+  def clear(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = {
+    val _dfName = "df.name"
+    val dfName = details.getOrElse(_dfName, "").toString
+
+    val df = sqlContext.table(s"`${dfName}`")
+    val emptyRdd = sqlContext.sparkContext.emptyRDD[Row]
+    sqlContext.createDataFrame(emptyRdd, df.schema)
+  }
+
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
new file mode 100644
index 0000000..84d5917
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
@@ -0,0 +1,41 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process.engine
+
+import org.apache.griffin.measure.config.params.user.DataSourceParam
+import org.apache.griffin.measure.data.source.DataSource
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.step._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+
+trait DqEngine extends Loggable with Serializable {
+
+  def runRuleStep(ruleStep: ConcreteRuleStep): Boolean
+
+  def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]]
+
+//  def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
+//
+//  def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
+
+  def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala
new file mode 100644
index 0000000..e075584
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process.engine
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.streaming.StreamingContext
+
+
+object DqEngineFactory {
+
+  private val engineTypes = List("spark-sql", "df-opr")
+
+  private final val SparkSqlEngineType = "spark-sql"
+  private final val DataFrameOprEngineType = "df-opr"
+
+  def genDqEngines(sqlContext: SQLContext): DqEngines = {
+    val engines = engineTypes.flatMap { et =>
+      genDqEngine(et, sqlContext)
+    }
+    DqEngines(engines)
+  }
+
+  private def genDqEngine(engineType: String, sqlContext: SQLContext): 
Option[DqEngine] = {
+    engineType match {
+      case SparkSqlEngineType => Some(SparkSqlEngine(sqlContext))
+      case DataFrameOprEngineType => Some(DataFrameOprEngine(sqlContext))
+      case _ => None
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
new file mode 100644
index 0000000..1bafa15
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
@@ -0,0 +1,208 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process.engine
+
+import org.apache.griffin.measure.config.params.user.DataSourceParam
+import org.apache.griffin.measure.data.connector.GroupByColumn
+import org.apache.griffin.measure.data.source._
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.step._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+
+case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
+
+  val persistOrder: List[PersistType] = List(MetricPersistType, 
RecordPersistType)
+
+  def loadData(dataSources: Seq[DataSource], ms: Long): Unit = {
+    dataSources.foreach { ds =>
+      ds.loadData(ms)
+    }
+  }
+
+  def runRuleSteps(ruleSteps: Seq[ConcreteRuleStep]): Unit = {
+    ruleSteps.foreach { ruleStep =>
+      runRuleStep(ruleStep)
+    }
+  }
+
+  def persistAllMetrics(ruleSteps: Seq[ConcreteRuleStep], persistFactory: 
PersistFactory
+                       ): Iterable[Long] = {
+    val metricSteps = ruleSteps.filter(_.persistType == MetricPersistType)
+    val allMetrics: Map[Long, Map[String, Any]] = {
+      metricSteps.foldLeft(Map[Long, Map[String, Any]]()) { (ret, step) =>
+        val metrics = collectMetrics(step)
+        metrics.foldLeft(ret) { (total, pair) =>
+          val (k, v) = pair
+          total.get(k) match {
+            case Some(map) => total + (k -> (map ++ v))
+            case _ => total + pair
+          }
+        }
+      }
+    }
+    val updateTimeGroups = allMetrics.keys
+    allMetrics.foreach { pair =>
+      val (t, metric) = pair
+      val persist = persistFactory.getPersists(t)
+      persist.persistMetrics(metric)
+    }
+    updateTimeGroups
+  }
+
+//  def persistAllRecords(ruleSteps: Seq[ConcreteRuleStep], persistFactory: 
PersistFactory,
+//                        timeGroups: Iterable[Long]): Unit = {
+//    val recordSteps = ruleSteps.filter(_.persistType == RecordPersistType)
+//    recordSteps.foreach { step =>
+//      collectRecords(step, timeGroups) match {
+//        case Some(rdd) => {
+//          val name = step.name
+//          rdd.foreach { pair =>
+//            val (t, items) = pair
+//            val persist = persistFactory.getPersists(t)
+//            persist.persistRecords(items, name)
+//          }
+//        }
+//        case _ => {
+//          println(s"empty records to persist")
+//        }
+//      }
+//    }
+//  }
+//
+//  def updateDataSources(ruleSteps: Seq[ConcreteRuleStep], dataSources: 
Seq[DataSource],
+//                        timeGroups: Iterable[Long]): Unit = {
+//    val updateSteps = ruleSteps.filter(_.updateDataSource.nonEmpty)
+//    updateSteps.foreach { step =>
+//      collectUpdateCacheDatas(step, timeGroups) match {
+//        case Some(rdd) => {
+//          val udpateDataSources = dataSources.filter { ds =>
+//            step.updateDataSource match {
+//              case Some(dsName) if (dsName == ds.name) => true
+//              case _ => false
+//            }
+//          }
+//          if (udpateDataSources.size > 0) {
+//            val name = step.name
+//            rdd.foreach { pair =>
+//              val (t, items) = pair
+//              udpateDataSources.foreach { ds =>
+//                ds.dataSourceCacheOpt.foreach(_.updateData(items, t))
+//              }
+//            }
+//          }
+//        }
+//        case _ => {
+//          println(s"empty data source to update")
+//        }
+//      }
+//    }
+//  }
+
+  ///////////////////////////
+
+  def runRuleStep(ruleStep: ConcreteRuleStep): Boolean = {
+    val ret = engines.foldLeft(false) { (done, engine) =>
+      done || engine.runRuleStep(ruleStep)
+    }
+    if (!ret) warn(s"run rule step warn: no dq engine support ${ruleStep}")
+    ret
+  }
+
+  ///////////////////////////
+
+//  def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = {
+//    engines.flatMap { engine =>
+//      engine.collectRecords(ruleStep, timeGroups)
+//    }.headOption
+//  }
+//  def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = {
+//    engines.flatMap { engine =>
+//      engine.collectUpdateCacheDatas(ruleStep, timeGroups)
+//    }.headOption
+//  }
+  def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]] 
= {
+    val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) 
=>
+      ret ++ engine.collectMetrics(ruleStep)
+    }
+//    if (ret.isEmpty) warn(s"collect metrics warn: no metrics collected for 
${ruleStep}")
+    ret
+  }
+
+  def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]
+                      ): Option[RDD[(Long, Iterable[String])]] = {
+    engines.flatMap { engine =>
+      engine.collectUpdateRDD(ruleStep, timeGroups)
+    }.headOption
+  }
+
+  ////////////////////////////
+
+  def collectUpdateRDDs(ruleSteps: Seq[ConcreteRuleStep], timeGroups: 
Iterable[Long]
+                       ): Seq[(ConcreteRuleStep, RDD[(Long, 
Iterable[String])])] = {
+    ruleSteps.flatMap { rs =>
+      collectUpdateRDD(rs, timeGroups) match {
+        case Some(rdd) => Some((rs, rdd))
+        case _ => None
+      }
+    }
+  }
+
+  def persistAllRecords(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, 
Iterable[String])])],
+                        persistFactory: PersistFactory): Unit = {
+    stepRdds.foreach { stepRdd =>
+      val (step, rdd) = stepRdd
+      if (step.persistType == RecordPersistType) {
+        val name = step.name
+        rdd.foreach { pair =>
+          val (t, items) = pair
+          val persist = persistFactory.getPersists(t)
+          persist.persistRecords(items, name)
+        }
+      }
+    }
+  }
+
+  def updateDataSources(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, 
Iterable[String])])],
+                        dataSources: Seq[DataSource]): Unit = {
+    stepRdds.foreach { stepRdd =>
+      val (step, rdd) = stepRdd
+      if (step.updateDataSource.nonEmpty) {
+        val udpateDataSources = dataSources.filter { ds =>
+          step.updateDataSource match {
+            case Some(dsName) if (dsName == ds.name) => true
+            case _ => false
+          }
+        }
+        if (udpateDataSources.size > 0) {
+          val name = step.name
+          rdd.foreach { pair =>
+            val (t, items) = pair
+            udpateDataSources.foreach { ds =>
+              ds.dataSourceCacheOpt.foreach(_.updateData(items, t))
+            }
+          }
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
new file mode 100644
index 0000000..ee994fd
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
@@ -0,0 +1,167 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process.engine
+
+import org.apache.griffin.measure.data.connector.GroupByColumn
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.rule.dsl.{MetricPersistType, 
RecordPersistType}
+import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.utils.JsonUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+trait SparkDqEngine extends DqEngine {
+
+  val sqlContext: SQLContext
+
+  def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]] 
= {
+    val emptyMap = Map[String, Any]()
+    ruleStep match {
+      case step: ConcreteRuleStep if (step.persistType == MetricPersistType) 
=> {
+        val name = step.name
+        try {
+          val pdf = sqlContext.table(s"`${name}`")
+          val records = pdf.toJSON.collect()
+
+          val pairs = records.flatMap { rec =>
+            try {
+              val value = JsonUtil.toAnyMap(rec)
+              value.get(GroupByColumn.tmst) match {
+                case Some(t) => {
+                  val key = t.toString.toLong
+                  Some((key, value))
+                }
+                case _ => None
+              }
+            } catch {
+              case e: Throwable => None
+            }
+          }
+          val groupedPairs = pairs.foldLeft(Map[Long, Seq[Map[String, 
Any]]]()) { (ret, pair) =>
+            val (k, v) = pair
+            ret.get(k) match {
+              case Some(seq) => ret + (k -> (seq :+ v))
+              case _ => ret + (k -> (v :: Nil))
+            }
+          }
+          groupedPairs.mapValues { vs =>
+            if (vs.size > 1) {
+              Map[String, Any]((name -> vs))
+            } else {
+              vs.headOption.getOrElse(emptyMap)
+            }
+          }
+        } catch {
+          case e: Throwable => {
+            error(s"collect metrics ${name} error: ${e.getMessage}")
+            Map[Long, Map[String, Any]]()
+          }
+        }
+      }
+      case _ => Map[Long, Map[String, Any]]()
+    }
+  }
+
+  def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]
+                      ): Option[RDD[(Long, Iterable[String])]] = {
+    ruleStep match {
+      case step: ConcreteRuleStep if ((step.persistType == RecordPersistType)
+        || (step.updateDataSource.nonEmpty)) => {
+        val name = step.name
+        try {
+          val pdf = sqlContext.table(s"`${name}`")
+          val cols = pdf.columns
+          val rdd = pdf.flatMap { row =>
+            val values = cols.flatMap { col =>
+              Some((col, row.getAs[Any](col)))
+            }.toMap
+            values.get(GroupByColumn.tmst) match {
+              case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, 
JsonUtil.toJson(values)))
+              case _ => None
+            }
+          }.groupByKey()
+          Some(rdd)
+        } catch {
+          case e: Throwable => {
+            error(s"collect records ${name} error: ${e.getMessage}")
+            None
+          }
+        }
+      }
+      case _ => None
+    }
+  }
+
+//  def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = {
+//    ruleStep match {
+//      case step: ConcreteRuleStep if (step.persistType == RecordPersistType) 
=> {
+//        val name = step.name
+//        try {
+//          val pdf = sqlContext.table(s"`${name}`")
+//          val cols = pdf.columns
+//          val rdd = pdf.flatMap { row =>
+//            val values = cols.flatMap { col =>
+//              Some((col, row.getAs[Any](col)))
+//            }.toMap
+//            values.get(GroupByColumn.tmst) match {
+//              case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, 
JsonUtil.toJson(values)))
+//              case _ => None
+//            }
+//          }.groupByKey()
+//          Some(rdd)
+//        } catch {
+//          case e: Throwable => {
+//            error(s"collect records ${name} error: ${e.getMessage}")
+//            None
+//          }
+//        }
+//      }
+//      case _ => None
+//    }
+//  }
+//
+//  def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = {
+//    ruleStep match {
+//      case step: ConcreteRuleStep if (step.updateDataSource.nonEmpty) => {
+//        val name = step.name
+//        try {
+//          val pdf = sqlContext.table(s"`${name}`")
+//          val cols = pdf.columns
+//          val rdd = pdf.flatMap { row =>
+//            val values = cols.flatMap { col =>
+//              Some((col, row.getAs[Any](col)))
+//            }.toMap
+//            values.get(GroupByColumn.tmst) match {
+//              case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, 
JsonUtil.toJson(values)))
+//              case _ => None
+//            }
+//          }.groupByKey()
+//          Some(rdd)
+//        } catch {
+//          case e: Throwable => {
+//            error(s"collect update cache datas ${name} error: 
${e.getMessage}")
+//            None
+//          }
+//        }
+//      }
+//      case _ => None
+//    }
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala
new file mode 100644
index 0000000..6ed0559
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala
@@ -0,0 +1,62 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process.engine
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{ArrayType, DataType, StructField, 
StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+object SparkRowFormatter {
+
+  def formatRow(row: Row): Map[String, Any] = {
+    formatRowWithSchema(row, row.schema)
+  }
+
+  private def formatRowWithSchema(row: Row, schema: StructType): Map[String, 
Any] = {
+    formatStruct(schema.fields, row)
+  }
+
+  private def formatStruct(schema: Seq[StructField], r: Row) = {
+    val paired = schema.zip(r.toSeq)
+    paired.foldLeft(Map[String, Any]())((s, p) => s ++ formatItem(p))
+  }
+
+  private def formatItem(p: Pair[StructField, Any]): Map[String, Any] = {
+    p match {
+      case (sf, a) =>
+        sf.dataType match {
+          case ArrayType(et, _) =>
+            Map(sf.name -> (if (a == null) a else formatArray(et, 
a.asInstanceOf[ArrayBuffer[Any]])))
+          case StructType(s) =>
+            Map(sf.name -> (if (a == null) a else formatStruct(s, 
a.asInstanceOf[Row])))
+          case _ => Map(sf.name -> a)
+        }
+    }
+  }
+
+  private def formatArray(et: DataType, arr: ArrayBuffer[Any]): Seq[Any] = {
+    et match {
+      case StructType(s) => arr.map(e => formatStruct(s, e.asInstanceOf[Row]))
+      case ArrayType(t, _) =>
+        arr.map(e => formatArray(t, e.asInstanceOf[ArrayBuffer[Any]]))
+      case _ => arr
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
new file mode 100644
index 0000000..15df3b5
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process.engine
+
+import java.util.Date
+
+import org.apache.griffin.measure.config.params.user.DataSourceParam
+import org.apache.griffin.measure.data.connector.GroupByColumn
+import org.apache.griffin.measure.data.source._
+import org.apache.griffin.measure.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.utils.JsonUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, GroupedData, SQLContext}
+import org.apache.spark.streaming.StreamingContext
+
+case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine {
+
+  def runRuleStep(ruleStep: ConcreteRuleStep): Boolean = {
+    ruleStep match {
+      case SparkSqlStep(name, rule, _, _, _) => {
+        try {
+          val rdf = sqlContext.sql(rule)
+          rdf.registerTempTable(name)
+          true
+        } catch {
+          case e: Throwable => {
+            error(s"run spark sql [ ${rule} ] error: ${e.getMessage}")
+            false
+          }
+        }
+      }
+      case _ => false
+    }
+  }
+
+}
+
+
+
+

Reply via email to