http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/MongoThreadPool.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/MongoThreadPool.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/persist/MongoThreadPool.scala
deleted file mode 100644
index 2f43edb..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/MongoThreadPool.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.persist
-
-import java.util.Date
-import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-object MongoThreadPool {
-
-  import scala.concurrent.ExecutionContext.Implicits.global
-
-  private val pool: ThreadPoolExecutor = 
Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
-  val MAX_RETRY = 100
-
-  def shutdown(): Unit = {
-    pool.shutdown()
-    pool.awaitTermination(10, TimeUnit.SECONDS)
-  }
-
-  def addTask(func: () => (Long, Future[_]), retry: Int): Unit = {
-    val r = if (retry < 0) MAX_RETRY else retry
-    println(s"add task, current task num: ${pool.getQueue.size}")
-    pool.submit(Task(func, r))
-  }
-
-  case class Task(func: () => (Long, Future[_]), retry: Int) extends Runnable {
-
-    override def run(): Unit = {
-      val st = new Date().getTime
-      val (t, res) = func()
-      res.onComplete {
-        case Success(value) => {
-          val et = new Date().getTime
-          println(s"task ${t} success [ using time ${et - st} ms ]")
-        }
-        case Failure(e) => {
-          val et = new Date().getTime
-          println(s"task ${t} fails [ using time ${et - st} ms ] : 
${e.getMessage}")
-          if (retry > 0) {
-            println(s"task ${t} retry [ rest retry count: ${retry - 1} ]")
-            pool.submit(Task(func, retry - 1))
-          } else {
-            println(s"task ${t} retry ends but fails")
-          }
-        }
-      }
-    }
-
-    def fail(): Unit = {
-      println("task fails")
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
deleted file mode 100644
index 4b30cba..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.persist
-
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Dataset}
-
-import scala.util.Try
-
-// persist result and data by multiple persists
-case class MultiPersists(persists: Iterable[Persist]) extends Persist {
-
-  val timeStamp: Long = persists match {
-    case Nil => 0
-    case _ => persists.head.timeStamp
-  }
-
-  val config: Map[String, Any] = Map[String, Any]()
-
-  def available(): Boolean = { persists.exists(_.available()) }
-
-  def start(msg: String): Unit = { persists.foreach(_.start(msg)) }
-  def finish(): Unit = { persists.foreach(_.finish()) }
-
-  def log(rt: Long, msg: String): Unit = {
-    persists.foreach { persist =>
-      try {
-        persist.log(rt, msg)
-      } catch {
-        case e: Throwable => error(s"log error: ${e.getMessage}")
-      }
-    }
-  }
-
-  def persistRecords(df: DataFrame, name: String): Unit = {
-    persists.foreach { persist =>
-      try {
-        persist.persistRecords(df, name)
-      } catch {
-        case e: Throwable => error(s"persist df error: ${e.getMessage}")
-      }
-    }
-  }
-  def persistRecords(records: RDD[String], name: String): Unit = {
-    persists.foreach { persist =>
-      try {
-        persist.persistRecords(records, name)
-      } catch {
-        case e: Throwable => error(s"persist records error: ${e.getMessage}")
-      }
-    }
-  }
-  def persistRecords(records: Iterable[String], name: String): Unit = {
-    persists.foreach { persist =>
-      try {
-        persist.persistRecords(records, name)
-      } catch {
-        case e: Throwable => error(s"persist records error: ${e.getMessage}")
-      }
-    }
-  }
-//  def persistMetrics(metrics: Seq[String], name: String): Unit = { 
persists.foreach(_.persistMetrics(metrics, name)) }
-  def persistMetrics(metrics: Map[String, Any]): Unit = {
-    persists.foreach { persist =>
-      try {
-        persist.persistMetrics(metrics)
-      } catch {
-        case e: Throwable => error(s"persist metrics error: ${e.getMessage}")
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala 
b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
deleted file mode 100644
index 300f2c5..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.persist
-
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.result._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Dataset}
-
-import scala.util.Try
-
-
-trait Persist extends Loggable with Serializable {
-  val timeStamp: Long
-
-  val config: Map[String, Any]
-
-  def available(): Boolean
-
-  def start(msg: String): Unit
-  def finish(): Unit
-
-  def log(rt: Long, msg: String): Unit
-
-//  def result(rt: Long, result: Result): Unit
-//
-//  def records(recs: RDD[String], tp: String): Unit
-//  def records(recs: Iterable[String], tp: String): Unit
-
-  def persistRecords(df: DataFrame, name: String): Unit
-  def persistRecords(records: RDD[String], name: String): Unit
-  def persistRecords(records: Iterable[String], name: String): Unit
-//  def persistMetrics(metrics: Seq[String], name: String): Unit
-  def persistMetrics(metrics: Map[String, Any]): Unit
-
-}
-
-//object PersistDataType {
-//  final val MISS = "miss"
-//  final val MATCH = "match"
-//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
deleted file mode 100644
index b2e34a9..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.persist
-
-import org.apache.griffin.measure.config.params.env._
-
-import scala.util.{Success, Try}
-
-
-case class PersistFactory(persistParams: Iterable[PersistParam], metricName: 
String) extends Serializable {
-
-  val HDFS_REGEX = """^(?i)hdfs$""".r
-  val HTTP_REGEX = """^(?i)http$""".r
-//  val OLDHTTP_REGEX = """^(?i)oldhttp$""".r
-  val LOG_REGEX = """^(?i)log$""".r
-  val MONGO_REGEX = """^(?i)mongo$""".r
-
-  def getPersists(timeStamp: Long): MultiPersists = {
-    MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param)))
-  }
-
-  // get the persists configured
-  private def getPersist(timeStamp: Long, persistParam: PersistParam): 
Option[Persist] = {
-    val config = persistParam.config
-    val persistTry = persistParam.persistType match {
-      case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp))
-      case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp))
-//      case OLDHTTP_REGEX() => Try(OldHttpPersist(config, metricName, 
timeStamp))
-      case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp))
-      case MONGO_REGEX() => Try(MongoPersist(config, metricName, timeStamp))
-      case _ => throw new Exception("not supported persist type")
-    }
-    persistTry match {
-      case Success(persist) if (persist.available) => Some(persist)
-      case _ => None
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala
deleted file mode 100644
index 0a647b4..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.persist
-
-import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-
-object PersistThreadPool {
-
-  private val pool: ThreadPoolExecutor = 
Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
-  val MAX_RETRY = 100
-
-  def shutdown(): Unit = {
-    pool.shutdown()
-    pool.awaitTermination(10, TimeUnit.SECONDS)
-  }
-
-  def addTask(func: () => Boolean, retry: Int): Unit = {
-    val r = if (retry < 0) MAX_RETRY else retry
-    println(s"add task, current task num: ${pool.getQueue.size}")
-    pool.submit(Task(func, r))
-  }
-
-  case class Task(func: () => Boolean, retry: Int) extends Runnable {
-
-    override def run(): Unit = {
-      try {
-        var i = retry
-        var suc = false
-        while (!suc && i > 0) {
-          if (func()) {
-            println("task success")
-            suc = true
-          } else i = i - 1
-        }
-        if (!suc) fail(s"retried for ${retry} times")
-      } catch {
-        case e: Throwable => fail(s"${e.getMessage}")
-      }
-    }
-
-    def fail(msg: String): Unit = {
-      println(s"task fails: ${msg}")
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
deleted file mode 100644
index deccf5f..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process
-
-import java.util.Date
-
-import org.apache.griffin.measure.config.params._
-import org.apache.griffin.measure.config.params.env._
-import org.apache.griffin.measure.config.params.user._
-import org.apache.griffin.measure.data.source.DataSourceFactory
-import org.apache.griffin.measure.persist.{Persist, PersistFactory}
-import org.apache.griffin.measure.process.engine._
-import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters, TimeRange}
-import org.apache.griffin.measure.rule.adaptor._
-import org.apache.griffin.measure.rule.plan._
-import org.apache.griffin.measure.rule.udf._
-import org.apache.spark.sql.{SQLContext, SparkSession}
-import org.apache.spark.SparkConf
-
-import scala.util.Try
-
-case class BatchDqProcess(allParam: AllParam) extends DqProcess {
-
-  val envParam: EnvParam = allParam.envParam
-  val userParam: UserParam = allParam.userParam
-
-  val sparkParam = envParam.sparkParam
-  val metricName = userParam.name
-  val dataSourceNames = userParam.dataSources.map(_.name)
-  val baselineDsName = userParam.baselineDsName
-
-//  var sparkContext: SparkContext = _
-  var sqlContext: SQLContext = _
-
-  var sparkSession: SparkSession = _
-
-  def retriable: Boolean = false
-
-  def init: Try[_] = Try {
-    val conf = new SparkConf().setAppName(metricName)
-    conf.setAll(sparkParam.config)
-    conf.set("spark.sql.crossJoin.enabled", "true")
-    sparkSession = 
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
-    sparkSession.sparkContext.setLogLevel(sparkParam.logLevel)
-    sqlContext = sparkSession.sqlContext
-
-    // register udf
-    GriffinUdfs.register(sqlContext)
-    GriffinUdafs.register(sqlContext)
-
-    // init adaptors
-    RuleAdaptorGroup.init(sparkSession, dataSourceNames, baselineDsName)
-  }
-
-  def run: Try[_] = Try {
-    // start time
-    val startTime = new Date().getTime
-
-    val appTime = getAppTime
-    val calcTimeInfo = CalcTimeInfo(appTime)
-
-    // get persists to persist measure result
-    val persistFactory = PersistFactory(envParam.persistParams, metricName)
-    val persist: Persist = persistFactory.getPersists(appTime)
-
-    // persist start id
-    val applicationId = sparkSession.sparkContext.applicationId
-    persist.start(applicationId)
-
-    // get dq engines
-    val dqEngines = DqEngineFactory.genDqEngines(sqlContext)
-
-    // generate data sources
-    val dataSources = DataSourceFactory.genDataSources(sqlContext, null, 
dqEngines, userParam.dataSources)
-    dataSources.foreach(_.init)
-
-    // init data sources
-    val dsTimeRanges = dqEngines.loadData(dataSources, calcTimeInfo)
-    printTimeRanges(dsTimeRanges)
-
-    val rulePlan = RuleAdaptorGroup.genRulePlan(
-      calcTimeInfo, userParam.evaluateRuleParam, BatchProcessType, 
dsTimeRanges)
-
-    // run rules
-    dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps)
-
-    // persist results
-    dqEngines.persistAllMetrics(rulePlan.metricExports, persistFactory)
-
-    dqEngines.persistAllRecords(rulePlan.recordExports, persistFactory, 
dataSources)
-
-    // end time
-    val endTime = new Date().getTime
-    persist.log(endTime, s"process using time: ${endTime - startTime} ms")
-
-    // finish
-    persist.finish()
-
-    // clean data
-    cleanData(calcTimeInfo)
-
-//    sqlContext.tables().show(50)
-//    println(sqlContext.tableNames().size)
-
-    // clear temp table
-//    ruleSteps.foreach { rs =>
-//      println(rs)
-//      //      sqlContext.dropTempTable(rs.ruleInfo.name)
-//      rs.ruleInfo.tmstNameOpt match {
-//        case Some(n) => sqlContext.dropTempTable(s"`${n}`")
-//        case _ => {}
-//      }
-//    }
-//
-//    // -- test --
-//    sqlContext.tables().show(50)
-  }
-
-  private def cleanData(timeInfo: TimeInfo): Unit = {
-    TableRegisters.unregisterRunTempTables(sqlContext, timeInfo.key)
-    TableRegisters.unregisterCompileTempTables(timeInfo.key)
-
-    DataFrameCaches.uncacheDataFrames(timeInfo.key)
-    DataFrameCaches.clearTrashDataFrames(timeInfo.key)
-    DataFrameCaches.clearGlobalTrashDataFrames()
-  }
-
-  def end: Try[_] = Try {
-    TableRegisters.unregisterRunGlobalTables(sqlContext)
-    TableRegisters.unregisterCompileGlobalTables
-
-    DataFrameCaches.uncacheGlobalDataFrames()
-    DataFrameCaches.clearGlobalTrashDataFrames()
-
-    sparkSession.close()
-    sparkSession.stop()
-  }
-
-//  private def cleanData(t: Long): Unit = {
-//    try {
-////      dataSources.foreach(_.cleanOldData)
-////      dataSources.foreach(_.dropTable(t))
-//
-////      val cleanTime = TimeInfoCache.getCleanTime
-////      CacheResultProcesser.refresh(cleanTime)
-//
-//      sqlContext.dropTempTable()
-//    } catch {
-//      case e: Throwable => error(s"clean data error: ${e.getMessage}")
-//    }
-//  }
-
-  private def printTimeRanges(timeRanges: Map[String, TimeRange]): Unit = {
-    val timeRangesStr = timeRanges.map { pair =>
-      val (name, timeRange) = pair
-      s"${name} -> (${timeRange.begin}, ${timeRange.end}]"
-    }.mkString(", ")
-    println(s"data source timeRanges: ${timeRangesStr}")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala 
b/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala
deleted file mode 100644
index ac8f3d6..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process
-
-import org.apache.griffin.measure.config.params.env._
-import org.apache.griffin.measure.config.params.user._
-import org.apache.griffin.measure.log.Loggable
-
-import scala.util.Try
-
-trait DqProcess extends Loggable with Serializable {
-
-  val envParam: EnvParam
-  val userParam: UserParam
-
-  def init: Try[_]
-
-  def run: Try[_]
-
-  def end: Try[_]
-
-  def retriable: Boolean
-
-  protected def getAppTime: Long = {
-    if (userParam.timestamp != null && userParam.timestamp > 0) { 
userParam.timestamp }
-    else { System.currentTimeMillis }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala 
b/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala
deleted file mode 100644
index 42aa92b..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process
-
-sealed trait ExportMode {}
-
-object ExportMode {
-  def defaultMode(procType: ProcessType): ExportMode = {
-    procType match {
-      case BatchProcessType => SimpleMode
-      case StreamingProcessType => TimestampMode
-    }
-  }
-}
-
-final case object SimpleMode extends ExportMode {}
-
-final case object TimestampMode extends ExportMode {}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala 
b/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala
deleted file mode 100644
index 9f9e424..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process
-
-import scala.util.matching.Regex
-
-sealed trait ProcessType {
-  val regex: Regex
-  val desc: String
-}
-
-object ProcessType {
-  private val procTypes: List[ProcessType] = List(BatchProcessType, 
StreamingProcessType)
-  def apply(ptn: String): ProcessType = {
-    procTypes.find(tp => ptn match {
-      case tp.regex() => true
-      case _ => false
-    }).getOrElse(BatchProcessType)
-  }
-  def unapply(pt: ProcessType): Option[String] = Some(pt.desc)
-}
-
-final case object BatchProcessType extends ProcessType {
-  val regex = """^(?i)batch$""".r
-  val desc = "batch"
-}
-
-final case object StreamingProcessType extends ProcessType {
-  val regex = """^(?i)streaming$""".r
-  val desc = "streaming"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
deleted file mode 100644
index 19e553e..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process
-
-import org.apache.griffin.measure.cache.info.InfoCacheInstance
-import org.apache.griffin.measure.config.params._
-import org.apache.griffin.measure.config.params.env._
-import org.apache.griffin.measure.config.params.user._
-import org.apache.griffin.measure.data.source.DataSourceFactory
-import org.apache.griffin.measure.persist.{Persist, PersistFactory}
-import org.apache.griffin.measure.process.engine.DqEngineFactory
-import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters}
-import org.apache.griffin.measure.rule.adaptor.RuleAdaptorGroup
-import org.apache.griffin.measure.rule.udf._
-import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.sql.{SQLContext, SparkSession}
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-import org.apache.spark.{SparkConf, SparkContext}
-
-import scala.util.Try
-
-case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
-
-  val envParam: EnvParam = allParam.envParam
-  val userParam: UserParam = allParam.userParam
-
-  val sparkParam = envParam.sparkParam
-  val metricName = userParam.name
-  val dataSourceNames = userParam.dataSources.map(_.name)
-  val baselineDsName = userParam.baselineDsName
-
-//  var sparkContext: SparkContext = _
-  var sqlContext: SQLContext = _
-
-  var sparkSession: SparkSession = _
-
-  def retriable: Boolean = true
-
-  def init: Try[_] = Try {
-    val conf = new SparkConf().setAppName(metricName)
-    conf.setAll(sparkParam.config)
-    conf.set("spark.sql.crossJoin.enabled", "true")
-    sparkSession = 
SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
-    sparkSession.sparkContext.setLogLevel(sparkParam.logLevel)
-    sqlContext = sparkSession.sqlContext
-
-    // clear checkpoint directory
-    clearCpDir
-
-    // init info cache instance
-    InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName)
-    InfoCacheInstance.init
-
-    // register udf
-    GriffinUdfs.register(sqlContext)
-    GriffinUdafs.register(sqlContext)
-
-    // init adaptors
-    val dataSourceNames = userParam.dataSources.map(_.name)
-    RuleAdaptorGroup.init(sparkSession, dataSourceNames, baselineDsName)
-  }
-
-  def run: Try[_] = Try {
-    val ssc = StreamingContext.getOrCreate(sparkParam.cpDir, () => {
-      try {
-        createStreamingContext
-      } catch {
-        case e: Throwable => {
-          error(s"create streaming context error: ${e.getMessage}")
-          throw e
-        }
-      }
-    })
-
-    // start time
-    val appTime = getAppTime
-
-    // get persists to persist measure result
-    val persistFactory = PersistFactory(envParam.persistParams, metricName)
-    val persist: Persist = persistFactory.getPersists(appTime)
-
-    // persist start id
-    val applicationId = sparkSession.sparkContext.applicationId
-    persist.start(applicationId)
-
-    // get dq engines
-    val dqEngines = DqEngineFactory.genDqEngines(sqlContext)
-
-    // generate data sources
-    val dataSources = DataSourceFactory.genDataSources(sqlContext, ssc, 
dqEngines, userParam.dataSources)
-    dataSources.foreach(_.init)
-
-    // process thread
-    val dqThread = StreamingDqThread(sqlContext, dqEngines, dataSources,
-      userParam.evaluateRuleParam, persistFactory, persist)
-
-    // init data sources
-//    val dsTmsts = dqEngines.loadData(dataSources, appTime)
-
-    // generate rule steps
-//    val ruleSteps = RuleAdaptorGroup.genRuleSteps(
-//      TimeInfo(appTime, appTime), userParam.evaluateRuleParam, dsTmsts)
-//
-//    // run rules
-//    dqEngines.runRuleSteps(ruleSteps)
-//
-//    // persist results
-//    dqEngines.persistAllResults(ruleSteps, persist)
-
-    // end time
-//    val endTime = new Date().getTime
-//    persist.log(endTime, s"process using time: ${endTime - startTime} ms")
-
-    val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) 
match {
-      case Some(interval) => interval
-      case _ => throw new Exception("invalid batch interval")
-    }
-    val process = TimingProcess(processInterval, dqThread)
-    process.startup()
-
-    ssc.start()
-    ssc.awaitTermination()
-    ssc.stop(stopSparkContext=true, stopGracefully=true)
-
-    // finish
-    persist.finish()
-
-//    process.shutdown()
-  }
-
-  def end: Try[_] = Try {
-    TableRegisters.unregisterCompileGlobalTables()
-    TableRegisters.unregisterRunGlobalTables(sqlContext)
-
-    DataFrameCaches.uncacheGlobalDataFrames()
-    DataFrameCaches.clearGlobalTrashDataFrames()
-
-    sparkSession.close()
-    sparkSession.stop()
-
-    InfoCacheInstance.close
-  }
-
-  def createStreamingContext: StreamingContext = {
-    val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match {
-      case Some(interval) => Milliseconds(interval)
-      case _ => throw new Exception("invalid batch interval")
-    }
-    val ssc = new StreamingContext(sparkSession.sparkContext, batchInterval)
-    ssc.checkpoint(sparkParam.cpDir)
-
-    ssc
-  }
-
-  private def clearCpDir: Unit = {
-    if (sparkParam.needInitClear) {
-      val cpDir = sparkParam.cpDir
-      println(s"clear checkpoint directory ${cpDir}")
-      HdfsUtil.deleteHdfsPath(cpDir)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
deleted file mode 100644
index f67724f..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process
-
-import java.util.Date
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.cache.result.CacheResultProcesser
-import org.apache.griffin.measure.config.params.user.EvaluateRuleParam
-import org.apache.griffin.measure.data.source.DataSource
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.persist.{Persist, PersistFactory}
-import org.apache.griffin.measure.process.engine.DqEngines
-import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters, TimeRange}
-import org.apache.griffin.measure.rule.adaptor.{ProcessDetailsKeys, 
RuleAdaptorGroup, RunPhase}
-import org.apache.griffin.measure.rule.plan._
-import org.apache.spark.sql.SQLContext
-
-case class StreamingDqThread(sqlContext: SQLContext,
-                             dqEngines: DqEngines,
-                             dataSources: Seq[DataSource],
-                             evaluateRuleParam: EvaluateRuleParam,
-                             persistFactory: PersistFactory,
-                             appPersist: Persist
-                            ) extends Runnable with Loggable {
-
-  val lock = InfoCacheInstance.genLock("process")
-
-  def run(): Unit = {
-    val updateTimeDate = new Date()
-    val updateTime = updateTimeDate.getTime
-    println(s"===== [${updateTimeDate}] process begins =====")
-    val locked = lock.lock(5, TimeUnit.SECONDS)
-    if (locked) {
-      try {
-
-        val st = new Date().getTime
-        appPersist.log(st, s"starting process ...")
-        val calcTimeInfo = CalcTimeInfo(st)
-
-        TimeInfoCache.startTimeInfoCache
-
-        // init data sources
-        val dsTimeRanges = dqEngines.loadData(dataSources, calcTimeInfo)
-        printTimeRanges(dsTimeRanges)
-
-        // generate rule steps
-        val rulePlan = RuleAdaptorGroup.genRulePlan(
-          calcTimeInfo, evaluateRuleParam, StreamingProcessType, dsTimeRanges)
-
-        // run rules
-        dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps)
-
-        val ct = new Date().getTime
-        val calculationTimeStr = s"calculation using time: ${ct - st} ms"
-        appPersist.log(ct, calculationTimeStr)
-
-        // persist results
-        dqEngines.persistAllMetrics(rulePlan.metricExports, persistFactory)
-
-        val rt = new Date().getTime
-        val persistResultTimeStr = s"persist result using time: ${rt - ct} ms"
-        appPersist.log(rt, persistResultTimeStr)
-
-        // persist records
-        dqEngines.persistAllRecords(rulePlan.recordExports, persistFactory, 
dataSources)
-
-        // update data sources
-        dqEngines.updateDataSources(rulePlan.dsUpdates, dataSources)
-
-        // finish calculation
-        finishCalculation()
-
-        val et = new Date().getTime
-        val persistTimeStr = s"persist records using time: ${et - rt} ms"
-        appPersist.log(et, persistTimeStr)
-
-        TimeInfoCache.endTimeInfoCache
-
-        // clean old data
-        cleanData(calcTimeInfo)
-
-//        sqlContext.tables().show(20)
-
-      } catch {
-        case e: Throwable => error(s"process error: ${e.getMessage}")
-      } finally {
-        lock.unlock()
-      }
-    } else {
-      println(s"===== [${updateTimeDate}] process ignores =====")
-    }
-    val endTime = new Date().getTime
-    println(s"===== [${updateTimeDate}] process ends, using ${endTime - 
updateTime} ms =====")
-  }
-
-  // finish calculation for this round
-  private def finishCalculation(): Unit = {
-    dataSources.foreach(_.processFinish)
-  }
-
-  // clean old data and old result cache
-  private def cleanData(timeInfo: TimeInfo): Unit = {
-    try {
-      dataSources.foreach(_.cleanOldData)
-
-      TableRegisters.unregisterRunTempTables(sqlContext, timeInfo.key)
-      TableRegisters.unregisterCompileTempTables(timeInfo.key)
-
-      DataFrameCaches.uncacheDataFrames(timeInfo.key)
-      DataFrameCaches.clearTrashDataFrames(timeInfo.key)
-      DataFrameCaches.clearGlobalTrashDataFrames()
-
-      val cleanTime = TimeInfoCache.getCleanTime
-      CacheResultProcesser.refresh(cleanTime)
-    } catch {
-      case e: Throwable => error(s"clean data error: ${e.getMessage}")
-    }
-  }
-
-  private def printTimeRanges(timeRanges: Map[String, TimeRange]): Unit = {
-    val timeRangesStr = timeRanges.map { pair =>
-      val (name, timeRange) = pair
-      s"${name} -> (${timeRange.begin}, ${timeRange.end}]"
-    }.mkString(", ")
-    println(s"data source timeRanges: ${timeRangesStr}")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala 
b/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala
deleted file mode 100644
index 8d9bcb2..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process
-
-import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
-import java.util.{Timer, TimerTask}
-
-case class TimingProcess(interval: Long, runnable: Runnable) {
-
-  val pool: ThreadPoolExecutor = 
Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
-
-  val timer = new Timer("process", true)
-
-  val timerTask = new TimerTask() {
-    override def run(): Unit = {
-      pool.submit(runnable)
-    }
-  }
-
-  def startup(): Unit = {
-    timer.schedule(timerTask, interval, interval)
-  }
-
-  def shutdown(): Unit = {
-    timer.cancel()
-    pool.shutdown()
-    pool.awaitTermination(10, TimeUnit.SECONDS)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
deleted file mode 100644
index c06406c..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process.engine
-
-import java.util.Date
-
-import org.apache.griffin.measure.cache.result.CacheResultProcesser
-import org.apache.griffin.measure.config.params.user.DataSourceParam
-import org.apache.griffin.measure.data.source.{DataSource, DataSourceFactory}
-import org.apache.griffin.measure.persist.{Persist, PersistFactory}
-import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters}
-import org.apache.griffin.measure.result.AccuracyResult
-import org.apache.griffin.measure.rule.adaptor.InternalColumns
-import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.plan._
-import org.apache.griffin.measure.utils.JsonUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-import org.apache.spark.streaming.StreamingContext
-import org.apache.griffin.measure.utils.ParamUtil._
-
-import scala.util.Try
-
-import org.apache.spark.sql._
-
-case class DataFrameOprEngine(sqlContext: SQLContext) extends SparkDqEngine {
-
-  def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = {
-    ruleStep match {
-      case rs @ DfOprStep(name, rule, details, _, _) => {
-        try {
-          val df = rule match {
-            case DataFrameOprs._fromJson => DataFrameOprs.fromJson(sqlContext, 
details)
-            case DataFrameOprs._accuracy => DataFrameOprs.accuracy(sqlContext, 
timeInfo, details)
-            case DataFrameOprs._clear => DataFrameOprs.clear(sqlContext, 
details)
-            case _ => throw new Exception(s"df opr [ ${rule} ] not supported")
-          }
-          if (rs.needCache) DataFrameCaches.cacheDataFrame(timeInfo.key, name, 
df)
-          TableRegisters.registerRunTempTable(df, timeInfo.key, name)
-          true
-        } catch {
-          case e: Throwable => {
-            error(s"run df opr [ ${rule} ] error: ${e.getMessage}")
-            false
-          }
-        }
-      }
-      case _ => false
-    }
-  }
-
-}
-
-object DataFrameOprs {
-
-  final val _fromJson = "from_json"
-  final val _accuracy = "accuracy"
-  final val _clear = "clear"
-
-  object AccuracyOprKeys {
-    val _dfName = "df.name"
-    val _miss = "miss"
-    val _total = "total"
-    val _matched = "matched"
-  }
-
-  def fromJson(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = 
{
-    val _dfName = "df.name"
-    val _colName = "col.name"
-    val dfName = details.getOrElse(_dfName, "").toString
-    val colNameOpt = details.get(_colName).map(_.toString)
-
-    implicit val encoder = Encoders.STRING
-
-    val df: DataFrame = sqlContext.table(s"`${dfName}`")
-    val rdd = colNameOpt match {
-      case Some(colName: String) => df.map(r => r.getAs[String](colName))
-      case _ => df.map(_.getAs[String](0))
-    }
-    sqlContext.read.json(rdd) // slow process
-  }
-
-  def accuracy(sqlContext: SQLContext, timeInfo: TimeInfo, details: 
Map[String, Any]): DataFrame = {
-    import AccuracyOprKeys._
-
-    val dfName = details.getStringOrKey(_dfName)
-    val miss = details.getStringOrKey(_miss)
-    val total = details.getStringOrKey(_total)
-    val matched = details.getStringOrKey(_matched)
-
-//    val _enableIgnoreCache = "enable.ignore.cache"
-//    val enableIgnoreCache = details.getBoolean(_enableIgnoreCache, false)
-
-//    val tmst = InternalColumns.tmst
-
-    val updateTime = new Date().getTime
-
-    def getLong(r: Row, k: String): Option[Long] = {
-      try {
-        Some(r.getAs[Long](k))
-      } catch {
-        case e: Throwable => None
-      }
-    }
-
-    val df = sqlContext.table(s"`${dfName}`")
-
-    val results = df.rdd.flatMap { row =>
-      try {
-        val tmst = getLong(row, 
InternalColumns.tmst).getOrElse(timeInfo.calcTime)
-        val missCount = getLong(row, miss).getOrElse(0L)
-        val totalCount = getLong(row, total).getOrElse(0L)
-        val ar = AccuracyResult(missCount, totalCount)
-        if (ar.isLegal) Some((tmst, ar)) else None
-      } catch {
-        case e: Throwable => None
-      }
-    }.collect
-
-    val updateResults = results.flatMap { pair =>
-      val (t, result) = pair
-      val updatedCacheResultOpt = CacheResultProcesser.genUpdateCacheResult(t, 
updateTime, result)
-      updatedCacheResultOpt
-    }
-
-    // update results
-    updateResults.foreach { r =>
-      CacheResultProcesser.update(r)
-    }
-
-    // generate metrics
-    val schema = StructType(Array(
-      StructField(InternalColumns.tmst, LongType),
-      StructField(miss, LongType),
-      StructField(total, LongType),
-      StructField(matched, LongType),
-      StructField(InternalColumns.record, BooleanType),
-      StructField(InternalColumns.empty, BooleanType)
-    ))
-    val rows = updateResults.map { r =>
-      val ar = r.result.asInstanceOf[AccuracyResult]
-      Row(r.timeGroup, ar.miss, ar.total, ar.getMatch, !ar.initial, 
ar.eventual)
-    }
-    val rowRdd = sqlContext.sparkContext.parallelize(rows)
-    val retDf = sqlContext.createDataFrame(rowRdd, schema)
-
-    retDf
-  }
-
-  def clear(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = {
-    val _dfName = "df.name"
-    val dfName = details.getOrElse(_dfName, "").toString
-
-    val df = sqlContext.table(s"`${dfName}`")
-    val emptyRdd = sqlContext.sparkContext.emptyRDD[Row]
-    sqlContext.createDataFrame(emptyRdd, df.schema)
-  }
-
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
deleted file mode 100644
index a2de69d..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process.engine
-
-import org.apache.griffin.measure.config.params.user.DataSourceParam
-import org.apache.griffin.measure.data.source.DataSource
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.persist.{Persist, PersistFactory}
-import org.apache.griffin.measure.process.ProcessType
-import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Dataset, Row}
-
-trait DqEngine extends Loggable with Serializable {
-
-  def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean
-
-  protected def collectable(): Boolean = false
-
-  def collectMetrics(metricExport: MetricExport): Map[Long, Map[String, Any]]
-
-  //  def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
-  //
-  //  def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
-
-//  def collectUpdateRDD(ruleStep: RuleStep): Option[DataFrame]
-//  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport): 
Map[Long, DataFrame]
-
-
-  def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]]
-  def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, 
Iterable[String])]], Set[Long])
-
-  def collectUpdateDf(dsUpdate: DsUpdate): Option[DataFrame]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala
deleted file mode 100644
index e075584..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process.engine
-
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.streaming.StreamingContext
-
-
-object DqEngineFactory {
-
-  private val engineTypes = List("spark-sql", "df-opr")
-
-  private final val SparkSqlEngineType = "spark-sql"
-  private final val DataFrameOprEngineType = "df-opr"
-
-  def genDqEngines(sqlContext: SQLContext): DqEngines = {
-    val engines = engineTypes.flatMap { et =>
-      genDqEngine(et, sqlContext)
-    }
-    DqEngines(engines)
-  }
-
-  private def genDqEngine(engineType: String, sqlContext: SQLContext): 
Option[DqEngine] = {
-    engineType match {
-      case SparkSqlEngineType => Some(SparkSqlEngine(sqlContext))
-      case DataFrameOprEngineType => Some(DataFrameOprEngine(sqlContext))
-      case _ => None
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
deleted file mode 100644
index eab0b21..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process.engine
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.apache.griffin.measure.config.params.user.DataSourceParam
-import org.apache.griffin.measure.data.source._
-import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.persist._
-import org.apache.griffin.measure.process.temp.TimeRange
-import org.apache.griffin.measure.process._
-import org.apache.griffin.measure.rule.adaptor.InternalColumns
-import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.plan.{DsUpdate, _}
-import org.apache.griffin.measure.utils.JsonUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Dataset, Row}
-
-//import scala.concurrent._
-//import scala.concurrent.duration.Duration
-//import scala.util.{Failure, Success, Try}
-//import ExecutionContext.Implicits.global
-
-case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
-
-  val persistOrder: List[PersistType] = List(MetricPersistType, 
RecordPersistType)
-
-  def loadData(dataSources: Seq[DataSource], timeInfo: TimeInfo): Map[String, 
TimeRange] = {
-    dataSources.map { ds =>
-      (ds.name, ds.loadData(timeInfo))
-    }.toMap
-  }
-
-  def runRuleSteps(timeInfo: TimeInfo, ruleSteps: Seq[RuleStep]): Unit = {
-    ruleSteps.foreach { ruleStep =>
-      runRuleStep(timeInfo, ruleStep)
-    }
-  }
-
-  def persistAllMetrics(metricExports: Seq[MetricExport], persistFactory: 
PersistFactory
-                       ): Unit = {
-    val allMetrics: Map[Long, Map[String, Any]] = {
-      metricExports.foldLeft(Map[Long, Map[String, Any]]()) { (ret, 
metricExport) =>
-        val metrics = collectMetrics(metricExport)
-        metrics.foldLeft(ret) { (total, pair) =>
-          val (k, v) = pair
-          total.get(k) match {
-            case Some(map) => total + (k -> (map ++ v))
-            case _ => total + pair
-          }
-        }
-      }
-    }
-
-    allMetrics.foreach { pair =>
-      val (t, metric) = pair
-      val persist = persistFactory.getPersists(t)
-      persist.persistMetrics(metric)
-    }
-  }
-
-//  private def persistCollectedRecords(recordExport: RecordExport, records: 
Map[Long, DataFrame],
-//                                      persistFactory: PersistFactory, 
dataSources: Seq[DataSource]): Unit = {
-//    val pc = ParallelCounter(records.size)
-//    val pro = promise[Boolean]
-//    if (records.size > 0) {
-//      records.foreach { pair =>
-//        val (tmst, df) = pair
-//        val persist = persistFactory.getPersists(tmst)
-//        val updateDsCaches = recordExport.dataSourceCacheOpt match {
-//          case Some(dsName) => dataSources.filter(_.name == 
dsName).flatMap(_.dataSourceCacheOpt)
-//          case _ => Nil
-//        }
-//        val future = Future {
-//          persist.persistRecords(df, recordExport.name)
-////          updateDsCaches.foreach(_.updateData(df, tmst))
-//          updateDsCaches.foreach(_.updateData(Some(df)))
-//          true
-//        }
-//        future.onComplete {
-//          case Success(v) => {
-//            pc.finishOne(v)
-//            if (pc.checkDone) pro.trySuccess(pc.checkResult)
-//          }
-//          case Failure(ex) => {
-//            println(s"plan step failure: ${ex.getMessage}")
-//            pc.finishOne(false)
-//            if (pc.checkDone) pro.trySuccess(pc.checkResult)
-//          }
-//        }
-//      }
-//    } else pro.trySuccess(true)
-//
-//    Await.result(pro.future, Duration.Inf)
-//  }
-
-  def persistAllRecords(recordExports: Seq[RecordExport],
-                        persistFactory: PersistFactory, dataSources: 
Seq[DataSource]
-                       ): Unit = {
-    // method 1: multi thread persist multi data frame
-//    recordExports.foreach { recordExport =>
-//      val records = collectRecords(timeInfo, recordExport, procType)
-//      persistCollectedRecords(recordExport, records, persistFactory, 
dataSources)
-//    }
-
-    // method 2: multi thread persist multi iterable
-    recordExports.foreach { recordExport =>
-      recordExport.mode match {
-        case SimpleMode => {
-          collectBatchRecords(recordExport).foreach { rdd =>
-            persistCollectedBatchRecords(recordExport, rdd, persistFactory)
-          }
-        }
-        case TimestampMode => {
-          val (rddOpt, emptySet) = collectStreamingRecords(recordExport)
-          persistCollectedStreamingRecords(recordExport, rddOpt, emptySet, 
persistFactory, dataSources)
-        }
-      }
-    }
-  }
-
-  def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] = {
-    val ret = engines.foldLeft(None: Option[RDD[String]]) { (ret, engine) =>
-      if (ret.nonEmpty) ret else engine.collectBatchRecords(recordExport)
-    }
-    ret
-  }
-  def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, 
Iterable[String])]], Set[Long]) = {
-    val ret = engines.foldLeft((None: Option[RDD[(Long, Iterable[String])]], 
Set[Long]())) { (ret, engine) =>
-      if (ret._1.nonEmpty || ret._2.nonEmpty) ret else 
engine.collectStreamingRecords(recordExport)
-    }
-    ret
-  }
-
-  private def persistCollectedBatchRecords(recordExport: RecordExport,
-                                           records: RDD[String], 
persistFactory: PersistFactory
-                                          ): Unit = {
-    val persist = persistFactory.getPersists(recordExport.defTimestamp)
-    persist.persistRecords(records, recordExport.name)
-  }
-
-  private def persistCollectedStreamingRecords(recordExport: RecordExport, 
recordsOpt: Option[RDD[(Long, Iterable[String])]],
-                                               emtpyRecordKeys: Set[Long], 
persistFactory: PersistFactory,
-                                               dataSources: Seq[DataSource]
-                                              ): Unit = {
-//    val updateDsCaches = recordExport.dataSourceCacheOpt match {
-//      case Some(dsName) => dataSources.filter(_.name == 
dsName).flatMap(_.dataSourceCacheOpt)
-//      case _ => Nil
-//    }
-
-    recordsOpt.foreach { records =>
-      records.foreach { pair =>
-        val (tmst, strs) = pair
-        val persist = persistFactory.getPersists(tmst)
-
-        persist.persistRecords(strs, recordExport.name)
-//        updateDsCaches.foreach(_.updateData(strs, tmst))
-      }
-    }
-
-    emtpyRecordKeys.foreach { t =>
-      val persist = persistFactory.getPersists(t)
-      persist.persistRecords(Nil, recordExport.name)
-//      updateDsCaches.foreach(_.updateData(Nil, t))
-    }
-  }
-
-  ///////////////////////////
-
-  def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = {
-    val ret = engines.foldLeft(false) { (done, engine) =>
-      done || engine.runRuleStep(timeInfo, ruleStep)
-    }
-    if (!ret) warn(s"run rule step warn: no dq engine support ${ruleStep}")
-    ret
-  }
-
-  ///////////////////////////
-
-  def collectMetrics(metricExport: MetricExport
-                    ): Map[Long, Map[String, Any]] = {
-    val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) 
=>
-      if (ret.nonEmpty) ret else engine.collectMetrics(metricExport)
-    }
-    ret
-  }
-
-  def collectUpdateDf(dsUpdate: DsUpdate): Option[DataFrame] = {
-    val ret = engines.foldLeft(None: Option[DataFrame]) { (ret, engine) =>
-      if (ret.nonEmpty) ret else engine.collectUpdateDf(dsUpdate)
-    }
-    ret
-  }
-
-  def updateDataSources(dsUpdates: Seq[DsUpdate],
-                        dataSources: Seq[DataSource]): Unit = {
-    dsUpdates.foreach { dsUpdate =>
-      val dsName = dsUpdate.dsName
-      collectUpdateDf(dsUpdate) match {
-        case Some(df) => {
-          dataSources.find(_.name == dsName).foreach(_.updateData(df))
-        }
-        case _ => {
-          // do nothing
-        }
-      }
-    }
-  }
-
-}
-
-case class ParallelCounter(total: Int) extends Serializable {
-  private val done: AtomicInteger = new AtomicInteger(0)
-  private val result: AtomicInteger = new AtomicInteger(0)
-  def finishOne(suc: Boolean): Unit = {
-    if (suc) result.incrementAndGet
-    done.incrementAndGet
-  }
-  def checkDone: Boolean = {
-    done.get() >= total
-  }
-  def checkResult: Boolean = {
-    if (total > 0) result.get() > 0 else true
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
deleted file mode 100644
index 4ba185f..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process.engine
-
-import org.apache.griffin.measure.process._
-import org.apache.griffin.measure.rule.adaptor.InternalColumns
-import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.plan._
-import org.apache.griffin.measure.utils.JsonUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.griffin.measure.utils.ParamUtil._
-
-trait SparkDqEngine extends DqEngine {
-
-  val sqlContext: SQLContext
-
-  val emptyMetricMap = Map[Long, Map[String, Any]]()
-  val emptyMap = Map[String, Any]()
-  val emptyRecordMap = Map[Long, DataFrame]()
-
-  private def getMetricMaps(dfName: String): Seq[Map[String, Any]] = {
-    val pdf = sqlContext.table(s"`${dfName}`")
-    val records = pdf.toJSON.collect()
-    if (records.size > 0) {
-      records.flatMap { rec =>
-        try {
-          val value = JsonUtil.toAnyMap(rec)
-          Some(value)
-        } catch {
-          case e: Throwable => None
-        }
-      }.toSeq
-    } else Nil
-  }
-
-  private def normalizeMetric(metrics: Seq[Map[String, Any]], name: String, 
collectType: CollectType
-                             ): Map[String, Any] = {
-    collectType match {
-      case EntriesCollectType => metrics.headOption.getOrElse(emptyMap)
-      case ArrayCollectType => Map[String, Any]((name -> metrics))
-      case MapCollectType => {
-        val v = metrics.headOption.getOrElse(emptyMap)
-        Map[String, Any]((name -> v))
-      }
-      case _ => {
-        if (metrics.size > 1) Map[String, Any]((name -> metrics))
-        else metrics.headOption.getOrElse(emptyMap)
-      }
-    }
-  }
-
-  def collectMetrics(metricExport: MetricExport): Map[Long, Map[String, Any]] 
= {
-    if (collectable) {
-      val MetricExport(name, stepName, collectType, defTmst, mode) = 
metricExport
-      try {
-        val metricMaps: Seq[Map[String, Any]] = getMetricMaps(stepName)
-        mode match {
-          case SimpleMode => {
-            val metrics: Map[String, Any] = normalizeMetric(metricMaps, name, 
collectType)
-            emptyMetricMap + (defTmst -> metrics)
-          }
-          case TimestampMode => {
-            val tmstMetrics = metricMaps.map { metric =>
-              val tmst = metric.getLong(InternalColumns.tmst, defTmst)
-              val pureMetric = metric.removeKeys(InternalColumns.columns)
-              (tmst, pureMetric)
-            }
-            tmstMetrics.groupBy(_._1).map { pair =>
-              val (k, v) = pair
-              val maps = v.map(_._2)
-              val mtc = normalizeMetric(maps, name, collectType)
-              (k, mtc)
-            }
-          }
-        }
-      } catch {
-        case e: Throwable => {
-          error(s"collect metrics ${name} error: ${e.getMessage}")
-          emptyMetricMap
-        }
-      }
-    } else emptyMetricMap
-  }
-
-  private def getTmst(row: Row, defTmst: Long): Long = {
-    try {
-      row.getAs[Long](InternalColumns.tmst)
-    } catch {
-      case _: Throwable => defTmst
-    }
-  }
-
-  private def getRecordDataFrame(recordExport: RecordExport): 
Option[DataFrame] = {
-    if (collectable) {
-      val stepDf = sqlContext.table(s"`${recordExport.stepName}`")
-      Some(stepDf)
-    } else None
-  }
-
-  def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] = {
-    getRecordDataFrame(recordExport).map(_.toJSON.rdd)
-  }
-
-  def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, 
Iterable[String])]], Set[Long]) = {
-    implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.STRING)
-    val RecordExport(_, _, _, originDFOpt, defTmst, procType) = recordExport
-    getRecordDataFrame(recordExport) match {
-      case Some(stepDf) => {
-        originDFOpt match {
-          case Some(originName) => {
-            val tmsts = (stepDf.collect.flatMap { row =>
-              try {
-                val tmst = getTmst(row, defTmst)
-                val empty = row.getAs[Boolean](InternalColumns.empty)
-                Some((tmst, empty))
-              } catch {
-                case _: Throwable => None
-              }
-            })
-            val emptyTmsts = tmsts.filter(_._2).map(_._1).toSet
-            val recordTmsts = tmsts.filter(!_._2).map(_._1).toSet
-            if (recordTmsts.size > 0) {
-              val recordsDf = sqlContext.table(s"`${originName}`")
-              val records = recordsDf.flatMap { row =>
-                val tmst = getTmst(row, defTmst)
-                if (recordTmsts.contains(tmst)) {
-                  try {
-                    val map = SparkRowFormatter.formatRow(row)
-                    val str = JsonUtil.toJson(map)
-                    Some((tmst, str))
-                  } catch {
-                    case e: Throwable => None
-                  }
-                } else None
-              }
-              (Some(records.rdd.groupByKey), emptyTmsts)
-            } else (None, emptyTmsts)
-          }
-          case _ => {
-            val records = stepDf.flatMap { row =>
-              val tmst = getTmst(row, defTmst)
-              try {
-                val map = SparkRowFormatter.formatRow(row)
-                val str = JsonUtil.toJson(map)
-                Some((tmst, str))
-              } catch {
-                case e: Throwable => None
-              }
-            }
-            (Some(records.rdd.groupByKey), Set[Long]())
-          }
-        }
-      }
-      case _ => (None, Set[Long]())
-    }
-  }
-
-  def collectUpdateDf(dsUpdate: DsUpdate): Option[DataFrame] = {
-    if (collectable) {
-      val DsUpdate(_, stepName) = dsUpdate
-      val stepDf = sqlContext.table(s"`${stepName}`")
-      Some(stepDf)
-    } else None
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala
deleted file mode 100644
index 6ed0559..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process.engine
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.types.{ArrayType, DataType, StructField, 
StructType}
-
-import scala.collection.mutable.ArrayBuffer
-
-object SparkRowFormatter {
-
-  def formatRow(row: Row): Map[String, Any] = {
-    formatRowWithSchema(row, row.schema)
-  }
-
-  private def formatRowWithSchema(row: Row, schema: StructType): Map[String, 
Any] = {
-    formatStruct(schema.fields, row)
-  }
-
-  private def formatStruct(schema: Seq[StructField], r: Row) = {
-    val paired = schema.zip(r.toSeq)
-    paired.foldLeft(Map[String, Any]())((s, p) => s ++ formatItem(p))
-  }
-
-  private def formatItem(p: Pair[StructField, Any]): Map[String, Any] = {
-    p match {
-      case (sf, a) =>
-        sf.dataType match {
-          case ArrayType(et, _) =>
-            Map(sf.name -> (if (a == null) a else formatArray(et, 
a.asInstanceOf[ArrayBuffer[Any]])))
-          case StructType(s) =>
-            Map(sf.name -> (if (a == null) a else formatStruct(s, 
a.asInstanceOf[Row])))
-          case _ => Map(sf.name -> a)
-        }
-    }
-  }
-
-  private def formatArray(et: DataType, arr: ArrayBuffer[Any]): Seq[Any] = {
-    et match {
-      case StructType(s) => arr.map(e => formatStruct(s, e.asInstanceOf[Row]))
-      case ArrayType(t, _) =>
-        arr.map(e => formatArray(t, e.asInstanceOf[ArrayBuffer[Any]]))
-      case _ => arr
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
deleted file mode 100644
index e4ecb49..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process.engine
-
-import java.util.Date
-
-import org.apache.griffin.measure.config.params.user.DataSourceParam
-import org.apache.griffin.measure.data.source._
-import org.apache.griffin.measure.persist.{Persist, PersistFactory}
-import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters}
-import org.apache.griffin.measure.rule.adaptor.{GlobalKeys, InternalColumns}
-import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.plan._
-import org.apache.griffin.measure.utils.JsonUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.streaming.StreamingContext
-
-case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine {
-
-  override protected def collectable(): Boolean = true
-
-  def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = {
-    ruleStep match {
-      case rs @ SparkSqlStep(name, rule, details, _, _) => {
-        try {
-          val rdf = if (rs.isGlobal && 
!TableRegisters.existRunGlobalTable(name)) {
-            details.get(GlobalKeys._initRule) match {
-              case Some(initRule: String) => sqlContext.sql(initRule)
-              case _ => sqlContext.emptyDataFrame
-            }
-          } else sqlContext.sql(rule)
-
-//          println(name)
-//          rdf.show(3)
-
-          if (rs.isGlobal) {
-            if (rs.needCache) DataFrameCaches.cacheGlobalDataFrame(name, rdf)
-            TableRegisters.registerRunGlobalTable(rdf, name)
-          } else {
-            if (rs.needCache) DataFrameCaches.cacheDataFrame(timeInfo.key, 
name, rdf)
-            TableRegisters.registerRunTempTable(rdf, timeInfo.key, name)
-          }
-          true
-        } catch {
-          case e: Throwable => {
-            error(s"run spark sql [ ${rule} ] error: ${e.getMessage}")
-            false
-          }
-        }
-      }
-      case _ => false
-    }
-  }
-
-}
-
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala
deleted file mode 100644
index 58e8a13..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process.temp
-
-import org.apache.griffin.measure.log.Loggable
-import org.apache.spark.sql.DataFrame
-
-import scala.collection.concurrent.{TrieMap, Map => ConcMap}
-
-object DataFrameCaches extends Loggable {
-
-  final val _global = "_global"
-
-  private val caches: ConcMap[String, Map[String, DataFrame]] = 
TrieMap[String, Map[String, DataFrame]]()
-  private val trashCaches: ConcMap[String, Seq[DataFrame]] = TrieMap[String, 
Seq[DataFrame]]()
-
-  private def trashDataFrame(key: String, df: DataFrame): Unit = {
-    trashCaches.get(key) match {
-      case Some(seq) => {
-        val suc = trashCaches.replace(key, seq, seq :+ df)
-        if (!suc) trashDataFrame(key, df)
-      }
-      case _ => {
-        val oldOpt = trashCaches.putIfAbsent(key, Seq[DataFrame](df))
-        if (oldOpt.nonEmpty) trashDataFrame(key, df)
-      }
-    }
-  }
-  private def trashDataFrames(key: String, dfs: Seq[DataFrame]): Unit = {
-    trashCaches.get(key) match {
-      case Some(seq) => {
-        val suc = trashCaches.replace(key, seq, seq ++ dfs)
-        if (!suc) trashDataFrames(key, dfs)
-      }
-      case _ => {
-        val oldOpt = trashCaches.putIfAbsent(key, dfs)
-        if (oldOpt.nonEmpty) trashDataFrames(key, dfs)
-      }
-    }
-  }
-
-  def cacheDataFrame(key: String, name: String, df: DataFrame): Unit = {
-    println(s"try to cache df ${name}")
-    caches.get(key) match {
-      case Some(mp) => {
-        mp.get(name) match {
-          case Some(odf) => {
-            val suc = caches.replace(key, mp, mp + (name -> df))
-            if (suc) {
-              println(s"cache after replace old df")
-              df.cache
-              trashDataFrame(key, odf)
-            } else {
-              cacheDataFrame(key, name, df)
-            }
-          }
-          case _ => {
-            val suc = caches.replace(key, mp, mp + (name -> df))
-            if (suc) {
-              println(s"cache after replace no old df")
-              df.cache
-            } else {
-              cacheDataFrame(key, name, df)
-            }
-          }
-        }
-      }
-      case _ => {
-        val oldOpt = caches.putIfAbsent(key, Map[String, DataFrame]((name -> 
df)))
-        if (oldOpt.isEmpty) {
-          println(s"cache after put absent")
-          df.cache
-        } else {
-          cacheDataFrame(key, name, df)
-        }
-      }
-    }
-  }
-  def cacheGlobalDataFrame(name: String, df: DataFrame): Unit = {
-    cacheDataFrame(_global, name, df)
-  }
-
-  def uncacheDataFrames(key: String): Unit = {
-    caches.remove(key) match {
-      case Some(mp) => {
-        trashDataFrames(key, mp.values.toSeq)
-      }
-      case _ => {}
-    }
-  }
-  def uncacheGlobalDataFrames(): Unit = {
-    uncacheDataFrames(_global)
-  }
-
-  def clearTrashDataFrames(key: String): Unit = {
-    trashCaches.remove(key) match {
-      case Some(seq) => seq.foreach(_.unpersist)
-      case _ => {}
-    }
-  }
-  def clearGlobalTrashDataFrames(): Unit = {
-    clearTrashDataFrames(_global)
-  }
-
-  def getDataFrames(key: String): Map[String, DataFrame] = {
-    caches.get(key) match {
-      case Some(mp) => mp
-      case _ => Map[String, DataFrame]()
-    }
-  }
-  def getGlobalDataFrames(): Map[String, DataFrame] = {
-    getDataFrames(_global)
-  }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala
deleted file mode 100644
index 91a7541..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process.temp
-
-import org.apache.griffin.measure.log.Loggable
-import org.apache.spark.sql.{DataFrame, SQLContext}
-
-import scala.collection.concurrent.{TrieMap, Map => ConcMap}
-
-object TableRegisters extends Loggable {
-
-  final val _global = "_global"
-//
-//  val tables: ConcMap[String, Set[String]] = TrieMap[String, Set[String]]()
-
-  val compileTableRegs = TableRegs()
-  val runTableRegs = TableRegs()
-
-//  private def registerTable(key: String, table: String): Unit = {
-//    tables.get(key) match {
-//      case Some(set) => {
-//        val suc = tables.replace(key, set, set + table)
-//        if (!suc) registerTable(key, table)
-//      }
-//      case _ => {
-//        val oldOpt = tables.putIfAbsent(key, Set[String](table))
-//        if (oldOpt.nonEmpty) registerTable(key, table)
-//      }
-//    }
-//  }
-//
-//  private def unregisterTable(key: String, table: String): Option[String] = {
-//    tables.get(key) match {
-//      case Some(set) => {
-//        val ftb = set.find(_ == table)
-//        ftb match {
-//          case Some(tb) => {
-//            val nset = set - tb
-//            val suc = tables.replace(key, set, nset)
-//            if (suc) Some(tb)
-//            else unregisterTable(key, table)
-//          }
-//          case _ => None
-//        }
-//      }
-//      case _ => None
-//    }
-//  }
-//
-//  private def unregisterTables(key: String): Set[String] = {
-//    tables.remove(key) match {
-//      case Some(set) => set
-//      case _ => Set[String]()
-//    }
-//  }
-
-  private def dropTempTable(sqlContext: SQLContext, table: String): Unit = {
-    try {
-      sqlContext.dropTempTable(table)
-    } catch {
-      case e: Throwable => warn(s"drop temp table ${table} fails")
-    }
-  }
-
-  // -----
-
-  def registerRunGlobalTable(df: DataFrame, table: String): Unit = {
-    registerRunTempTable(df, _global, table)
-  }
-
-  def registerRunTempTable(df: DataFrame, key: String, table: String): Unit = {
-    runTableRegs.registerTable(key, table)
-    df.registerTempTable(table)
-  }
-
-  def registerCompileGlobalTable(table: String): Unit = {
-    registerCompileTempTable(_global, table)
-  }
-
-  def registerCompileTempTable(key: String, table: String): Unit = {
-    compileTableRegs.registerTable(key, table)
-  }
-
-  def unregisterRunTempTable(sqlContext: SQLContext, key: String, table: 
String): Unit = {
-    runTableRegs.unregisterTable(key, table).foreach(dropTempTable(sqlContext, 
_))
-  }
-
-  def unregisterCompileTempTable(key: String, table: String): Unit = {
-    compileTableRegs.unregisterTable(key, table)
-  }
-
-  def unregisterRunGlobalTables(sqlContext: SQLContext): Unit = {
-    unregisterRunTempTables(sqlContext, _global)
-  }
-
-  def unregisterCompileGlobalTables(): Unit = {
-    unregisterCompileTempTables(_global)
-  }
-
-  def unregisterRunTempTables(sqlContext: SQLContext, key: String): Unit = {
-    runTableRegs.unregisterTables(key).foreach(dropTempTable(sqlContext, _))
-  }
-
-  def unregisterCompileTempTables(key: String): Unit = {
-    compileTableRegs.unregisterTables(key)
-  }
-
-  def existRunGlobalTable(table: String): Boolean = {
-    existRunTempTable(_global, table)
-  }
-
-  def existCompileGlobalTable(table: String): Boolean = {
-    existCompileTempTable(_global, table)
-  }
-
-  def existRunTempTable(key: String, table: String): Boolean = {
-    runTableRegs.existTable(key, table)
-  }
-
-  def existCompileTempTable(key: String, table: String): Boolean = {
-    compileTableRegs.existTable(key, table)
-  }
-
-  def getRunGlobalTables(): Set[String] = {
-    getRunTempTables(_global)
-  }
-
-  def getRunTempTables(key: String): Set[String] = {
-    runTableRegs.getTables(key)
-  }
-
-}
-
-//object TempKeys {
-//  def key(t: Long): String = s"${t}"
-//  def key(head: String, t: Long): String = s"${head}_${t}"
-//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala
deleted file mode 100644
index 8f8e0f1..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process.temp
-
-import org.apache.spark.sql.SQLContext
-
-import scala.collection.concurrent.{TrieMap, Map => ConcMap}
-
-case class TableRegs() {
-
-  private val tables: ConcMap[String, Set[String]] = TrieMap[String, 
Set[String]]()
-
-  def registerTable(key: String, table: String): Unit = {
-    tables.get(key) match {
-      case Some(set) => {
-        val suc = tables.replace(key, set, set + table)
-        if (!suc) registerTable(key, table)
-      }
-      case _ => {
-        val oldOpt = tables.putIfAbsent(key, Set[String](table))
-        if (oldOpt.nonEmpty) registerTable(key, table)
-      }
-    }
-  }
-
-  def unregisterTable(key: String, table: String): Option[String] = {
-    tables.get(key) match {
-      case Some(set) => {
-        val ftb = set.find(_ == table)
-        ftb match {
-          case Some(tb) => {
-            val nset = set - tb
-            val suc = tables.replace(key, set, nset)
-            if (suc) Some(tb)
-            else unregisterTable(key, table)
-          }
-          case _ => None
-        }
-      }
-      case _ => None
-    }
-  }
-
-  def unregisterTables(key: String): Set[String] = {
-    tables.remove(key) match {
-      case Some(set) => set
-      case _ => Set[String]()
-    }
-  }
-
-  def existTable(key: String, table: String): Boolean = {
-    tables.get(key) match {
-      case Some(set) => set.contains(table)
-      case _ => false
-    }
-  }
-
-  def getTables(key: String): Set[String] = {
-    tables.get(key) match {
-      case Some(set) => set
-      case _ => Set[String]()
-    }
-  }
-
-}


Reply via email to