Repository: incubator-griffin Updated Branches: refs/heads/master 07121ba9b -> a4f4f2413
support multiple streaming data connector, and union dataframe by name - multiple streaming data connector - union dataframe by name Author: Lionel Liu <[email protected]> Closes #235 from bhlx3lyx7/tmst. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/a4f4f241 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/a4f4f241 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/a4f4f241 Branch: refs/heads/master Commit: a4f4f24139b2a59c2e7356b3e22b589e3eba5074 Parents: 07121ba Author: Lionel Liu <[email protected]> Authored: Wed Mar 21 16:10:32 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Wed Mar 21 16:10:32 2018 +0800 ---------------------------------------------------------------------- .../data/connector/DataConnectorFactory.scala | 3 - .../streaming/KafkaStreamingDataConnector.scala | 38 ++-- .../KafkaStreamingStringDataConnector.scala | 4 +- .../streaming/StreamingDataConnector.scala | 5 +- .../measure/data/source/DataSource.scala | 51 +++--- .../data/source/cache/DataSourceCache.scala | 51 ++++-- .../measure/data/source/cache/WithFanIn.scala | 57 ++++++ .../measure/process/engine/SparkSqlEngine.scala | 2 +- .../measure/rule/adaptor/InternalColumns.scala | 4 +- .../dsl/analyzer/DistinctnessAnalyzer.scala | 2 +- .../rule/dsl/expr/ClauseExpression.scala | 28 ++- .../griffin/measure/rule/dsl/expr/Expr.scala | 2 +- .../griffin/measure/rule/dsl/expr/ExprTag.scala | 23 +++ .../measure/rule/dsl/parser/BasicParser.scala | 10 +- .../rule/dsl/parser/GriffinDslParser.scala | 12 +- .../rule/trans/DistinctnessRulePlanTrans.scala | 173 ++++++++++++++----- .../griffin/measure/utils/DataFrameUtil.scala | 41 +++++ .../_distinctness-batch-griffindsl1.json | 2 +- .../_distinctness-batch-griffindsl2.json | 74 ++++++++ 19 files changed, 467 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala index 9c3383f..27b390a 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala @@ -61,9 +61,6 @@ object DataConnectorFactory { case AvroRegex() => AvroBatchDataConnector(sqlContext, dqEngines, dataConnectorParam) case TextDirRegex() => TextDirBatchDataConnector(sqlContext, dqEngines, dataConnectorParam) case KafkaRegex() => { -// val ksdcTry = getStreamingDataConnector(ssc, dataConnectorParam) -// val cdcTry = getCacheDataConnector(sqlContext, dataConnectorParam.cache) -// KafkaCacheDirectDataConnector(ksdcTry, cdcTry, dataConnectorParam) getStreamingDataConnector(sqlContext, ssc, dqEngines, dataConnectorParam) } case _ => throw new Exception("connector creation error!") http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala index f973f3f..d46d6b7 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala @@ -28,6 +28,7 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector { type KD <: Decoder[K] type VD <: Decoder[V] + type OUT = (K, V) val config = dcParam.config @@ -42,35 +43,44 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector { } def init(): Unit = { + // register fan in + dataSourceCacheOpt.foreach(_.registerFanIn) + val ds = stream match { case Success(dstream) => dstream case Failure(ex) => throw ex } ds.foreachRDD((rdd, time) => { val ms = time.milliseconds - - // coalesce partition number - val prlCount = rdd.sparkContext.defaultParallelism - val ptnCount = rdd.getNumPartitions - val repartitionedRdd = if (prlCount < ptnCount) { - rdd.coalesce(prlCount) - } else rdd - - val dfOpt = transform(repartitionedRdd) - - val preDfOpt = preProcess(dfOpt, ms) + val saveDfOpt = try { + // coalesce partition number + val prlCount = rdd.sparkContext.defaultParallelism + val ptnCount = rdd.getNumPartitions + val repartitionedRdd = if (prlCount < ptnCount) { + rdd.coalesce(prlCount) + } else rdd + + val dfOpt = transform(repartitionedRdd) + + preProcess(dfOpt, ms) + } catch { + case e: Throwable => { + error(s"streaming data connector error: ${e.getMessage}") + None + } + } // save data frame - dataSourceCacheOpt.foreach(_.saveData(preDfOpt, ms)) + dataSourceCacheOpt.foreach(_.saveData(saveDfOpt, ms)) }) } - def stream(): Try[InputDStream[(K, V)]] = Try { + def stream(): Try[InputDStream[OUT]] = Try { val topicSet = topics.split(",").toSet createDStream(topicSet) } - protected def createDStream(topicSet: Set[String]): InputDStream[(K, V)] + protected def createDStream(topicSet: Set[String]): InputDStream[OUT] } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala index 5e0413e..038cb77 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingStringDataConnector.scala @@ -44,11 +44,11 @@ case class KafkaStreamingStringDataConnector(sqlContext: SQLContext, StructField(valueColName, StringType) )) - def createDStream(topicSet: Set[String]): InputDStream[(K, V)] = { + def createDStream(topicSet: Set[String]): InputDStream[OUT] = { KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet) } - def transform(rdd: RDD[(K, V)]): Option[DataFrame] = { + def transform(rdd: RDD[OUT]): Option[DataFrame] = { if (rdd.isEmpty) None else { try { val rowRdd = rdd.map(d => Row(d._2)) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala index 39f4995..e52ca1b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala @@ -32,10 +32,11 @@ trait StreamingDataConnector extends DataConnector { type K type V + type OUT - protected def stream(): Try[InputDStream[(K, V)]] + protected def stream(): Try[InputDStream[OUT]] - def transform(rdd: RDD[(K, V)]): Option[DataFrame] + def transform(rdd: RDD[OUT]): Option[DataFrame] def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange.emptyTimeRange) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala index 9a4b640..b4324dd 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala @@ -28,6 +28,7 @@ import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters, import org.apache.griffin.measure.rule.plan.TimeInfo import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.griffin.measure.utils.DataFrameUtil._ case class DataSource(sqlContext: SQLContext, name: String, @@ -89,31 +90,31 @@ case class DataSource(sqlContext: SQLContext, } } - private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: Option[DataFrame] - ): Option[DataFrame] = { - (dfOpt1, dfOpt2) match { - case (Some(df1), Some(df2)) => Some(unionDataFrames(df1, df2)) - case (Some(df1), _) => dfOpt1 - case (_, Some(df2)) => dfOpt2 - case _ => None - } - } - - private def unionDataFrames(df1: DataFrame, df2: DataFrame): DataFrame = { - try { - val cols = df1.columns - val rdd2 = df2.map{ row => - val values = cols.map { col => - row.getAs[Any](col) - } - Row(values: _*) - } - val ndf2 = sqlContext.createDataFrame(rdd2, df1.schema) - df1 unionAll ndf2 - } catch { - case e: Throwable => df1 - } - } +// private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: Option[DataFrame] +// ): Option[DataFrame] = { +// (dfOpt1, dfOpt2) match { +// case (Some(df1), Some(df2)) => Some(unionDataFrames(df1, df2)) +// case (Some(df1), _) => dfOpt1 +// case (_, Some(df2)) => dfOpt2 +// case _ => None +// } +// } +// +// private def unionDataFrames(df1: DataFrame, df2: DataFrame): DataFrame = { +// try { +// val cols = df1.columns +// val rdd2 = df2.map{ row => +// val values = cols.map { col => +// row.getAs[Any](col) +// } +// Row(values: _*) +// } +// val ndf2 = sqlContext.createDataFrame(rdd2, df1.schema) +// df1 unionAll ndf2 +// } catch { +// case e: Throwable => df1 +// } +// } def updateData(df: DataFrame): Unit = { dataSourceCacheOpt.foreach(_.updateData(Some(df))) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala index ac67557..2412130 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala @@ -28,11 +28,15 @@ import org.apache.griffin.measure.rule.adaptor.InternalColumns import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} import org.apache.griffin.measure.utils.ParamUtil._ import org.apache.spark.sql._ +import org.apache.spark.sql.functions.col +import org.apache.griffin.measure.utils.DataFrameUtil._ + +import scala.util.Random // data source cache process steps // dump phase: save // process phase: read -> process -> update -> finish -> clean old data -trait DataSourceCache extends DataCacheable with Loggable with Serializable { +trait DataSourceCache extends DataCacheable with WithFanIn[Long] with Loggable with Serializable { val sqlContext: SQLContext val param: Map[String, Any] @@ -55,7 +59,8 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { val _ReadyTimeDelay = "ready.time.delay" val _TimeRange = "time.range" - val defFilePath = s"hdfs:///griffin/cache/${dsName}/${index}" + val rdmStr = Random.alphanumeric.take(10).mkString + val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}" val defInfoPath = s"${index}" val filePath: String = param.getString(_FilePath, defFilePath) @@ -98,11 +103,17 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { def init(): Unit = {} - // save new cache data only + // save new cache data only, need index for multiple streaming data connectors def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = { if (!readOnly) { dfOpt match { case Some(df) => { + df.cache + + // cache df + val cnt = df.count + println(s"save ${dsName} data count: ${cnt}") + // lock makes it safer when writing new cache data val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) if (newCacheLocked) { @@ -115,6 +126,9 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { newCacheLock.unlock() } } + + // uncache + df.unpersist } case _ => { info(s"no data frame to save") @@ -122,8 +136,12 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { } // submit cache time and ready time - submitCacheTime(ms) - submitReadyTime(ms) + if (fanIncrement(ms)) { + println(s"save data [${ms}] finish") + submitCacheTime(ms) + submitReadyTime(ms) + } + } } @@ -179,15 +197,20 @@ trait DataSourceCache extends DataCacheable with Loggable with Serializable { (cacheDfOpt, retTimeRange) } - private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: Option[DataFrame] - ): Option[DataFrame] = { - (dfOpt1, dfOpt2) match { - case (Some(df1), Some(df2)) => Some(df1 unionAll df2) - case (Some(df1), _) => dfOpt1 - case (_, Some(df2)) => dfOpt2 - case _ => None - } - } +// private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: Option[DataFrame] +// ): Option[DataFrame] = { +// (dfOpt1, dfOpt2) match { +// case (Some(df1), Some(df2)) => Some(unionByName(df1, df2)) +// case (Some(df1), _) => dfOpt1 +// case (_, Some(df2)) => dfOpt2 +// case _ => None +// } +// } +// +// private def unionByName(a: DataFrame, b: DataFrame): DataFrame = { +// val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq +// a.select(columns: _*).unionAll(b.select(columns: _*)) +// } private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String], func: (Long, Long) => Boolean http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala new file mode 100644 index 0000000..aa5e04d --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala @@ -0,0 +1,57 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.source.cache + +import java.util.concurrent.atomic.AtomicInteger +import scala.collection.concurrent.{TrieMap, Map => ConcMap} + +trait WithFanIn[T] { + + val totalNum: AtomicInteger = new AtomicInteger(0) + val fanInCountMap: ConcMap[T, Int] = TrieMap[T, Int]() + + def registerFanIn(): Int = { + totalNum.incrementAndGet() + } + + def fanIncrement(key: T): Boolean = { + fanInc(key) + fanInCountMap.get(key) match { + case Some(n) if (n >= totalNum.get) => { + fanInCountMap.remove(key) + true + } + case _ => false + } + } + + private def fanInc(key: T): Unit = { + fanInCountMap.get(key) match { + case Some(n) => { + val suc = fanInCountMap.replace(key, n, n + 1) + if (!suc) fanInc(key) + } + case _ => { + val oldOpt = fanInCountMap.putIfAbsent(key, 1) + if (oldOpt.nonEmpty) fanInc(key) + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala index dcb02f6..438595b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala @@ -48,7 +48,7 @@ case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine { } else sqlContext.sql(rule) // println(name) -// rdf.show(10) +// rdf.show(3) if (rs.isGlobal) { if (rs.needCache) DataFrameCaches.cacheGlobalDataFrame(name, rdf) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala index fc6a246..fa04288 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/InternalColumns.scala @@ -29,5 +29,7 @@ object InternalColumns { val distinct = "__distinct" - val columns = List[String](tmst, metric, record, empty, beginTs, endTs, distinct) + val rowNumber = "__rn" + + val columns = List[String](tmst, metric, record, empty, beginTs, endTs, distinct, rowNumber) } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala index 55e4f39..af59eb4 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/analyzer/DistinctnessAnalyzer.scala @@ -37,7 +37,7 @@ case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String) ex val selectionPairs = exprs.zipWithIndex.map { pair => val (pr, idx) = pair val res = pr.preOrderTraverseDepthFirst(Seq[String]())(seqAlias, combAlias) - (pr, res.headOption.getOrElse(genAlias(idx))) + (pr, res.headOption.getOrElse(genAlias(idx)), pr.tag.isEmpty) } if (selectionPairs.isEmpty) { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala index 340c1e2..6790268 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala @@ -99,7 +99,7 @@ case class GroupbyClause(exprs: Seq[Expr], havingClauseOpt: Option[Expr]) extend } -case class OrderbyItem(expr: Expr, orderOpt: Option[String]) extends Expr { +case class OrderItem(expr: Expr, orderOpt: Option[String]) extends Expr { addChild(expr) def desc: String = { orderOpt match { @@ -109,12 +109,12 @@ case class OrderbyItem(expr: Expr, orderOpt: Option[String]) extends Expr { } def coalesceDesc: String = desc - override def map(func: (Expr) => Expr): OrderbyItem = { - OrderbyItem(func(expr), orderOpt) + override def map(func: (Expr) => Expr): OrderItem = { + OrderItem(func(expr), orderOpt) } } -case class OrderbyClause(items: Seq[OrderbyItem]) extends ClauseExpression { +case class OrderbyClause(items: Seq[OrderItem]) extends ClauseExpression { addChildren(items.map(_.expr)) @@ -128,7 +128,25 @@ case class OrderbyClause(items: Seq[OrderbyItem]) extends ClauseExpression { } override def map(func: (Expr) => Expr): OrderbyClause = { - OrderbyClause(items.map(func(_).asInstanceOf[OrderbyItem])) + OrderbyClause(items.map(func(_).asInstanceOf[OrderItem])) + } +} + +case class SortbyClause(items: Seq[OrderItem]) extends ClauseExpression { + + addChildren(items.map(_.expr)) + + def desc: String = { + val obs = items.map(_.desc).mkString(", ") + s"SORT BY ${obs}" + } + def coalesceDesc: String = { + val obs = items.map(_.desc).mkString(", ") + s"SORT BY ${obs}" + } + + override def map(func: (Expr) => Expr): SortbyClause = { + SortbyClause(items.map(func(_).asInstanceOf[OrderItem])) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala index c089e81..0b653b1 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/Expr.scala @@ -18,7 +18,7 @@ under the License. */ package org.apache.griffin.measure.rule.dsl.expr -trait Expr extends TreeNode with Serializable { +trait Expr extends TreeNode with ExprTag with Serializable { def desc: String http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExprTag.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExprTag.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExprTag.scala new file mode 100644 index 0000000..2e31bbe --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ExprTag.scala @@ -0,0 +1,23 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.dsl.expr + +trait ExprTag { this: Expr => + var tag: String = "" +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala index 846770b..3a0d737 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala @@ -152,6 +152,7 @@ trait BasicParser extends JavaTokenParsers with Serializable { val WHERE: Parser[String] = """(?i)where\s""".r val GROUP: Parser[String] = """(?i)group\s""".r val ORDER: Parser[String] = """(?i)order\s""".r + val SORT: Parser[String] = """(?i)sort\s""".r val BY: Parser[String] = """(?i)by\s""".r val DESC: Parser[String] = """(?i)desc""".r val ASC: Parser[String] = """(?i)asc""".r @@ -360,12 +361,15 @@ trait BasicParser extends JavaTokenParsers with Serializable { def groupbyClause: Parser[GroupbyClause] = GROUP ~ BY ~ rep1sep(expression, COMMA) ~ opt(havingClause) ^^ { case _ ~ _ ~ cols ~ havingOpt => GroupbyClause(cols, havingOpt) } - def orderbyItem: Parser[OrderbyItem] = expression ~ opt(DESC | ASC) ^^ { - case expr ~ orderOpt => OrderbyItem(expr, orderOpt) + def orderItem: Parser[OrderItem] = expression ~ opt(DESC | ASC) ^^ { + case expr ~ orderOpt => OrderItem(expr, orderOpt) } - def orderbyClause: Parser[OrderbyClause] = ORDER ~ BY ~ rep1sep(orderbyItem, COMMA) ^^ { + def orderbyClause: Parser[OrderbyClause] = ORDER ~ BY ~ rep1sep(orderItem, COMMA) ^^ { case _ ~ _ ~ cols => OrderbyClause(cols) } + def sortbyClause: Parser[SortbyClause] = SORT ~ BY ~ rep1sep(orderItem, COMMA) ^^ { + case _ ~ _ ~ cols => SortbyClause(cols) + } def limitClause: Parser[LimitClause] = LIMIT ~> expression ^^ { LimitClause(_) } /** http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala index b129ead..d4a037b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala @@ -24,6 +24,8 @@ import org.apache.griffin.measure.rule.dsl.expr._ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[String] ) extends BasicParser { + import Operator._ + /** * -- profiling clauses -- * <profiling-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+ @@ -48,9 +50,15 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str /** * -- distinctness clauses -- - * <distinctness-clauses> = <expr> [, <expr>]+ + * <sqbr-expr> = "[" <expr> "]" + * <dist-expr> = <sqbr-expr> | <expr> + * <distinctness-clauses> = <distExpr> [, <distExpr>]+ */ - def distinctnessClause: Parser[DistinctnessClause] = rep1sep(expression, Operator.COMMA) ^^ { + def sqbrExpr: Parser[Expr] = LSQBR ~> expression <~ RSQBR ^^ { + case expr => { expr.tag = "[]"; expr} + } + def distExpr: Parser[Expr] = expression | sqbrExpr + def distinctnessClause: Parser[DistinctnessClause] = rep1sep(distExpr, Operator.COMMA) ^^ { case exprs => DistinctnessClause(exprs) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala index 1ec970b..ccdf178 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/DistinctnessRulePlanTrans.scala @@ -49,6 +49,8 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], val _duplicationArray = "duplication.array" val _withAccumulate = "with.accumulate" + + val _recordEnable = "record.enable" } import DistinctnessKeys._ @@ -81,11 +83,15 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], } val selClause = analyzer.selectionPairs.map { pair => - val (expr, alias) = pair + val (expr, alias, _) = pair s"${expr.desc} AS `${alias}`" }.mkString(", ") - val aliases = analyzer.selectionPairs.map(_._2) - val aliasesClause = aliases.map( a => s"`${a}`" ).mkString(", ") + val distAliases = analyzer.selectionPairs.filter(_._3).map(_._2) + val distAliasesClause = distAliases.map( a => s"`${a}`" ).mkString(", ") + val allAliases = analyzer.selectionPairs.map(_._2) + val allAliasesClause = allAliases.map( a => s"`${a}`" ).mkString(", ") + val groupAliases = analyzer.selectionPairs.filter(!_._3).map(_._2) + val groupAliasesClause = groupAliases.map( a => s"`${a}`" ).mkString(", ") // 1. source alias val sourceAliasTableName = "__sourceAlias" @@ -110,9 +116,9 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], val accuDupColName = details.getStringOrKey(_accu_dup) val selfGroupSql = { s""" - |SELECT ${aliasesClause}, (COUNT(*) - 1) AS `${dupColName}`, + |SELECT ${distAliasesClause}, (COUNT(*) - 1) AS `${dupColName}`, |TRUE AS `${InternalColumns.distinct}` - |FROM `${sourceAliasTableName}` GROUP BY ${aliasesClause} + |FROM `${sourceAliasTableName}` GROUP BY ${distAliasesClause} """.stripMargin } val selfGroupStep = SparkSqlStep(selfGroupTableName, selfGroupSql, emptyMap, true) @@ -141,13 +147,13 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], // 5. join with older data val joinedTableName = "__joined" - val selfSelClause = (aliases :+ dupColName).map { alias => + val selfSelClause = (distAliases :+ dupColName).map { alias => s"`${selfGroupTableName}`.`${alias}`" }.mkString(", ") - val onClause = aliases.map { alias => + val onClause = distAliases.map { alias => s"coalesce(`${selfGroupTableName}`.`${alias}`, '') = coalesce(`${olderAliasTableName}`.`${alias}`, '')" }.mkString(" AND ") - val olderIsNull = aliases.map { alias => + val olderIsNull = distAliases.map { alias => s"`${olderAliasTableName}`.`${alias}` IS NULL" }.mkString(" AND ") val joinedSql = { @@ -164,19 +170,30 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], val moreDupColName = "_more_dup" val groupSql = { s""" - |SELECT ${aliasesClause}, `${dupColName}`, `${InternalColumns.distinct}`, + |SELECT ${distAliasesClause}, `${dupColName}`, `${InternalColumns.distinct}`, |COUNT(*) AS `${moreDupColName}` |FROM `${joinedTableName}` - |GROUP BY ${aliasesClause}, `${dupColName}`, `${InternalColumns.distinct}` + |GROUP BY ${distAliasesClause}, `${dupColName}`, `${InternalColumns.distinct}` """.stripMargin } val groupStep = SparkSqlStep(groupTableName, groupSql, emptyMap) // 7. final duplicate count val finalDupCountTableName = "__finalDupCount" + // dupColName: the duplicate count of duplicated items only occurs in new data, + // which means the distinct one in new data is also duplicate + // accuDupColName: the count of duplicated items accumulated in new data and old data, + // which means the accumulated distinct count in all data + // e.g.: new data [A, A, B, B, C, D], old data [A, A, B, C] + // selfGroupTable will be (A, 1, F), (B, 1, F), (C, 0, T), (D, 0, T) + // joinedTable will be (A, 1, F), (A, 1, F), (B, 1, F), (C, 0, F), (D, 0, T) + // groupTable will be (A, 1, F, 2), (B, 1, F, 1), (C, 0, F, 1), (D, 0, T, 1) + // finalDupCountTable will be (A, F, 2, 3), (B, F, 2, 2), (C, F, 1, 1), (D, T, 0, 0) + // The distinct result of new data only should be: (A, 2), (B, 2), (C, 1), (D, 0), + // which means in new data [A, A, B, B, C, D], [A, A, B, B, C] are all duplicated, only [D] is distinct val finalDupCountSql = { s""" - |SELECT ${aliasesClause}, `${InternalColumns.distinct}`, + |SELECT ${distAliasesClause}, `${InternalColumns.distinct}`, |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}` |ELSE (`${dupColName}` + 1) END AS `${dupColName}`, |CASE WHEN `${InternalColumns.distinct}` THEN `${dupColName}` @@ -215,36 +232,112 @@ case class DistinctnessRulePlanTrans(dataSourceNames: Seq[String], val duplicationArrayName = details.getString(_duplicationArray, "") val dupRulePlan = if (duplicationArrayName.nonEmpty) { - // 9. duplicate record - val dupRecordTableName = "__dupRecords" - val dupRecordSelClause = procType match { - case StreamingProcessType if (withOlderTable) => s"${aliasesClause}, `${dupColName}`, `${accuDupColName}`" - case _ => s"${aliasesClause}, `${dupColName}`" - } - val dupRecordSql = { - s""" - |SELECT ${dupRecordSelClause} - |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0 - """.stripMargin - } - val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) - val dupRecordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val dupRecordExport = genRecordExport(dupRecordParam, dupRecordTableName, dupRecordTableName, endTmst, mode) - - // 10. duplicate metric - val dupMetricTableName = "__dupMetric" - val numColName = details.getStringOrKey(_num) - val dupMetricSql = { - s""" - |SELECT `${dupColName}`, COUNT(*) AS `${numColName}` - |FROM `${dupRecordTableName}` GROUP BY `${dupColName}` - """.stripMargin - } - val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) - val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) - val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, endTmst, mode) + val recordEnable = details.getBoolean(_recordEnable, false) + if (groupAliases.size > 0) { + // with some group by requirement + // 9. origin data join with distinct information + val informedTableName = "__informed" + val onClause = distAliases.map { alias => + s"coalesce(`${sourceAliasTableName}`.`${alias}`, '') = coalesce(`${dupCountTableName}`.`${alias}`, '')" + }.mkString(" AND ") + val informedSql = { + s""" + |SELECT `${sourceAliasTableName}`.*, + |`${dupCountTableName}`.`${dupColName}` AS `${dupColName}`, + |`${dupCountTableName}`.`${InternalColumns.distinct}` AS `${InternalColumns.distinct}` + |FROM `${sourceAliasTableName}` LEFT JOIN `${dupCountTableName}` + |ON ${onClause} + """.stripMargin + } + val informedStep = SparkSqlStep(informedTableName, informedSql, emptyMap) + + // 10. add row number + val rnTableName = "__rowNumber" + val rnDistClause = distAliasesClause + val rnSortClause = s"SORT BY `${InternalColumns.distinct}`" + val rnSql = { + s""" + |SELECT *, + |ROW_NUMBER() OVER (DISTRIBUTE BY ${rnDistClause} ${rnSortClause}) `${InternalColumns.rowNumber}` + |FROM `${informedTableName}` + """.stripMargin + } + val rnStep = SparkSqlStep(rnTableName, rnSql, emptyMap) - RulePlan(dupRecordStep :: dupMetricStep :: Nil, dupRecordExport :: dupMetricExport :: Nil) + // 11. recognize duplicate items + val dupItemsTableName = "__dupItems" + val dupItemsSql = { + s""" + |SELECT ${allAliasesClause}, `${dupColName}` FROM `${rnTableName}` + |WHERE NOT `${InternalColumns.distinct}` OR `${InternalColumns.rowNumber}` > 1 + """.stripMargin + } + val dupItemsStep = SparkSqlStep(dupItemsTableName, dupItemsSql, emptyMap) + val dupItemsParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + val dupItemsExport = genRecordExport(dupItemsParam, dupItemsTableName, dupItemsTableName, endTmst, mode) + + // 12. group by dup Record metric + val groupDupMetricTableName = "__groupDupMetric" + val numColName = details.getStringOrKey(_num) + val groupSelClause = groupAliasesClause + val groupDupMetricSql = { + s""" + |SELECT ${groupSelClause}, `${dupColName}`, COUNT(*) AS `${numColName}` + |FROM `${dupItemsTableName}` GROUP BY ${groupSelClause}, `${dupColName}` + """.stripMargin + } + val groupDupMetricStep = SparkSqlStep(groupDupMetricTableName, groupDupMetricSql, emptyMap) + val groupDupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) + val groupDupMetricExport = genMetricExport(groupDupMetricParam, duplicationArrayName, groupDupMetricTableName, endTmst, mode) + + val exports = if (recordEnable) { + dupItemsExport :: groupDupMetricExport :: Nil + } else { + groupDupMetricExport :: Nil + } + RulePlan( + informedStep :: rnStep :: dupItemsStep :: groupDupMetricStep :: Nil, + exports + ) + + } else { + // no group by requirement + // 9. duplicate record + val dupRecordTableName = "__dupRecords" + val dupRecordSelClause = procType match { + case StreamingProcessType if (withOlderTable) => s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`" + case _ => s"${distAliasesClause}, `${dupColName}`" + } + val dupRecordSql = { + s""" + |SELECT ${dupRecordSelClause} + |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0 + """.stripMargin + } + val dupRecordStep = SparkSqlStep(dupRecordTableName, dupRecordSql, emptyMap, true) + val dupRecordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) + val dupRecordExport = genRecordExport(dupRecordParam, dupRecordTableName, dupRecordTableName, endTmst, mode) + + // 10. duplicate metric + val dupMetricTableName = "__dupMetric" + val numColName = details.getStringOrKey(_num) + val dupMetricSql = { + s""" + |SELECT `${dupColName}`, COUNT(*) AS `${numColName}` + |FROM `${dupRecordTableName}` GROUP BY `${dupColName}` + """.stripMargin + } + val dupMetricStep = SparkSqlStep(dupMetricTableName, dupMetricSql, emptyMap) + val dupMetricParam = emptyMap.addIfNotExist(ExportParamKeys._collectType, ArrayCollectType.desc) + val dupMetricExport = genMetricExport(dupMetricParam, duplicationArrayName, dupMetricTableName, endTmst, mode) + + val exports = if (recordEnable) { + dupRecordExport :: dupMetricExport :: Nil + } else { + dupMetricExport :: Nil + } + RulePlan(dupRecordStep :: dupMetricStep :: Nil, exports) + } } else emptyRulePlan selfDistRulePlan.merge(distRulePlan).merge(distMetricRulePlan).merge(dupRulePlan) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala new file mode 100644 index 0000000..9390160 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/utils/DataFrameUtil.scala @@ -0,0 +1,41 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.utils + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions._ + +object DataFrameUtil { + + def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: Option[DataFrame] + ): Option[DataFrame] = { + (dfOpt1, dfOpt2) match { + case (Some(df1), Some(df2)) => Some(unionByName(df1, df2)) + case (Some(df1), _) => dfOpt1 + case (_, Some(df2)) => dfOpt2 + case _ => None + } + } + + def unionByName(a: DataFrame, b: DataFrame): DataFrame = { + val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq + a.select(columns: _*).unionAll(b.select(columns: _*)) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/test/resources/_distinctness-batch-griffindsl1.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl1.json b/measure/src/test/resources/_distinctness-batch-griffindsl1.json index f8aa077..4d94d8e 100644 --- a/measure/src/test/resources/_distinctness-batch-griffindsl1.json +++ b/measure/src/test/resources/_distinctness-batch-griffindsl1.json @@ -20,7 +20,7 @@ { "dsl.type": "spark-sql", "name": "${this}", - "rule": "select DISTINCT name, age from ${this}" + "rule": "select name, age from ${this}" } ] } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a4f4f241/measure/src/test/resources/_distinctness-batch-griffindsl2.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl2.json b/measure/src/test/resources/_distinctness-batch-griffindsl2.json new file mode 100644 index 0000000..6a12719 --- /dev/null +++ b/measure/src/test/resources/_distinctness-batch-griffindsl2.json @@ -0,0 +1,74 @@ +{ + "name": "dist_batch", + + "process.type": "batch", + + "timestamp": 123456, + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/dupdata.avro" + }, + "pre.proc": [ + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${this}" + } + ] + } + ] + }, + { + "name": "target", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/dupdata.avro" + }, + "pre.proc": [ + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select DISTINCT name, age from ${this}" + } + ] + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "distinct", + "name": "dist", + "rule": "name, [age]", + "details": { + "source": "source", + "target": "target", + "total": "total", + "distinct": "distinct", + "dup": "dup", + "num": "num", + "duplication.array": "dup", + "record.enable": true + }, + "metric": { + "name": "distinct" + } + } + ] + } +} \ No newline at end of file
