http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
index c90e572..39444cd 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala
@@ -28,9 +28,13 @@ import org.apache.griffin.measure.data.source.DataSource
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.persist.{Persist, PersistFactory}
 import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters}
 import org.apache.griffin.measure.rule.adaptor.{RuleAdaptorGroup, RunPhase}
+import org.apache.griffin.measure.rule.plan._
+import org.apache.spark.sql.SQLContext
 
-case class StreamingDqThread(dqEngines: DqEngines,
+case class StreamingDqThread(sqlContext: SQLContext,
+                             dqEngines: DqEngines,
                              dataSources: Seq[DataSource],
                              evaluateRuleParam: EvaluateRuleParam,
                              persistFactory: PersistFactory,
@@ -49,63 +53,88 @@ case class StreamingDqThread(dqEngines: DqEngines,
 
         val st = new Date().getTime
         appPersist.log(st, s"starting process ...")
+        val calcTimeInfo = CalcTimeInfo(st)
 
         TimeInfoCache.startTimeInfoCache
 
         // init data sources
-        dqEngines.loadData(dataSources, st)
+        val dsTmsts = dqEngines.loadData(dataSources, calcTimeInfo)
+
+        println(s"data sources timestamps: ${dsTmsts}")
 
         // generate rule steps
-        val ruleSteps = 
RuleAdaptorGroup.genConcreteRuleSteps(evaluateRuleParam, RunPhase)
+//        val ruleSteps = RuleAdaptorGroup.genRuleSteps(
+//          CalcTimeInfo(st), evaluateRuleParam, dsTmsts)
+        val rulePlan = RuleAdaptorGroup.genRulePlan(
+          calcTimeInfo, evaluateRuleParam, StreamingProcessType)
+
+//        ruleSteps.foreach(println)
 
         // run rules
-        dqEngines.runRuleSteps(ruleSteps)
+//        dqEngines.runRuleSteps(ruleSteps)
+        dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps)
 
         val ct = new Date().getTime
         val calculationTimeStr = s"calculation using time: ${ct - st} ms"
-        println(calculationTimeStr)
+//        println(calculationTimeStr)
         appPersist.log(ct, calculationTimeStr)
 
         // persist results
-        val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory)
+//        val timeGroups = dqEngines.persistAllMetrics(ruleSteps, 
persistFactory)
+        dqEngines.persistAllMetrics(calcTimeInfo, rulePlan.metricExports,
+          StreamingProcessType, persistFactory)
+//        println(s"--- timeGroups: ${timeGroups}")
 
         val rt = new Date().getTime
         val persistResultTimeStr = s"persist result using time: ${rt - ct} ms"
-        println(persistResultTimeStr)
         appPersist.log(rt, persistResultTimeStr)
 
-        val rdds = dqEngines.collectUpdateRDDs(ruleSteps, timeGroups)
-        rdds.foreach(_._2.cache())
-        rdds.foreach { pr =>
-          val (step, rdd) = pr
-          val cnt = rdd.count
-          println(s"step [${step.name}] group count: ${cnt}")
-        }
-
-        val lt = new Date().getTime
-        val collectRddTimeStr = s"collect records using time: ${lt - rt} ms"
-        println(collectRddTimeStr)
-        appPersist.log(lt, collectRddTimeStr)
-
         // persist records
-        dqEngines.persistAllRecords(rdds, persistFactory)
-//        dqEngines.persistAllRecords(ruleSteps, persistFactory, timeGroups)
+        dqEngines.persistAllRecords(calcTimeInfo, rulePlan.recordExports,
+          StreamingProcessType, persistFactory, dataSources)
 
-        // update data source
-        dqEngines.updateDataSources(rdds, dataSources)
-//        dqEngines.updateDataSources(ruleSteps, dataSources, timeGroups)
+        val et = new Date().getTime
+        val persistTimeStr = s"persist records using time: ${et - rt} ms"
+        appPersist.log(et, persistTimeStr)
 
-        rdds.foreach(_._2.unpersist())
+//        val dfs = dqEngines.collectUpdateRDDs(ruleSteps, timeGroups.toSet)
+//        dfs.foreach(_._2.cache())
+//        dfs.foreach { pr =>
+//          val (step, df) = pr
+//          val cnt = df.count
+//          println(s"step [${step.name}] group count: ${cnt}")
+//        }
+//
+//        val lt = new Date().getTime
+//        val collectRddTimeStr = s"collect records using time: ${lt - rt} ms"
+////        println(collectRddTimeStr)
+//        appPersist.log(lt, collectRddTimeStr)
+//
+//        // persist records
+//        dqEngines.persistAllRecords(dfs, persistFactory)
+////        dqEngines.persistAllRecords(ruleSteps, persistFactory, timeGroups)
+//
+//        // update data source
+//        dqEngines.updateDataSources(dfs, dataSources)
+////        dqEngines.updateDataSources(ruleSteps, dataSources, timeGroups)
+//
+//        dfs.foreach(_._2.unpersist())
 
         TimeInfoCache.endTimeInfoCache
 
+//        sqlContext.tables().show(20)
+
+        // cache global data
+//        val globalTables = TableRegisters.getRunGlobalTables
+//        globalTables.foreach { gt =>
+//          val df = sqlContext.table(gt)
+//          df.cache
+//        }
+
         // clean old data
-        cleanData
+        cleanData(calcTimeInfo)
 
-        val et = new Date().getTime
-        val persistTimeStr = s"persist records using time: ${et - lt} ms"
-        println(persistTimeStr)
-        appPersist.log(et, persistTimeStr)
+//        sqlContext.tables().show(20)
 
       } catch {
         case e: Throwable => error(s"process error: ${e.getMessage}")
@@ -120,10 +149,16 @@ case class StreamingDqThread(dqEngines: DqEngines,
   }
 
   // clean old data and old result cache
-  private def cleanData(): Unit = {
+  private def cleanData(timeInfo: TimeInfo): Unit = {
     try {
       dataSources.foreach(_.cleanOldData)
-      dataSources.foreach(_.dropTable)
+
+      TableRegisters.unregisterRunTempTables(sqlContext, timeInfo.key)
+      TableRegisters.unregisterCompileTempTables(timeInfo.key)
+
+      DataFrameCaches.uncacheDataFrames(timeInfo.key)
+      DataFrameCaches.clearTrashDataFrames(timeInfo.key)
+      DataFrameCaches.clearGlobalTrashDataFrames()
 
       val cleanTime = TimeInfoCache.getCleanTime
       CacheResultProcesser.refresh(cleanTime)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala
deleted file mode 100644
index 91855c2..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.process.check
-
-import org.apache.spark.sql.SQLContext
-
-case class DataChecker(sqlContext: SQLContext) {
-
-  def existDataSourceName(name: String): Boolean = {
-    sqlContext.tableNames.exists(_ == name)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
index c3205b5..59b765e 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
@@ -22,41 +22,36 @@ import java.util.Date
 
 import org.apache.griffin.measure.cache.result.CacheResultProcesser
 import org.apache.griffin.measure.config.params.user.DataSourceParam
-import org.apache.griffin.measure.data.connector.GroupByColumn
 import org.apache.griffin.measure.data.source.{DataSource, DataSourceFactory}
 import org.apache.griffin.measure.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters}
 import org.apache.griffin.measure.result.AccuracyResult
+import org.apache.griffin.measure.rule.adaptor.InternalColumns
 import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.rule.plan._
 import org.apache.griffin.measure.utils.JsonUtil
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.apache.spark.streaming.StreamingContext
+import org.apache.griffin.measure.utils.ParamUtil._
+
+import scala.util.Try
 
 case class DataFrameOprEngine(sqlContext: SQLContext) extends SparkDqEngine {
 
-  def runRuleStep(ruleStep: ConcreteRuleStep): Boolean = {
+  def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = {
     ruleStep match {
-      case DfOprStep(name, rule, details, _, _) => {
+      case rs @ DfOprStep(name, rule, details, _, _) => {
         try {
-          rule match {
-            case DataFrameOprs._fromJson => {
-              val df = DataFrameOprs.fromJson(sqlContext, details)
-              df.registerTempTable(name)
-            }
-            case DataFrameOprs._accuracy => {
-              val df = DataFrameOprs.accuracy(sqlContext, details)
-              df.registerTempTable(name)
-            }
-            case DataFrameOprs._clear => {
-              val df = DataFrameOprs.clear(sqlContext, details)
-              df.registerTempTable(name)
-            }
-            case _ => {
-              throw new Exception(s"df opr [ ${rule} ] not supported")
-            }
+          val df = rule match {
+            case DataFrameOprs._fromJson => DataFrameOprs.fromJson(sqlContext, 
details)
+            case DataFrameOprs._accuracy => DataFrameOprs.accuracy(sqlContext, 
timeInfo, details)
+            case DataFrameOprs._clear => DataFrameOprs.clear(sqlContext, 
details)
+            case _ => throw new Exception(s"df opr [ ${rule} ] not supported")
           }
+          if (rs.needCache) DataFrameCaches.cacheDataFrame(timeInfo.key, name, 
df)
+          TableRegisters.registerRunTempTable(df, timeInfo.key, name)
           true
         } catch {
           case e: Throwable => {
@@ -77,6 +72,13 @@ object DataFrameOprs {
   final val _accuracy = "accuracy"
   final val _clear = "clear"
 
+  object AccuracyOprKeys {
+    val _dfName = "df.name"
+    val _miss = "miss"
+    val _total = "total"
+    val _matched = "matched"
+  }
+
   def fromJson(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = 
{
     val _dfName = "df.name"
     val _colName = "col.name"
@@ -88,41 +90,44 @@ object DataFrameOprs {
       case Some(colName: String) => df.map(_.getAs[String](colName))
       case _ => df.map(_.getAs[String](0))
     }
-    sqlContext.read.json(rdd)
+    sqlContext.read.json(rdd) // slow process
   }
 
-  def accuracy(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = 
{
-    val _dfName = "df.name"
-    val _miss = "miss"
-    val _total = "total"
-    val _matched = "matched"
-//    val _tmst = "tmst"
-    val dfName = details.getOrElse(_dfName, _dfName).toString
-    val miss = details.getOrElse(_miss, _miss).toString
-    val total = details.getOrElse(_total, _total).toString
-    val matched = details.getOrElse(_matched, _matched).toString
-//    val tmst = details.getOrElse(_tmst, _tmst).toString
-    val tmst = GroupByColumn.tmst
+  def accuracy(sqlContext: SQLContext, timeInfo: TimeInfo, details: 
Map[String, Any]): DataFrame = {
+    import AccuracyOprKeys._
+
+    val dfName = details.getStringOrKey(_dfName)
+    val miss = details.getStringOrKey(_miss)
+    val total = details.getStringOrKey(_total)
+    val matched = details.getStringOrKey(_matched)
+
+//    val _enableIgnoreCache = "enable.ignore.cache"
+//    val enableIgnoreCache = details.getBoolean(_enableIgnoreCache, false)
+
+//    val tmst = InternalColumns.tmst
 
     val updateTime = new Date().getTime
 
-    def getLong(r: Row, k: String): Long = {
+    def getLong(r: Row, k: String): Option[Long] = {
       try {
-        r.getAs[Long](k)
+        Some(r.getAs[Long](k))
       } catch {
-        case e: Throwable => 0L
+        case e: Throwable => None
       }
     }
 
     val df = sqlContext.table(s"`${dfName}`")
+
     val results = df.flatMap { row =>
-      val t = getLong(row, tmst)
-      if (t > 0) {
-        val missCount = getLong(row, miss)
-        val totalCount = getLong(row, total)
+      try {
+        val tmst = getLong(row, 
InternalColumns.tmst).getOrElse(timeInfo.calcTime)
+        val missCount = getLong(row, miss).getOrElse(0L)
+        val totalCount = getLong(row, total).getOrElse(0L)
         val ar = AccuracyResult(missCount, totalCount)
-        Some((t, ar))
-      } else None
+        if (ar.isLegal) Some((tmst, ar)) else None
+      } catch {
+        case e: Throwable => None
+      }
     }.collect
 
     val updateResults = results.flatMap { pair =>
@@ -131,24 +136,28 @@ object DataFrameOprs {
       updatedCacheResultOpt
     }
 
-    // update
+    // update results
     updateResults.foreach { r =>
       CacheResultProcesser.update(r)
     }
 
+    // generate metrics
     val schema = StructType(Array(
-      StructField(tmst, LongType),
+      StructField(InternalColumns.tmst, LongType),
       StructField(miss, LongType),
       StructField(total, LongType),
-      StructField(matched, LongType)
+      StructField(matched, LongType),
+      StructField(InternalColumns.record, BooleanType),
+      StructField(InternalColumns.empty, BooleanType)
     ))
     val rows = updateResults.map { r =>
       val ar = r.result.asInstanceOf[AccuracyResult]
-      Row(r.timeGroup, ar.miss, ar.total, ar.getMatch)
+      Row(r.timeGroup, ar.miss, ar.total, ar.getMatch, !ar.initial, 
ar.eventual)
     }
     val rowRdd = sqlContext.sparkContext.parallelize(rows)
-    sqlContext.createDataFrame(rowRdd, schema)
+    val retDf = sqlContext.createDataFrame(rowRdd, schema)
 
+    retDf
   }
 
   def clear(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
index e28dfa4..a48c4d1 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
@@ -22,22 +22,30 @@ import 
org.apache.griffin.measure.config.params.user.DataSourceParam
 import org.apache.griffin.measure.data.source.DataSource
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.process.ProcessType
 import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, Row}
 
 trait DqEngine extends Loggable with Serializable {
 
-  def runRuleStep(ruleStep: ConcreteRuleStep): Boolean
+  def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean
 
   protected def collectable(): Boolean = false
 
-  def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]]
+  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport, procType: 
ProcessType
+                    ): Map[Long, Map[String, Any]]
 
-//  def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
-//
-//  def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
+  //  def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
+  //
+  //  def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
 
-  def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]]
-}
+//  def collectUpdateRDD(ruleStep: RuleStep): Option[DataFrame]
+  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport, procType: 
ProcessType
+                    ): Map[Long, DataFrame]
+
+
+  def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]]
+  def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, 
Iterable[String])]], Set[Long])
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
index 1af2ae3..03ee208 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
@@ -18,38 +18,47 @@ under the License.
 */
 package org.apache.griffin.measure.process.engine
 
+import java.util.concurrent.atomic.AtomicInteger
+
 import org.apache.griffin.measure.config.params.user.DataSourceParam
-import org.apache.griffin.measure.data.connector.GroupByColumn
 import org.apache.griffin.measure.data.source._
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.process.{BatchProcessType, ProcessType, 
StreamingProcessType}
+import org.apache.griffin.measure.rule.adaptor.InternalColumns
 import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.rule.plan._
+import org.apache.griffin.measure.utils.JsonUtil
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, Row}
+
+import scala.concurrent._
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success}
+import ExecutionContext.Implicits.global
 
 case class DqEngines(engines: Seq[DqEngine]) extends DqEngine {
 
   val persistOrder: List[PersistType] = List(MetricPersistType, 
RecordPersistType)
 
-  def loadData(dataSources: Seq[DataSource], ms: Long): Unit = {
-    dataSources.foreach { ds =>
-      ds.loadData(ms)
-    }
+  def loadData(dataSources: Seq[DataSource], timeInfo: TimeInfo): Map[String, 
Set[Long]] = {
+    dataSources.map { ds =>
+      (ds.name, ds.loadData(timeInfo))
+    }.toMap
   }
 
-  def runRuleSteps(ruleSteps: Seq[ConcreteRuleStep]): Unit = {
+  def runRuleSteps(timeInfo: TimeInfo, ruleSteps: Seq[RuleStep]): Unit = {
     ruleSteps.foreach { ruleStep =>
-      runRuleStep(ruleStep)
+      runRuleStep(timeInfo, ruleStep)
     }
   }
 
-  def persistAllMetrics(ruleSteps: Seq[ConcreteRuleStep], persistFactory: 
PersistFactory
-                       ): Iterable[Long] = {
-    val metricSteps = ruleSteps.filter(_.persistType == MetricPersistType)
+  def persistAllMetrics(timeInfo: TimeInfo, metricExports: Seq[MetricExport],
+                        procType: ProcessType, persistFactory: PersistFactory
+                       ): Unit = {
     val allMetrics: Map[Long, Map[String, Any]] = {
-      metricSteps.foldLeft(Map[Long, Map[String, Any]]()) { (ret, step) =>
-        val metrics = collectMetrics(step)
+      metricExports.foldLeft(Map[Long, Map[String, Any]]()) { (ret, step) =>
+        val metrics = collectMetrics(timeInfo, step, procType)
         metrics.foldLeft(ret) { (total, pair) =>
           val (k, v) = pair
           total.get(k) match {
@@ -59,15 +68,149 @@ case class DqEngines(engines: Seq[DqEngine]) extends 
DqEngine {
         }
       }
     }
-    val updateTimeGroups = allMetrics.keys
+
     allMetrics.foreach { pair =>
       val (t, metric) = pair
       val persist = persistFactory.getPersists(t)
       persist.persistMetrics(metric)
     }
-    updateTimeGroups
   }
 
+  private def persistCollectedRecords(recordExport: RecordExport, records: 
Map[Long, DataFrame],
+                                      persistFactory: PersistFactory, 
dataSources: Seq[DataSource]): Unit = {
+    val pc = ParallelCounter(records.size)
+    val pro = promise[Boolean]
+    if (records.size > 0) {
+      records.foreach { pair =>
+        val (tmst, df) = pair
+        val persist = persistFactory.getPersists(tmst)
+        val updateDsCaches = recordExport.dataSourceCacheOpt match {
+          case Some(dsName) => dataSources.filter(_.name == 
dsName).flatMap(_.dataSourceCacheOpt)
+          case _ => Nil
+        }
+        val future = Future {
+//        df.cache
+          persist.persistRecords(df, recordExport.name)
+          updateDsCaches.foreach(_.updateData(df, tmst))
+//        df.unpersist
+          true
+        }
+        future.onComplete {
+          case Success(v) => {
+            pc.finishOne(v)
+            if (pc.checkDone) pro.trySuccess(pc.checkResult)
+          }
+          case Failure(ex) => {
+            println(s"plan step failure: ${ex.getMessage}")
+            pc.finishOne(false)
+            if (pc.checkDone) pro.trySuccess(pc.checkResult)
+          }
+        }
+      }
+    } else pro.trySuccess(true)
+
+    Await.result(pro.future, Duration.Inf)
+  }
+
+  def persistAllRecords(timeInfo: TimeInfo, recordExports: Seq[RecordExport], 
procType: ProcessType,
+                        persistFactory: PersistFactory, dataSources: 
Seq[DataSource]
+                       ): Unit = {
+    // method 1: multi thread persist multi data frame
+//    recordExports.foreach { recordExport =>
+//      val records = collectRecords(timeInfo, recordExport, procType)
+//      persistCollectedRecords(recordExport, records, persistFactory, 
dataSources)
+//    }
+
+    // method 2: multi thread persist multi iterable
+    recordExports.foreach { recordExport =>
+//      val records = collectRecords(timeInfo, recordExport, procType)
+      procType match {
+        case BatchProcessType => {
+          collectBatchRecords(recordExport).foreach { rdd =>
+            persistCollectedBatchRecords(timeInfo, recordExport, rdd, 
persistFactory)
+          }
+        }
+        case StreamingProcessType => {
+          val (rddOpt, emptySet) = collectStreamingRecords(recordExport)
+          persistCollectedStreamingRecords(recordExport, rddOpt, emptySet, 
persistFactory, dataSources)
+//          collectStreamingRecords(recordExport).foreach { rddPair =>
+//            persistCollectedStreamingRecords(recordExport, rddPair._1, 
rddPair._2, persistFactory, dataSources)
+//          }
+        }
+      }
+    }
+  }
+
+  def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] = {
+    val ret = engines.foldLeft(None: Option[RDD[String]]) { (ret, engine) =>
+      if (ret.nonEmpty) ret else engine.collectBatchRecords(recordExport)
+    }
+    ret
+  }
+  def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, 
Iterable[String])]], Set[Long]) = {
+    val ret = engines.foldLeft((None: Option[RDD[(Long, Iterable[String])]], 
Set[Long]())) { (ret, engine) =>
+      if (ret._1.nonEmpty || ret._2.nonEmpty) ret else 
engine.collectStreamingRecords(recordExport)
+    }
+    ret
+  }
+
+  private def persistCollectedBatchRecords(timeInfo: TimeInfo, recordExport: 
RecordExport,
+                                           records: RDD[String], 
persistFactory: PersistFactory
+                                          ): Unit = {
+    val persist = persistFactory.getPersists(timeInfo.calcTime)
+    persist.persistRecords(records, recordExport.name)
+  }
+
+  private def persistCollectedStreamingRecords(recordExport: RecordExport, 
recordsOpt: Option[RDD[(Long, Iterable[String])]],
+                                               emtpyRecordKeys: Set[Long], 
persistFactory: PersistFactory,
+                                               dataSources: Seq[DataSource]
+                                              ): Unit = {
+    val updateDsCaches = recordExport.dataSourceCacheOpt match {
+      case Some(dsName) => dataSources.filter(_.name == 
dsName).flatMap(_.dataSourceCacheOpt)
+      case _ => Nil
+    }
+
+    recordsOpt.foreach { records =>
+      records.foreach { pair =>
+        val (tmst, strs) = pair
+        val persist = persistFactory.getPersists(tmst)
+
+        persist.persistRecords(strs, recordExport.name)
+        updateDsCaches.foreach(_.updateData(strs, tmst))
+      }
+    }
+
+    emtpyRecordKeys.foreach { t =>
+      val persist = persistFactory.getPersists(t)
+      persist.persistRecords(Nil, recordExport.name)
+      updateDsCaches.foreach(_.updateData(Nil, t))
+    }
+  }
+
+//  private def persistCollectedStreamingRecords(recordExport: RecordExport, 
records: RDD[(Long, Iterable[String])],
+//                                               emtpyRecordKeys: Set[Long], 
persistFactory: PersistFactory,
+//                                               dataSources: Seq[DataSource]
+//                                              ): Unit = {
+//    val updateDsCaches = recordExport.dataSourceCacheOpt match {
+//      case Some(dsName) => dataSources.filter(_.name == 
dsName).flatMap(_.dataSourceCacheOpt)
+//      case _ => Nil
+//    }
+//
+//    records.foreach { pair =>
+//      val (tmst, strs) = pair
+//      val persist = persistFactory.getPersists(tmst)
+//
+//      persist.persistRecords(strs, recordExport.name)
+//      updateDsCaches.foreach(_.updateData(strs, tmst))
+//    }
+//
+//    emtpyRecordKeys.foreach { t =>
+//      val persist = persistFactory.getPersists(t)
+//      persist.persistRecords(Nil, recordExport.name)
+//      updateDsCaches.foreach(_.updateData(Nil, t))
+//    }
+//  }
+
 //  def persistAllRecords(ruleSteps: Seq[ConcreteRuleStep], persistFactory: 
PersistFactory,
 //                        timeGroups: Iterable[Long]): Unit = {
 //    val recordSteps = ruleSteps.filter(_.persistType == RecordPersistType)
@@ -119,9 +262,9 @@ case class DqEngines(engines: Seq[DqEngine]) extends 
DqEngine {
 
   ///////////////////////////
 
-  def runRuleStep(ruleStep: ConcreteRuleStep): Boolean = {
+  def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = {
     val ret = engines.foldLeft(false) { (done, engine) =>
-      done || engine.runRuleStep(ruleStep)
+      done || engine.runRuleStep(timeInfo, ruleStep)
     }
     if (!ret) warn(s"run rule step warn: no dq engine support ${ruleStep}")
     ret
@@ -139,69 +282,143 @@ case class DqEngines(engines: Seq[DqEngine]) extends 
DqEngine {
 //      engine.collectUpdateCacheDatas(ruleStep, timeGroups)
 //    }.headOption
 //  }
-  def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]] 
= {
+  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport, procType: 
ProcessType
+                    ): Map[Long, Map[String, Any]] = {
     val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) 
=>
-      ret ++ engine.collectMetrics(ruleStep)
+      if (ret.nonEmpty) ret else engine.collectMetrics(timeInfo, metricExport, 
procType)
     }
     ret
   }
 
-  def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]
-                      ): Option[RDD[(Long, Iterable[String])]] = {
-    engines.flatMap { engine =>
-      engine.collectUpdateRDD(ruleStep, timeGroups)
-    }.headOption
+  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport, procType: 
ProcessType
+                    ): Map[Long, DataFrame] = {
+    val ret = engines.foldLeft(Map[Long, DataFrame]()) { (ret, engine) =>
+      if (ret.nonEmpty) ret else engine.collectRecords(timeInfo, recordExport, 
procType)
+    }
+    ret
   }
 
+  def collectUpdateRDD(ruleStep: RuleStep): Option[DataFrame] = {
+//    engines.flatMap { engine =>
+//      engine.collectUpdateRDD(ruleStep)
+//    }.headOption
+    None
+  }
+
+//  def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]
+//                      ): Option[RDD[(Long, Iterable[String])]] = {
+//    engines.flatMap { engine =>
+//      engine.collectUpdateRDD(ruleStep, timeGroups)
+//    }.headOption
+//  }
+
   ////////////////////////////
 
-  def collectUpdateRDDs(ruleSteps: Seq[ConcreteRuleStep], timeGroups: 
Iterable[Long]
-                       ): Seq[(ConcreteRuleStep, RDD[(Long, 
Iterable[String])])] = {
-    ruleSteps.flatMap { rs =>
-      collectUpdateRDD(rs, timeGroups) match {
-        case Some(rdd) => Some((rs, rdd))
-        case _ => None
-      }
-    }
+  def collectUpdateRDDs(ruleSteps: Seq[RuleStep], timeGroups: Set[Long]
+                       ): Seq[(RuleStep, DataFrame)] = {
+//    ruleSteps.flatMap { rs =>
+//      val t = rs.timeInfo.tmst
+//      if (timeGroups.contains(t)) {
+//        collectUpdateRDD(rs).map((rs, _))
+//      } else None
+//    }
+    Nil
   }
 
-  def persistAllRecords(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, 
Iterable[String])])],
+//  def collectUpdateRDDs(ruleSteps: Seq[ConcreteRuleStep], timeGroups: 
Iterable[Long]
+//                       ): Seq[(ConcreteRuleStep, RDD[(Long, 
Iterable[String])])] = {
+//    ruleSteps.flatMap { rs =>
+//      collectUpdateRDD(rs, timeGroups) match {
+//        case Some(rdd) => Some((rs, rdd))
+//        case _ => None
+//      }
+//    }
+//  }
+
+  def persistAllRecords(stepRdds: Seq[(RuleStep, DataFrame)],
                         persistFactory: PersistFactory): Unit = {
-    stepRdds.foreach { stepRdd =>
-      val (step, rdd) = stepRdd
-      if (step.persistType == RecordPersistType) {
-        val name = step.name
-        rdd.foreach { pair =>
-          val (t, items) = pair
-          val persist = persistFactory.getPersists(t)
-          persist.persistRecords(items, name)
-        }
-      }
-    }
+//    stepRdds.foreach { stepRdd =>
+//      val (step, df) = stepRdd
+//      if (step.ruleInfo.persistType == RecordPersistType) {
+//        val name = step.ruleInfo.name
+//        val t = step.timeInfo.tmst
+//        val persist = persistFactory.getPersists(t)
+//        persist.persistRecords(df, name)
+//      }
+//    }
   }
 
-  def updateDataSources(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, 
Iterable[String])])],
+//  def persistAllRecords(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, 
Iterable[String])])],
+//                        persistFactory: PersistFactory): Unit = {
+//    stepRdds.foreach { stepRdd =>
+//      val (step, rdd) = stepRdd
+//      if (step.ruleInfo.persistType == RecordPersistType) {
+//        val name = step.name
+//        rdd.foreach { pair =>
+//          val (t, items) = pair
+//          val persist = persistFactory.getPersists(t)
+//          persist.persistRecords(items, name)
+//        }
+//      }
+//    }
+//  }
+
+  def updateDataSources(stepRdds: Seq[(RuleStep, DataFrame)],
                         dataSources: Seq[DataSource]): Unit = {
-    stepRdds.foreach { stepRdd =>
-      val (step, rdd) = stepRdd
-      if (step.updateDataSource.nonEmpty) {
-        val udpateDataSources = dataSources.filter { ds =>
-          step.updateDataSource match {
-            case Some(dsName) if (dsName == ds.name) => true
-            case _ => false
-          }
-        }
-        if (udpateDataSources.size > 0) {
-          val name = step.name
-          rdd.foreach { pair =>
-            val (t, items) = pair
-            udpateDataSources.foreach { ds =>
-              ds.dataSourceCacheOpt.foreach(_.updateData(items, t))
-            }
-          }
-        }
-      }
-    }
+//    stepRdds.foreach { stepRdd =>
+//      val (step, df) = stepRdd
+//      if (step.ruleInfo.cacheDataSourceOpt.nonEmpty) {
+//        val udpateDsCaches = dataSources.filter { ds =>
+//          step.ruleInfo.cacheDataSourceOpt match {
+//            case Some(dsName) if (dsName == ds.name) => true
+//            case _ => false
+//          }
+//        }.flatMap(_.dataSourceCacheOpt)
+//        if (udpateDsCaches.size > 0) {
+//          val t = step.timeInfo.tmst
+//          udpateDsCaches.foreach(_.updateData(df, t))
+//        }
+//      }
+//    }
   }
 
+//  def updateDataSources(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, 
Iterable[String])])],
+//                        dataSources: Seq[DataSource]): Unit = {
+//    stepRdds.foreach { stepRdd =>
+//      val (step, rdd) = stepRdd
+//      if (step.ruleInfo.cacheDataSourceOpt.nonEmpty) {
+//        val udpateDataSources = dataSources.filter { ds =>
+//          step.ruleInfo.cacheDataSourceOpt match {
+//            case Some(dsName) if (dsName == ds.name) => true
+//            case _ => false
+//          }
+//        }
+//        if (udpateDataSources.size > 0) {
+//          val name = step.name
+//          rdd.foreach { pair =>
+//            val (t, items) = pair
+//            udpateDataSources.foreach { ds =>
+//              ds.dataSourceCacheOpt.foreach(_.updateData(items, t))
+//            }
+//          }
+//        }
+//      }
+//    }
+//  }
+
 }
+
+case class ParallelCounter(total: Int) extends Serializable {
+  private val done: AtomicInteger = new AtomicInteger(0)
+  private val result: AtomicInteger = new AtomicInteger(0)
+  def finishOne(suc: Boolean): Unit = {
+    if (suc) result.incrementAndGet
+    done.incrementAndGet
+  }
+  def checkDone: Boolean = {
+    done.get() >= total
+  }
+  def checkResult: Boolean = {
+    if (total > 0) result.get() > 0 else true
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
index e8a7b16..f1e12d2 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
@@ -18,100 +18,316 @@ under the License.
 */
 package org.apache.griffin.measure.process.engine
 
-import org.apache.griffin.measure.data.connector.GroupByColumn
+import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache}
 import org.apache.griffin.measure.log.Loggable
-import org.apache.griffin.measure.rule.dsl.{MetricPersistType, 
RecordPersistType}
-import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.process.{BatchProcessType, ProcessType, 
StreamingProcessType}
+import org.apache.griffin.measure.rule.adaptor.InternalColumns
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.plan._
 import org.apache.griffin.measure.utils.JsonUtil
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.griffin.measure.utils.ParamUtil._
 
 trait SparkDqEngine extends DqEngine {
 
   val sqlContext: SQLContext
 
-  def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]] 
= {
+  val emptyMetricMap = Map[Long, Map[String, Any]]()
+  val emptyMap = Map[String, Any]()
+  val emptyRecordMap = Map[Long, DataFrame]()
+
+  private def getMetricMaps(dfName: String): Seq[Map[String, Any]] = {
+    val pdf = sqlContext.table(s"`${dfName}`")
+    val records = pdf.toJSON.collect()
+    if (records.size > 0) {
+      records.flatMap { rec =>
+        try {
+          val value = JsonUtil.toAnyMap(rec)
+          Some(value)
+        } catch {
+          case e: Throwable => None
+        }
+      }.toSeq
+    } else Nil
+  }
+
+  private def normalizeMetric(metrics: Seq[Map[String, Any]], name: String, 
collectType: CollectType
+                             ): Map[String, Any] = {
+    collectType match {
+      case EntriesCollectType => metrics.headOption.getOrElse(emptyMap)
+      case ArrayCollectType => Map[String, Any]((name -> metrics))
+      case MapCollectType => {
+        val v = metrics.headOption.getOrElse(emptyMap)
+        Map[String, Any]((name -> v))
+      }
+      case _ => {
+        if (metrics.size > 1) Map[String, Any]((name -> metrics))
+        else metrics.headOption.getOrElse(emptyMap)
+      }
+    }
+  }
+
+  def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport, procType: 
ProcessType
+                    ): Map[Long, Map[String, Any]] = {
     if (collectable) {
-      val emptyMap = Map[String, Any]()
-      ruleStep match {
-        case step: ConcreteRuleStep if (step.persistType == MetricPersistType) 
=> {
-          val name = step.name
-          try {
-            val pdf = sqlContext.table(s"`${name}`")
-            val records = pdf.toJSON.collect()
-
-            val pairs = records.flatMap { rec =>
-              try {
-                val value = JsonUtil.toAnyMap(rec)
-                value.get(GroupByColumn.tmst) match {
-                  case Some(t) => {
-                    val key = t.toString.toLong
-                    Some((key, value))
-                  }
-                  case _ => None
-                }
-              } catch {
-                case e: Throwable => None
-              }
+      val MetricExport(name, stepName, collectType) = metricExport
+      try {
+        val metricMaps = getMetricMaps(stepName)
+        procType match {
+          case BatchProcessType => {
+            val metrics: Map[String, Any] = normalizeMetric(metricMaps, name, 
collectType)
+            emptyMetricMap + (timeInfo.calcTime -> metrics)
+          }
+          case StreamingProcessType => {
+            val tmstMetrics = metricMaps.map { metric =>
+              val tmst = metric.getLong(InternalColumns.tmst, 
timeInfo.calcTime)
+              val pureMetric = metric.removeKeys(InternalColumns.columns)
+              (tmst, pureMetric)
             }
-            val groupedPairs = pairs.foldLeft(Map[Long, Seq[Map[String, 
Any]]]()) { (ret, pair) =>
+            tmstMetrics.groupBy(_._1).map { pair =>
               val (k, v) = pair
-              ret.get(k) match {
-                case Some(seq) => ret + (k -> (seq :+ v))
-                case _ => ret + (k -> (v :: Nil))
-              }
+              val maps = v.map(_._2)
+              val mtc = normalizeMetric(maps, name, collectType)
+              (k, mtc)
             }
-            groupedPairs.mapValues { vs =>
-              if (vs.size > 1) {
-                Map[String, Any]((name -> vs))
-              } else {
-                vs.headOption.getOrElse(emptyMap)
-              }
+          }
+        }
+      } catch {
+        case e: Throwable => {
+          error(s"collect metrics ${name} error: ${e.getMessage}")
+          emptyMetricMap
+        }
+      }
+    } else emptyMetricMap
+  }
+
+
+  def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport, procType: 
ProcessType
+                    ): Map[Long, DataFrame] = {
+    if (collectable) {
+      val RecordExport(_, stepName, _, originDFOpt) = recordExport
+      val stepDf = sqlContext.table(s"`${stepName}`")
+      val recordsDf = originDFOpt match {
+        case Some(originName) => sqlContext.table(s"`${originName}`")
+        case _ => stepDf
+      }
+
+      procType match {
+        case BatchProcessType => {
+          val recordsDf = sqlContext.table(s"`${stepName}`")
+          emptyRecordMap + (timeInfo.calcTime -> recordsDf)
+        }
+        case StreamingProcessType => {
+          originDFOpt match {
+            case Some(originName) => {
+              val recordsDf = sqlContext.table(s"`${originName}`")
+              stepDf.collect.map { row =>
+                val tmst = row.getAs[Long](InternalColumns.tmst)
+                val trdf = recordsDf.filter(s"`${InternalColumns.tmst}` = 
${tmst}")
+                (tmst, trdf)
+              }.toMap
             }
-          } catch {
-            case e: Throwable => {
-              error(s"collect metrics ${name} error: ${e.getMessage}")
-              Map[Long, Map[String, Any]]()
+            case _ => {
+              val recordsDf = sqlContext.table(s"`${stepName}`")
+              emptyRecordMap + (timeInfo.calcTime -> recordsDf)
             }
           }
         }
-        case _ => Map[Long, Map[String, Any]]()
       }
-    } else Map[Long, Map[String, Any]]()
+    } else emptyRecordMap
   }
 
-  def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]
-                      ): Option[RDD[(Long, Iterable[String])]] = {
+  private def getRecordDataFrame(recordExport: RecordExport): 
Option[DataFrame] = {
     if (collectable) {
-      ruleStep match {
-        case step: ConcreteRuleStep if ((step.persistType == RecordPersistType)
-          || (step.updateDataSource.nonEmpty)) => {
-          val name = step.name
-          try {
-            val pdf = sqlContext.table(s"`${name}`")
-            val cols = pdf.columns
-            val rdd = pdf.flatMap { row =>
-              val values = cols.flatMap { col =>
-                Some((col, row.getAs[Any](col)))
-              }.toMap
-              values.get(GroupByColumn.tmst) match {
-                case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, 
JsonUtil.toJson(values)))
-                case _ => None
+      val RecordExport(_, stepName, _, _) = recordExport
+      val stepDf = sqlContext.table(s"`${stepName}`")
+      Some(stepDf)
+    } else None
+  }
+
+  def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] = {
+    getRecordDataFrame(recordExport).map(_.toJSON)
+  }
+
+  def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, 
Iterable[String])]], Set[Long]) = {
+    val RecordExport(_, _, _, originDFOpt) = recordExport
+    getRecordDataFrame(recordExport) match {
+      case Some(stepDf) => {
+        originDFOpt match {
+          case Some(originName) => {
+            val tmsts = (stepDf.collect.flatMap { row =>
+              try {
+                val tmst = row.getAs[Long](InternalColumns.tmst)
+                val empty = row.getAs[Boolean](InternalColumns.empty)
+                Some((tmst, empty))
+              } catch {
+                case _: Throwable => None
+              }
+            })
+            val emptyTmsts = tmsts.filter(_._2).map(_._1).toSet
+            val recordTmsts = tmsts.filter(!_._2).map(_._1).toSet
+            if (recordTmsts.size > 0) {
+              val recordsDf = sqlContext.table(s"`${originName}`")
+              val records = recordsDf.flatMap { row =>
+                val tmst = row.getAs[Long](InternalColumns.tmst)
+                if (recordTmsts.contains(tmst)) {
+                  try {
+                    val map = SparkRowFormatter.formatRow(row)
+                    val str = JsonUtil.toJson(map)
+                    Some((tmst, str))
+                  } catch {
+                    case e: Throwable => None
+                  }
+                } else None
+              }
+              (Some(records.groupByKey), emptyTmsts)
+            } else (None, emptyTmsts)
+          }
+          case _ => {
+            val records = stepDf.flatMap { row =>
+              val tmst = row.getAs[Long](InternalColumns.tmst)
+              try {
+                val map = SparkRowFormatter.formatRow(row)
+                val str = JsonUtil.toJson(map)
+                Some((tmst, str))
+              } catch {
+                case e: Throwable => None
               }
-            }.groupByKey()
-            Some(rdd)
-          } catch {
-            case e: Throwable => {
-              error(s"collect records ${name} error: ${e.getMessage}")
-              None
             }
+            (Some(records.groupByKey), Set[Long]())
           }
         }
-        case _ => None
       }
-    } else None
+      case _ => (None, Set[Long]())
+    }
+//    val recordsOpt = getRecordDataFrame(recordExport).flatMap { stepDf =>
+//      originDFOpt match {
+//        case Some(originName) => {
+//          val tmsts = (stepDf.collect.flatMap { row =>
+//            try {
+//              val tmst = row.getAs[Long](InternalColumns.tmst)
+//              val empty = row.getAs[Boolean](InternalColumns.empty)
+//              Some((tmst, empty))
+//            } catch {
+//              case _: Throwable => None
+//            }
+//          })
+//          val emptyTmsts = tmsts.filter(_._2).map(_._1).toSet
+//          val recordTmsts = tmsts.filter(!_._2).map(_._1).toSet
+//          if (recordTmsts.size > 0) {
+//            val recordsDf = sqlContext.table(s"`${originName}`")
+//            val records = recordsDf.flatMap { row =>
+//              val tmst = row.getAs[Long](InternalColumns.tmst)
+//              if (recordTmsts.contains(tmst)) {
+//                try {
+//                  val map = SparkRowFormatter.formatRow(row)
+//                  val str = JsonUtil.toJson(map)
+//                  Some((tmst, str))
+//                } catch {
+//                  case e: Throwable => None
+//                }
+//              } else None
+//            }
+//            Some((Some(records.groupByKey), emptyTmsts))
+//          } else Some((None, emptyTmsts))
+//        }
+//        case _ => {
+//          val records = stepDf.flatMap { row =>
+//            val tmst = row.getAs[Long](InternalColumns.tmst)
+//            try {
+//              val map = SparkRowFormatter.formatRow(row)
+//              val str = JsonUtil.toJson(map)
+//              Some((tmst, str))
+//            } catch {
+//              case e: Throwable => None
+//            }
+//          }
+//          Some(records.groupByKey)
+//        }
+//      }
+//    }
   }
 
+//
+//  def collectUpdateRDD(ruleStep: ConcreteRuleStep): Option[DataFrame] = {
+//    if (collectable) {
+//      ruleStep match {
+//        case step: ConcreteRuleStep if ((step.ruleInfo.persistType == 
RecordPersistType)
+//          || (step.ruleInfo.cacheDataSourceOpt.nonEmpty)) => {
+//          val tmst = step.timeInfo.tmst
+////          val metricName = step.ruleInfo.name
+//
+//          step.ruleInfo.tmstNameOpt match {
+//            case Some(metricTmstName) => {
+//              try {
+//                val pdf = sqlContext.table(s"`${metricTmstName}`")
+//                Some(pdf)
+//              } catch {
+//                case e: Throwable => {
+//                  error(s"collect records ${metricTmstName} error: 
${e.getMessage}")
+//                  None
+//                }
+//              }
+//            }
+//            case _ => None
+//          }
+//        }
+//        case _ => None
+//      }
+//    } else None
+//  }
+
+
+
+
+
+//  def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]
+//                      ): Option[RDD[(Long, Iterable[String])]] = {
+//    if (collectable) {
+//      ruleStep match {
+//        case step: ConcreteRuleStep if ((step.ruleInfo.persistType == 
RecordPersistType)
+//          || (step.ruleInfo.cacheDataSourceOpt.nonEmpty)) => {
+//          val tmst = step.timeInfo.tmst
+//          val metricName = step.ruleInfo.name
+//
+//          step.ruleInfo.tmstNameOpt match {
+//            case Some(metricTmstName) => {
+//              try {
+//                val pdf = sqlContext.table(s"`${metricTmstName}`")
+//                val cols = pdf.columns
+//                val rdd = pdf.flatMap { row =>
+//                  val values = cols.flatMap { col =>
+//                    Some((col, row.getAs[Any](col)))
+//                  }.toMap
+//                  values.get(GroupByColumn.tmst) match {
+//                    case Some(t: Long) if (timeGroups.exists(_ == t)) => 
Some((t, JsonUtil.toJson(values)))
+//                    case _ => None
+//                  }
+//                }.groupByKey()
+//
+//                // find other keys in time groups, create empty records for 
those timestamps
+//                val existKeys = rdd.keys.collect
+//                val otherKeys = timeGroups.filter(t => !existKeys.exists(_ 
== t))
+//                val otherPairs = otherKeys.map((_, Iterable[String]())).toSeq
+//                val otherPairRdd = 
sqlContext.sparkContext.parallelize(otherPairs)
+//
+//                Some(rdd union otherPairRdd)
+//              } catch {
+//                case e: Throwable => {
+//                  error(s"collect records ${metricTmstName} error: 
${e.getMessage}")
+//                  None
+//                }
+//              }
+//            }
+//            case _ => None
+//          }
+//        }
+//        case _ => None
+//      }
+//    } else None
+//  }
+
 //  def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: 
Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = {
 //    ruleStep match {
 //      case step: ConcreteRuleStep if (step.persistType == RecordPersistType) 
=> {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
index 9c47d77..9de7955 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
@@ -21,11 +21,12 @@ package org.apache.griffin.measure.process.engine
 import java.util.Date
 
 import org.apache.griffin.measure.config.params.user.DataSourceParam
-import org.apache.griffin.measure.data.connector.GroupByColumn
 import org.apache.griffin.measure.data.source._
 import org.apache.griffin.measure.persist.{Persist, PersistFactory}
+import org.apache.griffin.measure.process.temp.{DataFrameCaches, 
TableRegisters}
+import org.apache.griffin.measure.rule.adaptor.{GlobalKeys, InternalColumns}
 import org.apache.griffin.measure.rule.dsl._
-import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.rule.plan._
 import org.apache.griffin.measure.utils.JsonUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, GroupedData, SQLContext}
@@ -35,12 +36,24 @@ case class SparkSqlEngine(sqlContext: SQLContext) extends 
SparkDqEngine {
 
   override protected def collectable(): Boolean = true
 
-  def runRuleStep(ruleStep: ConcreteRuleStep): Boolean = {
+  def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = {
     ruleStep match {
-      case SparkSqlStep(name, rule, _, _, _) => {
+      case rs @ SparkSqlStep(name, rule, details, _, _) => {
         try {
-          val rdf = sqlContext.sql(rule)
-          rdf.registerTempTable(name)
+          val rdf = if (rs.isGlobal && 
!TableRegisters.existRunGlobalTable(name)) {
+            details.get(GlobalKeys._initRule) match {
+              case Some(initRule: String) => sqlContext.sql(initRule)
+              case _ => sqlContext.emptyDataFrame
+            }
+          } else sqlContext.sql(rule)
+
+          if (rs.isGlobal) {
+            if (rs.needCache) DataFrameCaches.cacheGlobalDataFrame(name, rdf)
+            TableRegisters.registerRunGlobalTable(rdf, name)
+          } else {
+            if (rs.needCache) DataFrameCaches.cacheDataFrame(timeInfo.key, 
name, rdf)
+            TableRegisters.registerRunTempTable(rdf, timeInfo.key, name)
+          }
           true
         } catch {
           case e: Throwable => {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala
new file mode 100644
index 0000000..fc5fea3
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala
@@ -0,0 +1,115 @@
+package org.apache.griffin.measure.process.temp
+
+import org.apache.griffin.measure.log.Loggable
+import org.apache.spark.sql.DataFrame
+
+import scala.collection.concurrent.{TrieMap, Map => ConcMap}
+
+object DataFrameCaches extends Loggable {
+
+  final val _global = "_global"
+
+  private val caches: ConcMap[String, Map[String, DataFrame]] = 
TrieMap[String, Map[String, DataFrame]]()
+  private val trashCaches: ConcMap[String, Seq[DataFrame]] = TrieMap[String, 
Seq[DataFrame]]()
+
+  private def trashDataFrame(key: String, df: DataFrame): Unit = {
+    trashCaches.get(key) match {
+      case Some(seq) => {
+        val suc = trashCaches.replace(key, seq, seq :+ df)
+        if (!suc) trashDataFrame(key, df)
+      }
+      case _ => {
+        val oldOpt = trashCaches.putIfAbsent(key, Seq[DataFrame](df))
+        if (oldOpt.nonEmpty) trashDataFrame(key, df)
+      }
+    }
+  }
+  private def trashDataFrames(key: String, dfs: Seq[DataFrame]): Unit = {
+    trashCaches.get(key) match {
+      case Some(seq) => {
+        val suc = trashCaches.replace(key, seq, seq ++ dfs)
+        if (!suc) trashDataFrames(key, dfs)
+      }
+      case _ => {
+        val oldOpt = trashCaches.putIfAbsent(key, dfs)
+        if (oldOpt.nonEmpty) trashDataFrames(key, dfs)
+      }
+    }
+  }
+
+  def cacheDataFrame(key: String, name: String, df: DataFrame): Unit = {
+    println(s"try to cache df ${name}")
+    caches.get(key) match {
+      case Some(mp) => {
+        mp.get(name) match {
+          case Some(odf) => {
+            val suc = caches.replace(key, mp, mp + (name -> df))
+            if (suc) {
+              println(s"cache after replace old df")
+              df.cache
+              trashDataFrame(key, odf)
+            } else {
+              cacheDataFrame(key, name, df)
+            }
+          }
+          case _ => {
+            val suc = caches.replace(key, mp, mp + (name -> df))
+            if (suc) {
+              println(s"cache after replace no old df")
+              df.cache
+            } else {
+              cacheDataFrame(key, name, df)
+            }
+          }
+        }
+      }
+      case _ => {
+        val oldOpt = caches.putIfAbsent(key, Map[String, DataFrame]((name -> 
df)))
+        if (oldOpt.isEmpty) {
+          println(s"cache after put absent")
+          df.cache
+        } else {
+          cacheDataFrame(key, name, df)
+        }
+      }
+    }
+  }
+  def cacheGlobalDataFrame(name: String, df: DataFrame): Unit = {
+    cacheDataFrame(_global, name, df)
+  }
+
+  def uncacheDataFrames(key: String): Unit = {
+    caches.remove(key) match {
+      case Some(mp) => {
+        trashDataFrames(key, mp.values.toSeq)
+      }
+      case _ => {}
+    }
+  }
+  def uncacheGlobalDataFrames(): Unit = {
+    uncacheDataFrames(_global)
+  }
+
+  def clearTrashDataFrames(key: String): Unit = {
+    trashCaches.remove(key) match {
+      case Some(seq) => seq.foreach(_.unpersist)
+      case _ => {}
+    }
+  }
+  def clearGlobalTrashDataFrames(): Unit = {
+    clearTrashDataFrames(_global)
+  }
+
+  def getDataFrames(key: String): Map[String, DataFrame] = {
+    caches.get(key) match {
+      case Some(mp) => mp
+      case _ => Map[String, DataFrame]()
+    }
+  }
+  def getGlobalDataFrames(): Map[String, DataFrame] = {
+    getDataFrames(_global)
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala
new file mode 100644
index 0000000..91a7541
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala
@@ -0,0 +1,153 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process.temp
+
+import org.apache.griffin.measure.log.Loggable
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+import scala.collection.concurrent.{TrieMap, Map => ConcMap}
+
+object TableRegisters extends Loggable {
+
+  final val _global = "_global"
+//
+//  val tables: ConcMap[String, Set[String]] = TrieMap[String, Set[String]]()
+
+  val compileTableRegs = TableRegs()
+  val runTableRegs = TableRegs()
+
+//  private def registerTable(key: String, table: String): Unit = {
+//    tables.get(key) match {
+//      case Some(set) => {
+//        val suc = tables.replace(key, set, set + table)
+//        if (!suc) registerTable(key, table)
+//      }
+//      case _ => {
+//        val oldOpt = tables.putIfAbsent(key, Set[String](table))
+//        if (oldOpt.nonEmpty) registerTable(key, table)
+//      }
+//    }
+//  }
+//
+//  private def unregisterTable(key: String, table: String): Option[String] = {
+//    tables.get(key) match {
+//      case Some(set) => {
+//        val ftb = set.find(_ == table)
+//        ftb match {
+//          case Some(tb) => {
+//            val nset = set - tb
+//            val suc = tables.replace(key, set, nset)
+//            if (suc) Some(tb)
+//            else unregisterTable(key, table)
+//          }
+//          case _ => None
+//        }
+//      }
+//      case _ => None
+//    }
+//  }
+//
+//  private def unregisterTables(key: String): Set[String] = {
+//    tables.remove(key) match {
+//      case Some(set) => set
+//      case _ => Set[String]()
+//    }
+//  }
+
+  private def dropTempTable(sqlContext: SQLContext, table: String): Unit = {
+    try {
+      sqlContext.dropTempTable(table)
+    } catch {
+      case e: Throwable => warn(s"drop temp table ${table} fails")
+    }
+  }
+
+  // -----
+
+  def registerRunGlobalTable(df: DataFrame, table: String): Unit = {
+    registerRunTempTable(df, _global, table)
+  }
+
+  def registerRunTempTable(df: DataFrame, key: String, table: String): Unit = {
+    runTableRegs.registerTable(key, table)
+    df.registerTempTable(table)
+  }
+
+  def registerCompileGlobalTable(table: String): Unit = {
+    registerCompileTempTable(_global, table)
+  }
+
+  def registerCompileTempTable(key: String, table: String): Unit = {
+    compileTableRegs.registerTable(key, table)
+  }
+
+  def unregisterRunTempTable(sqlContext: SQLContext, key: String, table: 
String): Unit = {
+    runTableRegs.unregisterTable(key, table).foreach(dropTempTable(sqlContext, 
_))
+  }
+
+  def unregisterCompileTempTable(key: String, table: String): Unit = {
+    compileTableRegs.unregisterTable(key, table)
+  }
+
+  def unregisterRunGlobalTables(sqlContext: SQLContext): Unit = {
+    unregisterRunTempTables(sqlContext, _global)
+  }
+
+  def unregisterCompileGlobalTables(): Unit = {
+    unregisterCompileTempTables(_global)
+  }
+
+  def unregisterRunTempTables(sqlContext: SQLContext, key: String): Unit = {
+    runTableRegs.unregisterTables(key).foreach(dropTempTable(sqlContext, _))
+  }
+
+  def unregisterCompileTempTables(key: String): Unit = {
+    compileTableRegs.unregisterTables(key)
+  }
+
+  def existRunGlobalTable(table: String): Boolean = {
+    existRunTempTable(_global, table)
+  }
+
+  def existCompileGlobalTable(table: String): Boolean = {
+    existCompileTempTable(_global, table)
+  }
+
+  def existRunTempTable(key: String, table: String): Boolean = {
+    runTableRegs.existTable(key, table)
+  }
+
+  def existCompileTempTable(key: String, table: String): Boolean = {
+    compileTableRegs.existTable(key, table)
+  }
+
+  def getRunGlobalTables(): Set[String] = {
+    getRunTempTables(_global)
+  }
+
+  def getRunTempTables(key: String): Set[String] = {
+    runTableRegs.getTables(key)
+  }
+
+}
+
+//object TempKeys {
+//  def key(t: Long): String = s"${t}"
+//  def key(head: String, t: Long): String = s"${head}_${t}"
+//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala
new file mode 100644
index 0000000..d205099
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process.temp
+
+import org.apache.spark.sql.SQLContext
+
+import scala.collection.concurrent.{TrieMap, Map => ConcMap}
+
+case class TableRegs() {
+
+  private val tables: ConcMap[String, Set[String]] = TrieMap[String, 
Set[String]]()
+
+  def registerTable(key: String, table: String): Unit = {
+    tables.get(key) match {
+      case Some(set) => {
+        val suc = tables.replace(key, set, set + table)
+        if (!suc) registerTable(key, table)
+      }
+      case _ => {
+        val oldOpt = tables.putIfAbsent(key, Set[String](table))
+        if (oldOpt.nonEmpty) registerTable(key, table)
+      }
+    }
+  }
+
+  def unregisterTable(key: String, table: String): Option[String] = {
+    tables.get(key) match {
+      case Some(set) => {
+        val ftb = set.find(_ == table)
+        ftb match {
+          case Some(tb) => {
+            val nset = set - tb
+            val suc = tables.replace(key, set, nset)
+            if (suc) Some(tb)
+            else unregisterTable(key, table)
+          }
+          case _ => None
+        }
+      }
+      case _ => None
+    }
+  }
+
+  def unregisterTables(key: String): Set[String] = {
+    tables.remove(key) match {
+      case Some(set) => set
+      case _ => Set[String]()
+    }
+  }
+
+  def existTable(key: String, table: String): Boolean = {
+    tables.get(key) match {
+      case Some(set) => set.exists(_ == table)
+      case _ => false
+    }
+  }
+
+  def getTables(key: String): Set[String] = {
+    tables.get(key) match {
+      case Some(set) => set
+      case _ => Set[String]()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala 
b/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala
index 16bb772..7b75043 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala
@@ -23,10 +23,16 @@ case class AccuracyResult(miss: Long, total: Long) extends 
Result {
 
   type T = AccuracyResult
 
+  override def isLegal(): Boolean = getTotal > 0
+
   def update(delta: T): T = {
     AccuracyResult(delta.miss, total)
   }
 
+  def initial(): Boolean = {
+    getMatch <= 0
+  }
+
   def eventual(): Boolean = {
     this.miss <= 0
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala 
b/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala
index 803416e..c90e095 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala
@@ -27,6 +27,10 @@ case class ProfileResult(matchCount: Long, totalCount: Long) 
extends Result {
     ProfileResult(matchCount + delta.matchCount, totalCount)
   }
 
+  def initial(): Boolean = {
+    this.matchCount <= 0
+  }
+
   def eventual(): Boolean = {
     this.matchCount >= totalCount
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala 
b/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala
index 6dcd9a1..caf6d96 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala
@@ -23,8 +23,12 @@ trait Result extends Serializable {
 
   type T <: Result
 
+  def isLegal(): Boolean = true
+
   def update(delta: T): T
 
+  def initial(): Boolean
+
   def eventual(): Boolean
 
   def differsFrom(other: T): Boolean

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
index eb57838..5447ccc 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala
@@ -19,26 +19,37 @@ under the License.
 package org.apache.griffin.measure.rule.adaptor
 
 import org.apache.griffin.measure.process.ProcessType
-import org.apache.griffin.measure.rule.step._
+import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
+import org.apache.griffin.measure.utils.ParamUtil._
 
-case class DataFrameOprAdaptor(adaptPhase: AdaptPhase) extends RuleAdaptor {
+case class DataFrameOprAdaptor() extends RuleAdaptor {
 
-  def genRuleStep(param: Map[String, Any]): Seq[RuleStep] = {
-    DfOprStep(getName(param), getRule(param), getDetails(param),
-      getPersistType(param), getUpdateDataSource(param)) :: Nil
-  }
-  def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = {
-    ruleStep match {
-      case rs @ DfOprStep(_, _, _, _, _) => rs :: Nil
-      case _ => Nil
-    }
-  }
+//  def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): 
Seq[RuleStep] = {
+//    val ruleInfo = RuleInfoGen(param, timeInfo)
+//    DfOprStep(timeInfo, ruleInfo) :: Nil
+////    DfOprStep(getName(param), getRule(param), getDetails(param),
+////      getPersistType(param), getUpdateDataSource(param)) :: Nil
+//  }
+//  def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = {
+//    ruleStep match {
+//      case rs @ DfOprStep(_, _) => rs :: Nil
+//      case _ => Nil
+//    }
+//  }
+
+//  def getTempSourceNames(param: Map[String, Any]): Seq[String] = {
+//    param.get(_name) match {
+//      case Some(name) => name.toString :: Nil
+//      case _ => Nil
+//    }
+//  }
+
+  import RuleParamKeys._
 
-  def getTempSourceNames(param: Map[String, Any]): Seq[String] = {
-    param.get(_name) match {
-      case Some(name) => name.toString :: Nil
-      case _ => Nil
-    }
+  def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: 
ProcessType): RulePlan = {
+    val name = getRuleName(param)
+    val step = DfOprStep(name, getRule(param), getDetails(param), 
getCache(param), getGlobal(param))
+    RulePlan(step :: Nil, genRuleExports(param, name, name))
   }
 
 }

Reply via email to