Repository: incubator-griffin
Updated Branches:
  refs/heads/master 07121ba9b -> a4f4f2413


support multiple streaming data connector, and union dataframe by name

- multiple streaming data connector
- union dataframe by name

Author: Lionel Liu <[email protected]>

Closes #235 from bhlx3lyx7/tmst.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/a4f4f241
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/a4f4f241
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/a4f4f241

Branch: refs/heads/master
Commit: a4f4f24139b2a59c2e7356b3e22b589e3eba5074
Parents: 07121ba
Author: Lionel Liu <[email protected]>
Authored: Wed Mar 21 16:10:32 2018 +0800
Committer: Lionel Liu <[email protected]>
Committed: Wed Mar 21 16:10:32 2018 +0800

----------------------------------------------------------------------
 .../data/connector/DataConnectorFactory.scala   |   3 -
 .../streaming/KafkaStreamingDataConnector.scala |  38 ++--
 .../KafkaStreamingStringDataConnector.scala     |   4 +-
 .../streaming/StreamingDataConnector.scala      |   5 +-
 .../measure/data/source/DataSource.scala        |  51 +++---
 .../data/source/cache/DataSourceCache.scala     |  51 ++++--
 .../measure/data/source/cache/WithFanIn.scala   |  57 ++++++
 .../measure/process/engine/SparkSqlEngine.scala |   2 +-
 .../measure/rule/adaptor/InternalColumns.scala  |   4 +-
 .../dsl/analyzer/DistinctnessAnalyzer.scala     |   2 +-
 .../rule/dsl/expr/ClauseExpression.scala        |  28 ++-
 .../griffin/measure/rule/dsl/expr/Expr.scala    |   2 +-
 .../griffin/measure/rule/dsl/expr/ExprTag.scala |  23 +++
 .../measure/rule/dsl/parser/BasicParser.scala   |  10 +-
 .../rule/dsl/parser/GriffinDslParser.scala      |  12 +-
 .../rule/trans/DistinctnessRulePlanTrans.scala  | 173 ++++++++++++++-----
 .../griffin/measure/utils/DataFrameUtil.scala   |  41 +++++
 .../_distinctness-batch-griffindsl1.json        |   2 +-
 .../_distinctness-batch-griffindsl2.json        |  74 ++++++++
 19 files changed, 467 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
index 9c3383f..27b390a 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
@@ -61,9 +61,6 @@ object DataConnectorFactory {
         case AvroRegex() => AvroBatchDataConnector(sqlContext, dqEngines, 
dataConnectorParam)
         case TextDirRegex() => TextDirBatchDataConnector(sqlContext, 
dqEngines, dataConnectorParam)
         case KafkaRegex() => {
-//          val ksdcTry = getStreamingDataConnector(ssc, dataConnectorParam)
-//          val cdcTry = getCacheDataConnector(sqlContext, 
dataConnectorParam.cache)
-//          KafkaCacheDirectDataConnector(ksdcTry, cdcTry, dataConnectorParam)
           getStreamingDataConnector(sqlContext, ssc, dqEngines, 
dataConnectorParam)
         }
         case _ => throw new Exception("connector creation error!")

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
index f973f3f..d46d6b7 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala
@@ -28,6 +28,7 @@ trait KafkaStreamingDataConnector extends 
StreamingDataConnector {
 
   type KD <: Decoder[K]
   type VD <: Decoder[V]
+  type OUT = (K, V)
 
   val config = dcParam.config
 
@@ -42,35 +43,44 @@ trait KafkaStreamingDataConnector extends 
StreamingDataConnector {
   }
 
   def init(): Unit = {
+    // register fan in
+    dataSourceCacheOpt.foreach(_.registerFanIn)
+
     val ds = stream match {
       case Success(dstream) => dstream
       case Failure(ex) => throw ex
     }
     ds.foreachRDD((rdd, time) => {
       val ms = time.milliseconds
-
-      // coalesce partition number
-      val prlCount = rdd.sparkContext.defaultParallelism
-      val ptnCount = rdd.getNumPartitions
-      val repartitionedRdd = if (prlCount < ptnCount) {
-        rdd.coalesce(prlCount)
-      } else rdd
-
-      val dfOpt = transform(repartitionedRdd)
-
-      val preDfOpt = preProcess(dfOpt, ms)
+      val saveDfOpt = try {
+        // coalesce partition number
+        val prlCount = rdd.sparkContext.defaultParallelism
+        val ptnCount = rdd.getNumPartitions
+        val repartitionedRdd = if (prlCount < ptnCount) {
+          rdd.coalesce(prlCount)
+        } else rdd
+
+        val dfOpt = transform(repartitionedRdd)
+
+        preProcess(dfOpt, ms)
+      } catch {
+        case e: Throwable => {
+          error(s"streaming data connector error: ${e.getMessage}")
+          None
+        }
+      }
 
       // save data frame
-      dataSourceCacheOpt.foreach(_.saveData(preDfOpt, ms))
+      dataSourceCacheOpt.foreach(_.saveData(saveDfOpt, ms))
     })
   }
 
-  def stream(): Try[InputDStream[(K, V)]] = Try {
+  def stream(): Try[InputDStream[OUT]] = Try {
     val topicSet = topics.split(",").toSet
     createDStream(topicSet)
   }
 
-  protected def createDStream(topicSet: Set[String]): InputDStream[(K, V)]
+  protected def createDStream(topicSet: Set[String]): InputDStream[OUT]
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala
index 5e0413e..038cb77 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala
@@ -44,11 +44,11 @@ case class KafkaStreamingStringDataConnector(sqlContext: 
SQLContext,
     StructField(valueColName, StringType)
   ))
 
-  def createDStream(topicSet: Set[String]): InputDStream[(K, V)] = {
+  def createDStream(topicSet: Set[String]): InputDStream[OUT] = {
     KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet)
   }
 
-  def transform(rdd: RDD[(K, V)]): Option[DataFrame] = {
+  def transform(rdd: RDD[OUT]): Option[DataFrame] = {
     if (rdd.isEmpty) None else {
       try {
         val rowRdd = rdd.map(d => Row(d._2))

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
index 39f4995..e52ca1b 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala
@@ -32,10 +32,11 @@ trait StreamingDataConnector extends DataConnector {
 
   type K
   type V
+  type OUT
 
-  protected def stream(): Try[InputDStream[(K, V)]]
+  protected def stream(): Try[InputDStream[OUT]]
 
-  def transform(rdd: RDD[(K, V)]): Option[DataFrame]
+  def transform(rdd: RDD[OUT]): Option[DataFrame]
 
   def data(ms: Long): (Option[DataFrame], TimeRange) = (None, 
TimeRange.emptyTimeRange)
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
index 9a4b640..b4324dd 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala
@@ -28,6 +28,7 @@ import 
org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters,
 import org.apache.griffin.measure.rule.plan.TimeInfo
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.griffin.measure.utils.DataFrameUtil._
 
 case class DataSource(sqlContext: SQLContext,
                       name: String,
@@ -89,31 +90,31 @@ case class DataSource(sqlContext: SQLContext,
     }
   }
 
-  private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: Option[DataFrame]
-                         ): Option[DataFrame] = {
-    (dfOpt1, dfOpt2) match {
-      case (Some(df1), Some(df2)) => Some(unionDataFrames(df1, df2))
-      case (Some(df1), _) => dfOpt1
-      case (_, Some(df2)) => dfOpt2
-      case _ => None
-    }
-  }
-
-  private def unionDataFrames(df1: DataFrame, df2: DataFrame): DataFrame = {
-    try {
-      val cols = df1.columns
-      val rdd2 = df2.map{ row =>
-        val values = cols.map { col =>
-          row.getAs[Any](col)
-        }
-        Row(values: _*)
-      }
-      val ndf2 = sqlContext.createDataFrame(rdd2, df1.schema)
-      df1 unionAll ndf2
-    } catch {
-      case e: Throwable => df1
-    }
-  }
+//  private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: 
Option[DataFrame]
+//                         ): Option[DataFrame] = {
+//    (dfOpt1, dfOpt2) match {
+//      case (Some(df1), Some(df2)) => Some(unionDataFrames(df1, df2))
+//      case (Some(df1), _) => dfOpt1
+//      case (_, Some(df2)) => dfOpt2
+//      case _ => None
+//    }
+//  }
+//
+//  private def unionDataFrames(df1: DataFrame, df2: DataFrame): DataFrame = {
+//    try {
+//      val cols = df1.columns
+//      val rdd2 = df2.map{ row =>
+//        val values = cols.map { col =>
+//          row.getAs[Any](col)
+//        }
+//        Row(values: _*)
+//      }
+//      val ndf2 = sqlContext.createDataFrame(rdd2, df1.schema)
+//      df1 unionAll ndf2
+//    } catch {
+//      case e: Throwable => df1
+//    }
+//  }
 
   def updateData(df: DataFrame): Unit = {
     dataSourceCacheOpt.foreach(_.updateData(Some(df)))

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
index ac67557..2412130 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
@@ -28,11 +28,15 @@ import 
org.apache.griffin.measure.rule.adaptor.InternalColumns
 import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
 import org.apache.griffin.measure.utils.ParamUtil._
 import org.apache.spark.sql._
+import org.apache.spark.sql.functions.col
+import org.apache.griffin.measure.utils.DataFrameUtil._
+
+import scala.util.Random
 
 // data source cache process steps
 // dump phase: save
 // process phase: read -> process -> update -> finish -> clean old data
-trait DataSourceCache extends DataCacheable with Loggable with Serializable {
+trait DataSourceCache extends DataCacheable with WithFanIn[Long] with Loggable 
with Serializable {
 
   val sqlContext: SQLContext
   val param: Map[String, Any]
@@ -55,7 +59,8 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
   val _ReadyTimeDelay = "ready.time.delay"
   val _TimeRange = "time.range"
 
-  val defFilePath = s"hdfs:///griffin/cache/${dsName}/${index}"
+  val rdmStr = Random.alphanumeric.take(10).mkString
+  val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}"
   val defInfoPath = s"${index}"
 
   val filePath: String = param.getString(_FilePath, defFilePath)
@@ -98,11 +103,17 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
 
   def init(): Unit = {}
 
-  // save new cache data only
+  // save new cache data only, need index for multiple streaming data 
connectors
   def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
     if (!readOnly) {
       dfOpt match {
         case Some(df) => {
+          df.cache
+
+          // cache df
+          val cnt = df.count
+          println(s"save ${dsName} data count: ${cnt}")
+
           // lock makes it safer when writing new cache data
           val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
           if (newCacheLocked) {
@@ -115,6 +126,9 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
               newCacheLock.unlock()
             }
           }
+
+          // uncache
+          df.unpersist
         }
         case _ => {
           info(s"no data frame to save")
@@ -122,8 +136,12 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
       }
 
       // submit cache time and ready time
-      submitCacheTime(ms)
-      submitReadyTime(ms)
+      if (fanIncrement(ms)) {
+        println(s"save data [${ms}] finish")
+        submitCacheTime(ms)
+        submitReadyTime(ms)
+      }
+
     }
   }
 
@@ -179,15 +197,20 @@ trait DataSourceCache extends DataCacheable with Loggable 
with Serializable {
     (cacheDfOpt, retTimeRange)
   }
 
-  private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: Option[DataFrame]
-                         ): Option[DataFrame] = {
-    (dfOpt1, dfOpt2) match {
-      case (Some(df1), Some(df2)) => Some(df1 unionAll df2)
-      case (Some(df1), _) => dfOpt1
-      case (_, Some(df2)) => dfOpt2
-      case _ => None
-    }
-  }
+//  private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: 
Option[DataFrame]
+//                         ): Option[DataFrame] = {
+//    (dfOpt1, dfOpt2) match {
+//      case (Some(df1), Some(df2)) => Some(unionByName(df1, df2))
+//      case (Some(df1), _) => dfOpt1
+//      case (_, Some(df2)) => dfOpt2
+//      case _ => None
+//    }
+//  }
+//
+//  private def unionByName(a: DataFrame, b: DataFrame): DataFrame = {
+//    val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq
+//    a.select(columns: _*).unionAll(b.select(columns: _*))
+//  }
 
   private def cleanOutTimePartitions(path: String, outTime: Long, 
partitionOpt: Option[String],
                                      func: (Long, Long) => Boolean

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala
new file mode 100644
index 0000000..aa5e04d
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.source.cache
+
+import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.concurrent.{TrieMap, Map => ConcMap}
+
+trait WithFanIn[T] {
+
+  val totalNum: AtomicInteger = new AtomicInteger(0)
+  val fanInCountMap: ConcMap[T, Int] = TrieMap[T, Int]()
+
+  def registerFanIn(): Int = {
+    totalNum.incrementAndGet()
+  }
+
+  def fanIncrement(key: T): Boolean = {
+    fanInc(key)
+    fanInCountMap.get(key) match {
+      case Some(n) if (n >= totalNum.get) => {
+        fanInCountMap.remove(key)
+        true
+      }
+      case _ => false
+    }
+  }
+
+  private def fanInc(key: T): Unit = {
+    fanInCountMap.get(key) match {
+      case Some(n) => {
+        val suc = fanInCountMap.replace(key, n, n + 1)
+        if (!suc) fanInc(key)
+      }
+      case _ => {
+        val oldOpt = fanInCountMap.putIfAbsent(key, 1)
+        if (oldOpt.nonEmpty) fanInc(key)
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
index dcb02f6..438595b 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
@@ -48,7 +48,7 @@ case class SparkSqlEngine(sqlContext: SQLContext) extends 
SparkDqEngine {
           } else sqlContext.sql(rule)
 
 //          println(name)
-//          rdf.show(10)
+//          rdf.show(3)
 
           if (rs.isGlobal) {
             if (rs.needCache) DataFrameCaches.cacheGlobalDataFrame(name, rdf)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
index fc6a246..fa04288 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala
@@ -29,5 +29,7 @@ object InternalColumns {
 
   val distinct = "__distinct"
 
-  val columns = List[String](tmst, metric, record, empty, beginTs, endTs, 
distinct)
+  val rowNumber = "__rn"
+
+  val columns = List[String](tmst, metric, record, empty, beginTs, endTs, 
distinct, rowNumber)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala
index 55e4f39..af59eb4 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala
@@ -37,7 +37,7 @@ case class DistinctnessAnalyzer(expr: DistinctnessClause, 
sourceName: String) ex
   val selectionPairs = exprs.zipWithIndex.map { pair =>
     val (pr, idx) = pair
     val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias)
-    (pr, res.headOption.getOrElse(genAlias(idx)))
+    (pr, res.headOption.getOrElse(genAlias(idx)), pr.tag.isEmpty)
   }
 
   if (selectionPairs.isEmpty) {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
index 340c1e2..6790268 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
@@ -99,7 +99,7 @@ case class GroupbyClause(exprs: Seq[Expr], havingClauseOpt: 
Option[Expr]) extend
 
 }
 
-case class OrderbyItem(expr: Expr, orderOpt: Option[String]) extends Expr {
+case class OrderItem(expr: Expr, orderOpt: Option[String]) extends Expr {
   addChild(expr)
   def desc: String = {
     orderOpt match {
@@ -109,12 +109,12 @@ case class OrderbyItem(expr: Expr, orderOpt: 
Option[String]) extends Expr {
   }
   def coalesceDesc: String = desc
 
-  override def map(func: (Expr) => Expr): OrderbyItem = {
-    OrderbyItem(func(expr), orderOpt)
+  override def map(func: (Expr) => Expr): OrderItem = {
+    OrderItem(func(expr), orderOpt)
   }
 }
 
-case class OrderbyClause(items: Seq[OrderbyItem]) extends ClauseExpression {
+case class OrderbyClause(items: Seq[OrderItem]) extends ClauseExpression {
 
   addChildren(items.map(_.expr))
 
@@ -128,7 +128,25 @@ case class OrderbyClause(items: Seq[OrderbyItem]) extends 
ClauseExpression {
   }
 
   override def map(func: (Expr) => Expr): OrderbyClause = {
-    OrderbyClause(items.map(func(_).asInstanceOf[OrderbyItem]))
+    OrderbyClause(items.map(func(_).asInstanceOf[OrderItem]))
+  }
+}
+
+case class SortbyClause(items: Seq[OrderItem]) extends ClauseExpression {
+
+  addChildren(items.map(_.expr))
+
+  def desc: String = {
+    val obs = items.map(_.desc).mkString(", ")
+    s"SORT BY ${obs}"
+  }
+  def coalesceDesc: String = {
+    val obs = items.map(_.desc).mkString(", ")
+    s"SORT BY ${obs}"
+  }
+
+  override def map(func: (Expr) => Expr): SortbyClause = {
+    SortbyClause(items.map(func(_).asInstanceOf[OrderItem]))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala
index c089e81..0b653b1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala
@@ -18,7 +18,7 @@ under the License.
 */
 package org.apache.griffin.measure.rule.dsl.expr
 
-trait Expr extends TreeNode with Serializable {
+trait Expr extends TreeNode with ExprTag with Serializable {
 
   def desc: String
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExprTag.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExprTag.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExprTag.scala
new file mode 100644
index 0000000..2e31bbe
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExprTag.scala
@@ -0,0 +1,23 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.dsl.expr
+
+trait ExprTag { this: Expr =>
+  var tag: String = ""
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
index 846770b..3a0d737 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
@@ -152,6 +152,7 @@ trait BasicParser extends JavaTokenParsers with 
Serializable {
     val WHERE: Parser[String] = """(?i)where\s""".r
     val GROUP: Parser[String] = """(?i)group\s""".r
     val ORDER: Parser[String] = """(?i)order\s""".r
+    val SORT: Parser[String] = """(?i)sort\s""".r
     val BY: Parser[String] = """(?i)by\s""".r
     val DESC: Parser[String] = """(?i)desc""".r
     val ASC: Parser[String] = """(?i)asc""".r
@@ -360,12 +361,15 @@ trait BasicParser extends JavaTokenParsers with 
Serializable {
   def groupbyClause: Parser[GroupbyClause] = GROUP ~ BY ~ rep1sep(expression, 
COMMA) ~ opt(havingClause) ^^ {
     case _ ~ _ ~ cols ~ havingOpt => GroupbyClause(cols, havingOpt)
   }
-  def orderbyItem: Parser[OrderbyItem] = expression ~ opt(DESC | ASC) ^^ {
-    case expr ~ orderOpt => OrderbyItem(expr, orderOpt)
+  def orderItem: Parser[OrderItem] = expression ~ opt(DESC | ASC) ^^ {
+    case expr ~ orderOpt => OrderItem(expr, orderOpt)
   }
-  def orderbyClause: Parser[OrderbyClause] = ORDER ~ BY ~ rep1sep(orderbyItem, 
COMMA) ^^ {
+  def orderbyClause: Parser[OrderbyClause] = ORDER ~ BY ~ rep1sep(orderItem, 
COMMA) ^^ {
     case _ ~ _ ~ cols => OrderbyClause(cols)
   }
+  def sortbyClause: Parser[SortbyClause] = SORT ~ BY ~ rep1sep(orderItem, 
COMMA) ^^ {
+    case _ ~ _ ~ cols => SortbyClause(cols)
+  }
   def limitClause: Parser[LimitClause] = LIMIT ~> expression ^^ { 
LimitClause(_) }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
index b129ead..d4a037b 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
@@ -24,6 +24,8 @@ import org.apache.griffin.measure.rule.dsl.expr._
 case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: 
Seq[String]
                            ) extends BasicParser {
 
+  import Operator._
+
   /**
     * -- profiling clauses --
     * <profiling-clauses> = <select-clause> [ <from-clause> ]+ [ 
<where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> 
]+
@@ -48,9 +50,15 @@ case class GriffinDslParser(dataSourceNames: Seq[String], 
functionNames: Seq[Str
 
   /**
     * -- distinctness clauses --
-    * <distinctness-clauses> = <expr> [, <expr>]+
+    * <sqbr-expr> = "[" <expr> "]"
+    * <dist-expr> = <sqbr-expr> | <expr>
+    * <distinctness-clauses> = <distExpr> [, <distExpr>]+
     */
-  def distinctnessClause: Parser[DistinctnessClause] = rep1sep(expression, 
Operator.COMMA) ^^ {
+  def sqbrExpr: Parser[Expr] = LSQBR ~> expression <~ RSQBR ^^ {
+    case expr => { expr.tag = "[]"; expr}
+  }
+  def distExpr: Parser[Expr] = expression | sqbrExpr
+  def distinctnessClause: Parser[DistinctnessClause] = rep1sep(distExpr, 
Operator.COMMA) ^^ {
     case exprs => DistinctnessClause(exprs)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
index 1ec970b..ccdf178 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala
@@ -49,6 +49,8 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
 
     val _duplicationArray = "duplication.array"
     val _withAccumulate = "with.accumulate"
+
+    val _recordEnable = "record.enable"
   }
   import DistinctnessKeys._
 
@@ -81,11 +83,15 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
       }
 
       val selClause = analyzer.selectionPairs.map { pair =>
-        val (expr, alias) = pair
+        val (expr, alias, _) = pair
         s"${expr.desc} AS `${alias}`"
       }.mkString(", ")
-      val aliases = analyzer.selectionPairs.map(_._2)
-      val aliasesClause = aliases.map( a => s"`${a}`" ).mkString(", ")
+      val distAliases = analyzer.selectionPairs.filter(_._3).map(_._2)
+      val distAliasesClause = distAliases.map( a => s"`${a}`" ).mkString(", ")
+      val allAliases = analyzer.selectionPairs.map(_._2)
+      val allAliasesClause = allAliases.map( a => s"`${a}`" ).mkString(", ")
+      val groupAliases = analyzer.selectionPairs.filter(!_._3).map(_._2)
+      val groupAliasesClause = groupAliases.map( a => s"`${a}`" ).mkString(", 
")
 
       // 1. source alias
       val sourceAliasTableName = "__sourceAlias"
@@ -110,9 +116,9 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
       val accuDupColName = details.getStringOrKey(_accu_dup)
       val selfGroupSql = {
         s"""
-           |SELECT ${aliasesClause}, (COUNT(*) - 1) AS `${dupColName}`,
+           |SELECT ${distAliasesClause}, (COUNT(*) - 1) AS `${dupColName}`,
            |TRUE AS `${InternalColumns.distinct}`
-           |FROM `${sourceAliasTableName}` GROUP BY ${aliasesClause}
+           |FROM `${sourceAliasTableName}` GROUP BY ${distAliasesClause}
           """.stripMargin
       }
       val selfGroupStep = SparkSqlStep(selfGroupTableName, selfGroupSql, 
emptyMap, true)
@@ -141,13 +147,13 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
 
           // 5. join with older data
           val joinedTableName = "__joined"
-          val selfSelClause = (aliases :+ dupColName).map { alias =>
+          val selfSelClause = (distAliases :+ dupColName).map { alias =>
             s"`${selfGroupTableName}`.`${alias}`"
           }.mkString(", ")
-          val onClause = aliases.map { alias =>
+          val onClause = distAliases.map { alias =>
             s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = 
coalesce(`${olderAliasTableName}`.`${alias}`, '')"
           }.mkString(" AND ")
-          val olderIsNull = aliases.map { alias =>
+          val olderIsNull = distAliases.map { alias =>
             s"`${olderAliasTableName}`.`${alias}` IS NULL"
           }.mkString(" AND ")
           val joinedSql = {
@@ -164,19 +170,30 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
           val moreDupColName = "_more_dup"
           val groupSql = {
             s"""
-               |SELECT ${aliasesClause}, `${dupColName}`, 
`${InternalColumns.distinct}`,
+               |SELECT ${distAliasesClause}, `${dupColName}`, 
`${InternalColumns.distinct}`,
                |COUNT(*) AS `${moreDupColName}`
                |FROM `${joinedTableName}`
-               |GROUP BY ${aliasesClause}, `${dupColName}`, 
`${InternalColumns.distinct}`
+               |GROUP BY ${distAliasesClause}, `${dupColName}`, 
`${InternalColumns.distinct}`
              """.stripMargin
           }
           val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap)
 
           // 7. final duplicate count
           val finalDupCountTableName = "__finalDupCount"
+          // dupColName:      the duplicate count of duplicated items only 
occurs in new data,
+          //                  which means the distinct one in new data is also 
duplicate
+          // accuDupColName:  the count of duplicated items accumulated in new 
data and old data,
+          //                  which means the accumulated distinct count in 
all data
+          // e.g.:  new data [A, A, B, B, C, D], old data [A, A, B, C]
+          //        selfGroupTable will be (A, 1, F), (B, 1, F), (C, 0, T), 
(D, 0, T)
+          //        joinedTable will be (A, 1, F), (A, 1, F), (B, 1, F), (C, 
0, F), (D, 0, T)
+          //        groupTable will be (A, 1, F, 2), (B, 1, F, 1), (C, 0, F, 
1), (D, 0, T, 1)
+          //        finalDupCountTable will be (A, F, 2, 3), (B, F, 2, 2), (C, 
F, 1, 1), (D, T, 0, 0)
+          //        The distinct result of new data only should be: (A, 2), 
(B, 2), (C, 1), (D, 0),
+          //        which means in new data [A, A, B, B, C, D], [A, A, B, B, 
C] are all duplicated, only [D] is distinct
           val finalDupCountSql = {
             s"""
-               |SELECT ${aliasesClause}, `${InternalColumns.distinct}`,
+               |SELECT ${distAliasesClause}, `${InternalColumns.distinct}`,
                |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}`
                |ELSE (`${dupColName}` + 1) END AS `${dupColName}`,
                |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}`
@@ -215,36 +232,112 @@ case class DistinctnessRulePlanTrans(dataSourceNames: 
Seq[String],
 
       val duplicationArrayName = details.getString(_duplicationArray, "")
       val dupRulePlan = if (duplicationArrayName.nonEmpty) {
-        // 9. duplicate record
-        val dupRecordTableName = "__dupRecords"
-        val dupRecordSelClause = procType match {
-          case StreamingProcessType if (withOlderTable) => s"${aliasesClause}, 
`${dupColName}`, `${accuDupColName}`"
-          case _ => s"${aliasesClause}, `${dupColName}`"
-        }
-        val dupRecordSql = {
-          s"""
-             |SELECT ${dupRecordSelClause}
-             |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
-           """.stripMargin
-        }
-        val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
-        val dupRecordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
-        val dupRecordExport = genRecordExport(dupRecordParam, 
dupRecordTableName, dupRecordTableName, endTmst, mode)
-
-        // 10. duplicate metric
-        val dupMetricTableName = "__dupMetric"
-        val numColName = details.getStringOrKey(_num)
-        val dupMetricSql = {
-          s"""
-             |SELECT `${dupColName}`, COUNT(*) AS `${numColName}`
-             |FROM `${dupRecordTableName}` GROUP BY `${dupColName}`
-         """.stripMargin
-        }
-        val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
-        val dupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
-        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, endTmst, mode)
+        val recordEnable = details.getBoolean(_recordEnable, false)
+        if (groupAliases.size > 0) {
+          // with some group by requirement
+          // 9. origin data join with distinct information
+          val informedTableName = "__informed"
+          val onClause = distAliases.map { alias =>
+            s"coalesce(`${sourceAliasTableName}`.`${alias}`, '') = 
coalesce(`${dupCountTableName}`.`${alias}`, '')"
+          }.mkString(" AND ")
+          val informedSql = {
+            s"""
+               |SELECT `${sourceAliasTableName}`.*,
+               |`${dupCountTableName}`.`${dupColName}` AS `${dupColName}`,
+               |`${dupCountTableName}`.`${InternalColumns.distinct}` AS 
`${InternalColumns.distinct}`
+               |FROM `${sourceAliasTableName}` LEFT JOIN `${dupCountTableName}`
+               |ON ${onClause}
+               """.stripMargin
+          }
+          val informedStep = SparkSqlStep(informedTableName, informedSql, 
emptyMap)
+
+          // 10. add row number
+          val rnTableName = "__rowNumber"
+          val rnDistClause = distAliasesClause
+          val rnSortClause = s"SORT BY `${InternalColumns.distinct}`"
+          val rnSql = {
+            s"""
+               |SELECT *,
+               |ROW_NUMBER() OVER (DISTRIBUTE BY ${rnDistClause} 
${rnSortClause}) `${InternalColumns.rowNumber}`
+               |FROM `${informedTableName}`
+               """.stripMargin
+          }
+          val rnStep = SparkSqlStep(rnTableName, rnSql, emptyMap)
 
-        RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: 
dupMetricExport :: Nil)
+          // 11. recognize duplicate items
+          val dupItemsTableName = "__dupItems"
+          val dupItemsSql = {
+            s"""
+               |SELECT ${allAliasesClause}, `${dupColName}` FROM 
`${rnTableName}`
+               |WHERE NOT `${InternalColumns.distinct}` OR 
`${InternalColumns.rowNumber}` > 1
+               """.stripMargin
+          }
+          val dupItemsStep = SparkSqlStep(dupItemsTableName, dupItemsSql, 
emptyMap)
+          val dupItemsParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+          val dupItemsExport = genRecordExport(dupItemsParam, 
dupItemsTableName, dupItemsTableName, endTmst, mode)
+
+          // 12. group by dup Record metric
+          val groupDupMetricTableName = "__groupDupMetric"
+          val numColName = details.getStringOrKey(_num)
+          val groupSelClause = groupAliasesClause
+          val groupDupMetricSql = {
+            s"""
+               |SELECT ${groupSelClause}, `${dupColName}`, COUNT(*) AS 
`${numColName}`
+               |FROM `${dupItemsTableName}` GROUP BY ${groupSelClause}, 
`${dupColName}`
+             """.stripMargin
+          }
+          val groupDupMetricStep = SparkSqlStep(groupDupMetricTableName, 
groupDupMetricSql, emptyMap)
+          val groupDupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+          val groupDupMetricExport = genMetricExport(groupDupMetricParam, 
duplicationArrayName, groupDupMetricTableName, endTmst, mode)
+
+          val exports = if (recordEnable) {
+            dupItemsExport :: groupDupMetricExport :: Nil
+          } else {
+            groupDupMetricExport :: Nil
+          }
+          RulePlan(
+            informedStep :: rnStep :: dupItemsStep :: groupDupMetricStep :: 
Nil,
+            exports
+          )
+
+        } else {
+          // no group by requirement
+          // 9. duplicate record
+          val dupRecordTableName = "__dupRecords"
+          val dupRecordSelClause = procType match {
+            case StreamingProcessType if (withOlderTable) => 
s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`"
+            case _ => s"${distAliasesClause}, `${dupColName}`"
+          }
+          val dupRecordSql = {
+            s"""
+               |SELECT ${dupRecordSelClause}
+               |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0
+              """.stripMargin
+          }
+          val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
+          val dupRecordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+          val dupRecordExport = genRecordExport(dupRecordParam, 
dupRecordTableName, dupRecordTableName, endTmst, mode)
+
+          // 10. duplicate metric
+          val dupMetricTableName = "__dupMetric"
+          val numColName = details.getStringOrKey(_num)
+          val dupMetricSql = {
+            s"""
+               |SELECT `${dupColName}`, COUNT(*) AS `${numColName}`
+               |FROM `${dupRecordTableName}` GROUP BY `${dupColName}`
+              """.stripMargin
+          }
+          val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
+          val dupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+          val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, endTmst, mode)
+
+          val exports = if (recordEnable) {
+            dupRecordExport :: dupMetricExport :: Nil
+          } else {
+            dupMetricExport :: Nil
+          }
+          RulePlan(dupRecordStep :: dupMetricStep :: Nil, exports)
+        }
       } else emptyRulePlan
 
       
selfDistRulePlan.merge(distRulePlan).merge(distMetricRulePlan).merge(dupRulePlan)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala
new file mode 100644
index 0000000..9390160
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala
@@ -0,0 +1,41 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.utils
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.functions._
+
+object DataFrameUtil {
+
+  def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: Option[DataFrame]
+                 ): Option[DataFrame] = {
+    (dfOpt1, dfOpt2) match {
+      case (Some(df1), Some(df2)) => Some(unionByName(df1, df2))
+      case (Some(df1), _) => dfOpt1
+      case (_, Some(df2)) => dfOpt2
+      case _ => None
+    }
+  }
+
+  def unionByName(a: DataFrame, b: DataFrame): DataFrame = {
+    val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq
+    a.select(columns: _*).unionAll(b.select(columns: _*))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/test/resources/_distinctness-batch-griffindsl1.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl1.json 
b/measure/src/test/resources/_distinctness-batch-griffindsl1.json
index f8aa077..4d94d8e 100644
--- a/measure/src/test/resources/_distinctness-batch-griffindsl1.json
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl1.json
@@ -20,7 +20,7 @@
             {
               "dsl.type": "spark-sql",
               "name": "${this}",
-              "rule": "select DISTINCT name, age from ${this}"
+              "rule": "select name, age from ${this}"
             }
           ]
         }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/test/resources/_distinctness-batch-griffindsl2.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl2.json 
b/measure/src/test/resources/_distinctness-batch-griffindsl2.json
new file mode 100644
index 0000000..6a12719
--- /dev/null
+++ b/measure/src/test/resources/_distinctness-batch-griffindsl2.json
@@ -0,0 +1,74 @@
+{
+  "name": "dist_batch",
+
+  "process.type": "batch",
+
+  "timestamp": 123456,
+
+  "data.sources": [
+    {
+      "name": "source",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/dupdata.avro"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select name, age from ${this}"
+            }
+          ]
+        }
+      ]
+    },
+    {
+      "name": "target",
+      "baseline": true,
+      "connectors": [
+        {
+          "type": "avro",
+          "version": "1.7",
+          "config": {
+            "file.name": "src/test/resources/dupdata.avro"
+          },
+          "pre.proc": [
+            {
+              "dsl.type": "spark-sql",
+              "name": "${this}",
+              "rule": "select DISTINCT name, age from ${this}"
+            }
+          ]
+        }
+      ]
+    }
+  ],
+
+  "evaluate.rule": {
+    "rules": [
+      {
+        "dsl.type": "griffin-dsl",
+        "dq.type": "distinct",
+        "name": "dist",
+        "rule": "name, [age]",
+        "details": {
+          "source": "source",
+          "target": "target",
+          "total": "total",
+          "distinct": "distinct",
+          "dup": "dup",
+          "num": "num",
+          "duplication.array": "dup",
+          "record.enable": true
+        },
+        "metric": {
+          "name": "distinct"
+        }
+      }
+    ]
+  }
+}
\ No newline at end of file


Reply via email to