[GRIFFIN-95] Enhance dump performance in streaming mode 1. in streaming mode, dump data as parquet file 2. repartition when dumping data as spark.default.parallelism 3. add percentile feature in timeliness measure
Author: Lionel Liu <[email protected]> Closes #205 from bhlx3lyx7/tmst. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/b83c5870 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/b83c5870 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/b83c5870 Branch: refs/heads/master Commit: b83c58706e1298cc6ad837137552da7dcd8f9e6e Parents: 530c2da Author: Lionel Liu <[email protected]> Authored: Thu Feb 1 16:09:57 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Thu Feb 1 16:09:57 2018 +0800 ---------------------------------------------------------------------- .../measure/cache/info/TimeInfoCache.scala | 2 + .../streaming/KafkaStreamingDataConnector.scala | 9 +- .../streaming/StreamingDataConnector.scala | 2 +- .../measure/data/source/DataCacheable.scala | 76 -- .../measure/data/source/DataSource.scala | 16 +- .../measure/data/source/DataSourceCache.scala | 403 ---------- .../measure/data/source/DataSourceFactory.scala | 25 +- .../data/source/cache/DataCacheable.scala | 84 ++ .../data/source/cache/DataSourceCache.scala | 333 ++++++++ .../source/cache/DataSourceCacheFactory.scala | 58 ++ .../data/source/cache/JsonDataSourceCache.scala | 40 + .../data/source/cache/OrcDataSourceCache.scala | 40 + .../source/cache/ParquetDataSourceCache.scala | 40 + .../griffin/measure/persist/MultiPersists.scala | 8 - .../measure/process/BatchDqProcess.scala | 47 +- .../measure/process/StreamingDqProcess.scala | 6 +- .../measure/process/StreamingDqThread.scala | 95 +-- .../measure/process/engine/DqEngine.scala | 2 + .../measure/process/engine/DqEngines.scala | 307 ++------ .../measure/process/engine/SparkDqEngine.scala | 229 +----- .../measure/process/temp/TimeRange.scala | 2 +- .../rule/adaptor/DataFrameOprAdaptor.scala | 6 +- .../measure/rule/adaptor/GlobalKeys.scala | 48 -- .../rule/adaptor/GriffinDslAdaptor.scala | 759 +------------------ .../measure/rule/adaptor/RuleAdaptor.scala | 128 +--- .../measure/rule/adaptor/SparkSqlAdaptor.scala | 6 +- .../griffin/measure/rule/plan/DsUpdate.scala | 24 + .../measure/rule/plan/MetricExport.scala | 3 - .../measure/rule/plan/RecordExport.scala | 3 - .../griffin/measure/rule/plan/RuleExport.scala | 2 - .../griffin/measure/rule/plan/RulePlan.scala | 9 +- .../rule/trans/AccuracyRulePlanTrans.scala | 198 +++++ .../rule/trans/DistinctnessRulePlanTrans.scala | 234 ++++++ .../measure/rule/trans/DsUpdateFactory.scala | 37 + .../rule/trans/ProfilingRulePlanTrans.scala | 98 +++ .../measure/rule/trans/RuleExportFactory.scala | 65 ++ .../measure/rule/trans/RulePlanTrans.scala | 57 ++ .../rule/trans/TimelinessRulePlanTrans.scala | 279 +++++++ .../rule/trans/UniquenessRulePlanTrans.scala | 198 +++++ .../griffin/measure/rule/udf/GriffinUdafs.scala | 29 + .../griffin/measure/rule/udf/MeanUdaf.scala | 58 ++ .../griffin/measure/utils/ParamUtil.scala | 11 + .../_accuracy-streaming-griffindsl.json | 8 +- .../resources/_timeliness-batch-griffindsl.json | 6 +- .../_timeliness-streaming-griffindsl.json | 4 +- 45 files changed, 2073 insertions(+), 2021 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala index aefd390..efd12b9 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/cache/info/TimeInfoCache.scala @@ -26,11 +26,13 @@ object TimeInfoCache extends Loggable with Serializable { private val LastProcTime = "last.proc.time" private val ReadyTime = "ready.time" private val CleanTime = "clean.time" + private val OldCacheIndex = "old.cache.index" def cacheTime(path: String): String = s"${path}/${CacheTime}" def lastProcTime(path: String): String = s"${path}/${LastProcTime}" def readyTime(path: String): String = s"${path}/${ReadyTime}" def cleanTime(path: String): String = s"${path}/${CleanTime}" + def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}" val infoPath = "info" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala index 41de217..f973f3f 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/KafkaStreamingDataConnector.scala @@ -49,7 +49,14 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector { ds.foreachRDD((rdd, time) => { val ms = time.milliseconds - val dfOpt = transform(rdd) + // coalesce partition number + val prlCount = rdd.sparkContext.defaultParallelism + val ptnCount = rdd.getNumPartitions + val repartitionedRdd = if (prlCount < ptnCount) { + rdd.coalesce(prlCount) + } else rdd + + val dfOpt = transform(repartitionedRdd) val preDfOpt = preProcess(dfOpt, ms) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala index f65b0d2..39f4995 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/streaming/StreamingDataConnector.scala @@ -19,7 +19,7 @@ under the License. package org.apache.griffin.measure.data.connector.streaming import org.apache.griffin.measure.data.connector._ -import org.apache.griffin.measure.data.source.DataSourceCache +import org.apache.griffin.measure.data.source.cache._ import org.apache.griffin.measure.process.temp.TimeRange import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala deleted file mode 100644 index 3c9106a..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataCacheable.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.source - -import java.util.concurrent.atomic.AtomicLong - -import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} - -trait DataCacheable { - - val cacheInfoPath: String - val readyTimeInterval: Long - val readyTimeDelay: Long - - def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}" - - def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath) - def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath) - def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath) - def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath) - - protected def submitCacheTime(ms: Long): Unit = { - val map = Map[String, String]((selfCacheTime -> ms.toString)) - InfoCacheInstance.cacheInfo(map) - } - - protected def submitReadyTime(ms: Long): Unit = { - val curReadyTime = ms - readyTimeDelay - if (curReadyTime % readyTimeInterval == 0) { - val map = Map[String, String]((selfReadyTime -> curReadyTime.toString)) - InfoCacheInstance.cacheInfo(map) - } - } - - protected def submitLastProcTime(ms: Long): Unit = { - val map = Map[String, String]((selfLastProcTime -> ms.toString)) - InfoCacheInstance.cacheInfo(map) - } - - protected def submitCleanTime(ms: Long): Unit = { - val cleanTime = genCleanTime(ms) - val map = Map[String, String]((selfCleanTime -> cleanTime.toString)) - InfoCacheInstance.cacheInfo(map) - } - - protected def genCleanTime(ms: Long): Long = ms - - protected def readCleanTime(): Option[Long] = { - val key = selfCleanTime - val keys = key :: Nil - InfoCacheInstance.readInfo(keys).get(key).flatMap { v => - try { - Some(v.toLong) - } catch { - case _ => None - } - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala index fc8c646..9a4b640 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala @@ -22,6 +22,7 @@ import org.apache.griffin.measure.cache.tmst._ import org.apache.griffin.measure.data.connector._ import org.apache.griffin.measure.data.connector.batch._ import org.apache.griffin.measure.data.connector.streaming._ +import org.apache.griffin.measure.data.source.cache._ import org.apache.griffin.measure.log.Loggable import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters, TimeRange} import org.apache.griffin.measure.rule.plan.TimeInfo @@ -114,16 +115,25 @@ case class DataSource(sqlContext: SQLContext, } } + def updateData(df: DataFrame): Unit = { + dataSourceCacheOpt.foreach(_.updateData(Some(df))) + } + def updateData(df: DataFrame, ms: Long): Unit = { - dataSourceCacheOpt.foreach(_.updateData(df, ms)) +// dataSourceCacheOpt.foreach(_.updateData(df, ms)) } def updateDataMap(dfMap: Map[Long, DataFrame]): Unit = { - dataSourceCacheOpt.foreach(_.updateDataMap(dfMap)) +// dataSourceCacheOpt.foreach(_.updateDataMap(dfMap)) } def cleanOldData(): Unit = { - dataSourceCacheOpt.foreach(_.cleanOldData) +// dataSourceCacheOpt.foreach(_.cleanOldData) + dataSourceCacheOpt.foreach(_.cleanOutTimeData) + } + + def processFinish(): Unit = { + dataSourceCacheOpt.foreach(_.processFinish) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala deleted file mode 100644 index fff186f..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceCache.scala +++ /dev/null @@ -1,403 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.source - -import java.util.concurrent.TimeUnit - -import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} -import org.apache.griffin.measure.cache.tmst.TmstCache -import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector -import org.apache.griffin.measure.data.connector._ -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.process.temp.TimeRange -import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, TimeUtil} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, SQLContext} - -import scala.util.{Failure, Success} -import org.apache.griffin.measure.utils.ParamUtil._ - -case class DataSourceCache(sqlContext: SQLContext, param: Map[String, Any], - dsName: String, index: Int - ) extends DataCacheable with Loggable with Serializable { - - var tmstCache: TmstCache = _ - protected def rangeTmsts(from: Long, until: Long) = tmstCache.range(from, until) - protected def clearTmst(t: Long) = tmstCache.remove(t) - protected def clearTmstsUntil(until: Long) = { - val outDateTmsts = tmstCache.until(until) - tmstCache.remove(outDateTmsts) - } - - val _FilePath = "file.path" - val _InfoPath = "info.path" - val _ReadyTimeInterval = "ready.time.interval" - val _ReadyTimeDelay = "ready.time.delay" - val _TimeRange = "time.range" - - val defFilePath = s"hdfs:///griffin/cache/${dsName}/${index}" - val defInfoPath = s"${index}" - - val filePath: String = param.getString(_FilePath, defFilePath) - val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath) - val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L) - val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L) - val deltaTimeRange: (Long, Long) = { - def negative(n: Long): Long = if (n <= 0) n else 0 - param.get(_TimeRange) match { - case Some(seq: Seq[String]) => { - val nseq = seq.flatMap(TimeUtil.milliseconds(_)) - val ns = negative(nseq.headOption.getOrElse(0)) - val ne = negative(nseq.tail.headOption.getOrElse(0)) - (ns, ne) - } - case _ => (0, 0) - } - } - -// val _WriteInfoPath = "write.info.path" -// val _ReadInfoPath = "read.info.path" -// val writeCacheInfoPath = param.getString(_WriteInfoPath, defInfoPath) -// val readCacheInfoPath = param.getString(_ReadInfoPath, defInfoPath) - - val _ReadOnly = "read.only" - val readOnly = param.getBoolean(_ReadOnly, false) - - val rowSepLiteral = "\n" - val partitionUnits: List[String] = List("hour", "min", "sec") - val minUnitTime: Long = TimeUtil.timeFromUnit(1, partitionUnits.last) - - val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new") - val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old") - - def init(): Unit = { - ; - } - - def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = { - if (!readOnly) { - dfOpt match { - case Some(df) => { - val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) - if (newCacheLocked) { - try { - val ptns = getPartition(ms) - val ptnsPath = genPartitionHdfsPath(ptns) - val dirPath = s"${filePath}/${ptnsPath}" - val dataFileName = s"${ms}" - val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) - - // transform data - val dataRdd: RDD[String] = df.toJSON - - // save data - // val dumped = if (!dataRdd.isEmpty) { - // HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral) - // } else false - - if (!dataRdd.isEmpty) { - HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral) - } - - } catch { - case e: Throwable => error(s"save data error: ${e.getMessage}") - } finally { - newCacheLock.unlock() - } - } - } - case _ => { - info(s"no data frame to save") - } - } - - // submit cache time and ready time - submitCacheTime(ms) - submitReadyTime(ms) - } - } - - // return: (data frame option, time range) - def readData(): (Option[DataFrame], TimeRange) = { - val tr = TimeInfoCache.getTimeRange - val timeRange = (tr._1 + minUnitTime, tr._2) - submitLastProcTime(timeRange._2) - - val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) - submitCleanTime(reviseTimeRange._1) - - // read directly through partition info - val partitionRanges = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2) - println(s"read time ranges: ${reviseTimeRange}") - println(s"read partition ranges: ${partitionRanges}") - - // list partition paths - val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges) -// println(partitionPaths) - - val dfOpt = if (partitionPaths.isEmpty) { - None - } else { - try { - Some(sqlContext.read.json(partitionPaths: _*)) - } catch { - case e: Throwable => { - warn(s"read data source cache warn: ${e.getMessage}") - None - } - } - } - - // from until tmst range - val (from, until) = (reviseTimeRange._1, reviseTimeRange._2 + 1) - val tmstSet = rangeTmsts(from, until) - - val retTimeRange = TimeRange(reviseTimeRange, tmstSet) - (dfOpt, retTimeRange) - } - - def updateData(df: DataFrame, ms: Long): Unit = { - if (!readOnly) { - val ptns = getPartition(ms) - val ptnsPath = genPartitionHdfsPath(ptns) - val dirPath = s"${filePath}/${ptnsPath}" - val dataFileName = s"${ms}" - val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) - - try { - val records = df.toJSON - val arr = records.collect - val needSave = !arr.isEmpty - - // remove out time old data - HdfsFileDumpUtil.remove(dirPath, dataFileName, true) - println(s"remove file path: ${dirPath}/${dataFileName}") - - // save updated data - if (needSave) { - HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral) - println(s"update file path: ${dataFilePath}") - } else { - clearTmst(ms) - println(s"data source [${dsName}] timestamp [${ms}] cleared") - } - } catch { - case e: Throwable => error(s"update data error: ${e.getMessage}") - } - } - } - - def updateData(rdd: RDD[String], ms: Long, cnt: Long): Unit = { - if (!readOnly) { - val ptns = getPartition(ms) - val ptnsPath = genPartitionHdfsPath(ptns) - val dirPath = s"${filePath}/${ptnsPath}" - val dataFileName = s"${ms}" - val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) - - try { - // val needSave = !rdd.isEmpty - - // remove out time old data - HdfsFileDumpUtil.remove(dirPath, dataFileName, true) - println(s"remove file path: ${dirPath}/${dataFileName}") - - // save updated data - if (cnt > 0) { - HdfsFileDumpUtil.dump(dataFilePath, rdd, rowSepLiteral) - println(s"update file path: ${dataFilePath}") - } else { - clearTmst(ms) - println(s"data source [${dsName}] timestamp [${ms}] cleared") - } - } catch { - case e: Throwable => error(s"update data error: ${e.getMessage}") - } finally { - rdd.unpersist() - } - } - } - - def updateData(arr: Iterable[String], ms: Long): Unit = { - if (!readOnly) { - val ptns = getPartition(ms) - val ptnsPath = genPartitionHdfsPath(ptns) - val dirPath = s"${filePath}/${ptnsPath}" - val dataFileName = s"${ms}" - val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName) - - try { - val needSave = !arr.isEmpty - - // remove out time old data - HdfsFileDumpUtil.remove(dirPath, dataFileName, true) - println(s"remove file path: ${dirPath}/${dataFileName}") - - // save updated data - if (needSave) { - HdfsFileDumpUtil.dump(dataFilePath, arr, rowSepLiteral) - println(s"update file path: ${dataFilePath}") - } else { - clearTmst(ms) - println(s"data source [${dsName}] timestamp [${ms}] cleared") - } - } catch { - case e: Throwable => error(s"update data error: ${e.getMessage}") - } - } - } - - def updateDataMap(dfMap: Map[Long, DataFrame]): Unit = { - if (!readOnly) { - val dataMap = dfMap.map { pair => - val (t, recs) = pair - val rdd = recs.toJSON - // rdd.cache - (t, rdd, rdd.count) - } - - dataMap.foreach { pair => - val (t, arr, cnt) = pair - updateData(arr, t, cnt) - } - } - } - - def cleanOldData(): Unit = { - if (!readOnly) { - val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) - if (oldCacheLocked) { - try { - val cleanTime = readCleanTime() - cleanTime match { - case Some(ct) => { - println(s"data source [${dsName}] old timestamps clear until [${ct}]") - - // clear out date tmsts - clearTmstsUntil(ct) - - // drop partitions - val bounds = getPartition(ct) - - // list partition paths - val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, bounds) - - // delete out time data path - earlierPaths.foreach { path => - println(s"delete hdfs path: ${path}") - HdfsUtil.deleteHdfsPath(path) - } - } - case _ => { - // do nothing - } - } - } catch { - case e: Throwable => error(s"clean old data error: ${e.getMessage}") - } finally { - oldCacheLock.unlock() - } - } - } - } - - override protected def genCleanTime(ms: Long): Long = { - val minPartitionUnit = partitionUnits.last - val t1 = TimeUtil.timeToUnit(ms, minPartitionUnit) - val t2 = TimeUtil.timeFromUnit(t1, minPartitionUnit) - t2 - } - - private def getPartition(ms: Long): List[Long] = { - partitionUnits.map { unit => - TimeUtil.timeToUnit(ms, unit) - } - } - private def getPartitionRange(ms1: Long, ms2: Long): List[(Long, Long)] = { - partitionUnits.map { unit => - val t1 = TimeUtil.timeToUnit(ms1, unit) - val t2 = TimeUtil.timeToUnit(ms2, unit) - (t1, t2) - } - } - private def genPartitionHdfsPath(partition: List[Long]): String = { - partition.map(prtn => s"${prtn}").mkString("/") - } - private def str2Long(str: String): Option[Long] = { - try { - Some(str.toLong) - } catch { - case e: Throwable => None - } - } - - - // here the range means [min, max] - private def listPathsBetweenRanges(paths: List[String], - partitionRanges: List[(Long, Long)] - ): List[String] = { - partitionRanges match { - case Nil => paths - case head :: tail => { - val (lb, ub) = head - val curPaths = paths.flatMap { path => - val names = HdfsUtil.listSubPathsByType(path, "dir").toList - names.filter { name => - str2Long(name) match { - case Some(t) => (t >= lb) && (t <= ub) - case _ => false - } - }.map(HdfsUtil.getHdfsFilePath(path, _)) - } - listPathsBetweenRanges(curPaths, tail) - } - } - } - private def listPathsEarlierThanBounds(paths: List[String], bounds: List[Long] - ): List[String] = { - bounds match { - case Nil => paths - case head :: tail => { - val earlierPaths = paths.flatMap { path => - val names = HdfsUtil.listSubPathsByType(path, "dir").toList - names.filter { name => - str2Long(name) match { - case Some(t) => (t < head) - case _ => false - } - }.map(HdfsUtil.getHdfsFilePath(path, _)) - } - val equalPaths = paths.flatMap { path => - val names = HdfsUtil.listSubPathsByType(path, "dir").toList - names.filter { name => - str2Long(name) match { - case Some(t) => (t == head) - case _ => false - } - }.map(HdfsUtil.getHdfsFilePath(path, _)) - } - - tail match { - case Nil => earlierPaths - case _ => earlierPaths ::: listPathsEarlierThanBounds(equalPaths, tail) - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala index b83e2fb..e18c852 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala @@ -19,11 +19,10 @@ under the License. package org.apache.griffin.measure.data.source import org.apache.griffin.measure.config.params.user._ -import org.apache.griffin.measure.data.connector.batch.BatchDataConnector -import org.apache.griffin.measure.data.connector.streaming.StreamingDataConnector -import org.apache.griffin.measure.data.connector.{DataConnector, DataConnectorFactory} +import org.apache.griffin.measure.data.connector.DataConnectorFactory +import org.apache.griffin.measure.data.source.cache._ import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.process.engine.{DqEngine, DqEngines} +import org.apache.griffin.measure.process.engine._ import org.apache.spark.sql.SQLContext import org.apache.spark.streaming.StreamingContext @@ -55,27 +54,11 @@ object DataSourceFactory extends Loggable { case _ => None } } - val dataSourceCacheOpt = genDataSourceCache(sqlContext, cacheParam, name, index) + val dataSourceCacheOpt = DataSourceCacheFactory.genDataSourceCache(sqlContext, cacheParam, name, index) Some(DataSource(sqlContext, name, baseline, dataConnectors, dataSourceCacheOpt)) } - private def genDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], - name: String, index: Int - ) = { - if (param != null) { - try { - Some(DataSourceCache(sqlContext, param, name, index)) - } catch { - case e: Throwable => { - error(s"generate data source cache fails") - None - } - } - } else None - } - - private def trimDataSourceParams(dataSourceParams: Seq[DataSourceParam]): Seq[DataSourceParam] = { val (validDsParams, _) = dataSourceParams.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, dsParam) => http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala new file mode 100644 index 0000000..36c556b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala @@ -0,0 +1,84 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.source.cache + +import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} + +trait DataCacheable { + + val cacheInfoPath: String + val readyTimeInterval: Long + val readyTimeDelay: Long + + def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}" + + def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath) + def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath) + def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath) + def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath) + def selfOldCacheIndex = TimeInfoCache.oldCacheIndex(selfCacheInfoPath) + + protected def submitCacheTime(ms: Long): Unit = { + val map = Map[String, String]((selfCacheTime -> ms.toString)) + InfoCacheInstance.cacheInfo(map) + } + + protected def submitReadyTime(ms: Long): Unit = { + val curReadyTime = ms - readyTimeDelay + if (curReadyTime % readyTimeInterval == 0) { + val map = Map[String, String]((selfReadyTime -> curReadyTime.toString)) + InfoCacheInstance.cacheInfo(map) + } + } + + protected def submitLastProcTime(ms: Long): Unit = { + val map = Map[String, String]((selfLastProcTime -> ms.toString)) + InfoCacheInstance.cacheInfo(map) + } + + protected def readLastProcTime(): Option[Long] = readSelfInfo(selfLastProcTime) + + protected def submitCleanTime(ms: Long): Unit = { + val cleanTime = genCleanTime(ms) + val map = Map[String, String]((selfCleanTime -> cleanTime.toString)) + InfoCacheInstance.cacheInfo(map) + } + + protected def genCleanTime(ms: Long): Long = ms + + protected def readCleanTime(): Option[Long] = readSelfInfo(selfCleanTime) + + protected def submitOldCacheIndex(index: Long): Unit = { + val map = Map[String, String]((selfOldCacheIndex -> index.toString)) + InfoCacheInstance.cacheInfo(map) + } + + protected def readOldCacheIndex(): Option[Long] = readSelfInfo(selfOldCacheIndex) + + private def readSelfInfo(key: String): Option[Long] = { + InfoCacheInstance.readInfo(key :: Nil).get(key).flatMap { v => + try { + Some(v.toLong) + } catch { + case _ => None + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala new file mode 100644 index 0000000..1a0366d --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala @@ -0,0 +1,333 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.source.cache + +import java.util.concurrent.TimeUnit + +import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} +import org.apache.griffin.measure.cache.tmst.TmstCache +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.process.temp.TimeRange +import org.apache.griffin.measure.rule.adaptor.InternalColumns +import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.spark.sql._ + +// data source cache process steps +// dump phase: save +// process phase: read -> process -> update -> finish -> clean old data +trait DataSourceCache extends DataCacheable with Loggable with Serializable { + + val sqlContext: SQLContext + val param: Map[String, Any] + val dsName: String + val index: Int + + var tmstCache: TmstCache = _ + protected def rangeTmsts(from: Long, until: Long) = tmstCache.range(from, until) + protected def clearTmst(t: Long) = tmstCache.remove(t) + protected def clearTmstsUntil(until: Long) = { + val outDateTmsts = tmstCache.until(until) + tmstCache.remove(outDateTmsts) + } + + val _FilePath = "file.path" + val _InfoPath = "info.path" + val _ReadyTimeInterval = "ready.time.interval" + val _ReadyTimeDelay = "ready.time.delay" + val _TimeRange = "time.range" + + val defFilePath = s"hdfs:///griffin/cache/${dsName}/${index}" + val defInfoPath = s"${index}" + + val filePath: String = param.getString(_FilePath, defFilePath) + val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath) + val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L) + val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L) + val deltaTimeRange: (Long, Long) = { + def negative(n: Long): Long = if (n <= 0) n else 0 + param.get(_TimeRange) match { + case Some(seq: Seq[String]) => { + val nseq = seq.flatMap(TimeUtil.milliseconds(_)) + val ns = negative(nseq.headOption.getOrElse(0)) + val ne = negative(nseq.tail.headOption.getOrElse(0)) + (ns, ne) + } + case _ => (0, 0) + } + } + + val _ReadOnly = "read.only" + val readOnly = param.getBoolean(_ReadOnly, false) + + val _Updatable = "updatable" + val updatable = param.getBoolean(_Updatable, false) + +// val rowSepLiteral = "\n" +// val partitionUnits: List[String] = List("hour", "min", "sec") +// val minUnitTime: Long = TimeUtil.timeFromUnit(1, partitionUnits.last) + + val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new") + val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old") + + val newFilePath = s"${filePath}/new" + val oldFilePath = s"${filePath}/old" + + val defOldCacheIndex = 0L + + protected def writeDataFrame(dfw: DataFrameWriter, path: String): Unit + protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame + + def init(): Unit = {} + + // save new cache data only + def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = { + if (!readOnly) { + dfOpt match { + case Some(df) => { + // lock makes it safer when writing new cache data + val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) + if (newCacheLocked) { + try { + val dfw = df.write.mode(SaveMode.Append).partitionBy(InternalColumns.tmst) + writeDataFrame(dfw, newFilePath) + } catch { + case e: Throwable => error(s"save data error: ${e.getMessage}") + } finally { + newCacheLock.unlock() + } + } + } + case _ => { + info(s"no data frame to save") + } + } + + // submit cache time and ready time + submitCacheTime(ms) + submitReadyTime(ms) + } + } + + // read new cache data and old cache data + def readData(): (Option[DataFrame], TimeRange) = { + // time range: [a, b) + val timeRange = TimeInfoCache.getTimeRange + val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) + + // read partition info + val filterStr = s"`${InternalColumns.tmst}` >= ${reviseTimeRange._1} AND `${InternalColumns.tmst}` < ${reviseTimeRange._2}" + println(s"read time range: [${reviseTimeRange._1}, ${reviseTimeRange._2})") + + // new cache data + val newDfOpt = try { + val dfr = sqlContext.read + Some(readDataFrame(dfr, newFilePath).filter(filterStr)) + } catch { + case e: Throwable => { + warn(s"read data source cache warn: ${e.getMessage}") + None + } + } + + // old cache data + val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None + val oldDfOpt = oldCacheIndexOpt.flatMap { idx => + val oldDfPath = s"${oldFilePath}/${idx}" + try { + val dfr = sqlContext.read +// Some(readDataFrame(dfr, oldDfPath).filter(filterStr)) + Some(readDataFrame(dfr, oldDfPath)) // not need to filter, has filtered in update phase + } catch { + case e: Throwable => { + warn(s"read old data source cache warn: ${e.getMessage}") + None + } + } + } + + // whole cache data + val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt) + + // from until tmst range + val (from, until) = (reviseTimeRange._1, reviseTimeRange._2) + val tmstSet = rangeTmsts(from, until) + + val retTimeRange = TimeRange(reviseTimeRange, tmstSet) + (cacheDfOpt, retTimeRange) + } + + private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: Option[DataFrame] + ): Option[DataFrame] = { + (dfOpt1, dfOpt2) match { + case (Some(df1), Some(df2)) => Some(df1 unionAll df2) + case (Some(df1), _) => dfOpt1 + case (_, Some(df2)) => dfOpt2 + case _ => None + } + } + + private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String]): Unit = { + val earlierPaths = listEarlierPartitions(path: String, outTime, partitionOpt) + // delete out time data path + earlierPaths.foreach { path => + println(s"delete hdfs path: ${path}") + HdfsUtil.deleteHdfsPath(path) + } + } + private def listEarlierPartitions(path: String, bound: Long, partitionOpt: Option[String]): Iterable[String] = { + val names = HdfsUtil.listSubPathsByType(path, "dir") + val regex = partitionOpt match { + case Some(partition) => s"^${partition}=(\\d+)$$".r + case _ => "^(\\d+)$".r + } + names.filter { name => + name match { + case regex(value) => { + str2Long(value) match { + case Some(t) => (t < bound) + case _ => false + } + } + case _ => false + } + }.map(name => s"${path}/${name}") + } + private def str2Long(str: String): Option[Long] = { + try { + Some(str.toLong) + } catch { + case e: Throwable => None + } + } + + // clean out time from new cache data and old cache data + def cleanOutTimeData(): Unit = { + if (!readOnly) { + // new cache data + val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime + newCacheCleanTime match { + case Some(nct) => { + // clean calculated new cache data + val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) + if (newCacheLocked) { + try { + cleanOutTimePartitions(newFilePath, nct, Some(InternalColumns.tmst)) + } catch { + case e: Throwable => error(s"clean new cache data error: ${e.getMessage}") + } finally { + newCacheLock.unlock() + } + } + } + case _ => { + // do nothing + } + } + + // old cache data + val oldCacheCleanTime = if (updatable) readCleanTime else None + oldCacheCleanTime match { + case Some(oct) => { + val oldCacheIndexOpt = readOldCacheIndex + oldCacheIndexOpt.foreach { idx => + val oldDfPath = s"${oldFilePath}/${idx}" + val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) + if (oldCacheLocked) { + try { + // clean calculated old cache data + cleanOutTimePartitions(oldFilePath, idx, None) + // clean out time old cache data not calculated +// cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst)) + } catch { + case e: Throwable => error(s"clean old cache data error: ${e.getMessage}") + } finally { + oldCacheLock.unlock() + } + } + } + } + case _ => { + // do nothing + } + } + } + } + + // update old cache data + def updateData(dfOpt: Option[DataFrame]): Unit = { + if (!readOnly && updatable) { + dfOpt match { + case Some(df) => { + // old cache lock + val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) + if (oldCacheLocked) { + try { + val oldCacheIndexOpt = readOldCacheIndex + val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1 + + val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}" +// val dfw = df.write.mode(SaveMode.Overwrite).partitionBy(InternalColumns.tmst) + val cleanTime = readCleanTime + val updateDf = cleanTime match { + case Some(ct) => { + val filterStr = s"`${InternalColumns.tmst}` >= ${ct}" + df.filter(filterStr) + } + case _ => df + } + + val prlCount = sqlContext.sparkContext.defaultParallelism + // coalesce +// val ptnCount = updateDf.rdd.getNumPartitions +// val repartitionedDf = if (prlCount < ptnCount) { +// updateDf.coalesce(prlCount) +// } else updateDf + // repartition + val repartitionedDf = updateDf.repartition(prlCount) + val dfw = repartitionedDf.write.mode(SaveMode.Overwrite) + writeDataFrame(dfw, oldDfPath) + + submitOldCacheIndex(nextOldCacheIndex) + } catch { + case e: Throwable => error(s"update data error: ${e.getMessage}") + } finally { + oldCacheLock.unlock() + } + } + } + case _ => { + info(s"no data frame to update") + } + } + } + } + + // process finish + def processFinish(): Unit = { + // next last proc time + val timeRange = TimeInfoCache.getTimeRange + submitLastProcTime(timeRange._2) + + // next clean time + val nextCleanTime = timeRange._2 + deltaTimeRange._1 + submitCleanTime(nextCleanTime) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala new file mode 100644 index 0000000..d03c181 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala @@ -0,0 +1,58 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.source.cache + +import org.apache.griffin.measure.log.Loggable +import org.apache.spark.sql.SQLContext +import org.apache.griffin.measure.utils.ParamUtil._ + +object DataSourceCacheFactory extends Loggable { + + private object DataSourceCacheType { + val parquet = "^(?i)parq(uet)?$".r + val json = "^(?i)json$".r + val orc = "^(?i)orc$".r + } + import DataSourceCacheType._ + + val _type = "type" + + def genDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], + name: String, index: Int + ) = { + if (param != null) { + try { + val tp = param.getString(_type, "") + val dsCache = tp match { + case parquet() => ParquetDataSourceCache(sqlContext, param, name, index) + case json() => JsonDataSourceCache(sqlContext, param, name, index) + case orc() => OrcDataSourceCache(sqlContext, param, name, index) + case _ => ParquetDataSourceCache(sqlContext, param, name, index) + } + Some(dsCache) + } catch { + case e: Throwable => { + error(s"generate data source cache fails") + None + } + } + } else None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala new file mode 100644 index 0000000..e284d47 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.source.cache + +import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, SQLContext} + +case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], + dsName: String, index: Int + ) extends DataSourceCache { + + override def init(): Unit = { +// sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false"); + } + + def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = { + println(s"write path: ${path}") + dfw.json(path) + } + + def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { + dfr.json(path) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala new file mode 100644 index 0000000..7b92bef --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.source.cache + +import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, SQLContext} + +case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], + dsName: String, index: Int + ) extends DataSourceCache { + + override def init(): Unit = { +// sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false"); + } + + def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = { + println(s"write path: ${path}") + dfw.orc(path) + } + + def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { + dfr.orc(path) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala new file mode 100644 index 0000000..1761f56 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.data.source.cache + +import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, SQLContext} + +case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], + dsName: String, index: Int + ) extends DataSourceCache { + + override def init(): Unit = { + sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false"); + } + + def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = { + println(s"write path: ${path}") + dfw.parquet(path) + } + + def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { + dfr.parquet(path) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala index aa97afa..bed28fd 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala @@ -40,14 +40,6 @@ case class MultiPersists(persists: Iterable[Persist]) extends Persist { def start(msg: String): Unit = { persists.foreach(_.start(msg)) } def finish(): Unit = { persists.foreach(_.finish()) } -// def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, result)) } -// -// def records(recs: RDD[String], tp: String): Unit = { persists.foreach(_.records(recs, tp)) } -// def records(recs: Iterable[String], tp: String): Unit = { persists.foreach(_.records(recs, tp)) } - -// def missRecords(records: RDD[String]): Unit = { persists.foreach(_.missRecords(records)) } -// def matchRecords(records: RDD[String]): Unit = { persists.foreach(_.matchRecords(records)) } - def log(rt: Long, msg: String): Unit = { persists.foreach { persist => try { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala index 44cca9a..8c95a39 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala @@ -20,24 +20,20 @@ package org.apache.griffin.measure.process import java.util.Date -import org.apache.griffin.measure.cache.info.TimeInfoCache -import org.apache.griffin.measure.cache.result.CacheResultProcesser import org.apache.griffin.measure.config.params._ import org.apache.griffin.measure.config.params.env._ import org.apache.griffin.measure.config.params.user._ import org.apache.griffin.measure.data.source.DataSourceFactory import org.apache.griffin.measure.persist.{Persist, PersistFactory} -import org.apache.griffin.measure.process.engine.{DqEngineFactory, SparkSqlEngine} -import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters} -import org.apache.griffin.measure.rule.adaptor.{RuleAdaptorGroup, RunPhase} +import org.apache.griffin.measure.process.engine._ +import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters, TimeRange} +import org.apache.griffin.measure.rule.adaptor._ import org.apache.griffin.measure.rule.plan._ -import org.apache.griffin.measure.rule.udf.GriffinUdfs -import org.apache.griffin.measure.utils.JsonUtil +import org.apache.griffin.measure.rule.udf._ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext} -import scala.concurrent.Await import scala.util.Try case class BatchDqProcess(allParam: AllParam) extends DqProcess { @@ -64,6 +60,7 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess { // register udf GriffinUdfs.register(sqlContext) + GriffinUdafs.register(sqlContext) // init adaptors RuleAdaptorGroup.init(sqlContext, dataSourceNames, baselineDsName) @@ -93,25 +90,11 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess { // init data sources val dsTimeRanges = dqEngines.loadData(dataSources, calcTimeInfo) - - println(s"data source timeRanges: ${dsTimeRanges}") - - // generate rule steps -// val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps( -// TimeInfo(appTime, appTime), userParam.evaluateRuleParam, dsTmsts, BatchProcessType, RunPhase) -// val ruleSteps = RuleAdaptorGroup.genRuleSteps( -// CalcTimeInfo(appTime), userParam.evaluateRuleParam, dsTmsts) + printTimeRanges(dsTimeRanges) val rulePlan = RuleAdaptorGroup.genRulePlan( calcTimeInfo, userParam.evaluateRuleParam, BatchProcessType, dsTimeRanges) -// rulePlan.ruleSteps.foreach(println) -// println("====") -// rulePlan.metricExports.foreach(println) -// println("====") -// rulePlan.recordExports.foreach(println) -// println("====") - // run rules dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps) @@ -119,11 +102,6 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess { dqEngines.persistAllMetrics(rulePlan.metricExports, persistFactory) dqEngines.persistAllRecords(rulePlan.recordExports, persistFactory, dataSources) -// dfs.foreach(_._2.cache()) -// -// dqEngines.persistAllRecords(dfs, persistFactory) - -// dfs.foreach(_._2.unpersist()) // end time val endTime = new Date().getTime @@ -132,11 +110,6 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess { // finish persist.finish() -// sqlContext.tables().show(50) -// println(sqlContext.tableNames().size) - -// sqlContext.tables().show(50) - // clean data cleanData(calcTimeInfo) @@ -190,4 +163,12 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess { // } // } + private def printTimeRanges(timeRanges: Map[String, TimeRange]): Unit = { + val timeRangesStr = timeRanges.map { pair => + val (name, timeRange) = pair + s"${name} -> [${timeRange.begin}, ${timeRange.end})" + }.mkString(", ") + println(s"data source timeRanges: ${timeRangesStr}") + } + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala index 1cc2ab7..3c2376a 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala @@ -18,8 +18,6 @@ under the License. */ package org.apache.griffin.measure.process -import java.util.Date - import org.apache.griffin.measure.cache.info.InfoCacheInstance import org.apache.griffin.measure.config.params._ import org.apache.griffin.measure.config.params.env._ @@ -29,8 +27,7 @@ import org.apache.griffin.measure.persist.{Persist, PersistFactory} import org.apache.griffin.measure.process.engine.DqEngineFactory import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters} import org.apache.griffin.measure.rule.adaptor.RuleAdaptorGroup -import org.apache.griffin.measure.rule.plan.TimeInfo -import org.apache.griffin.measure.rule.udf.GriffinUdfs +import org.apache.griffin.measure.rule.udf._ import org.apache.griffin.measure.utils.TimeUtil import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveContext @@ -67,6 +64,7 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess { // register udf GriffinUdfs.register(sqlContext) + GriffinUdafs.register(sqlContext) // init adaptors val dataSourceNames = userParam.dataSources.map(_.name) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala index c3c4f09..dc49df0 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala @@ -28,7 +28,7 @@ import org.apache.griffin.measure.data.source.DataSource import org.apache.griffin.measure.log.Loggable import org.apache.griffin.measure.persist.{Persist, PersistFactory} import org.apache.griffin.measure.process.engine.DqEngines -import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters} +import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters, TimeRange} import org.apache.griffin.measure.rule.adaptor.{ProcessDetailsKeys, RuleAdaptorGroup, RunPhase} import org.apache.griffin.measure.rule.plan._ import org.apache.spark.sql.SQLContext @@ -59,80 +59,41 @@ case class StreamingDqThread(sqlContext: SQLContext, // init data sources val dsTimeRanges = dqEngines.loadData(dataSources, calcTimeInfo) - - println(s"data source timeRanges: ${dsTimeRanges}") + printTimeRanges(dsTimeRanges) // generate rule steps -// val ruleSteps = RuleAdaptorGroup.genRuleSteps( -// CalcTimeInfo(st), evaluateRuleParam, dsTmsts) val rulePlan = RuleAdaptorGroup.genRulePlan( calcTimeInfo, evaluateRuleParam, StreamingProcessType, dsTimeRanges) - // optimize rule plan -// val optRulePlan = optimizeRulePlan(rulePlan, dsTmsts) - val optRulePlan = rulePlan - -// ruleSteps.foreach(println) - // run rules -// dqEngines.runRuleSteps(ruleSteps) - dqEngines.runRuleSteps(calcTimeInfo, optRulePlan.ruleSteps) + dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps) val ct = new Date().getTime val calculationTimeStr = s"calculation using time: ${ct - st} ms" -// println(calculationTimeStr) appPersist.log(ct, calculationTimeStr) // persist results -// val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory) - dqEngines.persistAllMetrics(optRulePlan.metricExports, persistFactory) -// println(s"--- timeGroups: ${timeGroups}") + dqEngines.persistAllMetrics(rulePlan.metricExports, persistFactory) val rt = new Date().getTime val persistResultTimeStr = s"persist result using time: ${rt - ct} ms" appPersist.log(rt, persistResultTimeStr) // persist records - dqEngines.persistAllRecords(optRulePlan.recordExports, persistFactory, dataSources) + dqEngines.persistAllRecords(rulePlan.recordExports, persistFactory, dataSources) + + // update data sources + dqEngines.updateDataSources(rulePlan.dsUpdates, dataSources) + + // finish calculation + finishCalculation() val et = new Date().getTime val persistTimeStr = s"persist records using time: ${et - rt} ms" appPersist.log(et, persistTimeStr) -// val dfs = dqEngines.collectUpdateRDDs(ruleSteps, timeGroups.toSet) -// dfs.foreach(_._2.cache()) -// dfs.foreach { pr => -// val (step, df) = pr -// val cnt = df.count -// println(s"step [${step.name}] group count: ${cnt}") -// } -// -// val lt = new Date().getTime -// val collectRddTimeStr = s"collect records using time: ${lt - rt} ms" -//// println(collectRddTimeStr) -// appPersist.log(lt, collectRddTimeStr) -// -// // persist records -// dqEngines.persistAllRecords(dfs, persistFactory) -//// dqEngines.persistAllRecords(ruleSteps, persistFactory, timeGroups) -// -// // update data source -// dqEngines.updateDataSources(dfs, dataSources) -//// dqEngines.updateDataSources(ruleSteps, dataSources, timeGroups) -// -// dfs.foreach(_._2.unpersist()) - TimeInfoCache.endTimeInfoCache -// sqlContext.tables().show(20) - - // cache global data -// val globalTables = TableRegisters.getRunGlobalTables -// globalTables.foreach { gt => -// val df = sqlContext.table(gt) -// df.cache -// } - // clean old data cleanData(calcTimeInfo) @@ -150,6 +111,11 @@ case class StreamingDqThread(sqlContext: SQLContext, println(s"===== [${updateTimeDate}] process ends, using ${endTime - updateTime} ms =====") } + // finish calculation for this round + private def finishCalculation(): Unit = { + dataSources.foreach(_.processFinish) + } + // clean old data and old result cache private def cleanData(timeInfo: TimeInfo): Unit = { try { @@ -169,29 +135,12 @@ case class StreamingDqThread(sqlContext: SQLContext, } } - private def optimizeRulePlan(rulePlan: RulePlan, dsTmsts: Map[String, Set[Long]]): RulePlan = { - val steps = rulePlan.ruleSteps - val optExports = rulePlan.ruleExports.flatMap { export => - findRuleStepByName(steps, export.stepName).map { rs => - rs.details.get(ProcessDetailsKeys._baselineDataSource) match { - case Some(dsname: String) => { - val defTmstOpt = (dsTmsts.get(dsname)).flatMap { set => - try { Some(set.max) } catch { case _: Throwable => None } - } - defTmstOpt match { - case Some(t) => export.setDefTimestamp(t) - case _ => export - } - } - case _ => export - } - } - } - RulePlan(steps, optExports) - } - - private def findRuleStepByName(steps: Seq[RuleStep], name: String): Option[RuleStep] = { - steps.filter(_.name == name).headOption + private def printTimeRanges(timeRanges: Map[String, TimeRange]): Unit = { + val timeRangesStr = timeRanges.map { pair => + val (name, timeRange) = pair + s"${name} -> [${timeRange.begin}, ${timeRange.end})" + }.mkString(", ") + println(s"data source timeRanges: ${timeRangesStr}") } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala index ee3a65e..3d77458 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala @@ -46,4 +46,6 @@ trait DqEngine extends Loggable with Serializable { def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, Iterable[String])]], Set[Long]) + + def collectUpdateDf(dsUpdate: DsUpdate): Option[DataFrame] } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/b83c5870/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala index 8f17764..6b9a215 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala @@ -23,20 +23,20 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.griffin.measure.config.params.user.DataSourceParam import org.apache.griffin.measure.data.source._ import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.persist.{Persist, PersistFactory} +import org.apache.griffin.measure.persist._ import org.apache.griffin.measure.process.temp.TimeRange import org.apache.griffin.measure.process._ import org.apache.griffin.measure.rule.adaptor.InternalColumns import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.plan._ +import org.apache.griffin.measure.rule.plan.{DsUpdate, _} import org.apache.griffin.measure.utils.JsonUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row} -import scala.concurrent._ -import scala.concurrent.duration.Duration -import scala.util.{Failure, Success, Try} -import ExecutionContext.Implicits.global +//import scala.concurrent._ +//import scala.concurrent.duration.Duration +//import scala.util.{Failure, Success, Try} +//import ExecutionContext.Implicits.global case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { @@ -76,41 +76,40 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { } } - private def persistCollectedRecords(recordExport: RecordExport, records: Map[Long, DataFrame], - persistFactory: PersistFactory, dataSources: Seq[DataSource]): Unit = { - val pc = ParallelCounter(records.size) - val pro = promise[Boolean] - if (records.size > 0) { - records.foreach { pair => - val (tmst, df) = pair - val persist = persistFactory.getPersists(tmst) - val updateDsCaches = recordExport.dataSourceCacheOpt match { - case Some(dsName) => dataSources.filter(_.name == dsName).flatMap(_.dataSourceCacheOpt) - case _ => Nil - } - val future = Future { -// df.cache - persist.persistRecords(df, recordExport.name) - updateDsCaches.foreach(_.updateData(df, tmst)) -// df.unpersist - true - } - future.onComplete { - case Success(v) => { - pc.finishOne(v) - if (pc.checkDone) pro.trySuccess(pc.checkResult) - } - case Failure(ex) => { - println(s"plan step failure: ${ex.getMessage}") - pc.finishOne(false) - if (pc.checkDone) pro.trySuccess(pc.checkResult) - } - } - } - } else pro.trySuccess(true) - - Await.result(pro.future, Duration.Inf) - } +// private def persistCollectedRecords(recordExport: RecordExport, records: Map[Long, DataFrame], +// persistFactory: PersistFactory, dataSources: Seq[DataSource]): Unit = { +// val pc = ParallelCounter(records.size) +// val pro = promise[Boolean] +// if (records.size > 0) { +// records.foreach { pair => +// val (tmst, df) = pair +// val persist = persistFactory.getPersists(tmst) +// val updateDsCaches = recordExport.dataSourceCacheOpt match { +// case Some(dsName) => dataSources.filter(_.name == dsName).flatMap(_.dataSourceCacheOpt) +// case _ => Nil +// } +// val future = Future { +// persist.persistRecords(df, recordExport.name) +//// updateDsCaches.foreach(_.updateData(df, tmst)) +// updateDsCaches.foreach(_.updateData(Some(df))) +// true +// } +// future.onComplete { +// case Success(v) => { +// pc.finishOne(v) +// if (pc.checkDone) pro.trySuccess(pc.checkResult) +// } +// case Failure(ex) => { +// println(s"plan step failure: ${ex.getMessage}") +// pc.finishOne(false) +// if (pc.checkDone) pro.trySuccess(pc.checkResult) +// } +// } +// } +// } else pro.trySuccess(true) +// +// Await.result(pro.future, Duration.Inf) +// } def persistAllRecords(recordExports: Seq[RecordExport], persistFactory: PersistFactory, dataSources: Seq[DataSource] @@ -123,7 +122,6 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { // method 2: multi thread persist multi iterable recordExports.foreach { recordExport => -// val records = collectRecords(timeInfo, recordExport, procType) recordExport.mode match { case SimpleMode => { collectBatchRecords(recordExport).foreach { rdd => @@ -133,9 +131,6 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { case TimestampMode => { val (rddOpt, emptySet) = collectStreamingRecords(recordExport) persistCollectedStreamingRecords(recordExport, rddOpt, emptySet, persistFactory, dataSources) -// collectStreamingRecords(recordExport).foreach { rddPair => -// persistCollectedStreamingRecords(recordExport, rddPair._1, rddPair._2, persistFactory, dataSources) -// } } } } @@ -165,10 +160,10 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { emtpyRecordKeys: Set[Long], persistFactory: PersistFactory, dataSources: Seq[DataSource] ): Unit = { - val updateDsCaches = recordExport.dataSourceCacheOpt match { - case Some(dsName) => dataSources.filter(_.name == dsName).flatMap(_.dataSourceCacheOpt) - case _ => Nil - } +// val updateDsCaches = recordExport.dataSourceCacheOpt match { +// case Some(dsName) => dataSources.filter(_.name == dsName).flatMap(_.dataSourceCacheOpt) +// case _ => Nil +// } recordsOpt.foreach { records => records.foreach { pair => @@ -176,90 +171,17 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { val persist = persistFactory.getPersists(tmst) persist.persistRecords(strs, recordExport.name) - updateDsCaches.foreach(_.updateData(strs, tmst)) +// updateDsCaches.foreach(_.updateData(strs, tmst)) } } emtpyRecordKeys.foreach { t => val persist = persistFactory.getPersists(t) persist.persistRecords(Nil, recordExport.name) - updateDsCaches.foreach(_.updateData(Nil, t)) +// updateDsCaches.foreach(_.updateData(Nil, t)) } } -// private def persistCollectedStreamingRecords(recordExport: RecordExport, records: RDD[(Long, Iterable[String])], -// emtpyRecordKeys: Set[Long], persistFactory: PersistFactory, -// dataSources: Seq[DataSource] -// ): Unit = { -// val updateDsCaches = recordExport.dataSourceCacheOpt match { -// case Some(dsName) => dataSources.filter(_.name == dsName).flatMap(_.dataSourceCacheOpt) -// case _ => Nil -// } -// -// records.foreach { pair => -// val (tmst, strs) = pair -// val persist = persistFactory.getPersists(tmst) -// -// persist.persistRecords(strs, recordExport.name) -// updateDsCaches.foreach(_.updateData(strs, tmst)) -// } -// -// emtpyRecordKeys.foreach { t => -// val persist = persistFactory.getPersists(t) -// persist.persistRecords(Nil, recordExport.name) -// updateDsCaches.foreach(_.updateData(Nil, t)) -// } -// } - -// def persistAllRecords(ruleSteps: Seq[ConcreteRuleStep], persistFactory: PersistFactory, -// timeGroups: Iterable[Long]): Unit = { -// val recordSteps = ruleSteps.filter(_.persistType == RecordPersistType) -// recordSteps.foreach { step => -// collectRecords(step, timeGroups) match { -// case Some(rdd) => { -// val name = step.name -// rdd.foreach { pair => -// val (t, items) = pair -// val persist = persistFactory.getPersists(t) -// persist.persistRecords(items, name) -// } -// } -// case _ => { -// println(s"empty records to persist") -// } -// } -// } -// } -// -// def updateDataSources(ruleSteps: Seq[ConcreteRuleStep], dataSources: Seq[DataSource], -// timeGroups: Iterable[Long]): Unit = { -// val updateSteps = ruleSteps.filter(_.updateDataSource.nonEmpty) -// updateSteps.foreach { step => -// collectUpdateCacheDatas(step, timeGroups) match { -// case Some(rdd) => { -// val udpateDataSources = dataSources.filter { ds => -// step.updateDataSource match { -// case Some(dsName) if (dsName == ds.name) => true -// case _ => false -// } -// } -// if (udpateDataSources.size > 0) { -// val name = step.name -// rdd.foreach { pair => -// val (t, items) = pair -// udpateDataSources.foreach { ds => -// ds.dataSourceCacheOpt.foreach(_.updateData(items, t)) -// } -// } -// } -// } -// case _ => { -// println(s"empty data source to update") -// } -// } -// } -// } - /////////////////////////// def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = { @@ -272,16 +194,6 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { /////////////////////////// -// def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = { -// engines.flatMap { engine => -// engine.collectRecords(ruleStep, timeGroups) -// }.headOption -// } -// def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = { -// engines.flatMap { engine => -// engine.collectUpdateCacheDatas(ruleStep, timeGroups) -// }.headOption -// } def collectMetrics(metricExport: MetricExport ): Map[Long, Map[String, Any]] = { val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) => @@ -290,121 +202,28 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { ret } -// def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport): Map[Long, DataFrame] = { -// val ret = engines.foldLeft(Map[Long, DataFrame]()) { (ret, engine) => -// if (ret.nonEmpty) ret else engine.collectRecords(timeInfo, recordExport) -// } -// ret -// } - - def collectUpdateRDD(ruleStep: RuleStep): Option[DataFrame] = { -// engines.flatMap { engine => -// engine.collectUpdateRDD(ruleStep) -// }.headOption - None - } - -// def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long] -// ): Option[RDD[(Long, Iterable[String])]] = { -// engines.flatMap { engine => -// engine.collectUpdateRDD(ruleStep, timeGroups) -// }.headOption -// } - - //////////////////////////// - - def collectUpdateRDDs(ruleSteps: Seq[RuleStep], timeGroups: Set[Long] - ): Seq[(RuleStep, DataFrame)] = { -// ruleSteps.flatMap { rs => -// val t = rs.timeInfo.tmst -// if (timeGroups.contains(t)) { -// collectUpdateRDD(rs).map((rs, _)) -// } else None -// } - Nil - } - -// def collectUpdateRDDs(ruleSteps: Seq[ConcreteRuleStep], timeGroups: Iterable[Long] -// ): Seq[(ConcreteRuleStep, RDD[(Long, Iterable[String])])] = { -// ruleSteps.flatMap { rs => -// collectUpdateRDD(rs, timeGroups) match { -// case Some(rdd) => Some((rs, rdd)) -// case _ => None -// } -// } -// } - - def persistAllRecords(stepRdds: Seq[(RuleStep, DataFrame)], - persistFactory: PersistFactory): Unit = { -// stepRdds.foreach { stepRdd => -// val (step, df) = stepRdd -// if (step.ruleInfo.persistType == RecordPersistType) { -// val name = step.ruleInfo.name -// val t = step.timeInfo.tmst -// val persist = persistFactory.getPersists(t) -// persist.persistRecords(df, name) -// } -// } + def collectUpdateDf(dsUpdate: DsUpdate): Option[DataFrame] = { + val ret = engines.foldLeft(None: Option[DataFrame]) { (ret, engine) => + if (ret.nonEmpty) ret else engine.collectUpdateDf(dsUpdate) + } + ret } -// def persistAllRecords(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, Iterable[String])])], -// persistFactory: PersistFactory): Unit = { -// stepRdds.foreach { stepRdd => -// val (step, rdd) = stepRdd -// if (step.ruleInfo.persistType == RecordPersistType) { -// val name = step.name -// rdd.foreach { pair => -// val (t, items) = pair -// val persist = persistFactory.getPersists(t) -// persist.persistRecords(items, name) -// } -// } -// } -// } - - def updateDataSources(stepRdds: Seq[(RuleStep, DataFrame)], + def updateDataSources(dsUpdates: Seq[DsUpdate], dataSources: Seq[DataSource]): Unit = { -// stepRdds.foreach { stepRdd => -// val (step, df) = stepRdd -// if (step.ruleInfo.cacheDataSourceOpt.nonEmpty) { -// val udpateDsCaches = dataSources.filter { ds => -// step.ruleInfo.cacheDataSourceOpt match { -// case Some(dsName) if (dsName == ds.name) => true -// case _ => false -// } -// }.flatMap(_.dataSourceCacheOpt) -// if (udpateDsCaches.size > 0) { -// val t = step.timeInfo.tmst -// udpateDsCaches.foreach(_.updateData(df, t)) -// } -// } -// } + dsUpdates.foreach { dsUpdate => + val dsName = dsUpdate.dsName + collectUpdateDf(dsUpdate) match { + case Some(df) => { + dataSources.filter(_.name == dsName).headOption.foreach(_.updateData(df)) + } + case _ => { + // do nothing + } + } + } } -// def updateDataSources(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, Iterable[String])])], -// dataSources: Seq[DataSource]): Unit = { -// stepRdds.foreach { stepRdd => -// val (step, rdd) = stepRdd -// if (step.ruleInfo.cacheDataSourceOpt.nonEmpty) { -// val udpateDataSources = dataSources.filter { ds => -// step.ruleInfo.cacheDataSourceOpt match { -// case Some(dsName) if (dsName == ds.name) => true -// case _ => false -// } -// } -// if (udpateDataSources.size > 0) { -// val name = step.name -// rdd.foreach { pair => -// val (t, items) = pair -// udpateDataSources.foreach { ds => -// ds.dataSourceCacheOpt.foreach(_.updateData(items, t)) -// } -// } -// } -// } -// } -// } - } case class ParallelCounter(total: Int) extends Serializable {
