http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SchemaFieldParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SchemaFieldParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SchemaFieldParam.scala
new file mode 100644
index 0000000..56bdc40
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SchemaFieldParam.scala
@@ -0,0 +1,20 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class SchemaFieldParam( @JsonProperty("name") name: String,
+                             @JsonProperty("type") fieldType: String,
+                             @JsonProperty("default.value") defaultValue: 
String,
+                             @JsonProperty("extract.steps") extractSteps: 
List[String]
+                           ) extends ConfigParam {
+
+  override def equals(x: Any): Boolean = {
+    if (x.isInstanceOf[SchemaFieldParam]) {
+      val s = x.asInstanceOf[SchemaFieldParam]
+      s.name == this.name
+    } else false
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SparkParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SparkParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SparkParam.scala
new file mode 100644
index 0000000..e460fb6
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SparkParam.scala
@@ -0,0 +1,15 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class SparkParam( @JsonProperty("app.name") appName: String,
+                       @JsonProperty("log.level") logLevel: String,
+                       @JsonProperty("config") config: Map[String, String],
+                       @JsonProperty("streaming.checkpoint.dir") cpDir: String,
+                       @JsonProperty("streaming.batch.interval.seconds") 
batchInterval: Int,
+                       @JsonProperty("streaming.sample.interval.seconds") 
sampleInterval: Int
+                     ) extends ConfigParam {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TimeRangeParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TimeRangeParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TimeRangeParam.scala
new file mode 100644
index 0000000..6dfcd22
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TimeRangeParam.scala
@@ -0,0 +1,12 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class TimeRangeParam( @JsonProperty("begin") begin: Int,
+                           @JsonProperty("end") end: Int,
+                           @JsonProperty("unit") unit: String
+                         ) extends ConfigParam {
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TypeConfigParam.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TypeConfigParam.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TypeConfigParam.scala
new file mode 100644
index 0000000..a78163b
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TypeConfigParam.scala
@@ -0,0 +1,11 @@
+package org.apache.griffin.config.params
+
+import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+@JsonInclude(Include.NON_NULL)
+case class TypeConfigParam( @JsonProperty("dq") dqType: String,
+                            @JsonProperty("dataAsset") dataAssetTypeMap: 
Map[String, String]
+                          ) extends ConfigParam {
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapper.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapper.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapper.scala
new file mode 100644
index 0000000..54ebef3
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapper.scala
@@ -0,0 +1,21 @@
+package org.apache.griffin.dump
+
+case class DumpWrapper(infos: Iterable[DumpWrapperInfo]) extends Serializable {
+
+  def wrapSchema(): List[(String, String)] = {
+    infos.map { info =>
+      (info.key, info.tp)
+    }.toList
+  }
+
+  def wrapValues[T](data: Map[String, T]): List[Option[T]] = {
+    infos.map { i =>
+      data.get(i.key)
+    }.toList
+  }
+
+  def wrapKeys(): List[String] = {
+    infos.map(_.key).toList
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapperInfo.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapperInfo.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapperInfo.scala
new file mode 100644
index 0000000..a974d54
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapperInfo.scala
@@ -0,0 +1,32 @@
+package org.apache.griffin.dump
+
+sealed trait DumpWrapperInfo {
+  type T
+  val key: String
+  val tp: String
+  def wrap(value: T) = (key -> value)
+}
+
+final case object TimeGroupInfo extends DumpWrapperInfo {
+  type T = Long
+  val key = "__time__"
+  val tp = "bigint"
+}
+
+final case object NextFireTimeInfo extends DumpWrapperInfo {
+  type T = Long
+  val key = "__next_fire_time__"
+  val tp = "bigint"
+}
+
+final case object MismatchInfo extends DumpWrapperInfo {
+  type T = String
+  val key = "__mismatch__"
+  val tp = "string"
+}
+
+final case object ErrorInfo extends DumpWrapperInfo {
+  type T = String
+  val key = "__error__"
+  val tp = "string"
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/HdfsFileDumpUtil.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/HdfsFileDumpUtil.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/HdfsFileDumpUtil.scala
new file mode 100644
index 0000000..c76e621
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/HdfsFileDumpUtil.scala
@@ -0,0 +1,37 @@
+package org.apache.griffin.dump
+
+import org.apache.griffin.utils.HdfsUtil
+import org.apache.spark.rdd.RDD
+
+object HdfsFileDumpUtil {
+
+  val sepCount = 5000
+
+  private def suffix(i: Long): String = {
+    if (i == 0) "" else s".${i}"
+  }
+
+  def splitRdd[T](rdd: RDD[T])(implicit m: Manifest[T]): RDD[(Long, 
Iterable[T])] = {
+    val indexRdd = rdd.zipWithIndex
+    indexRdd.map(p => ((p._2 / sepCount), p._1)).groupByKey()
+  }
+
+  private def directDump(path: String, list: Iterable[String], lineSep: 
String): Unit = {
+    // collect and save
+    val strRecords = list.mkString(lineSep)
+    // save into hdfs
+    HdfsUtil.writeContent(path, strRecords)
+  }
+
+  def dump(path: String, recordsRdd: RDD[String], lineSep: String): Unit = {
+    val groupedRdd = splitRdd(recordsRdd)
+
+    // dump
+    groupedRdd.foreach { pair =>
+      val (idx, list) = pair
+      val filePath = path + suffix(idx)
+      directDump(filePath, list, lineSep)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/DataParser.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/DataParser.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/DataParser.scala
new file mode 100644
index 0000000..da2b239
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/DataParser.scala
@@ -0,0 +1,10 @@
+package org.apache.griffin.prep.parse
+
+trait DataParser extends Serializable {
+
+  type In
+  type Out
+
+  def parse(data: In): Seq[Out]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/Json2MapParser.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/Json2MapParser.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/Json2MapParser.scala
new file mode 100644
index 0000000..643d0f0
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/Json2MapParser.scala
@@ -0,0 +1,59 @@
+package org.apache.griffin.prep.parse
+
+import org.apache.griffin.config.params._
+import org.apache.griffin.utils.{DataTypeUtil, ExtractJsonUtil}
+
+import scala.collection.mutable.MutableList
+import scala.util.{Failure, Success, Try}
+
+
+case class Json2MapParser(param: ParseConfigParam) extends DataParser {
+
+  type In = String
+  type Out = Map[String, Any]
+
+  def parse(data: In): Seq[Out] = {
+    val datas = Some(data) :: Nil
+
+    val schema = param.schema
+    val stepsMap = schema.map { field =>
+      (field.name, field.extractSteps)
+    }.toMap
+
+    val resultList: List[Map[String, Option[_]]] = 
ExtractJsonUtil.extractDataListWithSchemaMap(datas, stepsMap)
+
+    resultList.map { mp =>
+      mp.map { pair =>
+        val (name, opt) = pair
+        val field = findFieldByName(name, schema)
+        val value = field match {
+          case Some(f) => parseValue(opt, f)
+          case _ => opt.getOrElse(null)
+        }
+        (name, value)
+      }
+    }
+  }
+
+  private def findFieldByName(name: String, fields: List[SchemaFieldParam]): 
Option[SchemaFieldParam] = {
+    fields.filter(_.name == name) match {
+      case Nil => None
+      case head :: _ => Some(head)
+    }
+  }
+
+  private def parseValue(valueOpt: Option[_], fieldSchema: SchemaFieldParam): 
Any = {
+    valueOpt match {
+      case Some(v) => {
+        val convertFunc = DataTypeUtil.str2ConvertFunc(fieldSchema.fieldType)
+        convertFunc(v)
+      }
+      case _ => {
+        val defValue = fieldSchema.defaultValue
+        val convertFunc = 
DataTypeUtil.str2StrValConvertFunc(fieldSchema.fieldType)
+        convertFunc(defValue)
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/HdfsRecorder.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/HdfsRecorder.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/HdfsRecorder.scala
new file mode 100644
index 0000000..210d5b4
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/HdfsRecorder.scala
@@ -0,0 +1,87 @@
+package org.apache.griffin.record
+
+import java.util.Date
+
+import org.apache.griffin.record.result.AccuResult
+import org.apache.griffin.utils.HdfsUtil
+
+case class HdfsRecorder(recPath: String, recMetricName: String, recTime: Long) 
extends Recorder(recTime) {
+
+  val startFile = "_START"
+  val outFile = "_RESULT"
+  val finalOutFile = "_RESULT_FINAL"
+  val errorFile = "_ERROR"
+  val infoFile = "_INFO"
+  val finishFile = "_FINISHED"
+  val missingRecFile = "_missingRec"
+  val delayFile = "_delayResult"
+  val nullResponseFile = "_nullResponseResult"
+  val timeFile = "_time"
+
+  val updateResultFile = "_RESULT_UPDATE"
+  val updateMissingRecFile = "_missingRec_UPDATE"
+  val updateTimeFile = "_time_UPDATE"
+
+  val maxCountOfStrings = 5000
+
+  private def timeHead(rt: Long): String = {
+    val dt = new Date(rt)
+    s"--- ${dt} ---\n"
+  }
+
+  def start(): Unit = {
+    val startFilePath = getFilePath(recPath, 
s"${recMetricName}/${getTime}/${startFile}")
+    HdfsUtil.createEmptyFile(startFilePath)
+  }
+
+  def finish(): Unit = {
+    HdfsUtil.createEmptyFile(getFilePath(recPath, 
s"${recMetricName}/${getTime}/${finishFile}"))
+  }
+
+  def error(rt: Long, msg: String): Unit = {
+    val outStr = timeHead(rt) + s"${msg}\n"
+    HdfsUtil.appendContent(getFilePath(recPath, 
s"${recMetricName}/${getTime}/${errorFile}"), outStr)
+  }
+
+  def info(rt: Long, msg: String): Unit = {
+    val outStr = timeHead(rt) + s"${msg}\n"
+    HdfsUtil.appendContent(getFilePath(recPath, 
s"${recMetricName}/${getTime}/${infoFile}"), outStr)
+  }
+
+  def accuracyResult(rt: Long, res: AccuResult): Unit = {
+    val matchPercentage: Double = if (res.totalCount <= 0) 0 else (1 - 
res.missCount.toDouble / res.totalCount) * 100
+
+    val outStr = timeHead(rt) + s"match percentage: ${matchPercentage}\nmiss 
count: ${res.missCount}\ntotal count: ${res.totalCount}\n"
+
+    // output
+    HdfsUtil.appendContent(getFilePath(recPath, 
s"${recMetricName}/${getTime}/${outFile}"), outStr)
+    HdfsUtil.writeContent(getFilePath(recPath, 
s"${recMetricName}/${getTime}/${finalOutFile}"), outStr)
+  }
+
+  def accuracyMissingRecords(records: Iterable[AnyRef]): Unit = {
+    val filePath = getFilePath(recPath, 
s"${recMetricName}/${getTime}/${missingRecFile}")
+    val groupCount = (records.size - 1) / maxCountOfStrings + 1
+    val groupedRecords = records.grouped(groupCount).zipWithIndex
+    groupedRecords.foreach { pair =>
+      val (rcs, idx) = pair
+      val recFunc = if (idx == 0) HdfsUtil.writeContent _ else 
HdfsUtil.appendContent _
+      recFunc(filePath, rcs.mkString("\n"))
+    }
+  }
+
+  def recordTime(rt: Long, calcTime: Long): Unit = {
+    val outStr = timeHead(rt) + s"${calcTime}\n"
+    HdfsUtil.appendContent(getFilePath(recPath, 
s"${recMetricName}/${getTime}/${timeFile}"), outStr)
+  }
+
+//  override def recordUpdateTime(calcTime: Long): Unit = {
+//    val timeFilePath = getFilePath(recPath, 
s"${recMetricName}/${getTime}/${updateTimeFile}")
+//    HdfsUtil.appendContent(timeFilePath, s"${recTime}\n${calcTime}\n\n")
+//  }
+
+  protected def getFilePath(parentPath: String, fileName: String): String = {
+    val sep = "/"
+    if (parentPath.endsWith(sep)) parentPath + fileName else parentPath + sep 
+ fileName
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/NullRecorder.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/NullRecorder.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/NullRecorder.scala
new file mode 100644
index 0000000..b99de27
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/NullRecorder.scala
@@ -0,0 +1,20 @@
+package org.apache.griffin.record
+
+import org.apache.griffin.record.result.AccuResult
+
+case class NullRecorder() extends Recorder(0L) {
+
+  def start(): Unit = {}
+  def finish(): Unit = {}
+  def error(rt: Long, msg: String): Unit = {}
+  def info(rt: Long, msg: String): Unit = {}
+
+  def accuracyResult(rt: Long, res: AccuResult): Unit = {}
+  def accuracyMissingRecords(records: Iterable[AnyRef]): Unit = {}
+
+//  def delayResult(res: (Long, Iterable[AnyRef])): Unit = {}
+//  def nullResponseResult(res: (Long, Iterable[AnyRef])): Unit = {}
+
+  def recordTime(rt: Long, calcTime: Long): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/PostRecorder.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/PostRecorder.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/PostRecorder.scala
new file mode 100644
index 0000000..9489349
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/PostRecorder.scala
@@ -0,0 +1,46 @@
+package org.apache.griffin.record
+
+import org.apache.griffin.utils.{RestfulUtil, JsonUtil}
+
+import org.apache.griffin.record.result.AccuResult
+
+//case class PostRecorder(url: String, path: String, metricName: String, t: 
Long) extends HdfsRecorder(path, metricName, t) {
+case class PostRecorder(url: String, metricName: String, recTime: Long) 
extends Recorder(recTime) {
+
+  val api = "api/v1/metrics"
+  val urlPath = getUrlPath(url, api)
+
+  def accuracyResult(rt: Long, res: AccuResult): Unit = {
+    val matchPercentage: Double = if (res.totalCount <= 0) 0 else (1 - 
res.missCount.toDouble / res.totalCount) * 100
+
+    def getDataStr(): String = {
+      val dataMap: Map[String, Any] = Map[String, Any](("metricName" -> 
metricName), ("timestamp" -> getTime), ("value" -> matchPercentage))
+      JsonUtil.toJson(dataMap)
+    }
+
+    // post
+    val params = Map[String, Object]()
+    val header = Map[String, Object](("content-type" -> "application/json"))
+    val data = getDataStr
+    val status = RestfulUtil.postData(urlPath, params, header, data)
+    println(s"post response status: ${status}")
+  }
+
+  def accuracyMissingRecords(records: Iterable[AnyRef]): Unit = {
+    ;
+  }
+
+  private def getUrlPath(urlPath: String, apiPath: String): String = {
+    val sep = "/"
+    if (urlPath.endsWith(sep)) urlPath + apiPath else urlPath + sep + apiPath
+  }
+
+  def start(): Unit = {}
+  def finish(): Unit = {}
+  def error(rt: Long, msg: String): Unit = {}
+  def info(rt: Long, msg: String): Unit = {}
+//  def delayResult(res: (Long, Iterable[AnyRef])): Unit = {}
+//  def nullResponseResult(res: (Long, Iterable[AnyRef])): Unit = {}
+  def recordTime(rt: Long, calcTime: Long): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/Recorder.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/Recorder.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/Recorder.scala
new file mode 100644
index 0000000..657e958
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/Recorder.scala
@@ -0,0 +1,19 @@
+package org.apache.griffin.record
+
+import org.apache.griffin.record.result.AccuResult
+
+
+abstract class Recorder(time: Long) extends Serializable {
+
+  def getTime(): Long = time
+
+  def start(): Unit
+  def finish(): Unit
+  def error(rt: Long, msg: String): Unit
+  def info(rt: Long, msg: String): Unit
+
+  def accuracyResult(rt: Long, res: AccuResult): Unit
+  def accuracyMissingRecords(records: Iterable[AnyRef]): Unit
+
+  def recordTime(rt: Long, calcTime: Long): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/RecorderFactory.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/RecorderFactory.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/RecorderFactory.scala
new file mode 100644
index 0000000..4a47998
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/RecorderFactory.scala
@@ -0,0 +1,48 @@
+package org.apache.griffin.record
+
+import org.apache.griffin.config.params.RecorderParam
+
+
+case class RecorderFactory(recorderParam: RecorderParam) extends Serializable {
+
+  val HDFS_REGEX = """^(?i)hdfs$""".r
+  val POST_REGEX = """^(?i)post$""".r
+
+  val metricName = recorderParam.metricName
+  val configMap = recorderParam.config
+
+  val HDFS_DIR = "hdfs.dir"
+  val POST_URL = "post.url"
+  val POST_METRIC_NAME = "post.metric.name"
+
+  def getRecorders(time: Long): Iterable[Recorder] = {
+    val recorderTypes = recorderParam.types
+    recorderTypes.map(tp => getRecorder(time, tp))
+  }
+
+  private def getRecorder(time: Long, recorderType: String): Recorder = {
+    val recorder = recorderType match {
+      case HDFS_REGEX() => {
+        configMap.get(HDFS_DIR) match {
+          case Some(path) => HdfsRecorder(path, metricName, time)
+          case _ => NullRecorder()
+        }
+      }
+      case POST_REGEX() => {
+        configMap.get(POST_URL) match {
+          case Some(url) => {
+            val mtrName = configMap.get(POST_METRIC_NAME) match {
+              case Some(s) => s
+              case _ => metricName
+            }
+            PostRecorder(url, mtrName, time)
+          }
+          case _ => NullRecorder()
+        }
+      }
+      case _ => NullRecorder()
+    }
+    recorder
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/CleanerFactory.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/CleanerFactory.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/CleanerFactory.scala
new file mode 100644
index 0000000..6051fdc
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/CleanerFactory.scala
@@ -0,0 +1,31 @@
+//package org.apache.griffin.record.cleaner
+//
+//import com.ebay.griffin.config.params.{AppParam, DataParam}
+//
+//class CleanerFactory(appParam: AppParam, dataParam: DataParam) {
+//
+//  var cleaner: HdfsCleaner = null
+//
+//  def cleanerStart(): Unit = {
+//    val recorderType = appParam.getParam(appParam.RecorderType).toString
+//    if (cleaner == null) {
+//      cleaner = recorderType match {
+//        case appParam.RecorderHdfs => {
+//          new HdfsCleaner(appParam, dataParam)
+//        }
+//        case appParam.RecorderPost => {
+//          new HdfsCleaner(appParam, dataParam)
+//        }
+//        case _ => null
+//      }
+//      cleaner.start
+//    }
+//  }
+//
+//  def cleanerEnd(): Unit = {
+//    if (cleaner != null) {
+//      cleaner.end
+//    }
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/HdfsCleaner.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/HdfsCleaner.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/HdfsCleaner.scala
new file mode 100644
index 0000000..7407dfe
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/HdfsCleaner.scala
@@ -0,0 +1,78 @@
+//package org.apache.griffin.record.cleaner
+//
+//import java.io.{BufferedReader, InputStreamReader}
+//import java.util.{Date, Timer, TimerTask}
+//
+//import com.ebay.griffin.config.params.{AppParam, DataParam}
+//
+//
+//class HdfsCleaner(appParam: AppParam, dataParam: DataParam) {
+//
+//  val expireDefSec: Long = 604800
+//  val periodDefSec: Long = 7200
+//
+//  val timer = new Timer
+//
+//  def start(): Unit = {
+//    val path = appParam.getParam(appParam.RecorderHdfsDir).toString
+//    val metricName = dataParam.getParam(dataParam.MetricName).toString
+//    val dirPath = getFilePath(path, metricName)
+//
+//    val expireSec = if 
(appParam.containsParam(appParam.RecorderCleanExpireSeconds)) {
+//      appParam.getParam(appParam.RecorderCleanExpireSeconds).toString.toLong
+//    } else expireDefSec
+//    val periodSec = if 
(appParam.containsParam(appParam.RecorderCleanPeriodSeconds)) {
+//      appParam.getParam(appParam.RecorderCleanPeriodSeconds).toString.toLong
+//    } else periodDefSec
+//
+//    val expireMs = expireSec * 1000
+//    val periodMs = periodSec * 1000
+//
+//    timer.scheduleAtFixedRate(new TimerTask() {
+//      override def run(): Unit = {
+//        val now = new Date().getTime
+//        clean(dirPath, now - expireMs)
+//      }
+//    }, periodMs, periodMs)
+//  }
+//
+//  def end(): Unit = {
+//    timer.cancel
+//  }
+//
+//  private def clean(dirPath: String, expireLineMs: Long): Unit = {
+//    val process = Runtime.getRuntime.exec("hadoop fs -ls " + dirPath)
+//    val result = process.waitFor()
+//    if (result == 0) {
+//      val reader = new BufferedReader(new 
InputStreamReader(process.getInputStream))
+//      var line = reader.readLine
+//      while (line != null) {
+//        val index = line.indexOf("/")
+//        if (index >= 0) {
+//          val dir = line.substring(index)
+//          val tail = dir.split("/").last
+//          val t = try {
+//            tail.toLong
+//          } catch {
+//            case _ => Long.MaxValue
+//          }
+//
+//          if (t < expireLineMs) {  // expire
+//            val remove = Runtime.getRuntime().exec("hadoop fs -rm -R " + dir)
+//            val removeResult = remove.waitFor()
+//            if (removeResult == 0) {
+//              println(s"===== hadoop fs -rm -R ${dir} =====")
+//            }
+//          }
+//        }
+//        line = reader.readLine
+//      }
+//    }
+//  }
+//
+//  protected def getFilePath(parentPath: String, fileName: String): String = {
+//    val sep = "/"
+//    if (parentPath.endsWith(sep)) parentPath + fileName else parentPath + 
sep + fileName
+//  }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/AccuResult.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/AccuResult.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/AccuResult.scala
new file mode 100644
index 0000000..25f4d9e
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/AccuResult.scala
@@ -0,0 +1,18 @@
+package org.apache.griffin.record.result
+
+case class AccuResult(missCount: Long, totalCount: Long) extends Result {
+
+  type T = AccuResult
+
+  def updateResult(delta: T): T = {
+    AccuResult(delta.missCount, totalCount)
+  }
+
+  def needCalc(): Boolean = {
+    this.missCount > 0
+  }
+
+  def differsFrom(other: T): Boolean = {
+    (this.missCount != other.missCount) || (this.totalCount != 
other.totalCount)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/Result.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/Result.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/Result.scala
new file mode 100644
index 0000000..7d73677
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/Result.scala
@@ -0,0 +1,12 @@
+package org.apache.griffin.record.result
+
+trait Result extends Serializable {
+
+  type T <: Result
+
+  def updateResult(delta: T): T
+
+  def needCalc(): Boolean
+
+  def differsFrom(other: T): Boolean
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/DataTypeUtil.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/DataTypeUtil.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/DataTypeUtil.scala
new file mode 100644
index 0000000..9ad128d
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/DataTypeUtil.scala
@@ -0,0 +1,179 @@
+package org.apache.griffin.utils
+
+import java.net.URLDecoder
+
+import org.apache.avro.util.Utf8
+
+object DataTypeUtil {
+
+  object DataTypeEnum {
+    val NULL_TYPE = "null"
+    val INT_TYPE = "int"
+    val SHORT_TYPE = "short"
+    val LONG_TYPE = "long"
+    val BYTE_TYPE = "byte"
+    val FLOAT_TYPE = "float"
+    val DOUBLE_TYPE = "double"
+    val STRING_TYPE = "string"
+    val BOOLEAN_TYPE = "boolean"
+    val UTF8_TYPE = "utf8"
+    val ISO88591_TYPE = "iso-8859-1"
+
+    val NULL_REGEX = """^[Nn]ull(?:Type)?$""".r
+    val INT_REGEX = """^[Ii]nt(?:eger)?(?:Type)?$""".r
+    val SHORT_REGEX = """^[Ss]hort(?:Type)?[Ss]mall(?:[Ii]nt)?$""".r
+    val LONG_REGEX = """^[Ll]ong(?:Type)?|[Bb]ig(?:[Ii]nt)?$""".r
+    val BYTE_REGEX = """^[Bb]yte(?:Type)?|[Tt]iny(?:[Ii]nt)?$""".r
+    val FLOAT_REGEX = """^[Ff]loat(?:Type)?$""".r
+    val DOUBLE_REGEX = """^[Dd]ouble(?:Type)?$""".r
+    val STRING_REGEX = 
"""^[Ss]tr(?:ing)?(?:Type)?|[Vv]ar(?:[Cc]har)?|[Cc]har$""".r
+    val BOOLEAN_REGEX = """^[Bb]ool(?:ean)?(?:Type)?|[Bb]inary$""".r
+    val UTF8_REGEX = """^[Uu][Tt][Ff](?:-)?8(?:Type)?$""".r
+    val ISO88591_REGEX = """^[Ii][Ss][Oo](?:-)?8859(?:-)?1(?:Type)?$""".r
+  }
+  import DataTypeEnum._
+
+  object DataTypeTuple {
+    // (type name, type name pattern, type convert function, string value 
convert function)
+    final val nullType = (NULL_TYPE, NULL_REGEX, convertNull _, 
StringConverter.str2Null _)
+    final val intType = (INT_TYPE, INT_REGEX, convertType[Int] _, 
StringConverter.str2Int _)
+    final val shortType = (SHORT_TYPE, SHORT_REGEX, convertType[Short] _, 
StringConverter.str2Short _)
+    final val longType = (LONG_TYPE, LONG_REGEX, convertType[Long] _, 
StringConverter.str2Long _)
+    final val byteType = (BYTE_TYPE, BYTE_REGEX, convertType[Byte] _, 
StringConverter.str2Byte _)
+    final val floatType = (FLOAT_TYPE, FLOAT_REGEX, convertType[Float] _, 
StringConverter.str2Float _)
+    final val doubleType = (DOUBLE_TYPE, DOUBLE_REGEX, convertType[Double] _, 
StringConverter.str2Double _)
+    final val stringType = (STRING_TYPE, STRING_REGEX, convertType[String] _, 
StringConverter.str2String _)
+    final val booleanType = (BOOLEAN_TYPE, BOOLEAN_REGEX, convertType[Boolean] 
_, StringConverter.str2Boolean _)
+    final val utf8Type = (UTF8_TYPE, UTF8_REGEX, convertUtf8 _, 
StringConverter.str2Utf8 _)
+    final val iso88591Type = (ISO88591_TYPE, ISO88591_REGEX, convertIso88591 
_, StringConverter.str2Iso88591 _)
+  }
+  import DataTypeTuple._
+
+  private def convertType[T](x: Any): T = { x.asInstanceOf[T] }
+  private def convertUtf8(x: Any): String = { x.asInstanceOf[Utf8].toString }
+  private def convertIso88591(x: Any): String = {
+    val str = new String(x.toString.getBytes("ISO-8859-1"), "UTF-8")
+    val str1 = x.asInstanceOf[Utf8].toString
+    println(s"=== ${x.getClass.getSimpleName} === ${str} === ${str1} ===")
+    str
+  }
+  private def convertNull(x: Any): Null = { null }
+  private def convertDecodeString(x: Any): String = {
+    val str = decodePrepare(convertType[String](x))
+    URLDecoder.decode(str, "UTF-8")
+  }
+  private def convertDecodeUtf8(x: Any): String = {
+    val str = decodePrepare(convertUtf8(x))
+    URLDecoder.decode(str, "UTF-8")
+  }
+  private def convertDecodeIso88591(x: Any): String = {
+    val str = decodePrepare(convertIso88591(x))
+    URLDecoder.decode(str, "UTF-8")
+  }
+
+  def str2ConvertFunc(tp: String, decode: Boolean = false): (Any) => _ = {
+    tp match {
+      case nullType._2() => nullType._3
+      case intType._2() => intType._3
+      case shortType._2() => shortType._3
+      case longType._2() => longType._3
+      case byteType._2() => byteType._3
+      case floatType._2() => floatType._3
+      case doubleType._2() => doubleType._3
+      case stringType._2() => if (decode) convertDecodeString _ else 
stringType._3
+      case booleanType._2() => booleanType._3
+      case utf8Type._2() => if (decode) convertDecodeUtf8 _ else utf8Type._3
+      case iso88591Type._2() => if (decode) convertDecodeIso88591 _ else 
iso88591Type._3
+      case _ => stringType._3
+    }
+  }
+
+  def str2StrValConvertFunc(tp: String, decode: Boolean = false): (String) => 
_ = {
+    tp match {
+      case nullType._2() => nullType._4
+      case intType._2() => intType._4
+      case shortType._2() => shortType._4
+      case longType._2() => longType._4
+      case byteType._2() => byteType._4
+      case floatType._2() => floatType._4
+      case doubleType._2() => doubleType._4
+      case stringType._2() => if (decode) StringConverter.str2DecodeString _ 
else stringType._4
+      case booleanType._2() => booleanType._4
+      case utf8Type._2() => if (decode) StringConverter.str2DecodeUtf8 _ else 
utf8Type._4
+      case iso88591Type._2() => if (decode) StringConverter.str2DecodeIso88591 
_ else iso88591Type._4
+      case _ => stringType._4
+    }
+  }
+
+  object StringConverter {
+    def str2Null(s: String): Null = null
+    def str2Int(s: String): Int = s.toInt
+    def str2Short(s: String): Short = s.toShort
+    def str2Long(s: String): Long = s.toLong
+    def str2Byte(s: String): Byte = s.toByte
+    def str2Float(s: String): Float = s.toFloat
+    def str2Double(s: String): Double = s.toDouble
+    def str2String(s: String): String = s
+    def str2DecodeString(s: String): String = {
+      val str = decodePrepare(str2String(s))
+      URLDecoder.decode(str, "UTF-8")
+    }
+    def str2Boolean(s: String): Boolean = s.toBoolean
+    def str2Utf8(s: String): String = { new Utf8(s).toString }
+    def str2DecodeUtf8(s: String): String = {
+      val str = decodePrepare(str2Utf8(s))
+      URLDecoder.decode(str, "UTF-8")
+    }
+    def str2Iso88591(s: String): String = { new 
String(s.getBytes("ISO-8859-1"), "UTF-8") }
+    def str2DecodeIso88591(s: String): String = {
+      val str = decodePrepare(str2Iso88591(s))
+      URLDecoder.decode(str, "UTF-8")
+    }
+  }
+
+  private def decodePrepare(s: String): String = {
+    s.replaceAll("%(?![0-9a-fA-F]{2})", "%25")
+  }
+
+//  object ParamTypeTuple {
+//
+//    final val intParamType = (INT_TYPE, INT_REGEX, getInt _, getIntList _)
+//    final val shortParamType = (SHORT_TYPE, SHORT_REGEX, getInt _, 
getIntList _)
+//    final val longParamType = (LONG_TYPE, LONG_REGEX, getLong _, getLongList 
_)
+//    final val byteParamType = (BYTE_TYPE, BYTE_REGEX, getInt _, getIntList _)
+//    final val floatParamType = (FLOAT_TYPE, FLOAT_REGEX, getDouble _, 
getDoubleList _)
+//    final val doubleParamType = (DOUBLE_TYPE, DOUBLE_REGEX, getDouble _, 
getDoubleList _)
+//    final val stringParamType = (STRING_TYPE, STRING_REGEX, getString _, 
getStringList _)
+//    final val booleanParamType = (BOOLEAN_TYPE, BOOLEAN_REGEX, getBoolean _, 
getBooleanList _)
+//  }
+
+//  import com.typesafe.config.Config
+//
+//  def str2GetValueFunc(tp: String): (Config => String => _) = {
+//    tp match {
+//      case intParamType._2() => intParamType._3
+//      case shortParamType._2() => shortParamType._3
+//      case longParamType._2() => longParamType._3
+//      case byteParamType._2() => byteParamType._3
+//      case floatParamType._2() => floatParamType._3
+//      case doubleParamType._2() => doubleParamType._3
+//      case stringParamType._2() => stringParamType._3
+//      case booleanParamType._2() => booleanParamType._3
+//      case _ => stringType._3
+//    }
+//  }
+//
+//  def str2GetValueListFunc(tp: String): (Config => String => List[_]) = {
+//    tp match {
+//      case intParamType._2() => intParamType._4
+//      case shortParamType._2() => shortParamType._4
+//      case longParamType._2() => longParamType._4
+//      case byteParamType._2() => byteParamType._4
+//      case floatParamType._2() => floatParamType._4
+//      case doubleParamType._2() => doubleParamType._4
+//      case stringParamType._2() => stringParamType._4
+//      case booleanParamType._2() => booleanParamType._4
+//      case _ => stringType._4
+//    }
+//  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ExtractJsonUtil.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ExtractJsonUtil.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ExtractJsonUtil.scala
new file mode 100644
index 0000000..bd95d40
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ExtractJsonUtil.scala
@@ -0,0 +1,205 @@
+package org.apache.griffin.utils
+
+import scala.util.{Failure, Success, Try}
+
+
+object ExtractJsonUtil {
+
+  val JSON_REGEX = """^(?i)json$""".r
+  val COUNT_REGEX = """^(?i)count$""".r
+  val MAP_REGEX = """^\.(.+)$""".r
+  val LIST_REGEX = """^\[(\d+)\]$""".r
+  val ALL_LIST_REGEX = """^\[\*\]$""".r
+  val FILTER_LIST_REGEX = """^\[\.(.+)=(.+)\]$""".r
+
+  def extract(data: Option[Any], steps: List[String]): Option[Any] = {
+    Try {
+      steps match {
+        case Nil => data
+        case step :: tailSteps => {
+          step match {
+            case JSON_REGEX() => {
+              data match {
+//                case Some(line: String) => 
extract(Some(JsonUtil.toMap[Any](line)), tailSteps)
+                case Some(line: String) => 
extract(Some(JsonUtil.toAnyMap(line)), tailSteps)
+                case _ => {
+                  throw new Exception(s"error: ${step} should be for string")
+                }
+              }
+            }
+            case MAP_REGEX(key) => {
+              data match {
+                case Some(map: Map[String, Any]) => extract(map.get(key), 
tailSteps)
+                case _ => {
+                  throw new Exception(s"error: ${step} should be for map")
+                }
+              }
+            }
+            case LIST_REGEX(index) => {
+              data match {
+                case Some(list: List[Any]) => extract(list.lift(index.toInt), 
tailSteps)
+                case _ => {
+                  throw new Exception(s"error: ${step} should be for list")
+                }
+              }
+            }
+            case COUNT_REGEX() => {
+              data match {
+                case Some(list: List[Any]) => Some(list.size)
+                case Some(map: Map[String, Any]) => Some(map.size)
+                case _ => {
+                  throw new Exception(s"error: ${step} should be for list or 
map")
+                }
+              }
+            }
+            case _ => {
+              throw new Exception(s"error: ${step} undefined")
+            }
+          }
+        }
+      }
+    } match {
+      case Success(res) => res
+      case Failure(ex) => {
+        println(s"extract failure: ${ex.getMessage}")
+        None
+      }
+    }
+
+  }
+
+  def stepHead(path: List[String], step: String): List[String] = {
+    path :+ step
+  }
+
+  def extractList(pathDatas: List[(List[String], Option[_])], steps: 
List[String]): List[(List[String], Option[_])] = {
+    Try {
+      steps match {
+        case Nil => pathDatas
+        case step :: tailSteps => {
+          pathDatas.flatMap { pathData =>
+            val (path, data) = pathData
+            step match {
+              case JSON_REGEX() => {
+                data match {
+//                  case Some(line: String) => extractList((stepHead(path, 
step), Some(JsonUtil.toMap[Any](line))) :: Nil, tailSteps)
+                  case Some(line: String) => extractList((stepHead(path, 
step), Some(JsonUtil.toAnyMap(line))) :: Nil, tailSteps)
+                  case _ => {
+//                    extractList((stepHead(path, step), None) :: Nil, 
tailSteps)
+                    throw new Exception(s"error: ${step} should be for string")
+                  }
+                }
+              }
+              case MAP_REGEX(key) => {
+                data match {
+                  case Some(map: Map[String, Any]) => 
extractList((stepHead(path, step), map.get(key)) :: Nil, tailSteps)
+                  case _ => {
+//                    extractList((stepHead(path, step), None) :: Nil, 
tailSteps)
+                    throw new Exception(s"error: ${step} should be for map")
+                  }
+                }
+              }
+              case LIST_REGEX(index) => {
+                data match {
+                  case Some(list: List[Any]) => extractList((stepHead(path, 
step), list.lift(index.toInt)) :: Nil, tailSteps)
+                  case _ => {
+//                    extractList((stepHead(path, step), None) :: Nil, 
tailSteps)
+                    throw new Exception(s"error: ${step} should be for list")
+                  }
+                }
+              }
+              case ALL_LIST_REGEX() => {
+                data match {
+                  case Some(list: List[Any]) => {
+                    val idxes = (0 to (list.size - 1)).toList
+                    val newList = idxes.map { idx =>
+                      val newStep = s"${step}_${idx}"
+                      (stepHead(path, newStep), list.lift(idx))
+                    }
+                    extractList(newList, tailSteps)
+                  }
+                  case _ => {
+//                    extractList((stepHead(path, step), None) :: Nil, 
tailSteps)
+                    throw new Exception(s"error: ${step} should be for list")
+                  }
+                }
+              }
+              case FILTER_LIST_REGEX(key, value) => {
+                data match {
+                  case Some(list: List[Any]) => {
+                    val filteredList = list.filter { item =>
+                      item match {
+                        case mp: Map[String, Any] => {
+                          mp.get(key) match {
+                            case Some(v) => v == value
+                            case _ => false
+                          }
+                        }
+                        case _ => false
+                      }
+                    }
+                    val idxes = (0 to (filteredList.size - 1)).toList
+                    val newList = idxes.map { idx =>
+                      val newStep = s"${step}_${idx}"
+                      (stepHead(path, newStep), filteredList.lift(idx))
+                    }
+                    extractList(newList, tailSteps)
+                  }
+                  case _ => {
+//                    extractList((stepHead(path, step), None) :: Nil, 
tailSteps)
+                    throw new Exception(s"error: ${step} should be for list")
+                  }
+                }
+              }
+              case COUNT_REGEX() => {
+                data match {
+                  case Some(list: List[Any]) => (stepHead(path, step), 
Some(list.size)) :: Nil
+                  case Some(map: Map[String, Any]) => (stepHead(path, step), 
Some(map.size)) :: Nil
+                  case _ => {
+//                    extractList((stepHead(path, step), None) :: Nil, 
tailSteps)
+                    throw new Exception(s"error: ${step} should be for list or 
map")
+                  }
+                }
+              }
+              case _ => {
+//                extractList((stepHead(path, step), None) :: Nil, tailSteps)
+                throw new Exception(s"error: ${step} undefined")
+              }
+            }
+          }
+        }
+      }
+    } match {
+      case Success(res) => res
+      case Failure(ex) => {
+        println(s"extract failure: ${ex.getMessage}")
+        Nil
+      }
+    }
+
+  }
+
+  def extractPathDataListWithSchemaMap(pathDatas: List[(List[String], 
Option[_])], stepsMap: Map[String, List[String]]): List[Map[String, Option[_]]] 
= {
+    val schemaValues: Map[String, List[(List[String], Option[_])]] = 
stepsMap.mapValues { steps =>
+      extractList(pathDatas, steps)
+    }
+
+    ValueListCombineUtil.cartesian(schemaValues)
+  }
+
+  def extractDataListWithSchemaMap(datas: List[Option[_]], stepsMap: 
Map[String, List[String]]): List[Map[String, Option[_]]] = {
+    val pathDatas = datas.map((Nil, _))
+    extractPathDataListWithSchemaMap(pathDatas, stepsMap)
+  }
+
+//  def extractDataListWithSchemaMapTest(datas: List[Option[_]], stepsMap: 
Map[String, List[String]]): Unit = {
+//    val pathDatas = datas.map((Nil, _))
+//
+//    val schemaValues: Map[String, List[(List[String], Option[_])]] = 
stepsMap.mapValues { steps =>
+//      extractList(pathDatas, steps)
+//    }
+//
+//    ValueListCombineUtil.cartesianTest(schemaValues)
+//  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/HdfsUtil.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/HdfsUtil.scala
new file mode 100644
index 0000000..70a8004
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/HdfsUtil.scala
@@ -0,0 +1,57 @@
+package org.apache.griffin.utils
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, 
FileSystem, Path}
+
+object HdfsUtil {
+
+  private val seprator = "/"
+
+  private val conf = new Configuration()
+  conf.set("dfs.support.append", "true")
+
+  private val dfs = FileSystem.get(conf)
+
+  def createFile(filePath: String): FSDataOutputStream = {
+    val path = new Path(filePath)
+    if (dfs.exists(path)) dfs.delete(path, true)
+    return dfs.create(path)
+  }
+
+  def appendOrCreateFile(filePath: String): FSDataOutputStream = {
+    val path = new Path(filePath)
+    if (dfs.exists(path)) dfs.append(path) else createFile(filePath)
+  }
+
+  def openFile(filePath: String): FSDataInputStream = {
+    val path = new Path(filePath)
+    dfs.open(path)
+  }
+
+  def writeContent(filePath: String, message: String): Unit = {
+    val out = createFile(filePath)
+    out.write(message.getBytes("utf-8"))
+    out.close
+  }
+
+  def appendContent(filePath: String, message: String): Unit = {
+    val out = appendOrCreateFile(filePath)
+    out.write(message.getBytes("utf-8"))
+    out.close
+  }
+
+  def createEmptyFile(filePath: String): Unit = {
+    val out = createFile(filePath)
+    out.close
+  }
+
+
+  def getHdfsFilePath(parentPath: String, fileName: String): String = {
+    if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + 
seprator + fileName
+  }
+
+  def deleteHdfsPath(dirPath: String): Unit = {
+    val path = new Path(dirPath)
+    if (dfs.exists(path)) dfs.delete(path, true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/JsonUtil.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/JsonUtil.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/JsonUtil.scala
new file mode 100644
index 0000000..d56b467
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/JsonUtil.scala
@@ -0,0 +1,33 @@
+package org.apache.griffin.utils
+
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.griffin.config.params.AllParam
+
+object JsonUtil {
+  val mapper = new ObjectMapper()
+  mapper.registerModule(DefaultScalaModule)
+  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+
+  def toJson(value: Map[Symbol, Any]): String = {
+    toJson(value map { case (k,v) => k.name -> v})
+  }
+
+  def toJson(value: Any): String = {
+    mapper.writeValueAsString(value)
+  }
+
+//  def toMap[V](json:String)(implicit m: Manifest[V]) = 
fromJson[Map[String,V]](json)
+//
+//  def fromJson[T](json: String)(implicit m : Manifest[T]): T = {
+//    mapper.readValue(json, classOf[T])
+//  }
+
+  def toAnyMap(json: String) = {
+    mapper.readValue(json, classOf[Map[String, Any]])
+  }
+
+  def fromJson2AllParam(json: String) = {
+    mapper.readValue(json, classOf[AllParam])
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/RestfulUtil.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/RestfulUtil.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/RestfulUtil.scala
new file mode 100644
index 0000000..a233aad
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/RestfulUtil.scala
@@ -0,0 +1,16 @@
+package org.apache.griffin.utils
+
+import scalaj.http._
+
+object RestfulUtil {
+
+  def postData(url: String, params: Map[String, Object], headers: Map[String, 
Object], data: String): String = {
+    val response = 
Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString
+    response.code.toString
+  }
+
+  private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, 
String] = {
+    map.map(pair => pair._1 -> pair._2.asInstanceOf[String])
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ValueListCombineUtil.scala
----------------------------------------------------------------------
diff --git 
a/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ValueListCombineUtil.scala
 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ValueListCombineUtil.scala
new file mode 100644
index 0000000..3a428cf
--- /dev/null
+++ 
b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ValueListCombineUtil.scala
@@ -0,0 +1,198 @@
+package org.apache.griffin.utils
+
+import scala.collection.mutable.MutableList
+
+object ValueListCombineUtil {
+
+  // Map[String, List[(List[String], T)]]: Map[key, List[(path, value)]]
+  def cartesian[T](valuesMap: Map[String, List[(List[String], T)]]): 
List[Map[String, T]] = {
+    val fieldsList: List[(String, List[(List[String], T)])] = valuesMap.toList
+
+    // wrong algorithm: assume the lists have same size
+//    val minSize = fieldsList.map(_._2.size).min
+//    val idxes = (0 to (minSize - 1)).toList
+//    idxes.map { idx =>
+//      fieldsList.foldLeft(Map[String, T]()) { (map, pair) =>
+//        val (key, value) = pair
+//        map + (key -> value(idx)._2)
+//      }
+//    }
+
+    // following is correct algorithm
+
+    // List[key, List[(path, value)]] to List[(path, (key, value))]
+    val valueList: List[(List[String], (String, T))] = fieldsList.flatMap { 
fields =>
+      val (key, list) = fields
+      list.map { pv =>
+        val (path, value) = pv
+        (path, (key, value))
+      }
+    }
+
+    // 1. generate tree from value list, and return root node
+    val root = TreeUtil.genRootTree(valueList)
+
+    // 2. deep first visit tree from root, merge datas into value map list
+    val valueMapList: List[Map[String, _]] = TreeUtil.mergeDatasIntoMap(root, 
Nil)
+
+    // 3. simple change
+    valueMapList.map { mp =>
+      mp.map { kv =>
+        val (k, v) = kv
+        (k, v.asInstanceOf[T])
+      }
+    }
+
+  }
+
+//  def cartesianTest[T](valuesMap: Map[String, List[(List[String], T)]]): 
Unit = {
+//    val fieldsList: List[(String, List[(List[String], T)])] = 
valuesMap.toList
+//
+//    // List[key, List[(path, value)]] to List[(path, (key, value))]
+//    val valueList: List[(List[String], (String, T))] = fieldsList.flatMap { 
fields =>
+//      val (key, list) = fields
+//      list.map { pv =>
+//        val (path, value) = pv
+//        (path, (key, value))
+//      }
+//    }
+//
+//    // 1. generate tree from value list, and return root node
+//    val root = TreeUtil.genRootTree(valueList)
+//
+//    // 2. deep first visit tree from root, merge datas into value map list
+//    val valueMapList: List[Map[String, _]] = 
TreeUtil.mergeDatasIntoMap(root, Nil)
+//
+//    valueMapList.foreach(println)
+//
+//  }
+
+
+  case class TreeNode(key: String, var datas: List[(String, _)]) {
+    var children = List[TreeNode]()
+    def addChild(node: TreeNode): Unit = children = children :+ node
+    def mergeSelf(node: TreeNode): Unit = datas = datas ::: node.datas
+  }
+
+  object TreeUtil {
+    private def genTree(path: List[String], datas: List[(String, _)]): 
Option[TreeNode] = {
+      path match {
+        case Nil => None
+//        case head :: Nil => Some(TreeNode(datas))
+        case head :: tail => {
+          genTree(tail, datas) match {
+            case Some(child) => {
+              val curNode = TreeNode(head, Nil)
+              curNode.addChild(child)
+              Some(curNode)
+            }
+            case _ => Some(TreeNode(head, datas))
+          }
+        }
+      }
+    }
+
+    private def mergeTrees(trees: List[TreeNode], newTreeOpt: 
Option[TreeNode]): List[TreeNode] = {
+      newTreeOpt match {
+        case Some(newTree) => {
+          trees.find(tree => tree.key == newTree.key) match {
+            case Some(tree) => {
+              // children merge
+              for (child <- newTree.children) {
+                tree.children = mergeTrees(tree.children, Some(child))
+              }
+              // self data merge
+              tree.mergeSelf(newTree)
+              trees
+            }
+            case _ => trees :+ newTree
+          }
+        }
+        case _ => trees
+      }
+    }
+
+    private def root(): TreeNode = TreeNode("", Nil)
+
+    def genRootTree(values: List[(List[String], (String, _))]): TreeNode = {
+      val rootNode = root()
+      val nodeOpts = values.map(value => genTree(value._1, value._2 :: Nil))
+      rootNode.children = nodeOpts.foldLeft(List[TreeNode]()) { (trees, 
treeOpt) =>
+        mergeTrees(trees, treeOpt)
+      }
+      rootNode
+    }
+
+    private def add(mapList1: List[Map[String, _]], mapList2: List[Map[String, 
_]]):  List[Map[String, _]] = {
+      mapList1 ::: mapList2
+    }
+    private def multiply(mapList1: List[Map[String, _]], mapList2: 
List[Map[String, _]]):  List[Map[String, _]] = {
+      mapList1.flatMap { map1 =>
+        mapList2.map { map2 =>
+          map1 ++ map2
+        }
+      }
+    }
+
+    private def keysList(mapList: List[Map[String, _]]): List[String] = {
+      val keySet = mapList match {
+        case Nil => Set[String]()
+        case head :: _ => head.keySet
+      }
+      keySet.toList
+    }
+
+    def mergeDatasIntoMap(root: TreeNode, mapDatas: List[Map[String, _]]): 
List[Map[String, _]] = {
+      val childrenKeysMapDatas = root.children.foldLeft(Map[List[String], 
List[Map[String, _]]]()) { (keysMap, child) =>
+        val childMdts = mergeDatasIntoMap(child, List[Map[String, _]]())
+        val keys = keysList(childMdts)
+        val afterList = keysMap.get(keys) match {
+          case Some(list) => add(list, childMdts)
+          case _ => childMdts
+        }
+        keysMap + (keys -> afterList)
+      }
+      val childrenMergeMaps = 
childrenKeysMapDatas.values.foldLeft(List[Map[String, _]]()) { (originList, 
list) =>
+        originList match {
+          case Nil => list
+          case _ => multiply(originList, list)
+        }
+      }
+      mergeNodeChildrenDatasIntoMap(root, childrenMergeMaps)
+    }
+
+    private def mergeNodeChildrenDatasIntoMap(node: TreeNode, mapDatas: 
List[Map[String, _]]): List[Map[String, _]] = {
+      val datas: List[(String, (String, Any))] = node.children.flatMap { child 
=>
+        child.datas.map(dt => (dt._1, (child.key, dt._2)))
+      }
+      val childrenDataKeys: Set[String] = datas.map(_._1).toSet
+      val childrenDataLists: Map[String, List[(String, _)]] = 
datas.foldLeft(childrenDataKeys.map(k => (k, List[(String, _)]())).toMap) { 
(maps, data) =>
+        maps.get(data._1) match {
+          case Some(list) => maps + (data._1 -> (list :+ data._2))
+          case _ => maps
+        }
+      }
+
+      // multiply different key datas
+      childrenDataLists.foldLeft(mapDatas) { (mdts, klPair) =>
+        val (key, list) = klPair
+        mdts match {
+          case Nil => list.map(pr => Map[String, Any]((key -> pr._2)))
+          case _ => {
+            list.flatMap { kvPair =>
+              val (path, value) = kvPair
+              mdts.map { mp =>
+                mp + (key -> value)
+              }
+            }
+          }
+        }
+      }
+
+    }
+  }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/test/resources/config.json
----------------------------------------------------------------------
diff --git a/griffin-streaming-model/src/test/resources/config.json 
b/griffin-streaming-model/src/test/resources/config.json
new file mode 100644
index 0000000..808b0f7
--- /dev/null
+++ b/griffin-streaming-model/src/test/resources/config.json
@@ -0,0 +1,11 @@
+{
+  "spark": {
+    "app.name": "GriffinAccuStreamingApp",
+    "checkpoint.dir": "hdfs:///test/kafka/cp",
+    "config": {
+      "spark.task.maxFailures": 5,
+      "spark.streaming.kafka.maxRatePerPartition": 1000,
+      "spark.streaming.concurrentJobs": 4
+    }
+  }
+}
\ No newline at end of file

Reply via email to