Repository: incubator-griffin Updated Branches: refs/heads/master 71fcf93b9 -> a0b130ae0
[GRIFFIN-96] Add timeliness as a new feature Timeliness supported, to measure the latency of streaming data, only supporting the data which containing start_time and end_time itself. Author: Lionel Liu <[email protected]> Closes #196 from bhlx3lyx7/tmst. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/a0b130ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/a0b130ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/a0b130ae Branch: refs/heads/master Commit: a0b130ae01199c1c95a8ed56a00dcd700adb6651 Parents: 71fcf93 Author: Lionel Liu <[email protected]> Authored: Wed Jan 24 10:54:44 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Wed Jan 24 10:54:44 2018 +0800 ---------------------------------------------------------------------- .../measure/cache/info/ZKInfoCache.scala | 2 + .../measure/rule/adaptor/GlobalKeys.scala | 5 + .../rule/adaptor/GriffinDslAdaptor.scala | 110 +++++++++++++++++-- .../apache/griffin/measure/utils/TimeUtil.scala | 67 +++++++---- .../resources/_timeliness-batch-griffindsl.json | 5 +- .../_timeliness-streaming-griffindsl.json | 11 +- .../griffin/measure/utils/TimeUtilTest.scala | 38 +++++++ 7 files changed, 208 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala index ee99099..3789a05 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/ZKInfoCache.scala @@ -117,6 +117,8 @@ case class ZKInfoCache(config: Map[String, Any], metricName: String) extends Inf def clearInfo(): Unit = { // delete("/") + deleteInfo(TimeInfoCache.finalCacheInfoPath :: Nil) + deleteInfo(TimeInfoCache.infoPath :: Nil) println("clear info") } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala index f592709..bd27b19 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GlobalKeys.scala @@ -58,7 +58,12 @@ object DistinctnessKeys { object TimelinessKeys { val _source = "source" val _latency = "latency" + val _total = "total" + val _avg = "avg" val _threshold = "threshold" + val _step = "step" + val _count = "count" + val _stepSize = "step.size" } object GlobalKeys { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala index ad4a195..5655a13 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala @@ -666,21 +666,21 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], // 3. timeliness metric val metricTableName = name + val totalColName = details.getStringOrKey(TimelinessKeys._total) + val avgColName = details.getStringOrKey(TimelinessKeys._avg) val metricSql = procType match { case BatchProcessType => { s""" - |SELECT CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`, - |MAX(`${latencyColName}`) AS `max`, - |MIN(`${latencyColName}`) AS `min` + |SELECT COUNT(*) AS `${totalColName}`, + |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}` |FROM `${latencyTableName}` """.stripMargin } case StreamingProcessType => { s""" |SELECT `${InternalColumns.tmst}`, - |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `avg`, - |MAX(`${latencyColName}`) AS `max`, - |MIN(`${latencyColName}`) AS `min` + |COUNT(*) AS `${totalColName}`, + |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}` |FROM `${latencyTableName}` |GROUP BY `${InternalColumns.tmst}` """.stripMargin @@ -710,9 +710,105 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], case _ => emptyRulePlan } + // 5. ranges +// val rangePlan = details.get(TimelinessKeys._rangeSplit) match { +// case Some(arr: Seq[String]) => { +// val ranges = splitTimeRanges(arr) +// if (ranges.size > 0) { +// try { +// // 5.1. range +// val rangeTableName = "__range" +// val rangeColName = details.getStringOrKey(TimelinessKeys._range) +// val caseClause = { +// val whenClause = ranges.map { range => +// s"WHEN `${latencyColName}` < ${range._1} THEN '<${range._2}'" +// }.mkString("\n") +// s"CASE ${whenClause} ELSE '>=${ranges.last._2}' END AS `${rangeColName}`" +// } +// val rangeSql = { +// s"SELECT *, ${caseClause} FROM `${latencyTableName}`" +// } +// val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap) +// +// // 5.2. range metric +// val rangeMetricTableName = "__rangeMetric" +// val countColName = details.getStringOrKey(TimelinessKeys._count) +// val rangeMetricSql = procType match { +// case BatchProcessType => { +// s""" +// |SELECT `${rangeColName}`, COUNT(*) AS `${countColName}` +// |FROM `${rangeTableName}` GROUP BY `${rangeColName}` +// """.stripMargin +// } +// case StreamingProcessType => { +// s""" +// |SELECT `${InternalColumns.tmst}`, `${rangeColName}`, COUNT(*) AS `${countColName}` +// |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, `${rangeColName}` +// """.stripMargin +// } +// } +// val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap) +// val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) +// val rangeMetricExports = genMetricExport(rangeMetricParam, rangeColName, rangeMetricTableName, ct, mode) :: Nil +// +// RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports) +// } catch { +// case _: Throwable => emptyRulePlan +// } +// } else emptyRulePlan +// } +// case _ => emptyRulePlan +// } + // return timeliness plan - timePlan.merge(recordPlan) + + // 5. ranges + val rangePlan = TimeUtil.milliseconds(details.getString(TimelinessKeys._stepSize, "")) match { + case Some(stepSize) => { + // 5.1 range + val rangeTableName = "__range" + val stepColName = details.getStringOrKey(TimelinessKeys._step) + val rangeSql = { + s""" + |SELECT *, CAST((`${latencyColName}` / ${stepSize}) AS BIGINT) AS `${stepColName}` + |FROM `${latencyTableName}` + """.stripMargin + } + val rangeStep = SparkSqlStep(rangeTableName, rangeSql, emptyMap) + + // 5.2 range metric + val rangeMetricTableName = "__rangeMetric" + val countColName = details.getStringOrKey(TimelinessKeys._count) + val rangeMetricSql = procType match { + case BatchProcessType => { + s""" + |SELECT `${stepColName}`, COUNT(*) AS `${countColName}` + |FROM `${rangeTableName}` GROUP BY `${stepColName}` + """.stripMargin + } + case StreamingProcessType => { + s""" + |SELECT `${InternalColumns.tmst}`, `${stepColName}`, COUNT(*) AS `${countColName}` + |FROM `${rangeTableName}` GROUP BY `${InternalColumns.tmst}`, `${stepColName}` + """.stripMargin + } + } + val rangeMetricStep = SparkSqlStep(rangeMetricTableName, rangeMetricSql, emptyMap) + val rangeMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) + val rangeMetricExports = genMetricExport(rangeMetricParam, stepColName, rangeMetricTableName, ct, mode) :: Nil + + RulePlan(rangeStep :: rangeMetricStep :: Nil, rangeMetricExports) + } + case _ => emptyRulePlan + } + + timePlan.merge(recordPlan).merge(rangePlan) } } + private def splitTimeRanges(tstrs: Seq[String]): List[(Long, String)] = { + val ts = tstrs.flatMap(TimeUtil.milliseconds(_)).sorted.toList + ts.map { t => (t, TimeUtil.time2String(t)) } + } + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala index 42a140f..9b4d58e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/TimeUtil.scala @@ -20,11 +20,30 @@ package org.apache.griffin.measure.utils import org.apache.griffin.measure.log.Loggable +import scala.util.matching.Regex import scala.util.{Failure, Success, Try} object TimeUtil extends Loggable { - final val TimeRegex = """^([+\-]?\d+)(ms|s|m|h|d)$""".r + private object Units { + case class TimeUnit(name: String, shortName: String, ut: Long, regex: Regex) { + def toMs(t: Long) = t * ut + def fromMs(ms: Long) = ms / ut + def fitUnit(ms: Long) = (ms % ut == 0) + } + + val dayUnit = TimeUnit("day", "d", 24 * 60 * 60 * 1000, """^(?i)d(?:ay)?$""".r) + val hourUnit = TimeUnit("hour", "h", 60 * 60 * 1000, """^(?i)h(?:our|r)?$""".r) + val minUnit = TimeUnit("minute", "m", 60 * 1000, """^(?i)m(?:in(?:ute)?)?$""".r) + val secUnit = TimeUnit("second", "s", 1000, """^(?i)s(?:ec(?:ond)?)?$""".r) + val msUnit = TimeUnit("millisecond", "ms", 1, """^(?i)m(?:illi)?s(?:ec(?:ond)?)?$""".r) + + val timeUnits = dayUnit :: hourUnit :: minUnit :: secUnit :: msUnit :: Nil + } + import Units._ + +// final val TimeRegex = """^([+\-]?\d+)(ms|s|m|h|d)$""".r + final val TimeRegex = """^([+\-]?\d+)([a-zA-Z]+)$""".r final val PureTimeRegex = """^([+\-]?\d+)$""".r def milliseconds(timeString: String): Option[Long] = { @@ -34,17 +53,17 @@ object TimeUtil extends Loggable { case TimeRegex(time, unit) => { val t = time.toLong unit match { - case "d" => t * 24 * 60 * 60 * 1000 - case "h" => t * 60 * 60 * 1000 - case "m" => t * 60 * 1000 - case "s" => t * 1000 - case "ms" => t + case dayUnit.regex() => dayUnit.toMs(t) + case hourUnit.regex() => hourUnit.toMs(t) + case minUnit.regex() => minUnit.toMs(t) + case secUnit.regex() => secUnit.toMs(t) + case msUnit.regex() => msUnit.toMs(t) case _ => throw new Exception(s"${timeString} is invalid time format") } } case PureTimeRegex(time) => { val t = time.toLong - t + msUnit.toMs(t) } case _ => throw new Exception(s"${timeString} is invalid time format") } @@ -58,24 +77,34 @@ object TimeUtil extends Loggable { def timeToUnit(ms: Long, unit: String): Long = { unit match { - case "ms" => ms - case "sec" => ms / 1000 - case "min" => ms / (60 * 1000) - case "hour" => ms / (60 * 60 * 1000) - case "day" => ms / (24 * 60 * 60 * 1000) - case _ => ms / (60 * 1000) + case dayUnit.regex() => dayUnit.fromMs(ms) + case hourUnit.regex() => hourUnit.fromMs(ms) + case minUnit.regex() => minUnit.fromMs(ms) + case secUnit.regex() => secUnit.fromMs(ms) + case msUnit.regex() => msUnit.fromMs(ms) + case _ => ms } } def timeFromUnit(t: Long, unit: String): Long = { unit match { - case "ms" => t - case "sec" => t * 1000 - case "min" => t * 60 * 1000 - case "hour" => t * 60 * 60 * 1000 - case "day" => t * 24 * 60 * 60 * 1000 - case _ => t * 60 * 1000 + case dayUnit.regex() => dayUnit.toMs(t) + case hourUnit.regex() => hourUnit.toMs(t) + case minUnit.regex() => minUnit.toMs(t) + case secUnit.regex() => secUnit.toMs(t) + case msUnit.regex() => msUnit.toMs(t) + case _ => t + } + } + + def time2String(t: Long): String = { + val matchedUnitOpt = timeUnits.foldLeft(None: Option[TimeUnit]) { (retOpt, unit) => + if (retOpt.isEmpty && unit.fitUnit(t)) Some(unit) else retOpt } + val unit = matchedUnitOpt.getOrElse(msUnit) + val unitTime = unit.fromMs(t) + val unitStr = unit.shortName + s"${unitTime}${unitStr}" } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/test/resources/_timeliness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json b/measure/src/test/resources/_timeliness-batch-griffindsl.json index 2af98f1..bd48401 100644 --- a/measure/src/test/resources/_timeliness-batch-griffindsl.json +++ b/measure/src/test/resources/_timeliness-batch-griffindsl.json @@ -28,7 +28,10 @@ "details": { "source": "source", "latency": "latency", - "threshold": "3m" + "threshold": "3m", + "step": "step", + "count": "cnt", + "step.size": "2m" }, "metric": { "name": "timeliness" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/test/resources/_timeliness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json b/measure/src/test/resources/_timeliness-streaming-griffindsl.json index 776c3b5..fbaf8d4 100644 --- a/measure/src/test/resources/_timeliness-streaming-griffindsl.json +++ b/measure/src/test/resources/_timeliness-streaming-griffindsl.json @@ -33,7 +33,7 @@ { "dsl.type": "spark-sql", "name": "${this}", - "rule": "select ts, name, age from ${s1}" + "rule": "select ts, end_ts, name, age from ${s1}" } ] } @@ -54,11 +54,16 @@ "dsl.type": "griffin-dsl", "dq.type": "timeliness", "name": "timeliness", - "rule": "ts", + "rule": "ts, end_ts", "details": { "source": "source", "latency": "latency", - "threshold": "1h" + "total": "total", + "avg": "avg", + "threshold": "1h", + "step": "step", + "count": "cnt", + "step.size": "5m" }, "metric": { "name": "timeliness" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0b130ae/measure/src/test/scala/org/apache/griffin/measure/utils/TimeUtilTest.scala ---------------------------------------------------------------------- diff --git a/measure/src/test/scala/org/apache/griffin/measure/utils/TimeUtilTest.scala b/measure/src/test/scala/org/apache/griffin/measure/utils/TimeUtilTest.scala new file mode 100644 index 0000000..673eca0 --- /dev/null +++ b/measure/src/test/scala/org/apache/griffin/measure/utils/TimeUtilTest.scala @@ -0,0 +1,38 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.utils + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} + +@RunWith(classOf[JUnitRunner]) +class TimeUtilTest extends FunSuite with Matchers with BeforeAndAfter { + + test ("milliseconds") { + val ts = "1h" + val res = TimeUtil.milliseconds(ts) + println(res) + + val t = 1200000 + val s = TimeUtil.time2String(t) + println(s) + } + +}
