http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
index e5f72d1..3c20a0e 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -18,11 +18,13 @@ under the License.
 */
 package org.apache.griffin.measure.sink
 
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
+import scala.concurrent.Future
+
 import org.apache.spark.rdd.RDD
 
-import scala.concurrent.Future
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
+import org.apache.griffin.measure.utils.ParamUtil._
+
 
 /**
   * sink metric and record through http request
@@ -38,7 +40,10 @@ case class ElasticSearchSink(config: Map[String, Any], 
metricName: String,
 
   val api = config.getString(Api, "")
   val method = config.getString(Method, "post")
-  val connectionTimeout = 
TimeUtil.milliseconds(config.getString(ConnectionTimeout, "")).getOrElse(-1L)
+
+  val connectionTimeout =
+    TimeUtil.milliseconds(config.getString(ConnectionTimeout, 
"")).getOrElse(-1L)
+
   val retry = config.getInt(Retry, 10)
 
   val _Value = "value"
@@ -55,7 +60,7 @@ case class ElasticSearchSink(config: Map[String, Any], 
metricName: String,
       val data = JsonUtil.toJson(dataMap)
       // http request
       val params = Map[String, Object]()
-      val header = Map[String, Object](("Content-Type","application/json"))
+      val header = Map[String, Object](("Content-Type", "application/json"))
 
       def func(): (Long, Future[Boolean]) = {
         import scala.concurrent.ExecutionContext.Implicits.global

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala 
b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
index 718f1c1..588fabf 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
@@ -20,10 +20,11 @@ package org.apache.griffin.measure.sink
 
 import java.util.Date
 
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
 import org.apache.spark.rdd.RDD
 
+import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
+import org.apache.griffin.measure.utils.ParamUtil._
+
 /**
   * sink metric and record to hdfs
   */
@@ -83,6 +84,7 @@ case class HdfsSink(config: Map[String, Any], metricName: 
String, timeStamp: Lon
       case e: Throwable => error(e.getMessage)
     }
   }
+
   def finish(): Unit = {
     try {
       HdfsUtil.createEmptyFile(FinishFile)
@@ -103,6 +105,7 @@ case class HdfsSink(config: Map[String, Any], metricName: 
String, timeStamp: Lon
   private def getHdfsPath(path: String, groupId: Int): String = {
     HdfsUtil.getHdfsFilePath(path, s"${groupId}")
   }
+
   private def getHdfsPath(path: String, ptnId: Int, groupId: Int): String = {
     HdfsUtil.getHdfsFilePath(path, s"${ptnId}.${groupId}")
   }
@@ -116,7 +119,10 @@ case class HdfsSink(config: Map[String, Any], metricName: 
String, timeStamp: Lon
     clearOldRecords(path)
     try {
       val recordCount = records.count
-      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
+
+      val count =
+        if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
+
       if (count > 0) {
         val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
         if (groupCount <= 1) {
@@ -145,7 +151,10 @@ case class HdfsSink(config: Map[String, Any], metricName: 
String, timeStamp: Lon
     clearOldRecords(path)
     try {
       val recordCount = records.size
-      val count = if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
+
+      val count =
+        if (maxPersistLines < 0) recordCount else 
scala.math.min(maxPersistLines, recordCount)
+
       if (count > 0) {
         val groupCount = (count - 1) / maxLinesPerFile + 1
         if (groupCount <= 1) {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala 
b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
index 206f187..ab59e59 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
@@ -18,14 +18,16 @@ under the License.
 */
 package org.apache.griffin.measure.sink
 
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.TimeUtil
+import scala.concurrent.Future
+
 import org.apache.spark.rdd.RDD
 import org.mongodb.scala._
 import org.mongodb.scala.model.{Filters, UpdateOptions, Updates}
 import org.mongodb.scala.result.UpdateResult
 
-import scala.concurrent.Future
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.TimeUtil
+
 
 /**
   * sink metric and record to mongo
@@ -98,7 +100,7 @@ object MongoConnection {
   var dataConf: MongoConf = _
   private var dataCollection: MongoCollection[Document] = _
 
-  def getDataCollection = dataCollection
+  def getDataCollection : MongoCollection[Document] = dataCollection
 
   def init(config: Map[String, Any]): Unit = {
     if (!initialed) {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala 
b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
index 0c03bd8..9d598fb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.sink
 
-import org.apache.griffin.measure.Loggable
 import org.apache.spark.rdd.RDD
 
+import org.apache.griffin.measure.Loggable
+
 /**
   * sink metric and record
   */

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala 
b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 26b0178..49818f2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -18,10 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.sink
 
+import scala.util.{Success, Try}
+
 import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
 import org.apache.griffin.measure.configuration.enums._
 
-import scala.util.{Success, Try}
 
 
 case class SinkFactory(sinkParams: Iterable[SinkParam], metricName: String) 
extends Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala 
b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
index 1cc3f3e..ca38629 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
@@ -21,12 +21,13 @@ package org.apache.griffin.measure.sink
 import java.util.Date
 import java.util.concurrent.TimeUnit
 
-import org.apache.griffin.measure.Loggable
-
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
 
+import org.apache.griffin.measure.Loggable
+
+
 /**
   * sink task runner, to sink metrics in block or non-block mode
   */
@@ -52,11 +53,11 @@ object SinkTaskRunner extends Loggable {
     val st = new Date().getTime
     val (t, res) = func()
     res.onComplete {
-      case Success(value) => {
+      case Success(value) =>
         val et = new Date().getTime
         info(s"task ${t} success with (${value}) [ using time ${et - st} ms ]")
-      }
-      case Failure(e) => {
+
+      case Failure(e) =>
         val et = new Date().getTime
         warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}")
         if (nextRetry >= 0) {
@@ -65,11 +66,11 @@ object SinkTaskRunner extends Loggable {
         } else {
           error(s"task fails: task ${t} retry ends but fails")
         }
-      }
     }
   }
 
-  private def blockExecute(func: () => (Long, Future[_]), retry: Int, 
waitDuration: Duration): Unit = {
+  private def blockExecute(func: () => (Long, Future[_]),
+                           retry: Int, waitDuration: Duration): Unit = {
     val nextRetry = nextRetryCount(retry)
     val st = new Date().getTime
     val (t, res) = func()
@@ -78,7 +79,7 @@ object SinkTaskRunner extends Loggable {
       val et = new Date().getTime
       info(s"task ${t} success with (${value}) [ using time ${et - st} ms ]")
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         val et = new Date().getTime
         warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}")
         if (nextRetry >= 0) {
@@ -87,7 +88,6 @@ object SinkTaskRunner extends Loggable {
         } else {
           error(s"task fails: task ${t} retry ends but fails")
         }
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
index c37b5a3..5ce2b14 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala
@@ -19,9 +19,10 @@ under the License.
 package org.apache.griffin.measure.step.builder
 
 import org.apache.commons.lang.StringUtils
+
 import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.{DataSourceParam, 
Param, RuleParam}
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step._
 
@@ -49,7 +50,8 @@ object DQStepBuilder {
       .flatMap(_.buildDQStep(context, dsParam))
   }
 
-  private def getDataSourceParamStepBuilder(procType: ProcessType): 
Option[DataSourceParamStepBuilder] = {
+  private def getDataSourceParamStepBuilder(procType: ProcessType)
+  : Option[DataSourceParamStepBuilder] = {
     procType match {
       case BatchProcessType => Some(BatchDataSourceStepBuilder())
       case StreamingProcessType => Some(StreamingDataSourceStepBuilder())
@@ -64,7 +66,9 @@ object DQStepBuilder {
     val funcNames = context.functionNames
     val dqStepOpt = getRuleParamStepBuilder(dslType, dsNames, funcNames)
       .flatMap(_.buildDQStep(context, ruleParam))
-    dqStepOpt.toSeq.flatMap(_.getNames).foreach(name => 
context.compileTableRegister.registerTable(name))
+    dqStepOpt.toSeq.flatMap(_.getNames).foreach(name =>
+      context.compileTableRegister.registerTable(name)
+    )
     dqStepOpt
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
index d3c0e41..aa43cf6 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
@@ -24,7 +24,6 @@ import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.dsl.parser.GriffinDslParser
 import org.apache.griffin.measure.step.builder.dsl.transform.Expr2DQSteps
 
-import scala.util.{Failure, Success}
 
 case class GriffinDslDQStepBuilder(dataSourceNames: Seq[String],
                                    functionNames: Seq[String]
@@ -50,10 +49,9 @@ case class GriffinDslDQStepBuilder(dataSourceNames: 
Seq[String],
         Nil
       }
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"generate rule plan ${name} fails: ${e.getMessage}")
         Nil
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
index 5a04c11..4af3ceb 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -18,11 +18,11 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder
 
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
-import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, 
MetricWriteStep, RecordWriteStep}
 import org.apache.griffin.measure.step.{DQStep, SeqDQStep}
+import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, 
MetricWriteStep, RecordWriteStep}
 
 /**
   * build dq step by rule param

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
index 17e678e..1c04e75 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala
@@ -35,7 +35,8 @@ case class SelectClause(exprs: Seq[Expr], extraConditionOpt: 
Option[ExtraConditi
   def coalesceDesc: String = desc
 
   override def map(func: (Expr) => Expr): SelectClause = {
-    SelectClause(exprs.map(func(_)), 
extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr]))
+    SelectClause(exprs.map(func(_)),
+      extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr]))
   }
 
 }
@@ -81,11 +82,10 @@ case class GroupbyClause(exprs: Seq[Expr], havingClauseOpt: 
Option[Expr]) extend
 
   def merge(other: GroupbyClause): GroupbyClause = {
     val newHavingClauseOpt = (havingClauseOpt, other.havingClauseOpt) match {
-      case (Some(hc), Some(ohc)) => {
+      case (Some(hc), Some(ohc)) =>
         val logical1 = LogicalFactorExpr(hc, false, None)
         val logical2 = LogicalFactorExpr(ohc, false, None)
         Some(BinaryLogicalExpr(logical1, ("AND", logical2) :: Nil))
-      }
       case (a @ Some(_), _) => a
       case (_, b @ Some(_)) => b
       case (_, _) => None
@@ -250,7 +250,8 @@ case class DistinctnessClause(exprs: Seq[Expr]) extends 
ClauseExpression {
 
   def desc: String = exprs.map(_.desc).mkString(", ")
   def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
-  override def map(func: (Expr) => Expr): DistinctnessClause = 
DistinctnessClause(exprs.map(func(_)))
+  override def map(func: (Expr) => Expr) : DistinctnessClause =
+    DistinctnessClause(exprs.map(func(_)))
 }
 
 case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression {
@@ -266,5 +267,6 @@ case class CompletenessClause(exprs: Seq[Expr]) extends 
ClauseExpression {
 
   def desc: String = exprs.map(_.desc).mkString(", ")
   def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ")
-  override def map(func: (Expr) => Expr): CompletenessClause = 
CompletenessClause(exprs.map(func(_)))
-}
\ No newline at end of file
+  override def map(func: (Expr) => Expr): CompletenessClause =
+    CompletenessClause(exprs.map(func(_)))
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
index 13317bb..426dbbb 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala
@@ -185,7 +185,8 @@ case class UnaryLogicalExpr(oprs: Seq[String], factor: 
LogicalExpr) extends Logi
   }
 }
 
-case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, 
LogicalExpr)]) extends LogicalExpr {
+case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, 
LogicalExpr)])
+  extends LogicalExpr {
 
   addChildren(factor +: tails.map(_._2))
 
@@ -220,4 +221,4 @@ case class BinaryLogicalExpr(factor: LogicalExpr, tails: 
Seq[(String, LogicalExp
       (pair._1, func(pair._2).asInstanceOf[LogicalExpr])
     })
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
index b64dae2..c4f80d1 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala
@@ -96,17 +96,17 @@ case class FunctionSelectExpr(functionName: String, args: 
Seq[Expr]) extends Sel
 
 // -------------
 
-case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: 
Option[String]) extends SelectExpr {
+case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: 
Option[String])
+  extends SelectExpr {
 
   addChildren(head +: selectors)
 
   def desc: String = {
     selectors.foldLeft(head.desc) { (hd, sel) =>
       sel match {
-        case FunctionSelectExpr(funcName, args) => {
+        case FunctionSelectExpr(funcName, args) =>
           val nargs = hd +: args.map(_.desc)
           s"${funcName}(${nargs.mkString(", ")})"
-        }
         case _ => s"${hd}${sel.desc}"
       }
     }
@@ -129,4 +129,4 @@ case class SelectionExpr(head: HeadExpr, selectors: 
Seq[SelectExpr], aliasOpt: O
     SelectionExpr(func(head).asInstanceOf[HeadExpr],
       selectors.map(func(_).asInstanceOf[SelectExpr]), aliasOpt)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
index 4ce0fe6..b10fd57 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala
@@ -24,13 +24,15 @@ trait TreeNode extends Serializable {
 
   var children = Seq[TreeNode]()
 
-  def addChild(expr: TreeNode) = { children :+= expr }
-  def addChildren(exprs: Seq[TreeNode]) = { children ++= exprs }
+  def addChild(expr: TreeNode) : Unit = { children :+= expr }
+  def addChildren(exprs: Seq[TreeNode]) : Unit = { children ++= exprs }
 
-  def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, 
combOp: (T, T) => T)(implicit tag: ClassTag[A]): T = {
+  def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)
+                                                  (seqOp: (A, T) => T, combOp: 
(T, T) => T)
+                                                  (implicit tag: ClassTag[A]): 
T = {
 
     val clazz = tag.runtimeClass
-    if(clazz.isAssignableFrom(this.getClass)){
+    if(clazz.isAssignableFrom(this.getClass)) {
       val tv = seqOp(this.asInstanceOf[A], z)
       children.foldLeft(combOp(z, tv)) { (ov, tn) =>
         combOp(ov, tn.preOrderTraverseDepthFirst(z)(seqOp, combOp))
@@ -41,16 +43,18 @@ trait TreeNode extends Serializable {
     }
 
   }
-  def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, 
combOp: (T, T) => T)(implicit tag: ClassTag[A]): T = {
+  def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)
+                                                   (seqOp: (A, T) => T, 
combOp: (T, T) => T)
+                                                   (implicit tag: 
ClassTag[A]): T = {
 
     val clazz = tag.runtimeClass
-    if(clazz.isAssignableFrom(this.getClass)){
+    if(clazz.isAssignableFrom(this.getClass)) {
       val cv = children.foldLeft(z) { (ov, tn) =>
         combOp(ov, tn.postOrderTraverseDepthFirst(z)(seqOp, combOp))
       }
       combOp(z, seqOp(this.asInstanceOf[A], cv))
     }
-    else{
+    else {
       z
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
index 18f7754..fe6678d 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.parser
 
+import scala.util.parsing.combinator.JavaTokenParsers
+
 import org.apache.griffin.measure.step.builder.dsl.expr._
 
-import scala.util.parsing.combinator.JavaTokenParsers
 
 /**
   * basic parser for sql like syntax
@@ -388,10 +389,9 @@ trait BasicParser extends JavaTokenParsers with 
Serializable {
 
   def combinedClause: Parser[CombinedClause] = selectClause ~ opt(fromClause) 
~ opt(whereClause) ~
     opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ {
-    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
+    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt =>
       val tails = Seq(whereOpt, groupbyOpt, orderbyOpt, limitOpt).flatMap(opt 
=> opt)
       CombinedClause(sel, fromOpt, tails)
-    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
index 2cd638c..77ed987 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala
@@ -37,11 +37,10 @@ case class GriffinDslParser(dataSourceNames: Seq[String], 
functionNames: Seq[Str
 
   def profilingClause: Parser[ProfilingClause] = selectClause ~ 
opt(fromClause) ~ opt(whereClause) ~
     opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ {
-    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
+    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt =>
       val preClauses = Seq(whereOpt).flatMap(opt => opt)
       val postClauses = Seq(orderbyOpt, limitOpt).flatMap(opt => opt)
       ProfilingClause(sel, fromOpt, groupbyOpt, preClauses, postClauses)
-    }
   }
 
   /**
@@ -59,7 +58,7 @@ case class GriffinDslParser(dataSourceNames: Seq[String], 
functionNames: Seq[Str
     * <distinctness-clauses> = <distExpr> [, <distExpr>]+
     */
   def sqbrExpr: Parser[Expr] = LSQBR ~> expression <~ RSQBR ^^ {
-    case expr => { expr.tag = "[]"; expr}
+    case expr => expr.tag = "[]"; expr
   }
   def distExpr: Parser[Expr] = expression | sqbrExpr
   def distinctnessClause: Parser[DistinctnessClause] = rep1sep(distExpr, 
Operator.COMMA) ^^ {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index b97a1a0..3bf7d04 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -18,15 +18,15 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
 import org.apache.griffin.measure.step.builder.dsl.expr._
 import 
org.apache.griffin.measure.step.builder.dsl.transform.analyzer.AccuracyAnalyzer
-import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys
 import org.apache.griffin.measure.step.transform.{DataFrameOps, 
DataFrameOpsTransformStep, SparkSqlTransformStep}
+import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys
 import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, 
MetricWriteStep, RecordWriteStep}
 import org.apache.griffin.measure.utils.ParamUtil._
 
@@ -77,30 +77,37 @@ case class AccuracyExpr2DQSteps(context: DQContext,
           s"${sel.desc} IS NULL"
         }.mkString(" AND ")
         val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})"
-        s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` 
ON ${onClause} WHERE ${whereClause}"
+        s"SELECT ${selClause} FROM `${sourceName}` " +
+          s"LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}"
       }
-      val missRecordsTransStep = SparkSqlTransformStep(missRecordsTableName, 
missRecordsSql, emptyMap, true)
+      val missRecordsTransStep =
+        SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, 
true)
+
       val missRecordsWriteSteps = procType match {
-        case BatchProcessType => {
-          val rwName = 
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
+        case BatchProcessType =>
+          val rwName =
+            ruleParam.getOutputOpt(RecordOutputType).
+              flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
           RecordWriteStep(rwName, missRecordsTableName) :: Nil
-        }
         case StreamingProcessType => Nil
       }
       val missRecordsUpdateWriteSteps = procType match {
         case BatchProcessType => Nil
-        case StreamingProcessType => {
-          val dsName = 
ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
+        case StreamingProcessType =>
+          val dsName =
+            
ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
           DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil
-        }
       }
 
       // 2. miss count
       val missCountTableName = "__missCount"
       val missColName = details.getStringOrKey(_miss)
       val missCountSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM 
`${missRecordsTableName}`"
-        case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, 
COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY 
`${ConstantColumns.tmst}`"
+        case BatchProcessType =>
+          s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`"
+        case StreamingProcessType =>
+          s"SELECT `${ConstantColumns.tmst}`,COUNT(*) AS `${missColName}` " +
+            s"FROM `${missRecordsTableName}` GROUP BY 
`${ConstantColumns.tmst}`"
       }
       val missCountTransStep = SparkSqlTransformStep(missCountTableName, 
missCountSql, emptyMap)
 
@@ -109,7 +116,9 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val totalColName = details.getStringOrKey(_total)
       val totalCountSql = procType match {
         case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
-        case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, 
COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY 
`${ConstantColumns.tmst}`"
+        case StreamingProcessType =>
+          s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` " +
+            s"FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`"
       }
       val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, 
totalCountSql, emptyMap)
 
@@ -117,15 +126,14 @@ case class AccuracyExpr2DQSteps(context: DQContext,
       val accuracyTableName = ruleParam.getOutDfName()
       val matchedColName = details.getStringOrKey(_matched)
       val accuracyMetricSql = procType match {
-        case BatchProcessType => {
+        case BatchProcessType =>
           s"""
              |SELECT `${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
              |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS 
`${missColName}`,
              |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}`
              |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
          """.stripMargin
-        }
-        case StreamingProcessType => {
+        case StreamingProcessType =>
           s"""
              |SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS 
`${ConstantColumns.tmst}`,
              |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
@@ -134,27 +142,26 @@ case class AccuracyExpr2DQSteps(context: DQContext,
              |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}`
              |ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = 
`${missCountTableName}`.`${ConstantColumns.tmst}`
          """.stripMargin
-        }
       }
       val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, 
accuracyMetricSql, emptyMap)
       val accuracyMetricWriteSteps = procType match {
-        case BatchProcessType => {
+        case BatchProcessType =>
           val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
           val mwName = 
metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName())
           val flattenType = 
metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
           MetricWriteStep(mwName, accuracyTableName, flattenType) :: Nil
-        }
         case StreamingProcessType => Nil
       }
 
       // accuracy current steps
       val transSteps1 = missRecordsTransStep :: missCountTransStep :: 
totalCountTransStep :: accuracyTransStep :: Nil
-      val writeSteps1 = accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ 
missRecordsUpdateWriteSteps
+      val writeSteps1 =
+        accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ 
missRecordsUpdateWriteSteps
 
       // streaming extra steps
       val (transSteps2, writeSteps2) = procType match {
         case BatchProcessType => (Nil, Nil)
-        case StreamingProcessType => {
+        case StreamingProcessType =>
           // 5. accuracy metric merge
           val accuracyMetricTableName = "__accuracy"
           val accuracyMetricRule = DataFrameOps._accuracy
@@ -183,14 +190,16 @@ case class AccuracyExpr2DQSteps(context: DQContext,
           val accuracyRecordTransStep = SparkSqlTransformStep(
             accuracyRecordTableName, accuracyRecordSql, emptyMap)
           val accuracyRecordWriteStep = {
-            val rwName = 
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
+            val rwName =
+              ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+                .getOrElse(missRecordsTableName)
+
             RecordWriteStep(rwName, missRecordsTableName, 
Some(accuracyRecordTableName))
           }
 
           // extra steps
           (accuracyMetricTransStep :: accuracyRecordTransStep :: Nil,
             accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil)
-        }
       }
 
       // full steps

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 4852a5b..87cfa86 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -18,8 +18,8 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -80,16 +80,23 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       val sourceAliasSql = {
         s"SELECT ${selClause} FROM `${sourceName}`"
       }
-      val sourceAliasTransStep = SparkSqlTransformStep(sourceAliasTableName, 
sourceAliasSql, emptyMap, true)
+      val sourceAliasTransStep =
+        SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, 
true)
 
       // 2. incomplete record
       val incompleteRecordsTableName = "__incompleteRecords"
       val completeWhereClause = aliases.map(a => s"`${a}` IS NOT 
NULL").mkString(" AND ")
       val incompleteWhereClause = s"NOT (${completeWhereClause})"
-      val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` 
WHERE ${incompleteWhereClause}"
-      val incompleteRecordTransStep = 
SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, 
emptyMap, true)
+
+      val incompleteRecordsSql =
+        s"SELECT * FROM `${sourceAliasTableName}` WHERE 
${incompleteWhereClause}"
+
+      val incompleteRecordTransStep =
+        SparkSqlTransformStep(incompleteRecordsTableName, 
incompleteRecordsSql, emptyMap, true)
       val incompleteRecordWriteStep = {
-        val rwName = 
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(incompleteRecordsTableName)
+        val rwName =
+          ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+            .getOrElse(incompleteRecordsTableName)
         RecordWriteStep(rwName, incompleteRecordsTableName)
       }
 
@@ -97,17 +104,24 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       val incompleteCountTableName = "__incompleteCount"
       val incompleteColName = details.getStringOrKey(_incomplete)
       val incompleteCountSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` 
FROM `${incompleteRecordsTableName}`"
-        case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, 
COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}` GROUP 
BY `${ConstantColumns.tmst}`"
+        case BatchProcessType =>
+          s"SELECT COUNT(*) AS `${incompleteColName}` FROM 
`${incompleteRecordsTableName}`"
+        case StreamingProcessType =>
+          s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS 
`${incompleteColName}` " +
+            s"FROM `${incompleteRecordsTableName}` GROUP BY 
`${ConstantColumns.tmst}`"
       }
-      val incompleteCountTransStep = 
SparkSqlTransformStep(incompleteCountTableName, incompleteCountSql, emptyMap)
+      val incompleteCountTransStep =
+        SparkSqlTransformStep(incompleteCountTableName, incompleteCountSql, 
emptyMap)
 
       // 4. total count
       val totalCountTableName = "__totalCount"
       val totalColName = details.getStringOrKey(_total)
       val totalCountSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceAliasTableName}`"
-        case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, 
COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}` GROUP BY 
`${ConstantColumns.tmst}`"
+        case BatchProcessType =>
+          s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceAliasTableName}`"
+        case StreamingProcessType =>
+          s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` " +
+            s"FROM `${sourceAliasTableName}` GROUP BY 
`${ConstantColumns.tmst}`"
       }
       val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, 
totalCountSql, emptyMap)
 
@@ -115,15 +129,14 @@ case class CompletenessExpr2DQSteps(context: DQContext,
       val completeTableName = ruleParam.getOutDfName()
       val completeColName = details.getStringOrKey(_complete)
       val completeMetricSql = procType match {
-        case BatchProcessType => {
+        case BatchProcessType =>
           s"""
              |SELECT `${totalCountTableName}`.`${totalColName}` AS 
`${totalColName}`,
              |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 
0) AS `${incompleteColName}`,
              |(`${totalCountTableName}`.`${totalColName}` - 
coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS 
`${completeColName}`
              |FROM `${totalCountTableName}` LEFT JOIN 
`${incompleteCountTableName}`
          """.stripMargin
-        }
-        case StreamingProcessType => {
+        case StreamingProcessType =>
           s"""
              |SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS 
`${ConstantColumns.tmst}`,
              |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`,
@@ -132,7 +145,6 @@ case class CompletenessExpr2DQSteps(context: DQContext,
              |FROM `${totalCountTableName}` LEFT JOIN 
`${incompleteCountTableName}`
              |ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = 
`${incompleteCountTableName}`.`${ConstantColumns.tmst}`
          """.stripMargin
-        }
       }
       val completeTransStep = SparkSqlTransformStep(completeTableName, 
completeMetricSql, emptyMap)
       val completeWriteStep = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 6c56a77..70fee6c 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -18,8 +18,8 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -101,7 +101,8 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
       val sourceAliasSql = {
         s"SELECT ${selClause} FROM `${sourceName}`"
       }
-      val sourceAliasTransStep = SparkSqlTransformStep(sourceAliasTableName, 
sourceAliasSql, emptyMap, true)
+      val sourceAliasTransStep =
+        SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, 
true)
 
       // 2. total metric
       val totalTableName = "__totalMetric"
@@ -125,13 +126,14 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
            |FROM `${sourceAliasTableName}` GROUP BY ${distAliasesClause}
           """.stripMargin
       }
-      val selfGroupTransStep = SparkSqlTransformStep(selfGroupTableName, 
selfGroupSql, emptyMap, true)
+      val selfGroupTransStep =
+        SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true)
 
       val transSteps1 = sourceAliasTransStep :: totalTransStep :: 
selfGroupTransStep :: Nil
       val writeSteps1 = totalMetricWriteStep :: Nil
 
       val ((transSteps2, writeSteps2), dupCountTableName) = procType match {
-        case StreamingProcessType if (withOlderTable) => {
+        case StreamingProcessType if (withOlderTable) =>
           // 4.0 update old data
           val targetDsUpdateWriteStep = DataSourceUpdateWriteStep(targetName, 
targetName)
 
@@ -200,14 +202,14 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
                |FROM `${groupTableName}`
              """.stripMargin
           }
-          val finalDupCountTransStep = 
SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true)
+          val finalDupCountTransStep =
+            SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, 
emptyMap, true)
 
-          ((olderAliasTransStep :: joinedTransStep :: groupTransStep :: 
finalDupCountTransStep :: Nil,
+          ((olderAliasTransStep :: joinedTransStep
+            :: groupTransStep :: finalDupCountTransStep :: Nil,
             targetDsUpdateWriteStep :: Nil), finalDupCountTableName)
-        }
-        case _ => {
+        case _ =>
           ((Nil, Nil), selfGroupTableName)
-        }
       }
 
       // 8. distinct metric
@@ -285,9 +287,13 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
                |FROM `${dupItemsTableName}` GROUP BY ${groupSelClause}, 
`${dupColName}`
              """.stripMargin
           }
-          val groupDupMetricTransStep = 
SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap)
+          val groupDupMetricTransStep =
+            SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, 
emptyMap)
           val groupDupMetricWriteStep = {
-            MetricWriteStep(duplicationArrayName, groupDupMetricTableName, 
ArrayFlattenType, writeTimestampOpt)
+            MetricWriteStep(duplicationArrayName,
+              groupDupMetricTableName,
+              ArrayFlattenType,
+              writeTimestampOpt)
           }
 
           val msteps = {
@@ -306,7 +312,9 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
           // 9. duplicate record
           val dupRecordTableName = "__dupRecords"
           val dupRecordSelClause = procType match {
-            case StreamingProcessType if (withOlderTable) => 
s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`"
+            case StreamingProcessType if (withOlderTable) =>
+              s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`"
+
             case _ => s"${distAliasesClause}, `${dupColName}`"
           }
           val dupRecordSql = {
@@ -315,9 +323,14 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
                |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
               """.stripMargin
           }
-          val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, 
dupRecordSql, emptyMap, true)
+          val dupRecordTransStep =
+            SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, 
true)
+
           val dupRecordWriteStep = {
-            val rwName = 
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
+            val rwName =
+              ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+                .getOrElse(dupRecordTableName)
+
             RecordWriteStep(rwName, dupRecordTableName, None, 
writeTimestampOpt)
           }
 
@@ -332,7 +345,12 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
           }
           val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, 
dupMetricSql, emptyMap)
           val dupMetricWriteStep = {
-            MetricWriteStep(duplicationArrayName, dupMetricTableName, 
ArrayFlattenType, writeTimestampOpt)
+            MetricWriteStep(
+              duplicationArrayName,
+              dupMetricTableName,
+              ArrayFlattenType,
+              writeTimestampOpt
+            )
           }
 
           val msteps = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
index d986b56..492f4fd 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala
@@ -19,8 +19,8 @@ under the License.
 package org.apache.griffin.measure.step.builder.dsl.transform
 
 import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.dsl.expr.Expr

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index ecc115c..af493af 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -19,8 +19,9 @@ under the License.
 package org.apache.griffin.measure.step.builder.dsl.transform
 
 import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.configuration.enums.{BatchProcessType, 
FlattenType, MetricOutputType, StreamingProcessType}
+
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums.{BatchProcessType, 
FlattenType, MetricOutputType, StreamingProcessType}
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -63,7 +64,9 @@ case class ProfilingExpr2DQSteps(context: DQContext,
       val analyzer = ProfilingAnalyzer(profilingExpr, sourceName)
       val selExprDescs = analyzer.selectionExprs.map { sel =>
         val alias = sel match {
-          case s: AliasableExpr => 
s.alias.filter(StringUtils.isNotEmpty).map(a => s" AS `${a}`").getOrElse("")
+          case s: AliasableExpr =>
+            s.alias.filter(StringUtils.isNotEmpty).map(a => s" AS 
`${a}`").getOrElse("")
+
           case _ => ""
         }
         s"${sel.desc}${alias}"
@@ -76,21 +79,22 @@ case class ProfilingExpr2DQSteps(context: DQContext,
       val groupByClauseOpt = analyzer.groupbyExprOpt
       val groupbyClause = procType match {
         case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("")
-        case StreamingProcessType => {
-          val tmstGroupbyClause = 
GroupbyClause(LiteralStringExpr(s"`${ConstantColumns.tmst}`") :: Nil, None)
+        case StreamingProcessType =>
+          val tmstGroupbyClause =
+            GroupbyClause(LiteralStringExpr(s"`${ConstantColumns.tmst}`") :: 
Nil, None)
           val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt 
match {
             case Some(gbc) => gbc
             case _ => GroupbyClause(Nil, None)
           })
           mergedGroubbyClause.desc
-        }
       }
       val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ")
       val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" 
")
 
       // 1. select statement
       val profilingSql = {
-        s"SELECT ${selCondition} ${selClause} ${fromClause} 
${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
+        s"SELECT ${selCondition} ${selClause} " +
+          s"${fromClause} ${preGroupbyClause} ${groupbyClause} 
${postGroupbyClause}"
       }
       val profilingName = ruleParam.getOutDfName()
       val profilingTransStep = SparkSqlTransformStep(profilingName, 
profilingSql, details)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index 8a3924b..3731da9 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -18,8 +18,8 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -79,19 +79,17 @@ case class TimelinessExpr2DQSteps(context: DQContext,
       // 1. in time
       val inTimeTableName = "__inTime"
       val inTimeSql = etsSelOpt match {
-        case Some(etsSel) => {
+        case Some(etsSel) =>
           s"""
              |SELECT *, (${btsSel}) AS `${ConstantColumns.beginTs}`,
              |(${etsSel}) AS `${ConstantColumns.endTs}`
              |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) 
IS NOT NULL
            """.stripMargin
-        }
-        case _ => {
+        case _ =>
           s"""
              |SELECT *, (${btsSel}) AS `${ConstantColumns.beginTs}`
              |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL
            """.stripMargin
-        }
       }
       val inTimeTransStep = SparkSqlTransformStep(inTimeTableName, inTimeSql, 
emptyMap)
 
@@ -103,7 +101,8 @@ case class TimelinessExpr2DQSteps(context: DQContext,
         case _ => ConstantColumns.tmst
       }
       val latencySql = {
-        s"SELECT *, (`${etsColName}` - `${ConstantColumns.beginTs}`) AS 
`${latencyColName}` FROM `${inTimeTableName}`"
+        s"SELECT *, (`${etsColName}` - `${ConstantColumns.beginTs}`) AS 
`${latencyColName}` " +
+          s"FROM `${inTimeTableName}`"
       }
       val latencyTransStep = SparkSqlTransformStep(latencyTableName, 
latencySql, emptyMap, true)
 
@@ -112,14 +111,15 @@ case class TimelinessExpr2DQSteps(context: DQContext,
       val totalColName = details.getStringOrKey(_total)
       val avgColName = details.getStringOrKey(_avg)
       val metricSql = procType match {
-        case BatchProcessType => {
+
+        case BatchProcessType =>
           s"""
              |SELECT COUNT(*) AS `${totalColName}`,
              |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
              |FROM `${latencyTableName}`
            """.stripMargin
-        }
-        case StreamingProcessType => {
+
+        case StreamingProcessType =>
           s"""
              |SELECT `${ConstantColumns.tmst}`,
              |COUNT(*) AS `${totalColName}`,
@@ -127,7 +127,6 @@ case class TimelinessExpr2DQSteps(context: DQContext,
              |FROM `${latencyTableName}`
              |GROUP BY `${ConstantColumns.tmst}`
            """.stripMargin
-        }
       }
       val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, 
emptyMap)
       val metricWriteStep = {
@@ -143,24 +142,26 @@ case class TimelinessExpr2DQSteps(context: DQContext,
 
       // 4. timeliness record
       val (transSteps2, writeSteps2) = 
TimeUtil.milliseconds(details.getString(_threshold, "")) match {
-        case Some(tsh) => {
+        case Some(tsh) =>
           val recordTableName = "__lateRecords"
           val recordSql = {
             s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > 
${tsh}"
           }
           val recordTransStep = SparkSqlTransformStep(recordTableName, 
recordSql, emptyMap)
           val recordWriteStep = {
-            val rwName = 
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(recordTableName)
+            val rwName =
+              ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+                .getOrElse(recordTableName)
+
             RecordWriteStep(rwName, recordTableName, None)
           }
           (recordTransStep :: Nil, recordWriteStep :: Nil)
-        }
         case _ => (Nil, Nil)
       }
 
       // 5. ranges
       val (transSteps3, writeSteps3) = 
TimeUtil.milliseconds(details.getString(_stepSize, "")) match {
-        case Some(stepSize) => {
+        case Some(stepSize) =>
           // 5.1 range
           val rangeTableName = "__range"
           val stepColName = details.getStringOrKey(_step)
@@ -176,26 +177,24 @@ case class TimelinessExpr2DQSteps(context: DQContext,
           val rangeMetricTableName = "__rangeMetric"
           val countColName = details.getStringOrKey(_count)
           val rangeMetricSql = procType match {
-            case BatchProcessType => {
+            case BatchProcessType =>
               s"""
                  |SELECT `${stepColName}`, COUNT(*) AS `${countColName}`
                  |FROM `${rangeTableName}` GROUP BY `${stepColName}`
                 """.stripMargin
-            }
-            case StreamingProcessType => {
+            case StreamingProcessType =>
               s"""
                  |SELECT `${ConstantColumns.tmst}`, `${stepColName}`, COUNT(*) 
AS `${countColName}`
                  |FROM `${rangeTableName}` GROUP BY `${ConstantColumns.tmst}`, 
`${stepColName}`
                 """.stripMargin
-            }
           }
-          val rangeMetricTransStep = 
SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap)
+          val rangeMetricTransStep =
+            SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, 
emptyMap)
           val rangeMetricWriteStep = {
             MetricWriteStep(stepColName, rangeMetricTableName, 
ArrayFlattenType)
           }
 
           (rangeTransStep :: rangeMetricTransStep :: Nil, rangeMetricWriteStep 
:: Nil)
-        }
         case _ => (Nil, Nil)
       }
 
@@ -206,7 +205,8 @@ case class TimelinessExpr2DQSteps(context: DQContext,
         val percentileColName = details.getStringOrKey(_percentileColPrefix)
         val percentileCols = percentiles.map { pct =>
           val pctName = (pct * 100).toInt.toString
-          s"floor(percentile_approx(${latencyColName}, ${pct})) AS 
`${percentileColName}_${pctName}`"
+          s"floor(percentile_approx(${latencyColName}, ${pct})) " +
+            s"AS `${percentileColName}_${pctName}`"
         }.mkString(", ")
         val percentileSql = {
           s"""
@@ -214,7 +214,9 @@ case class TimelinessExpr2DQSteps(context: DQContext,
              |FROM `${latencyTableName}`
             """.stripMargin
         }
-        val percentileTransStep = SparkSqlTransformStep(percentileTableName, 
percentileSql, emptyMap)
+        val percentileTransStep =
+          SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap)
+
         val percentileWriteStep = {
           MetricWriteStep(percentileTableName, percentileTableName, 
DefaultFlattenType)
         }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index 443239c..28e9d48 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -18,8 +18,8 @@ under the License.
 */
 package org.apache.griffin.measure.step.builder.dsl.transform
 
-import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
 import org.apache.griffin.measure.step.builder.ConstantColumns
@@ -112,7 +112,8 @@ case class UniquenessExpr2DQSteps(context: DQContext,
       }.mkString(", ")
       val dupColName = details.getStringOrKey(_dup)
       val groupSql = {
-        s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM 
`${joinedTableName}` GROUP BY ${groupSelClause}"
+        s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` " +
+          s"FROM `${joinedTableName}` GROUP BY ${groupSelClause}"
       }
       val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, 
emptyMap, true)
 
@@ -121,12 +122,11 @@ case class UniquenessExpr2DQSteps(context: DQContext,
       val totalColName = details.getStringOrKey(_total)
       val totalSql = procType match {
         case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
-        case StreamingProcessType => {
+        case StreamingProcessType =>
           s"""
              |SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}`
              |FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`
            """.stripMargin
-        }
       }
       val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, 
emptyMap)
       val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, 
EntriesFlattenType)
@@ -136,22 +136,25 @@ case class UniquenessExpr2DQSteps(context: DQContext,
       val uniqueRecordSql = {
         s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0"
       }
-      val uniqueRecordTransStep = SparkSqlTransformStep(uniqueRecordTableName, 
uniqueRecordSql, emptyMap)
+      val uniqueRecordTransStep =
+        SparkSqlTransformStep(uniqueRecordTableName, uniqueRecordSql, emptyMap)
 
       // 7. unique metric
       val uniqueTableName = "__uniqueMetric"
       val uniqueColName = details.getStringOrKey(_unique)
       val uniqueSql = procType match {
-        case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM 
`${uniqueRecordTableName}`"
-        case StreamingProcessType => {
+        case BatchProcessType =>
+          s"SELECT COUNT(*) AS `${uniqueColName}` FROM 
`${uniqueRecordTableName}`"
+        case StreamingProcessType =>
           s"""
              |SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${uniqueColName}`
              |FROM `${uniqueRecordTableName}` GROUP BY 
`${ConstantColumns.tmst}`
            """.stripMargin
-        }
       }
       val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, 
emptyMap)
-      val uniqueMetricWriteStep = MetricWriteStep(uniqueColName, 
uniqueTableName, EntriesFlattenType)
+
+      val uniqueMetricWriteStep =
+        MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
 
       val transSteps1 = sourceTransStep :: targetTransStep :: joinedTransStep 
:: groupTransStep ::
         totalTransStep :: uniqueRecordTransStep :: uniqueTransStep :: Nil
@@ -164,9 +167,14 @@ case class UniquenessExpr2DQSteps(context: DQContext,
         val dupRecordSql = {
           s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0"
         }
-        val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, 
dupRecordSql, emptyMap, true)
+        val dupRecordTransStep =
+          SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, 
true)
+
         val dupRecordWriteStep = {
-          val rwName = 
ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
+          val rwName =
+            ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt)
+              .getOrElse(dupRecordTableName)
+
           RecordWriteStep(rwName, dupRecordTableName)
         }
 
@@ -175,7 +183,9 @@ case class UniquenessExpr2DQSteps(context: DQContext,
         val numColName = details.getStringOrKey(_num)
         val dupMetricSelClause = procType match {
           case BatchProcessType => s"`${dupColName}`, COUNT(*) AS 
`${numColName}`"
-          case StreamingProcessType => s"`${ConstantColumns.tmst}`, 
`${dupColName}`, COUNT(*) AS `${numColName}`"
+
+          case StreamingProcessType =>
+            s"`${ConstantColumns.tmst}`, `${dupColName}`, COUNT(*) AS 
`${numColName}`"
         }
         val dupMetricGroupbyClause = procType match {
           case BatchProcessType => s"`${dupColName}`"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
index b2a95ce..d57bf23 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala
@@ -21,9 +21,11 @@ package 
org.apache.griffin.measure.step.builder.dsl.transform.analyzer
 import org.apache.griffin.measure.step.builder.dsl.expr._
 
 
-case class AccuracyAnalyzer(expr: LogicalExpr, sourceName: String, targetName: 
String) extends BasicAnalyzer {
+case class AccuracyAnalyzer(expr: LogicalExpr, sourceName: String, targetName: 
String)
+  extends BasicAnalyzer {
 
-  val dataSourceNames = 
expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, 
combDataSourceNames)
+  val dataSourceNames =
+    expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, 
combDataSourceNames)
 
   val sourceSelectionExprs = {
     val seq = seqSelectionExprs(sourceName)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
index f8b872a..5a73bce 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala
@@ -21,7 +21,8 @@ package 
org.apache.griffin.measure.step.builder.dsl.transform.analyzer
 import org.apache.griffin.measure.step.builder.dsl.expr._
 
 
-case class CompletenessAnalyzer(expr: CompletenessClause, sourceName: String) 
extends BasicAnalyzer {
+case class CompletenessAnalyzer(expr: CompletenessClause, sourceName: String)
+  extends BasicAnalyzer {
 
   val seqAlias = (expr: Expr, v: Seq[String]) => {
     expr match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
index 9cee8ab..b1f4229 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala
@@ -21,8 +21,8 @@ package 
org.apache.griffin.measure.step.builder.dsl.transform.analyzer
 import org.apache.griffin.measure.step.builder.dsl.expr._
 
 
-//case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: 
String, targetName: String) extends BasicAnalyzer {
-case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String) 
extends BasicAnalyzer {
+case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String)
+  extends BasicAnalyzer {
 
   val seqAlias = (expr: Expr, v: Seq[String]) => {
     expr match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
index 049e6fd..f66d482 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala
@@ -23,7 +23,8 @@ import org.apache.griffin.measure.step.builder.dsl.expr._
 
 case class ProfilingAnalyzer(expr: ProfilingClause, sourceName: String) 
extends BasicAnalyzer {
 
-  val dataSourceNames = 
expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, 
combDataSourceNames)
+  val dataSourceNames =
+    expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, 
combDataSourceNames)
 
   val selectionExprs: Seq[Expr] = {
     expr.selectClause.exprs.map(_.extractSelf).flatMap { expr =>

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
index 0976d6c..8a05d17 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala
@@ -21,7 +21,8 @@ package 
org.apache.griffin.measure.step.builder.dsl.transform.analyzer
 import org.apache.griffin.measure.step.builder.dsl.expr.{AliasableExpr, _}
 
 
-case class UniquenessAnalyzer(expr: UniquenessClause, sourceName: String, 
targetName: String) extends BasicAnalyzer {
+case class UniquenessAnalyzer(expr: UniquenessClause, sourceName: String, 
targetName: String)
+  extends BasicAnalyzer {
 
   val seqAlias = (expr: Expr, v: Seq[String]) => {
     expr match {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
index eac3b2b..713dc55 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
@@ -26,9 +26,10 @@ import org.apache.griffin.measure.configuration.enums._
   */
 object PreProcParamMaker {
 
-  case class StringAnyMap(values:Map[String,Any])
+  case class StringAnyMap ( values: Map[String, Any] )
 
-  def makePreProcRules(rules: Seq[RuleParam], suffix: String, dfName: String): 
(Seq[RuleParam], String) = {
+  def makePreProcRules(rules: Seq[RuleParam],
+                       suffix: String, dfName: String): (Seq[RuleParam], 
String) = {
     val len = rules.size
     val (newRules, _) = rules.zipWithIndex.foldLeft((Nil: Seq[RuleParam], 
dfName)) { (ret, pair) =>
       val (rls, prevOutDfName) = ret
@@ -47,10 +48,9 @@ object PreProcParamMaker {
     val rpRule = rule.replaceInOutDfName(newInDfName, newOutDfName)
     rule.getDslType match {
       case DataFrameOpsType => rpRule
-      case _ => {
+      case _ =>
         val newRule = replaceDfNameSuffix(rule.getRule, rule.getInDfName(), 
suffix)
         rpRule.replaceRule(newRule)
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala 
b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
index 8b1df82..9194da1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala
@@ -18,9 +18,10 @@ under the License.
 */
 package org.apache.griffin.measure.step.read
 
+import org.apache.spark.sql._
+
 import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.step.DQStep
-import org.apache.spark.sql._
 
 trait ReadStep extends DQStep {
 
@@ -31,15 +32,13 @@ trait ReadStep extends DQStep {
   def execute(context: DQContext): Boolean = {
     info(s"read data source [${name}]")
     read(context) match {
-      case Some(df) => {
+      case Some(df) =>
 //        if (needCache) context.dataFrameCache.cacheDataFrame(name, df)
         context.runTimeTableRegister.registerTable(name, df)
         true
-      }
-      case _ => {
+      case _ =>
         warn(s"read data source [${name}] fails")
         false
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
index 6dae1cb..88a39cf 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala
@@ -18,8 +18,9 @@ under the License.
 */
 package org.apache.griffin.measure.step.read
 
-import org.apache.griffin.measure.context.DQContext
 import org.apache.spark.sql._
+
+import org.apache.griffin.measure.context.DQContext
 import org.apache.griffin.measure.utils.DataFrameUtil._
 
 case class UnionReadStep(name: String,

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
index 86b367c..088f328 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala
@@ -20,13 +20,14 @@ package org.apache.griffin.measure.step.transform
 
 import java.util.Date
 
+import org.apache.spark.sql.{Encoders, Row, SQLContext, _}
+import org.apache.spark.sql.types.{BooleanType, LongType, StructField, 
StructType}
+
 import org.apache.griffin.measure.context.ContextId
-import 
org.apache.griffin.measure.context.streaming.metric.CacheResults.CacheResult
 import org.apache.griffin.measure.context.streaming.metric._
+import 
org.apache.griffin.measure.context.streaming.metric.CacheResults.CacheResult
 import org.apache.griffin.measure.step.builder.ConstantColumns
 import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.sql.types.{BooleanType, LongType, StructField, 
StructType}
-import org.apache.spark.sql.{Encoders, Row, SQLContext, _}
 
 /**
   * pre-defined data frame operations
@@ -44,7 +45,9 @@ object DataFrameOps {
     val _matched = "matched"
   }
 
-  def fromJson(sqlContext: SQLContext, inputDfName: String, details: 
Map[String, Any]): DataFrame = {
+  def fromJson(sqlContext: SQLContext,
+               inputDfName: String,
+               details: Map[String, Any]): DataFrame = {
     val _colName = "col.name"
     val colNameOpt = details.get(_colName).map(_.toString)
 
@@ -58,7 +61,10 @@ object DataFrameOps {
     sqlContext.read.json(rdd) // slow process
   }
 
-  def accuracy(sqlContext: SQLContext, inputDfName: String, contextId: 
ContextId, details: Map[String, Any]): DataFrame = {
+  def accuracy(sqlContext: SQLContext,
+               inputDfName: String,
+               contextId: ContextId,
+               details: Map[String, Any]): DataFrame = {
     import AccuracyOprKeys._
 
     val miss = details.getStringOrKey(_miss)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
index 5f99ed2..a2bf46e 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala
@@ -35,7 +35,9 @@ case class DataFrameOpsTransformStep(name: String,
     try {
       val df = rule match {
         case DataFrameOps._fromJson => DataFrameOps.fromJson(sqlContext, 
inputDfName, details)
-        case DataFrameOps._accuracy => DataFrameOps.accuracy(sqlContext, 
inputDfName, context.contextId, details)
+        case DataFrameOps._accuracy =>
+          DataFrameOps.accuracy(sqlContext, inputDfName, context.contextId, 
details)
+
         case DataFrameOps._clear => DataFrameOps.clear(sqlContext, 
inputDfName, details)
         case _ => throw new Exception(s"df opr [ ${rule} ] not supported")
       }
@@ -43,10 +45,9 @@ case class DataFrameOpsTransformStep(name: String,
       context.runTimeTableRegister.registerTable(name, df)
       true
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"run data frame ops [ ${rule} ] error: ${e.getMessage}")
         false
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
index ead7344..ca03f79 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala
@@ -37,10 +37,9 @@ case class SparkSqlTransformStep(name: String,
       context.runTimeTableRegister.registerTable(name, df)
       true
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"run spark sql [ ${rule} ] error: ${e.getMessage}")
         false
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
index 9415998..f34e003 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
@@ -19,9 +19,10 @@ under the License.
 package org.apache.griffin.measure.step.write
 
 import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.context.DQContext
 import org.apache.spark.sql.DataFrame
 
+import org.apache.griffin.measure.context.DQContext
+
 /**
   * update data source streaming cache
   */
@@ -34,12 +35,12 @@ case class DataSourceUpdateWriteStep(dsName: String,
 
   def execute(context: DQContext): Boolean = {
     getDataSourceCacheUpdateDf(context) match {
-      case Some(df) => {
-        context.dataSources.find(ds => StringUtils.equals(ds.name, 
dsName)).foreach(_.updateData(df))
-      }
-      case _ => {
+      case Some(df) =>
+        context.dataSources
+          .find(ds => StringUtils.equals(ds.name, dsName))
+          .foreach(_.updateData(df))
+      case _ =>
         warn(s"update ${dsName} from ${inputName} fails")
-      }
     }
     true
   }
@@ -49,13 +50,13 @@ case class DataSourceUpdateWriteStep(dsName: String,
       val df = context.sqlContext.table(s"`${name}`")
       Some(df)
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"get data frame ${name} fails")
         None
-      }
     }
   }
 
-  private def getDataSourceCacheUpdateDf(context: DQContext): 
Option[DataFrame] = getDataFrame(context, inputName)
+  private def getDataSourceCacheUpdateDf(context: DQContext): Option[DataFrame]
+    = getDataFrame(context, inputName)
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
index 8f7d01c..e787d96 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala
@@ -36,10 +36,9 @@ case class MetricFlushStep() extends WriteStep {
         context.getSink(t).sinkMetrics(metric)
         true
       } catch {
-        case e: Throwable => {
+        case e: Throwable =>
           error(s"flush metrics error: ${e.getMessage}")
           false
-        }
       }
       ret && pr
     }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
index 4771891..bc721f2 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala
@@ -45,11 +45,12 @@ case class MetricWriteStep(name: String,
     // get timestamp and normalize metric
     val writeMode = writeTimestampOpt.map(_ => 
SimpleMode).getOrElse(context.writeMode)
     val timestampMetricMap: Map[Long, Map[String, Any]] = writeMode match {
-      case SimpleMode => {
+
+      case SimpleMode =>
         val metrics: Map[String, Any] = flattenMetric(metricMaps, name, 
flattenType)
         emptyMetricMap + (timestamp -> metrics)
-      }
-      case TimestampMode => {
+
+      case TimestampMode =>
         val tmstMetrics = metricMaps.map { metric =>
           val tmst = metric.getLong(ConstantColumns.tmst, timestamp)
           val pureMetric = metric.removeKeys(ConstantColumns.columns)
@@ -61,7 +62,6 @@ case class MetricWriteStep(name: String,
           val mtc = flattenMetric(maps, name, flattenType)
           (k, mtc)
         }
-      }
     }
 
     // write to metric wrapper
@@ -88,10 +88,9 @@ case class MetricWriteStep(name: String,
         }.toSeq
       } else Nil
     } catch {
-      case e: Throwable => {
+      case e: Throwable =>
         error(s"get metric ${name} fails")
         Nil
-      }
     }
   }
 
@@ -100,14 +99,12 @@ case class MetricWriteStep(name: String,
     flattenType match {
       case EntriesFlattenType => metrics.headOption.getOrElse(emptyMap)
       case ArrayFlattenType => Map[String, Any]((name -> metrics))
-      case MapFlattenType => {
+      case MapFlattenType =>
         val v = metrics.headOption.getOrElse(emptyMap)
         Map[String, Any]((name -> v))
-      }
-      case _ => {
+      case _ =>
         if (metrics.size > 1) Map[String, Any]((name -> metrics))
         else metrics.headOption.getOrElse(emptyMap)
-      }
     }
   }
 

Reply via email to