http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
new file mode 100644
index 0000000..c29c072
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -0,0 +1,115 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.write
+
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.JsonUtil
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+  * write metrics into context metric wrapper
+  */
+case class MetricWriteStep(name: String,
+                           inputName: String,
+                           collectType: NormalizeType,
+                           writeTimestampOpt: Option[Long] = None
+                          ) extends WriteStep {
+
+  val emptyMetricMap = Map[Long, Map[String, Any]]()
+  val emptyMap = Map[String, Any]()
+
+  def execute(context: DQContext): Boolean = {
+    val timestamp = writeTimestampOpt.getOrElse(context.contextId.timestamp)
+
+    // get metric list from data frame
+    val metricMaps: Seq[Map[String, Any]] = getMetricMaps(context)
+
+    // get timestamp and normalize metric
+    val writeMode = writeTimestampOpt.map(_ => 
SimpleMode).getOrElse(context.writeMode)
+    val timestampMetricMap: Map[Long, Map[String, Any]] = writeMode match {
+      case SimpleMode => {
+        println(metricMaps)
+        val metrics: Map[String, Any] = normalizeMetric(metricMaps, name, 
collectType)
+        emptyMetricMap + (timestamp -> metrics)
+      }
+      case TimestampMode => {
+        val tmstMetrics = metricMaps.map { metric =>
+          val tmst = metric.getLong(ConstantColumns.tmst, timestamp)
+          val pureMetric = metric.removeKeys(ConstantColumns.columns)
+          (tmst, pureMetric)
+        }
+        tmstMetrics.groupBy(_._1).map { pair =>
+          val (k, v) = pair
+          val maps = v.map(_._2)
+          val mtc = normalizeMetric(maps, name, collectType)
+          (k, mtc)
+        }
+      }
+    }
+
+    // write to metric wrapper
+    timestampMetricMap.foreach { pair =>
+      val (timestamp, v) = pair
+      context.metricWrapper.insertMetric(timestamp, v)
+    }
+
+    true
+  }
+
+  private def getMetricMaps(context: DQContext): Seq[Map[String, Any]] = {
+    try {
+      val pdf = context.sqlContext.table(s"`${inputName}`")
+      val records = pdf.toJSON.collect()
+      if (records.size > 0) {
+        records.flatMap { rec =>
+          try {
+            val value = JsonUtil.toAnyMap(rec)
+            Some(value)
+          } catch {
+            case e: Throwable => None
+          }
+        }.toSeq
+      } else Nil
+    } catch {
+      case e: Throwable => {
+        error(s"get metric ${name} fails")
+        Nil
+      }
+    }
+  }
+
+  private def normalizeMetric(metrics: Seq[Map[String, Any]], name: String, 
collectType: NormalizeType
+                             ): Map[String, Any] = {
+    collectType match {
+      case EntriesNormalizeType => metrics.headOption.getOrElse(emptyMap)
+      case ArrayNormalizeType => Map[String, Any]((name -> metrics))
+      case MapNormalizeType => {
+        val v = metrics.headOption.getOrElse(emptyMap)
+        Map[String, Any]((name -> v))
+      }
+      case _ => {
+        if (metrics.size > 1) Map[String, Any]((name -> metrics))
+        else metrics.headOption.getOrElse(emptyMap)
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
new file mode 100644
index 0000000..13b7f80
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/RecordWriteStep.scala
@@ -0,0 +1,151 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.write
+
+import org.apache.griffin.measure.configuration.enums._
+import org.apache.griffin.measure.context.DQContext
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.JsonUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+
+/**
+  * write records needs to be persisted
+  */
+case class RecordWriteStep(name: String,
+                           inputName: String,
+                           filterTableNameOpt: Option[String] = None,
+                           writeTimestampOpt: Option[Long] = None
+                          ) extends WriteStep {
+
+  def execute(context: DQContext): Boolean = {
+    val timestamp = writeTimestampOpt.getOrElse(context.contextId.timestamp)
+
+    val writeMode = writeTimestampOpt.map(_ => 
SimpleMode).getOrElse(context.writeMode)
+    writeMode match {
+      case SimpleMode => {
+        // batch records
+        val recordsOpt = collectBatchRecords(context)
+        // write records
+        recordsOpt match {
+          case Some(records) => {
+            context.getPersist(timestamp).persistRecords(records, name)
+          }
+          case _ => {}
+        }
+      }
+      case TimestampMode => {
+        // streaming records
+        val (recordsOpt, emptyTimestamps) = collectStreamingRecords(context)
+        // write records
+        recordsOpt.foreach { records =>
+          records.foreach { pair =>
+            val (t, strs) = pair
+            context.getPersist(t).persistRecords(strs, name)
+          }
+        }
+        emptyTimestamps.foreach { t =>
+          context.getPersist(t).persistRecords(Nil, name)
+        }
+      }
+    }
+    true
+  }
+
+  private def getTmst(row: Row, defTmst: Long): Long = {
+    try {
+      row.getAs[Long](ConstantColumns.tmst)
+    } catch {
+      case _: Throwable => defTmst
+    }
+  }
+
+  private def getDataFrame(context: DQContext, name: String): 
Option[DataFrame] = {
+    try {
+      val df = context.sqlContext.table(s"`${name}`")
+      Some(df)
+    } catch {
+      case e: Throwable => {
+        error(s"get data frame ${name} fails")
+        None
+      }
+    }
+  }
+
+  private def getRecordDataFrame(context: DQContext): Option[DataFrame] = 
getDataFrame(context, inputName)
+
+  private def getFilterTableDataFrame(context: DQContext): Option[DataFrame] = 
filterTableNameOpt.flatMap(getDataFrame(context, _))
+
+  private def collectBatchRecords(context: DQContext): Option[RDD[String]] = {
+    getRecordDataFrame(context).map(_.toJSON.rdd);
+  }
+
+  private def collectStreamingRecords(context: DQContext): (Option[RDD[(Long, 
Iterable[String])]], Set[Long]) = {
+    implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.STRING)
+    val defTimestamp = context.contextId.timestamp
+    getRecordDataFrame(context) match {
+      case Some(df) => {
+        val (filterFuncOpt, emptyTimestamps) = 
getFilterTableDataFrame(context) match {
+          case Some(filterDf) => {
+            // timestamps with empty flag
+            val tmsts: Array[(Long, Boolean)] = (filterDf.collect.flatMap { 
row =>
+              try {
+                val tmst = getTmst(row, defTimestamp)
+                val empty = row.getAs[Boolean](ConstantColumns.empty)
+                Some((tmst, empty))
+              } catch {
+                case _: Throwable => None
+              }
+            })
+            val emptyTmsts = tmsts.filter(_._2).map(_._1).toSet
+            val recordTmsts = tmsts.filter(!_._2).map(_._1).toSet
+            val filterFuncOpt: Option[(Long) => Boolean] = if 
(recordTmsts.size > 0) {
+              Some((t: Long) => recordTmsts.contains(t))
+            } else None
+
+            (filterFuncOpt, emptyTmsts)
+          }
+          case _ => (Some((t: Long) => true), Set[Long]())
+        }
+
+        // filter timestamps need to record
+        filterFuncOpt match {
+          case Some(filterFunc) => {
+            val records = df.flatMap { row =>
+              val tmst = getTmst(row, defTimestamp)
+              if (filterFunc(tmst)) {
+                try {
+                  val map = SparkRowFormatter.formatRow(row)
+                  val str = JsonUtil.toJson(map)
+                  Some((tmst, str))
+                } catch {
+                  case e: Throwable => None
+                }
+              } else None
+            }
+            (Some(records.rdd.groupByKey), emptyTimestamps)
+          }
+          case _ => (None, emptyTimestamps)
+        }
+      }
+      case _ => (None, Set[Long]())
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala
new file mode 100644
index 0000000..592c1d4
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/SparkRowFormatter.scala
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.write
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{ArrayType, DataType, StructField, 
StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * spark row formatter
+  */
+object SparkRowFormatter {
+
+  def formatRow(row: Row): Map[String, Any] = {
+    formatRowWithSchema(row, row.schema)
+  }
+
+  private def formatRowWithSchema(row: Row, schema: StructType): Map[String, 
Any] = {
+    formatStruct(schema.fields, row)
+  }
+
+  private def formatStruct(schema: Seq[StructField], r: Row) = {
+    val paired = schema.zip(r.toSeq)
+    paired.foldLeft(Map[String, Any]())((s, p) => s ++ formatItem(p))
+  }
+
+  private def formatItem(p: Tuple2[StructField, Any]): Map[String, Any] = {
+    p match {
+      case (sf, a) =>
+        sf.dataType match {
+          case ArrayType(et, _) =>
+            Map(sf.name -> (if (a == null) a else formatArray(et, 
a.asInstanceOf[ArrayBuffer[Any]])))
+          case StructType(s) =>
+            Map(sf.name -> (if (a == null) a else formatStruct(s, 
a.asInstanceOf[Row])))
+          case _ => Map(sf.name -> a)
+        }
+    }
+  }
+
+  private def formatArray(et: DataType, arr: ArrayBuffer[Any]): Seq[Any] = {
+    et match {
+      case StructType(s) => arr.map(e => formatStruct(s, e.asInstanceOf[Row]))
+      case ArrayType(t, _) =>
+        arr.map(e => formatArray(t, e.asInstanceOf[ArrayBuffer[Any]]))
+      case _ => arr
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/write/WriteStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/WriteStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/WriteStep.scala
new file mode 100644
index 0000000..329975c
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/WriteStep.scala
@@ -0,0 +1,31 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.write
+
+import org.apache.griffin.measure.step.DQStep
+
+trait WriteStep extends DQStep {
+
+  val inputName: String
+
+  val writeTimestampOpt: Option[Long]
+
+  override def getNames(): Seq[String] = Nil
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java 
b/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java
deleted file mode 100644
index 6ab951b..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/util/MailUtil.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.measure.util;
-
-
-import org.apache.griffin.measure.config.params.env.EmailParam;
-
-import javax.mail.*;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeBodyPart;
-import javax.mail.internet.MimeMessage;
-import javax.mail.internet.MimeMultipart;
-import java.util.Properties;
-
-/**
- * Created by xiaoqiu.duan on 2017/9/11.
- */
-public class MailUtil {
-
-    public static void SendMail(String name, String UserArr, String Title, 
String Info, EmailParam emailParam) throws MessagingException {
-        Properties prop = new Properties();
-        prop.setProperty("mail.transport.protocol", "smtp");
-        prop.setProperty("mail.smtp.host", emailParam.host());
-        prop.setProperty("mail.smtp.auth", "NTLM");
-        prop.setProperty("mail.debug", "true");
-        Session session = Session.getInstance(prop);
-        Message msg = new MimeMessage(session);
-        msg.setFrom(new InternetAddress(emailParam.mail()));
-        //msg.setRecipient(Message.RecipientType.TO, new 
InternetAddress(UserArr));
-        String[] arr = null;
-        if (!UserArr.contains(",")) {
-            arr = new String[]{UserArr};
-        } else {
-            arr = UserArr.split(",");
-        }
-        Address[] tos = new InternetAddress[arr.length];
-        for (int i = 0; i < arr.length; i++) {
-            tos[i] = new InternetAddress(arr[i]);
-        }
-        msg.setRecipients(Message.RecipientType.TO, tos);
-        msg.setSubject(Title);
-        Multipart mainPart = new MimeMultipart();
-        BodyPart html = new MimeBodyPart();
-        html.setContent(Info, "text/html; charset=utf-8");
-        mainPart.addBodyPart(html);
-        msg.setContent(mainPart);
-        msg.addHeader("X-Priority", "3");
-        msg.addHeader("X-MSMail-Priority", "Normal");
-        msg.addHeader("X-Mailer", "Microsoft Outlook Express 6.00.2900.2869");
-        msg.addHeader("X-MimeOLE", "Produced By Microsoft MimeOLE 
V6.00.2900.2869");
-        msg.addHeader("ReturnReceipt", "1");
-        Transport trans = session.getTransport();
-        trans.connect(emailParam.usr(), emailParam.pwd());
-        trans.sendMessage(msg, msg.getAllRecipients());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/util/Md5Util.java
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/util/Md5Util.java 
b/measure/src/main/scala/org/apache/griffin/measure/util/Md5Util.java
deleted file mode 100644
index 0782510..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/util/Md5Util.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package org.apache.griffin.measure.util;
-
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-
-public class Md5Util {
-
-    private static final char[] HEX_DIGITS = { '0', '1', '2', '3', '4', '5', 
'6', '7', '8', '9', 'a', 'b', 'c', 'd',
-            'e', 'f' };
-
-    private Md5Util() {
-    }
-
-    public static String md5(String s) {
-
-        try {
-            byte[] bytes = s.getBytes();
-
-            MessageDigest messageDigest = MessageDigest.getInstance("MD5");
-            messageDigest.update(bytes);
-            byte[] mdBytes = messageDigest.digest();
-            int j = mdBytes.length;
-            char[] str = new char[j * 2];
-            int k = 0;
-            for (int i = 0; i < j; i++) {
-                byte byte0 = mdBytes[i];
-                str[k++] = HEX_DIGITS[byte0 >>> 4 & 0xf];
-                str[k++] = HEX_DIGITS[byte0 & 0xf];
-            }
-            return new String(str);
-        } catch (NoSuchAlgorithmException ex) {
-
-            return null;
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java 
b/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java
deleted file mode 100644
index 4f15b62..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-//package org.apache.griffin.measure.util;
-//
-//import java.io.IOException;
-//
-//import org.apache.commons.lang.StringUtils;
-//import java.util.ArrayList;
-//import java.io.*;
-//import java.io.BufferedReader;
-//import java.io.InputStreamReader;
-//import java.net.*;
-//import java.util.*;
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Map;
-//
-//
-//import org.apache.griffin.measure.config.params.env.SMSParam;
-//import org.json.JSONException;
-//import org.json.JSONObject;
-//
-///**
-// * Created by xiaoqiu.duan on 2017/9/11.
-// */
-//public class MessageUtil {
-//
-//
-//    public static String sendSMSCode(String teamPhone, String content, 
SMSParam smsParam){
-//        String url=smsParam.host();
-//        String SYS_ID=smsParam.id();
-//        String KEY=smsParam.key();
-//        String sendContext="["+smsParam.uuid()+"]: "+content ;
-//        System.out.println(" sendContext:  "+sendContext);
-//        Long timestamp=new Date().getTime()+20610;
-//        System.out.println(" timestamp:  "+timestamp);
-//        String[]  tels=teamPhone.split(",") ;
-//        String uuid = UUID.randomUUID().toString().replaceAll("-", "");
-//        Map<String,Object> param=new HashMap<String, Object>();
-//        param.put("apportionment","JR_DATA_ALARM");
-//        param.put("event",0);
-//        param.put("eventTime",0);
-//        param.put("isMasked",false);
-//        param.put("originator","ETC");
-//        param.put("reqId",uuid);
-//        param.put("schemaKey","");
-//        param.put("sysId",SYS_ID);
-//        param.put("template",sendContext);
-//        param.put("templateId","");
-//        param.put("timestamp",timestamp);
-//        param.put("token",Md5Util.md5(SYS_ID+timestamp+KEY));
-//        param.put("typeCode","JR_DATA_ALARM");
-//        System.out.println("params:  "+param);
-//        List<Map<String, Object>> bodys = new ArrayList<Map<String, 
Object>>();
-//          for (int i=0;i<tels.length;i++) {
-//            Map<String, Object> body = new HashMap<String, Object>();
-//            body.put("phoneNo", tels[i]);
-//            body.put("params", "");
-//            body.put("userId", 0);
-//            bodys.add(body);
-//        }
-//        System.out.println("bodys:  "+bodys);
-//        JSONObject jsonParam = new JSONObject();
-//        try {
-//            jsonParam.put("params",param);
-//            jsonParam.put("bodys",bodys);
-//            System.out.println("jsonParam:  "+jsonParam);
-//            System.out.println("jsonParam:  "+jsonParam.toString());
-//        } catch (JSONException e) {
-//            e.printStackTrace();
-//        }
-//        URL u=null;
-//        int smsnum=0;
-//        HttpURLConnection connection=null;
-//        try{
-//           String result= postRequestUrl(url, jsonParam.toString(), "utf-8");
-//            return "send success";
-//        }catch(Exception e){
-//            e.printStackTrace();
-//            return null;
-//        }
-//
-//    }
-//
-//
-//    public static String postRequestUrl(String url, String param,String 
encode) {
-//        OutputStreamWriter out = null;
-//        BufferedReader reader = null;
-//        String response="";
-//        try {
-//            URL httpUrl = null;
-//            httpUrl = new URL(url);
-//            HttpURLConnection conn = (HttpURLConnection) 
httpUrl.openConnection();
-//            conn.setRequestMethod("POST");
-//            conn.setRequestProperty("Content-Type", 
"application/json");//x-www-form-urlencoded
-//            conn.setRequestProperty("connection", "keep-alive");
-//            conn.setUseCaches(false);
-//            conn.setInstanceFollowRedirects(true);
-//            conn.setDoOutput(true);
-//            conn.setDoInput(true);
-//            conn.connect();
-//            //POST
-//            out = new OutputStreamWriter(
-//                    conn.getOutputStream());
-//            out.write(param);
-//
-//            out.flush();
-//
-//            System.out.println("send POST "+conn.getResponseCode());
-//            reader = new BufferedReader(new InputStreamReader(
-//                    conn.getInputStream()));
-//
-//            String lines;
-//            while ((lines = reader.readLine()) != null) {
-//                lines = new String(lines.getBytes(), "utf-8");
-//                response+=lines;
-//                System.out.println("lines:  "+lines);
-//            }
-//            reader.close();
-//            conn.disconnect();
-//
-//            //log.info(response.toString());
-//        } catch (Exception e) {
-//            System.out.println("send POST error!"+e);
-//            e.printStackTrace();
-//        }
-//        finally{
-//            try{
-//                if(out!=null){
-//                    out.close();
-//                }
-//                if(reader!=null){
-//                    reader.close();
-//                }
-//            }
-//            catch(IOException ex){
-//                ex.printStackTrace();
-//            }
-//        }
-//
-//        return response;
-//    }
-//
-//
-//
-//    private static String readBufferedContent(BufferedReader bufferedReader) 
{
-//        if (bufferedReader == null)
-//            return null;
-//        StringBuffer result = new StringBuffer();
-//        String line = null;
-//        try {
-//            while (StringUtils.isNotBlank((line = 
bufferedReader.readLine()))) {
-//                result.append(line+"\r\n");
-//            }
-//        } catch (IOException e) {
-//            e.printStackTrace();
-//            return null;
-//        }
-//        return result.toString();
-//    }
-//
-//}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala
index 9390160..1405c49 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala
@@ -35,7 +35,7 @@ object DataFrameUtil {
 
   def unionByName(a: DataFrame, b: DataFrame): DataFrame = {
     val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq
-    a.select(columns: _*).unionAll(b.select(columns: _*))
+    a.select(columns: _*).union(b.select(columns: _*))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
new file mode 100644
index 0000000..023a138
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/FSUtil.scala
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+import java.io.File
+import java.net.URI
+
+import org.apache.griffin.measure.Loggable
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+
+import scala.collection.mutable.{Map => MutableMap}
+
+object FSUtil extends Loggable {
+
+  private val fsMap: MutableMap[String, FileSystem] = MutableMap()
+  private val defaultFS: FileSystem = FileSystem.get(getConfiguration)
+
+  def getFileSystem(path: String): FileSystem = {
+    getUriOpt(path) match {
+      case Some(uri) => {
+        fsMap.get(uri.getScheme) match {
+          case Some(fs) => fs
+          case _ => {
+            val fs = try {
+              FileSystem.get(uri, getConfiguration)
+            } catch {
+              case e: Throwable => {
+                error(s"get file system error: ${e.getMessage}")
+                throw e
+              }
+            }
+            fsMap += (uri.getScheme -> fs)
+            fs
+          }
+        }
+      }
+      case _ => defaultFS
+    }
+  }
+
+  private def getConfiguration(): Configuration = {
+    val conf = new Configuration()
+    conf.setBoolean("dfs.support.append", true)
+//    conf.set("fs.defaultFS", "hdfs://localhost")    // debug in hdfs 
localhost env
+    conf
+  }
+
+  private def getUriOpt(path: String): Option[URI] = {
+    val uriOpt = try {
+      Some(new URI(path))
+    } catch {
+      case e: Throwable => None
+    }
+    uriOpt.flatMap { uri =>
+      if (uri.getScheme == null) {
+        try {
+          Some(new File(path).toURI)
+        } catch {
+          case e: Throwable => None
+        }
+      } else Some(uri)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
deleted file mode 100644
index 8e0d9a3..0000000
--- 
a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsFileDumpUtil.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.utils
-
-import org.apache.spark.TaskContext
-import org.apache.spark.rdd.RDD
-
-object HdfsFileDumpUtil {
-
-  val sepCount = 50000
-
-  private def suffix(i: Long): String = {
-    if (i == 0) "" else s".${i}"
-  }
-  private def samePattern(fileName: String, patternFileName: String): Boolean 
= {
-    fileName.startsWith(patternFileName)
-  }
-
-  def splitRdd[T](rdd: RDD[T])(implicit m: Manifest[T]): RDD[(Long, 
Iterable[T])] = {
-//    val indexRdd = rdd.zipWithIndex // slow process
-//    indexRdd.map(p => ((p._2 / sepCount), p._1)).groupByKey() // slow process
-    val count = rdd.count
-    val splitCount = count / sepCount + 1
-    val splitRdd = rdd.mapPartitionsWithIndex { (n, itr) =>
-      val idx = n % splitCount
-      itr.map((idx, _))
-    }
-    splitRdd.groupByKey()
-  }
-  def splitIterable[T](datas: Iterable[T])(implicit m: Manifest[T]): 
Iterator[(Int, Iterable[T])] = {
-    val groupedData = datas.grouped(sepCount).zipWithIndex
-    groupedData.map(v => (v._2, v._1))
-  }
-
-  private def directDump(path: String, list: Iterable[String], lineSep: 
String): Unit = {
-    // collect and save
-    val strRecords = list.mkString(lineSep)
-    // save into hdfs
-    HdfsUtil.writeContent(path, strRecords)
-  }
-
-  def dump(path: String, recordsRdd: RDD[String], lineSep: String): Unit = {
-    val groupedRdd = splitRdd(recordsRdd)
-    groupedRdd.foreach { pair =>
-      val (idx, list) = pair
-      val filePath = path + suffix(idx)
-      directDump(filePath, list, lineSep)
-    }
-//    groupedRdd.aggregate(true)({ (res, pair) =>
-//      val (idx, list) = pair
-//      val filePath = path + suffix(idx)
-//      directDump(filePath, list, lineSep)
-//      true
-//    }, _ && _)
-  }
-  def dump(path: String, records: Iterable[String], lineSep: String): Unit = {
-    val groupedRecords = splitIterable(records)
-    groupedRecords.foreach { pair =>
-      val (idx, list) = pair
-      val filePath = path + suffix(idx)
-      directDump(filePath, list, lineSep)
-    }
-//    groupedRecords.aggregate(true)({ (res, pair) =>
-//      val (idx, list) = pair
-//      val filePath = path + suffix(idx)
-//      directDump(filePath, list, lineSep)
-//      true
-//    }, _ && _)
-  }
-
-  def remove(path: String, filename: String, withSuffix: Boolean): Unit = {
-    if (withSuffix) {
-      val files = HdfsUtil.listSubPathsByType(path, "file")
-      val patternFiles = files.filter(samePattern(_, filename))
-      patternFiles.foreach { f =>
-        val rmPath = HdfsUtil.getHdfsFilePath(path, f)
-        HdfsUtil.deleteHdfsPath(rmPath)
-      }
-    } else {
-      val rmPath = HdfsUtil.getHdfsFilePath(path, filename)
-      HdfsUtil.deleteHdfsPath(rmPath)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
index aa5643b..89f505a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
@@ -18,24 +18,19 @@ under the License.
 */
 package org.apache.griffin.measure.utils
 
-import org.apache.griffin.measure.log.Loggable
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, 
FileSystem, Path}
+import org.apache.griffin.measure.Loggable
+import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path}
 
 object HdfsUtil extends Loggable {
 
   private val seprator = "/"
 
-  private val conf = new Configuration()
-  conf.setBoolean("dfs.support.append", true)
-//  conf.set("fs.defaultFS", "hdfs://localhost")    // debug @localhost
-
-  private val dfs = FileSystem.get(conf)
+  private def getFS(implicit path: Path) = FSUtil.getFileSystem(path.toString)
 
   def existPath(filePath: String): Boolean = {
     try {
-      val path = new Path(filePath)
-      dfs.exists(path)
+      implicit val path = new Path(filePath)
+      getFS.exists(path)
     } catch {
       case e: Throwable => false
     }
@@ -47,21 +42,21 @@ object HdfsUtil extends Loggable {
   }
 
   def createFile(filePath: String): FSDataOutputStream = {
-    val path = new Path(filePath)
-    if (dfs.exists(path)) dfs.delete(path, true)
-    return dfs.create(path)
+    implicit val path = new Path(filePath)
+    if (getFS.exists(path)) getFS.delete(path, true)
+    return getFS.create(path)
   }
 
   def appendOrCreateFile(filePath: String): FSDataOutputStream = {
-    val path = new Path(filePath)
-    if (dfs.getConf.getBoolean("dfs.support.append", false) && 
dfs.exists(path)) {
-        dfs.append(path)
+    implicit val path = new Path(filePath)
+    if (getFS.getConf.getBoolean("dfs.support.append", false) && 
getFS.exists(path)) {
+      getFS.append(path)
     } else createFile(filePath)
   }
 
   def openFile(filePath: String): FSDataInputStream = {
-    val path = new Path(filePath)
-    dfs.open(path)
+    implicit val path = new Path(filePath)
+    getFS.open(path)
   }
 
   def writeContent(filePath: String, message: String): Unit = {
@@ -88,35 +83,35 @@ object HdfsUtil extends Loggable {
 
   def deleteHdfsPath(dirPath: String): Unit = {
     try {
-      val path = new Path(dirPath)
-      if (dfs.exists(path)) dfs.delete(path, true)
+      implicit val path = new Path(dirPath)
+      if (getFS.exists(path)) getFS.delete(path, true)
     } catch {
       case e: Throwable => error(s"delete path [${dirPath}] error: 
${e.getMessage}")
     }
   }
 
-//  def listPathFiles(dirPath: String): Iterable[String] = {
-//    val path = new Path(dirPath)
-//    try {
-//      val fileStatusArray = dfs.listStatus(path)
-//      fileStatusArray.flatMap { fileStatus =>
-//        if (fileStatus.isFile) {
-//          Some(fileStatus.getPath.getName)
-//        } else None
-//      }
-//    } catch {
-//      case e: Throwable => {
-//        println(s"list path files error: ${e.getMessage}")
-//        Nil
-//      }
-//    }
-//  }
+  //  def listPathFiles(dirPath: String): Iterable[String] = {
+  //    val path = new Path(dirPath)
+  //    try {
+  //      val fileStatusArray = dfs.listStatus(path)
+  //      fileStatusArray.flatMap { fileStatus =>
+  //        if (fileStatus.isFile) {
+  //          Some(fileStatus.getPath.getName)
+  //        } else None
+  //      }
+  //    } catch {
+  //      case e: Throwable => {
+  //        println(s"list path files error: ${e.getMessage}")
+  //        Nil
+  //      }
+  //    }
+  //  }
 
   def listSubPathsByType(dirPath: String, subType: String, fullPath: Boolean = 
false): Iterable[String] = {
     if (existPath(dirPath)) {
       try {
-        val path = new Path(dirPath)
-        val fileStatusArray = dfs.listStatus(path)
+        implicit val path = new Path(dirPath)
+        val fileStatusArray = getFS.listStatus(path)
         fileStatusArray.filter { fileStatus =>
           subType match {
             case "dir" => fileStatus.isDirectory

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
index d125d87..3f05499 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
@@ -47,6 +47,18 @@ object ParamUtil {
       }
     }
 
+    def getLazyString(key: String, defValue: () => String): String = {
+      try {
+        params.get(key) match {
+          case Some(v: String) => v.toString
+          case Some(v) => v.toString
+          case _ => defValue()
+        }
+      } catch {
+        case _: Throwable => defValue()
+      }
+    }
+
     def getStringOrKey(key: String): String = getString(key, key)
 
     def getByte(key: String, defValue: Byte): Byte = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
index 9b4d58e..e96cbb1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala
@@ -18,7 +18,7 @@ under the License.
 */
 package org.apache.griffin.measure.utils
 
-import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.Loggable
 
 import scala.util.matching.Regex
 import scala.util.{Failure, Success, Try}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_accuracy-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json 
b/measure/src/test/resources/_accuracy-batch-griffindsl.json
deleted file mode 100644
index 10167cd..0000000
--- a/measure/src/test/resources/_accuracy-batch-griffindsl.json
+++ /dev/null
@@ -1,56 +0,0 @@
-{
-  "name": "accu_batch",
-
-  "process.type": "batch",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
-        }
-      ]
-    }, {
-      "name": "target",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_target.avro"
-          }
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "accuracy",
-        "name": "accu",
-        "rule": "source.user_id = target.user_id AND upper(source.first_name) 
= upper(target.first_name) AND source.last_name = target.last_name AND 
source.address = target.address AND source.email = target.email AND 
source.phone = target.phone AND source.post_code = target.post_code",
-        "details": {
-          "source": "source",
-          "target": "target",
-          "miss": "miss_count",
-          "total": "total_count",
-          "matched": "matched_count"
-        },
-        "metric": {
-          "name": "accu"
-        },
-        "record": {
-          "name": "missRecords"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_accuracy-batch-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-batch-sparksql.json 
b/measure/src/test/resources/_accuracy-batch-sparksql.json
deleted file mode 100644
index 2eef9f1..0000000
--- a/measure/src/test/resources/_accuracy-batch-sparksql.json
+++ /dev/null
@@ -1,63 +0,0 @@
-{
-  "name": "accu_batch",
-
-  "process.type": "batch",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
-        }
-      ]
-    }, {
-      "name": "target",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_target.avro"
-          }
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "spark-sql",
-        "name": "missRecords",
-        "rule": "SELECT source.* FROM source LEFT JOIN target ON 
coalesce(source.user_id, '') = coalesce(target.user_id, '') AND 
coalesce(source.first_name, '') = coalesce(target.first_name, '') AND 
coalesce(source.post_code, '') = coalesce(target.post_code, '') WHERE (NOT 
(source.user_id IS NULL AND source.first_name IS NULL AND source.post_code IS 
NULL)) AND (target.user_id IS NULL AND target.first_name IS NULL AND 
target.post_code IS NULL)",
-        "record": {
-          "name": "miss"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "miss_count",
-        "rule": "SELECT count(*) as miss FROM `missRecords`"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "total_count",
-        "rule": "SELECT count(*) as total FROM source"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "accu",
-        "rule": "SELECT `total_count`.`total` AS `total`, 
coalesce(`miss_count`.`miss`, 0) AS `miss`, (`total` - `miss`) AS `matched` 
FROM `total_count` FULL JOIN `miss_count`",
-        "metric": {
-          "name": "accu"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_accuracy-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json 
b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
deleted file mode 100644
index 240d768..0000000
--- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json
+++ /dev/null
@@ -1,121 +0,0 @@
-{
-  "name": "accu_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.147.177.107:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "sss",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "type": "parquet",
-        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
-        "info.path": "source",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["-2m", "0"],
-        "init.clear": true,
-        "updatable": true
-      }
-    }, {
-      "name": "target",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.147.177.107:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "ttt",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${t1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${t1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "type": "parquet",
-        "file.path": "hdfs://localhost/griffin/streaming/dump/target",
-        "info.path": "target",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["-2m", "0"],
-        "init.clear": true
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "accuracy",
-        "name": "accu",
-        "rule": "source.name = target.name and source.age = target.age",
-        "details": {
-          "source": "source",
-          "target": "target",
-          "miss": "miss_count",
-          "total": "total_count",
-          "matched": "matched_count"
-        },
-        "metric": {
-          "name": "accu"
-        },
-        "record": {
-          "name": "missRecords"
-        }
-      }
-    ]
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_accuracy-streaming-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-streaming-sparksql.json 
b/measure/src/test/resources/_accuracy-streaming-sparksql.json
deleted file mode 100644
index 0824cb8..0000000
--- a/measure/src/test/resources/_accuracy-streaming-sparksql.json
+++ /dev/null
@@ -1,142 +0,0 @@
-{
-  "name": "accu_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "sss",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
-        "info.path": "source",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["-2m", "0"]
-      }
-    }, {
-      "name": "target",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "ttt",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${t1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${t1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/target",
-        "info.path": "target",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["-2m", "0"]
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "spark-sql",
-        "name": "missRecords",
-        "cache": true,
-        "rule": "SELECT source.* FROM source LEFT JOIN target ON 
coalesce(source.name, '') = coalesce(target.name, '') AND coalesce(source.age, 
'') = coalesce(target.age, '') WHERE (NOT (source.name IS NULL AND source.age 
IS NULL)) AND (target.name IS NULL AND target.age IS NULL)"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "miss_count",
-        "rule": "SELECT `__tmst`, count(*) as miss FROM `missRecords` GROUP BY 
`__tmst`"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "total_count",
-        "rule": "SELECT `__tmst`, count(*) as total FROM source GROUP BY 
`__tmst`"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "accu",
-        "rule": "SELECT `total_count`.`__tmst` AS `__tmst`, 
`total_count`.`total` AS `total`, coalesce(`miss_count`.`miss`, 0) AS `miss` 
FROM `total_count` FULL JOIN `miss_count` ON `total_count`.`__tmst` = 
`miss_count`.`__tmst`"
-      },
-      {
-        "dsl.type": "df-opr",
-        "name": "metric_accu",
-        "rule": "accuracy",
-        "details": {
-          "df.name": "accu",
-          "miss": "miss",
-          "total": "total",
-          "matched": "matched"
-        },
-        "metric": {
-          "name": "accuracy"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "accu_miss_records",
-        "rule": "SELECT `__tmst`, `__empty` FROM `metric_accu` WHERE 
`__record`",
-        "record": {
-          "name": "missRecords",
-          "data.source.cache": "source",
-          "origin.DF": "missRecords"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_completeness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json 
b/measure/src/test/resources/_completeness-batch-griffindsl.json
deleted file mode 100644
index 9c00444..0000000
--- a/measure/src/test/resources/_completeness-batch-griffindsl.json
+++ /dev/null
@@ -1,36 +0,0 @@
-{
-  "name": "comp_batch",
-
-  "process.type": "batch",
-
-  "timestamp": 123456,
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "completeness",
-        "name": "comp",
-        "rule": "email, post_code, first_name",
-        "metric": {
-          "name": "comp"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_completeness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json 
b/measure/src/test/resources/_completeness-streaming-griffindsl.json
deleted file mode 100644
index ba8bdce..0000000
--- a/measure/src/test/resources/_completeness-streaming-griffindsl.json
+++ /dev/null
@@ -1,65 +0,0 @@
-{
-  "name": "comp_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.147.177.107:9092",
-              "group.id": "source",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "test",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
-        "info.path": "source",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["0", "0"],
-        "init.clear": true
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "completeness",
-        "name": "comp",
-        "rule": "name, age",
-        "metric": {
-          "name": "comp"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_distinctness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json 
b/measure/src/test/resources/_distinctness-batch-griffindsl.json
deleted file mode 100644
index af0c91e..0000000
--- a/measure/src/test/resources/_distinctness-batch-griffindsl.json
+++ /dev/null
@@ -1,57 +0,0 @@
-{
-  "name": "dist_batch",
-
-  "process.type": "batch",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
-        }
-      ]
-    },
-    {
-      "name": "target",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "distinct",
-        "name": "dist",
-        "rule": "user_id",
-        "details": {
-          "source": "source",
-          "target": "target",
-          "total": "total",
-          "distinct": "distinct",
-          "dup": "dup",
-          "num": "num",
-          "duplication.array": "dup"
-        },
-        "metric": {
-          "name": "distinct"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_distinctness-batch-griffindsl1.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl1.json 
b/measure/src/test/resources/_distinctness-batch-griffindsl1.json
deleted file mode 100644
index 4d94d8e..0000000
--- a/measure/src/test/resources/_distinctness-batch-griffindsl1.json
+++ /dev/null
@@ -1,73 +0,0 @@
-{
-  "name": "dist_batch",
-
-  "process.type": "batch",
-
-  "timestamp": 123456,
-
-  "data.sources": [
-    {
-      "name": "source",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/dupdata.avro"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${this}"
-            }
-          ]
-        }
-      ]
-    },
-    {
-      "name": "target",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/dupdata.avro"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select DISTINCT name, age from ${this}"
-            }
-          ]
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "distinct",
-        "name": "dist",
-        "rule": "name",
-        "details": {
-          "source": "source",
-          "target": "target",
-          "total": "total",
-          "distinct": "distinct",
-          "dup": "dup",
-          "num": "num",
-          "duplication.array": "dup"
-        },
-        "metric": {
-          "name": "distinct"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_distinctness-batch-griffindsl2.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl2.json 
b/measure/src/test/resources/_distinctness-batch-griffindsl2.json
deleted file mode 100644
index 6a12719..0000000
--- a/measure/src/test/resources/_distinctness-batch-griffindsl2.json
+++ /dev/null
@@ -1,74 +0,0 @@
-{
-  "name": "dist_batch",
-
-  "process.type": "batch",
-
-  "timestamp": 123456,
-
-  "data.sources": [
-    {
-      "name": "source",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/dupdata.avro"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${this}"
-            }
-          ]
-        }
-      ]
-    },
-    {
-      "name": "target",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/dupdata.avro"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select DISTINCT name, age from ${this}"
-            }
-          ]
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "distinct",
-        "name": "dist",
-        "rule": "name, [age]",
-        "details": {
-          "source": "source",
-          "target": "target",
-          "total": "total",
-          "distinct": "distinct",
-          "dup": "dup",
-          "num": "num",
-          "duplication.array": "dup",
-          "record.enable": true
-        },
-        "metric": {
-          "name": "distinct"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_distinctness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-streaming-griffindsl.json 
b/measure/src/test/resources/_distinctness-streaming-griffindsl.json
deleted file mode 100644
index c36e7ba..0000000
--- a/measure/src/test/resources/_distinctness-streaming-griffindsl.json
+++ /dev/null
@@ -1,85 +0,0 @@
-{
-  "name": "dist_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "new",
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
-        "info.path": "new",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["0", "0"],
-        "read.only": true
-      }
-    },
-    {
-      "name": "old",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "old",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "ttt",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
-        "info.path": "old",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["-24h", "0"]
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "distinct",
-        "name": "dist",
-        "rule": "name, age",
-        "details": {
-          "source": "new",
-          "target": "old",
-          "total": "total",
-          "distinct": "distinct",
-          "dup": "dup",
-          "accu_dup": "accu_dup",
-          "num": "num",
-          "duplication.array": "dup"
-        },
-        "metric": {
-          "name": "distinct"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json 
b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
deleted file mode 100644
index 03b0405..0000000
--- a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json
+++ /dev/null
@@ -1,48 +0,0 @@
-{
-  "name": "prof_batch",
-
-  "process.type": "batch",
-
-  "timestamp": 123456,
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "hive",
-          "version": "1.2",
-          "config": {
-            "database": "default",
-            "table.name": "s1"
-          }
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "profiling",
-        "name": "prof",
-        "rule": "name, count(*) as cnt from source group by name",
-        "metric": {
-          "name": "name_group",
-          "collect.type": "array"
-        }
-      },
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "profiling",
-        "name": "grp",
-        "rule": "age, count(*) as cnt from source group by age order by cnt",
-        "metric": {
-          "name": "age_group",
-          "collect.type": "array"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_profiling-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json 
b/measure/src/test/resources/_profiling-batch-griffindsl.json
deleted file mode 100644
index ec082c4..0000000
--- a/measure/src/test/resources/_profiling-batch-griffindsl.json
+++ /dev/null
@@ -1,54 +0,0 @@
-{
-  "name": "prof_batch",
-
-  "process.type": "batch",
-
-  "timestamp": 123456,
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select reg_replace(email, 
'^([^@0-9]+)([0-9]+)@(dc)(?:\\\\.[^@]+)$', '$1@$3') as email, post_code from 
${this}"
-            }
-          ]
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "profiling",
-        "name": "prof",
-        "rule": "email, count(*) as cnt from source group by email",
-        "metric": {
-          "name": "prof",
-          "collect.type": "array"
-        }
-      },
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "profiling",
-        "name": "grp",
-        "rule": "source.post_code, count(*) as cnt from source group by 
source.post_code order by cnt desc",
-        "metric": {
-          "name": "post_group",
-          "collect.type": "array"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_profiling-batch-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-batch-sparksql.json 
b/measure/src/test/resources/_profiling-batch-sparksql.json
deleted file mode 100644
index fdfd812..0000000
--- a/measure/src/test/resources/_profiling-batch-sparksql.json
+++ /dev/null
@@ -1,44 +0,0 @@
-{
-  "name": "prof_batch",
-
-  "process.type": "batch",
-
-  "timestamp": 123456,
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "spark-sql",
-        "name": "prof",
-        "rule": "select count(*) as `cnt`, count(distinct `post_code`) as 
`dis-cnt`, max(user_id) as `max` from source",
-        "metric": {
-          "name": "prof"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "grp",
-        "rule": "select post_code as `pc`, count(*) as `cnt` from source group 
by post_code",
-        "metric": {
-          "name": "post_group",
-          "collect.type": "array"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_profiling-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json 
b/measure/src/test/resources/_profiling-streaming-griffindsl.json
deleted file mode 100644
index b6feb5a..0000000
--- a/measure/src/test/resources/_profiling-streaming-griffindsl.json
+++ /dev/null
@@ -1,75 +0,0 @@
-{
-  "name": "prof_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.147.177.107:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "test",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
-        "info.path": "source",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["0", "0"],
-        "init.clear": true
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "profiling",
-        "name": "prof",
-        "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as 
`min` from source",
-        "metric": {
-          "name": "prof"
-        }
-      },
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "profiling",
-        "name": "grp",
-        "rule": "select name, count(*) as `cnt` from source group by name",
-        "metric": {
-          "name": "name_group",
-          "collect.type": "array"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_profiling-streaming-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-streaming-sparksql.json 
b/measure/src/test/resources/_profiling-streaming-sparksql.json
deleted file mode 100644
index 4f0b0ee..0000000
--- a/measure/src/test/resources/_profiling-streaming-sparksql.json
+++ /dev/null
@@ -1,80 +0,0 @@
-{
-  "name": "prof_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "sss",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
-        "info.path": "source",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["0", "0"]
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "spark-sql",
-        "name": "prof",
-        "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as 
`min` from source",
-        "metric": {
-          "name": "prof"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "grp",
-        "rule": "select name, count(*) as `cnt` from source group by name",
-        "metric": {
-          "name": "name_group",
-          "collect.type": "array"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "tmst_grp",
-        "rule": "select `__tmst`, count(*) as `cnt` from source group by 
`__tmst`",
-        "metric": {
-          "name": "tmst_group"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_timeliness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json 
b/measure/src/test/resources/_timeliness-batch-griffindsl.json
deleted file mode 100644
index 90439df..0000000
--- a/measure/src/test/resources/_timeliness-batch-griffindsl.json
+++ /dev/null
@@ -1,49 +0,0 @@
-{
-  "name": "timeliness_batch",
-
-  "process.type": "batch",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/timeliness_data.avro"
-          }
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "timeliness",
-        "name": "timeliness",
-        "rule": "ts, end_ts",
-        "details": {
-          "source": "source",
-          "latency": "latency",
-          "total": "total",
-          "avg": "avg",
-          "threshold": "3m",
-          "step": "step",
-          "count": "cnt",
-          "step.size": "2m",
-          "percentile": "percentile",
-          "percentile.values": [0.95]
-        },
-        "metric": {
-          "name": "timeliness"
-        },
-        "record": {
-          "name": "lateRecords"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_timeliness-batch-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-batch-sparksql.json 
b/measure/src/test/resources/_timeliness-batch-sparksql.json
deleted file mode 100644
index f9cb368..0000000
--- a/measure/src/test/resources/_timeliness-batch-sparksql.json
+++ /dev/null
@@ -1,52 +0,0 @@
-{
-  "name": "timeliness_batch",
-
-  "process.type": "batch",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/timeliness_data.avro"
-          }
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "spark-sql",
-        "name": "in_time",
-        "rule": "select *, (ts) as `_in_ts`, (end_ts) as `_out_ts` from source 
where (ts) IS NOT NULL"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "lat",
-        "cache": true,
-        "rule": "select *, (`_out_ts` - `_in_ts`) as `latency` from `in_time`"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "metric",
-        "rule": "select cast(avg(`latency`) as bigint) as `avg`, 
max(`latency`) as `max`, min(`latency`) as `min` from `lat`",
-        "metric": {
-          "name": "timeliness"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "slows",
-        "rule": "select * from `lat` where `latency` > 60000",
-        "record": {
-          "name": "lateRecords"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_timeliness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json 
b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
deleted file mode 100644
index 5916e5c..0000000
--- a/measure/src/test/resources/_timeliness-streaming-griffindsl.json
+++ /dev/null
@@ -1,79 +0,0 @@
-{
-  "name": "timeliness_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "fff",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select ts, end_ts, name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
-        "info.path": "source",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["0", "0"]
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "timeliness",
-        "name": "timeliness",
-        "rule": "ts, end_ts",
-        "details": {
-          "source": "source",
-          "latency": "latency",
-          "total": "total",
-          "avg": "avg",
-          "threshold": "1h",
-          "step": "step",
-          "count": "cnt",
-          "step.size": "5m",
-          "percentile": "percentile",
-          "percentile.values": [0.2, 0.5, 0.8]
-        },
-        "metric": {
-          "name": "timeliness"
-        },
-        "record": {
-          "name": "lateRecords"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_timeliness-streaming-sparksql.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-streaming-sparksql.json 
b/measure/src/test/resources/_timeliness-streaming-sparksql.json
deleted file mode 100644
index dc736ab..0000000
--- a/measure/src/test/resources/_timeliness-streaming-sparksql.json
+++ /dev/null
@@ -1,82 +0,0 @@
-{
-  "name": "timeliness_streaming",
-
-  "process.type": "streaming",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "connectors": [
-        {
-          "type": "kafka",
-          "version": "0.8",
-          "config": {
-            "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
-              "group.id": "group1",
-              "auto.offset.reset": "smallest",
-              "auto.commit.enable": "false"
-            },
-            "topics": "fff",
-            "key.type": "java.lang.String",
-            "value.type": "java.lang.String"
-          },
-          "pre.proc": [
-            {
-              "dsl.type": "df-opr",
-              "name": "${s1}",
-              "rule": "from_json",
-              "details": {
-                "df.name": "${this}"
-              }
-            },
-            {
-              "dsl.type": "spark-sql",
-              "name": "${this}",
-              "rule": "select ts, name, age from ${s1}"
-            }
-          ]
-        }
-      ],
-      "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
-        "info.path": "source",
-        "ready.time.interval": "10s",
-        "ready.time.delay": "0",
-        "time.range": ["0", "0"]
-      }
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "spark-sql",
-        "name": "in_time",
-        "rule": "select *, (ts) as `_in_ts` from source where (ts) IS NOT NULL"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "lat",
-        "cache": true,
-        "rule": "select *, (`__tmst` - `_in_ts`) as `latency` from `in_time`"
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "metric",
-        "rule": "select `__tmst`, cast(avg(`latency`) as bigint) as `avg`, 
max(`latency`) as `max`, min(`latency`) as `min` from `lat`",
-        "metric": {
-          "name": "timeliness"
-        }
-      },
-      {
-        "dsl.type": "spark-sql",
-        "name": "slows",
-        "rule": "select * from `lat` where `latency` > 60000",
-        "record": {
-          "name": "lateRecords"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/test/resources/_uniqueness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_uniqueness-batch-griffindsl.json 
b/measure/src/test/resources/_uniqueness-batch-griffindsl.json
deleted file mode 100644
index 28009e8..0000000
--- a/measure/src/test/resources/_uniqueness-batch-griffindsl.json
+++ /dev/null
@@ -1,58 +0,0 @@
-{
-  "name": "unique_batch",
-
-  "process.type": "batch",
-
-  "data.sources": [
-    {
-      "name": "source",
-      "baseline": true,
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
-        }
-      ]
-    },
-    {
-      "name": "target",
-      "connectors": [
-        {
-          "type": "avro",
-          "version": "1.7",
-          "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
-          }
-        }
-      ]
-    }
-  ],
-
-  "evaluate.rule": {
-    "rules": [
-      {
-        "dsl.type": "griffin-dsl",
-        "dq.type": "uniqueness",
-        "name": "dup",
-        "rule": "user_id",
-        "details": {
-          "source": "source",
-          "target": "target",
-          "total": "total",
-          "unique": "unique",
-          "dup": "dup",
-          "num": "num"
-        },
-        "metric": {
-          "name": "unique"
-        },
-        "record": {
-          "name": "dupRecords"
-        }
-      }
-    ]
-  }
-}
\ No newline at end of file

Reply via email to