Repository: incubator-griffin
Updated Branches:
  refs/heads/master 530c2dafc -> b83c58706


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala
new file mode 100644
index 0000000..d9d2d4e
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/ProfilingRulePlanTrans.scala
@@ -0,0 +1,98 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.trans
+
+import org.apache.griffin.measure.process.temp.TableRegisters
+import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, 
ProcessType, StreamingProcessType}
+import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
+import org.apache.griffin.measure.rule.adaptor._
+import org.apache.griffin.measure.rule.dsl.analyzer.ProfilingAnalyzer
+import org.apache.griffin.measure.rule.dsl.expr._
+import org.apache.griffin.measure.rule.plan._
+import org.apache.griffin.measure.rule.trans.RuleExportFactory._
+import org.apache.griffin.measure.utils.ParamUtil._
+
+case class ProfilingRulePlanTrans(dataSourceNames: Seq[String],
+                                  timeInfo: TimeInfo, name: String, expr: Expr,
+                                  param: Map[String, Any], procType: 
ProcessType
+                                 ) extends RulePlanTrans {
+
+  private object ProfilingKeys {
+    val _source = "source"
+  }
+  import ProfilingKeys._
+
+  def trans(): RulePlan = {
+    val details = getDetails(param)
+    val profilingClause = expr.asInstanceOf[ProfilingClause]
+    val sourceName = profilingClause.fromClauseOpt match {
+      case Some(fc) => fc.dataSource
+      case _ => details.getString(_source, dataSourceNames.head)
+    }
+    val fromClause = 
profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
+
+    val mode = ExportMode.defaultMode(procType)
+
+    val ct = timeInfo.calcTime
+
+    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+      emptyRulePlan
+    } else {
+      val analyzer = ProfilingAnalyzer(profilingClause, sourceName)
+      val selExprDescs = analyzer.selectionExprs.map { sel =>
+        val alias = sel match {
+          case s: AliasableExpr if (s.alias.nonEmpty) => s" AS 
`${s.alias.get}`"
+          case _ => ""
+        }
+        s"${sel.desc}${alias}"
+      }
+      val selCondition = 
profilingClause.selectClause.extraConditionOpt.map(_.desc).mkString
+      val selClause = procType match {
+        case BatchProcessType => selExprDescs.mkString(", ")
+        case StreamingProcessType => (s"`${InternalColumns.tmst}`" +: 
selExprDescs).mkString(", ")
+      }
+      val groupByClauseOpt = analyzer.groupbyExprOpt
+      val groupbyClause = procType match {
+        case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("")
+        case StreamingProcessType => {
+          val tmstGroupbyClause = 
GroupbyClause(LiteralStringExpr(s"`${InternalColumns.tmst}`") :: Nil, None)
+          val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt 
match {
+            case Some(gbc) => gbc
+            case _ => GroupbyClause(Nil, None)
+          })
+          mergedGroubbyClause.desc
+        }
+      }
+      val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ")
+      val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" 
")
+
+      // 1. select statement
+      val profilingSql = {
+        s"SELECT ${selCondition} ${selClause} ${fromClause} 
${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
+      }
+      val profilingName = name
+      val profilingStep = SparkSqlStep(profilingName, profilingSql, details)
+      val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+      val profilingExports = genMetricExport(metricParam, name, profilingName, 
ct, mode) :: Nil
+
+      RulePlan(profilingStep :: Nil, profilingExports)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala
new file mode 100644
index 0000000..915e654
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RuleExportFactory.scala
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.trans
+
+import org.apache.griffin.measure.process.ExportMode
+import org.apache.griffin.measure.rule.dsl.CollectType
+import org.apache.griffin.measure.rule.plan._
+
+import org.apache.griffin.measure.utils.ParamUtil._
+
+object RuleExportFactory {
+
+  def genMetricExport(param: Map[String, Any], name: String, stepName: String,
+                      defTimestamp: Long, mode: ExportMode
+                     ): MetricExport = {
+    MetricExport(
+      ExportParamKeys.getName(param, name),
+      stepName,
+      ExportParamKeys.getCollectType(param),
+      defTimestamp,
+      mode
+    )
+  }
+  def genRecordExport(param: Map[String, Any], name: String, stepName: String,
+                      defTimestamp: Long, mode: ExportMode
+                     ): RecordExport = {
+    RecordExport(
+      ExportParamKeys.getName(param, name),
+      stepName,
+      ExportParamKeys.getDataSourceCacheOpt(param),
+      ExportParamKeys.getOriginDFOpt(param),
+      defTimestamp,
+      mode
+    )
+  }
+
+}
+
+object ExportParamKeys {
+  val _name = "name"
+  val _collectType = "collect.type"
+  val _dataSourceCache = "data.source.cache"
+  val _originDF = "origin.DF"
+
+  def getName(param: Map[String, Any], defName: String): String = 
param.getString(_name, defName)
+  def getCollectType(param: Map[String, Any]): CollectType = 
CollectType(param.getString(_collectType, ""))
+  def getDataSourceCacheOpt(param: Map[String, Any]): Option[String] = 
param.get(_dataSourceCache).map(_.toString)
+  def getOriginDFOpt(param: Map[String, Any]): Option[String] = 
param.get(_originDF).map(_.toString)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
new file mode 100644
index 0000000..b7226ba
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/RulePlanTrans.scala
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.trans
+
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.process.ProcessType
+import org.apache.griffin.measure.process.temp.TimeRange
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.dsl.expr.Expr
+import org.apache.griffin.measure.rule.plan._
+
+trait RulePlanTrans extends Loggable with Serializable {
+
+  protected val emptyRulePlan = RulePlan(Nil, Nil)
+  protected val emptyMap = Map[String, Any]()
+
+  def trans(): RulePlan
+
+}
+
+object RulePlanTrans {
+  private val emptyRulePlanTrans = new RulePlanTrans {
+    def trans(): RulePlan = emptyRulePlan
+  }
+
+  def apply(dqType: DqType,
+            dsNames: Seq[String],
+            ti: TimeInfo, name: String, expr: Expr,
+            param: Map[String, Any], procType: ProcessType,
+            dsTimeRanges: Map[String, TimeRange]
+           ): RulePlanTrans = {
+    dqType match {
+      case AccuracyType => AccuracyRulePlanTrans(dsNames, ti, name, expr, 
param, procType)
+      case ProfilingType => ProfilingRulePlanTrans(dsNames, ti, name, expr, 
param, procType)
+      case UniquenessType => UniquenessRulePlanTrans(dsNames, ti, name, expr, 
param, procType)
+      case DistinctnessType => DistinctnessRulePlanTrans(dsNames, ti, name, 
expr, param, procType, dsTimeRanges)
+      case TimelinessType => TimelinessRulePlanTrans(dsNames, ti, name, expr, 
param, procType)
+      case _ => emptyRulePlanTrans
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
new file mode 100644
index 0000000..9a01553
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/TimelinessRulePlanTrans.scala
@@ -0,0 +1,279 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.trans
+
+import org.apache.griffin.measure.process.temp.TableRegisters
+import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, 
ProcessType, StreamingProcessType}
+import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
+import org.apache.griffin.measure.rule.adaptor._
+import org.apache.griffin.measure.rule.dsl.ArrayCollectType
+import org.apache.griffin.measure.rule.dsl.analyzer.TimelinessAnalyzer
+import org.apache.griffin.measure.rule.dsl.expr._
+import org.apache.griffin.measure.rule.plan._
+import org.apache.griffin.measure.rule.trans.RuleExportFactory._
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.TimeUtil
+
+case class TimelinessRulePlanTrans(dataSourceNames: Seq[String],
+                                   timeInfo: TimeInfo, name: String, expr: 
Expr,
+                                   param: Map[String, Any], procType: 
ProcessType
+                                  ) extends RulePlanTrans {
+
+  private object TimelinessKeys {
+    val _source = "source"
+    val _latency = "latency"
+    val _total = "total"
+    val _avg = "avg"
+    val _threshold = "threshold"
+    val _step = "step"
+    val _count = "count"
+    val _stepSize = "step.size"
+    val _percentileColPrefix = "percentile"
+    val _percentileValues = "percentile.values"
+  }
+  import TimelinessKeys._
+
+  def trans(): RulePlan = {
+    val details = getDetails(param)
+    val timelinessClause = expr.asInstanceOf[TimelinessClause]
+    val sourceName = details.getString(_source, dataSourceNames.head)
+
+    val mode = ExportMode.defaultMode(procType)
+
+    val ct = timeInfo.calcTime
+
+    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+      emptyRulePlan
+    } else {
+      val analyzer = TimelinessAnalyzer(timelinessClause, sourceName)
+      val btsSel = analyzer.btsExpr
+      val etsSelOpt = analyzer.etsExprOpt
+
+      // 1. in time
+      val inTimeTableName = "__inTime"
+      val inTimeSql = etsSelOpt match {
+        case Some(etsSel) => {
+          s"""
+             |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`,
+             |(${etsSel}) AS `${InternalColumns.endTs}`
+             |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) 
IS NOT NULL
+           """.stripMargin
+        }
+        case _ => {
+          s"""
+             |SELECT *, (${btsSel}) AS `${InternalColumns.beginTs}`
+             |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL
+           """.stripMargin
+        }
+      }
+      val inTimeStep = SparkSqlStep(inTimeTableName, inTimeSql, emptyMap)
+
+      // 2. latency
+      val latencyTableName = "__lat"
+      val latencyColName = details.getStringOrKey(_latency)
+      val etsColName = etsSelOpt match {
+        case Some(_) => InternalColumns.endTs
+        case _ => InternalColumns.tmst
+      }
+      val latencySql = {
+        s"SELECT *, (`${etsColName}` - `${InternalColumns.beginTs}`) AS 
`${latencyColName}` FROM `${inTimeTableName}`"
+      }
+      val latencyStep = SparkSqlStep(latencyTableName, latencySql, emptyMap, 
true)
+
+      // 3. timeliness metric
+      val metricTableName = name
+      val totalColName = details.getStringOrKey(_total)
+      val avgColName = details.getStringOrKey(_avg)
+      val metricSql = procType match {
+        case BatchProcessType => {
+          s"""
+             |SELECT COUNT(*) AS `${totalColName}`,
+             |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
+             |FROM `${latencyTableName}`
+           """.stripMargin
+        }
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${InternalColumns.tmst}`,
+             |COUNT(*) AS `${totalColName}`,
+             |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}`
+             |FROM `${latencyTableName}`
+             |GROUP BY `${InternalColumns.tmst}`
+           """.stripMargin
+        }
+      }
+      val metricStep = SparkSqlStep(metricTableName, metricSql, emptyMap)
+      val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap)
+      val metricExports = genMetricExport(metricParam, name, metricTableName, 
ct, mode) :: Nil
+
+      // current timeliness plan
+      val timeSteps = inTimeStep :: latencyStep :: metricStep :: Nil
+      val timeExports = metricExports
+      val timePlan = RulePlan(timeSteps, timeExports)
+
+      // 4. timeliness record
+      val recordPlan = TimeUtil.milliseconds(details.getString(_threshold, 
"")) match {
+        case Some(tsh) => {
+          val recordTableName = "__lateRecords"
+          val recordSql = {
+            s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > 
${tsh}"
+          }
+          val recordStep = SparkSqlStep(recordTableName, recordSql, emptyMap)
+          val recordParam = 
RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+          val recordExports = genRecordExport(recordParam, recordTableName, 
recordTableName, ct, mode) :: Nil
+          RulePlan(recordStep :: Nil, recordExports)
+        }
+        case _ => emptyRulePlan
+      }
+
+// 5. ranges
+//      val rangePlan = details.get(_rangeSplit) match {
+//        case Some(arr: Seq[String]) => {
+//          val ranges = splitTimeRanges(arr)
+//          if (ranges.size > 0) {
+//            try {
+//              // 5.1. range
+//              val rangeTableName = "__range"
+//              val rangeColName = details.getStringOrKey(_range)
+//              val caseClause = {
+//                val whenClause = ranges.map { range =>
+//                  s"WHEN `${latencyColName}` < ${range._1} THEN 
'<${range._2}'"
+//                }.mkString("\n")
+//                s"CASE ${whenClause} ELSE '>=${ranges.last._2}' END AS 
`${rangeColName}`"
+//              }
+//              val rangeSql = {
+//                s"SELECT *, ${caseClause} FROM `${latencyTableName}`"
+//              }
+//              val rangeStep = SparkSqlStep(rangeTableName, rangeSql, 
emptyMap)
+//
+//              // 5.2. range metric
+//              val rangeMetricTableName = "__rangeMetric"
+//              val countColName = details.getStringOrKey(_count)
+//              val rangeMetricSql = procType match {
+//                case BatchProcessType => {
+//                  s"""
+//                     |SELECT `${rangeColName}`, COUNT(*) AS `${countColName}`
+//                     |FROM `${rangeTableName}` GROUP BY `${rangeColName}`
+//                  """.stripMargin
+//                }
+//                case StreamingProcessType => {
+//                  s"""
+//                     |SELECT `${InternalColumns.tmst}`, `${rangeColName}`, 
COUNT(*) AS `${countColName}`
+//                     |FROM `${rangeTableName}` GROUP BY 
`${InternalColumns.tmst}`, `${rangeColName}`
+//                  """.stripMargin
+//                }
+//              }
+//              val rangeMetricStep = SparkSqlStep(rangeMetricTableName, 
rangeMetricSql, emptyMap)
+//              val rangeMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+//              val rangeMetricExports = genMetricExport(rangeMetricParam, 
rangeColName, rangeMetricTableName, ct, mode) :: Nil
+//
+//              RulePlan(rangeStep :: rangeMetricStep :: Nil, 
rangeMetricExports)
+//            } catch {
+//              case _: Throwable => emptyRulePlan
+//            }
+//          } else emptyRulePlan
+//        }
+//        case _ => emptyRulePlan
+//      }
+
+// return timeliness plan
+
+      // 5. ranges
+      val rangePlan = TimeUtil.milliseconds(details.getString(_stepSize, "")) 
match {
+        case Some(stepSize) => {
+          // 5.1 range
+          val rangeTableName = "__range"
+          val stepColName = details.getStringOrKey(_step)
+          val rangeSql = {
+            s"""
+               |SELECT *, CAST((`${latencyColName}` / ${stepSize}) AS BIGINT) 
AS `${stepColName}`
+               |FROM `${latencyTableName}`
+             """.stripMargin
+          }
+          val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap)
+
+          // 5.2 range metric
+          val rangeMetricTableName = "__rangeMetric"
+          val countColName = details.getStringOrKey(_count)
+          val rangeMetricSql = procType match {
+            case BatchProcessType => {
+              s"""
+                 |SELECT `${stepColName}`, COUNT(*) AS `${countColName}`
+                 |FROM `${rangeTableName}` GROUP BY `${stepColName}`
+                """.stripMargin
+            }
+            case StreamingProcessType => {
+              s"""
+                 |SELECT `${InternalColumns.tmst}`, `${stepColName}`, COUNT(*) 
AS `${countColName}`
+                 |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, 
`${stepColName}`
+                """.stripMargin
+            }
+          }
+          val rangeMetricStep = SparkSqlStep(rangeMetricTableName, 
rangeMetricSql, emptyMap)
+          val rangeMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+          val rangeMetricExports = genMetricExport(rangeMetricParam, 
stepColName, rangeMetricTableName, ct, mode) :: Nil
+
+          RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports)
+        }
+        case _ => emptyRulePlan
+      }
+
+      // 6. percentiles
+      val percentiles = getPercentiles(details)
+      val percentilePlan = if (percentiles.size > 0) {
+        val percentileTableName = "__percentile"
+        val percentileColName = details.getStringOrKey(_percentileColPrefix)
+        val percentileCols = percentiles.map { pct =>
+          val pctName = (pct * 100).toInt.toString
+          s"floor(percentile_approx(${latencyColName}, ${pct})) AS 
`${percentileColName}_${pctName}`"
+        }.mkString(", ")
+        val percentileSql = procType match {
+          case BatchProcessType => {
+            s"""
+               |SELECT ${percentileCols}
+               |FROM `${latencyTableName}`
+              """.stripMargin
+          }
+          case StreamingProcessType => {
+            s"""
+               |SELECT `${InternalColumns.tmst}`, ${percentileCols}
+               |FROM `${latencyTableName}` GROUP BY `${InternalColumns.tmst}`
+              """.stripMargin
+          }
+        }
+        val percentileStep = SparkSqlStep(percentileTableName, percentileSql, 
emptyMap)
+        val percentileParam = emptyMap
+        val percentielExports = genMetricExport(percentileParam, 
percentileTableName, percentileTableName, ct, mode) :: Nil
+
+        RulePlan(percentileStep :: Nil, percentielExports)
+      } else emptyRulePlan
+
+      timePlan.merge(recordPlan).merge(rangePlan).merge(percentilePlan)
+    }
+  }
+
+  private def getPercentiles(details: Map[String, Any]): Seq[Double] = {
+//    details.get(_percentiles) match {
+//      case Some(seq: Seq[Double]) => seq
+//      case _ => Nil
+//    }
+    details.getArr[Double](_percentileValues).filter(d => (d >= 0 && d <= 1))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala
new file mode 100644
index 0000000..326d80b
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/UniquenessRulePlanTrans.scala
@@ -0,0 +1,198 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.trans
+
+import org.apache.griffin.measure.process._
+import org.apache.griffin.measure.process.temp._
+import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._
+import org.apache.griffin.measure.rule.adaptor._
+import org.apache.griffin.measure.rule.dsl.analyzer.UniquenessAnalyzer
+import org.apache.griffin.measure.rule.dsl.expr._
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.plan._
+import org.apache.griffin.measure.rule.trans.RuleExportFactory._
+import org.apache.griffin.measure.utils.ParamUtil._
+
+case class UniquenessRulePlanTrans(dataSourceNames: Seq[String],
+                                   timeInfo: TimeInfo, name: String, expr: 
Expr,
+                                   param: Map[String, Any], procType: 
ProcessType
+                                  ) extends RulePlanTrans {
+
+  private object UniquenessKeys {
+    val _source = "source"
+    val _target = "target"
+    val _unique = "unique"
+    val _total = "total"
+    val _dup = "dup"
+    val _num = "num"
+
+    val _duplicationArray = "duplication.array"
+  }
+  import UniquenessKeys._
+
+  def trans(): RulePlan = {
+    val details = getDetails(param)
+    val sourceName = details.getString(_source, dataSourceNames.head)
+    val targetName = details.getString(_target, dataSourceNames.tail.head)
+    val analyzer = UniquenessAnalyzer(expr.asInstanceOf[UniquenessClause], 
sourceName, targetName)
+
+    val mode = ExportMode.defaultMode(procType)
+
+    val ct = timeInfo.calcTime
+
+    if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) {
+      println(s"[${ct}] data source ${sourceName} not exists")
+      emptyRulePlan
+    } else if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) {
+      println(s"[${ct}] data source ${targetName} not exists")
+      emptyRulePlan
+    } else {
+      val selItemsClause = analyzer.selectionPairs.map { pair =>
+        val (expr, alias) = pair
+        s"${expr.desc} AS `${alias}`"
+      }.mkString(", ")
+      val aliases = analyzer.selectionPairs.map(_._2)
+
+      val selClause = procType match {
+        case BatchProcessType => selItemsClause
+        case StreamingProcessType => s"`${InternalColumns.tmst}`, 
${selItemsClause}"
+      }
+      val selAliases = procType match {
+        case BatchProcessType => aliases
+        case StreamingProcessType => InternalColumns.tmst +: aliases
+      }
+
+      // 1. source distinct mapping
+      val sourceTableName = "__source"
+      val sourceSql = s"SELECT DISTINCT ${selClause} FROM ${sourceName}"
+      val sourceStep = SparkSqlStep(sourceTableName, sourceSql, emptyMap)
+
+      // 2. target mapping
+      val targetTableName = "__target"
+      val targetSql = s"SELECT ${selClause} FROM ${targetName}"
+      val targetStep = SparkSqlStep(targetTableName, targetSql, emptyMap)
+
+      // 3. joined
+      val joinedTableName = "__joined"
+      val joinedSelClause = selAliases.map { alias =>
+        s"`${sourceTableName}`.`${alias}` AS `${alias}`"
+      }.mkString(", ")
+      val onClause = aliases.map { alias =>
+        s"coalesce(`${sourceTableName}`.`${alias}`, '') = 
coalesce(`${targetTableName}`.`${alias}`, '')"
+      }.mkString(" AND ")
+      val joinedSql = {
+        s"SELECT ${joinedSelClause} FROM `${targetTableName}` RIGHT JOIN 
`${sourceTableName}` ON ${onClause}"
+      }
+      val joinedStep = SparkSqlStep(joinedTableName, joinedSql, emptyMap)
+
+      // 4. group
+      val groupTableName = "__group"
+      val groupSelClause = selAliases.map { alias =>
+        s"`${alias}`"
+      }.mkString(", ")
+      val dupColName = details.getStringOrKey(_dup)
+      val groupSql = {
+        s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM 
`${joinedTableName}` GROUP BY ${groupSelClause}"
+      }
+      val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap, true)
+
+      // 5. total metric
+      val totalTableName = "__totalMetric"
+      val totalColName = details.getStringOrKey(_total)
+      val totalSql = procType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM 
`${sourceName}`"
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}`
+             |FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`
+           """.stripMargin
+        }
+      }
+      val totalStep = SparkSqlStep(totalTableName, totalSql, emptyMap)
+      val totalMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
+      val totalMetricExport = genMetricExport(totalMetricParam, totalColName, 
totalTableName, ct, mode)
+
+      // 6. unique record
+      val uniqueRecordTableName = "__uniqueRecord"
+      val uniqueRecordSql = {
+        s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0"
+      }
+      val uniqueRecordStep = SparkSqlStep(uniqueRecordTableName, 
uniqueRecordSql, emptyMap)
+
+      // 7. unique metric
+      val uniqueTableName = "__uniqueMetric"
+      val uniqueColName = details.getStringOrKey(_unique)
+      val uniqueSql = procType match {
+        case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM 
`${uniqueRecordTableName}`"
+        case StreamingProcessType => {
+          s"""
+             |SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${uniqueColName}`
+             |FROM `${uniqueRecordTableName}` GROUP BY 
`${InternalColumns.tmst}`
+           """.stripMargin
+        }
+      }
+      val uniqueStep = SparkSqlStep(uniqueTableName, uniqueSql, emptyMap)
+      val uniqueMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, EntriesCollectType.desc)
+      val uniqueMetricExport = genMetricExport(uniqueMetricParam, 
uniqueColName, uniqueTableName, ct, mode)
+
+      val uniqueSteps = sourceStep :: targetStep :: joinedStep :: groupStep ::
+        totalStep :: uniqueRecordStep :: uniqueStep :: Nil
+      val uniqueExports = totalMetricExport :: uniqueMetricExport :: Nil
+      val uniqueRulePlan = RulePlan(uniqueSteps, uniqueExports)
+
+      val duplicationArrayName = details.getString(_duplicationArray, "")
+      val dupRulePlan = if (duplicationArrayName.nonEmpty) {
+        // 8. duplicate record
+        val dupRecordTableName = "__dupRecords"
+        val dupRecordSql = {
+          s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0"
+        }
+        val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, 
emptyMap, true)
+        val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap)
+        val dupRecordExport = genRecordExport(recordParam, dupRecordTableName, 
dupRecordTableName, ct, mode)
+
+        // 9. duplicate metric
+        val dupMetricTableName = "__dupMetric"
+        val numColName = details.getStringOrKey(_num)
+        val dupMetricSelClause = procType match {
+          case BatchProcessType => s"`${dupColName}`, COUNT(*) AS 
`${numColName}`"
+          case StreamingProcessType => s"`${InternalColumns.tmst}`, 
`${dupColName}`, COUNT(*) AS `${numColName}`"
+        }
+        val dupMetricGroupbyClause = procType match {
+          case BatchProcessType => s"`${dupColName}`"
+          case StreamingProcessType => s"`${InternalColumns.tmst}`, 
`${dupColName}`"
+        }
+        val dupMetricSql = {
+          s"""
+             |SELECT ${dupMetricSelClause} FROM `${dupRecordTableName}`
+             |GROUP BY ${dupMetricGroupbyClause}
+          """.stripMargin
+        }
+        val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, 
emptyMap)
+        val dupMetricParam = 
emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc)
+        val dupMetricExport = genMetricExport(dupMetricParam, 
duplicationArrayName, dupMetricTableName, ct, mode)
+
+        RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: 
dupMetricExport :: Nil)
+      } else emptyRulePlan
+
+      uniqueRulePlan.merge(dupRulePlan)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala
new file mode 100644
index 0000000..cb00641
--- /dev/null
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/GriffinUdafs.scala
@@ -0,0 +1,29 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.udf
+
+import org.apache.spark.sql.SQLContext
+
+object GriffinUdafs {
+
+  def register(sqlContext: SQLContext): Unit = {
+//    sqlContext.udf.register("my_mean", new MeanUdaf)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala 
b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala
new file mode 100644
index 0000000..80b3a02
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/udf/MeanUdaf.scala
@@ -0,0 +1,58 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.rule.udf
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, 
UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+class MeanUdaf extends UserDefinedAggregateFunction {
+  def inputSchema: StructType = StructType(Array(StructField("item", 
LongType)))
+
+  def bufferSchema = StructType(Array(
+    StructField("sum", DoubleType),
+    StructField("cnt", LongType)
+  ))
+
+  def dataType: DataType = DoubleType
+
+  def deterministic: Boolean = true
+
+  def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, 0.toDouble)
+    buffer.update(1, 0L)
+  }
+
+  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val sum = buffer.getDouble(0)
+    val cnt = buffer.getLong(1)
+    val value = input.getLong(0)
+    buffer.update(0, sum + value)
+    buffer.update(1, cnt + 1)
+  }
+
+  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0))
+    buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))
+  }
+
+  def evaluate(buffer: Row): Any = {
+    buffer.getDouble(0) / buffer.getLong(1).toDouble
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala 
b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
index 1ca32b3..d125d87 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/ParamUtil.scala
@@ -185,6 +185,17 @@ object ParamUtil {
       }
     }
 
+    def getArr[T](key: String): Seq[T] = {
+      try {
+        params.get(key) match {
+          case Some(seq: Seq[T]) => seq
+          case _ => Nil
+        }
+      } catch {
+        case _: Throwable => Nil
+      }
+    }
+
     def addIfNotExist(key: String, value: Any): Map[String, Any] = {
       params.get(key) match {
         case Some(v) => params

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/test/resources/_accuracy-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json 
b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
index da010d7..a0e2e7d 100644
--- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
@@ -40,11 +40,13 @@
         }
       ],
       "cache": {
+        "type": "parquet",
         "file.path": "hdfs://localhost/griffin/streaming/dump/source",
         "info.path": "source",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["-2m", "0"]
+        "time.range": ["-2m", "0"],
+        "updatable": true
       }
     }, {
       "name": "target",
@@ -81,6 +83,7 @@
         }
       ],
       "cache": {
+        "type": "parquet",
         "file.path": "hdfs://localhost/griffin/streaming/dump/target",
         "info.path": "target",
         "ready.time.interval": "10s",
@@ -108,8 +111,7 @@
           "name": "accu"
         },
         "record": {
-          "name": "missRecords",
-          "data.source.cache": "source"
+          "name": "missRecords"
         }
       }
     ]

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/test/resources/_timeliness-batch-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json 
b/measure/src/test/resources/_timeliness-batch-griffindsl.json
index bd48401..90439df 100644
--- a/measure/src/test/resources/_timeliness-batch-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-batch-griffindsl.json
@@ -28,10 +28,14 @@
         "details": {
           "source": "source",
           "latency": "latency",
+          "total": "total",
+          "avg": "avg",
           "threshold": "3m",
           "step": "step",
           "count": "cnt",
-          "step.size": "2m"
+          "step.size": "2m",
+          "percentile": "percentile",
+          "percentile.values": [0.95]
         },
         "metric": {
           "name": "timeliness"

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/test/resources/_timeliness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json 
b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
index fbaf8d4..5916e5c 100644
--- a/measure/src/test/resources/_timeliness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_timeliness-streaming-griffindsl.json
@@ -63,7 +63,9 @@
           "threshold": "1h",
           "step": "step",
           "count": "cnt",
-          "step.size": "5m"
+          "step.size": "5m",
+          "percentile": "percentile",
+          "percentile.values": [0.2, 0.5, 0.8]
         },
         "metric": {
           "name": "timeliness"


Reply via email to