Repository: incubator-griffin Updated Branches: refs/heads/master ff3098ffd -> ed5ac8730
refactor main process of batch and streaming measure Author: Lionel Liu <[email protected]> Author: dodobel <[email protected]> Closes #294 from bhlx3lyx7/spark2. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/ed5ac873 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/ed5ac873 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/ed5ac873 Branch: refs/heads/master Commit: ed5ac8730e685b54c38494c26bf07c0603e8fa67 Parents: ff3098f Author: Lionel Liu <[email protected]> Authored: Mon Jun 11 15:58:39 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Mon Jun 11 15:58:39 2018 +0800 ---------------------------------------------------------------------- .../apache/griffin/measure/Application.scala | 16 +- .../griffin/measure/context/DQContext.scala | 2 + .../measure/context/datasource/DataSource.scala | 14 +- .../context/datasource/DataSourceFactory.scala | 9 +- .../datasource/cache/DataSourceCache.scala | 366 ------------------- .../cache/DataSourceCacheFactory.scala | 68 ---- .../datasource/cache/JsonDataSourceCache.scala | 40 -- .../datasource/cache/OrcDataSourceCache.scala | 40 -- .../cache/ParquetDataSourceCache.scala | 42 --- .../datasource/cache/StreamingCacheClient.scala | 366 +++++++++++++++++++ .../cache/StreamingCacheClientFactory.scala | 68 ++++ .../cache/StreamingCacheJsonClient.scala | 40 ++ .../cache/StreamingCacheOrcClient.scala | 40 ++ .../cache/StreamingCacheParquetClient.scala | 42 +++ .../connector/DataConnectorFactory.scala | 16 +- .../streaming/KafkaStreamingDataConnector.scala | 4 +- .../KafkaStreamingStringDataConnector.scala | 4 +- .../streaming/StreamingDataConnector.scala | 4 +- .../apache/griffin/measure/launch/DQApp.scala | 5 +- .../measure/launch/batch/BatchDQApp.scala | 6 +- .../launch/streaming/StreamingDQApp.scala | 104 +++++- .../launch/streaming/StreamingDQApp2.scala | 104 ------ .../step/builder/RuleParamStepBuilder.scala | 4 +- .../dsl/transform/AccuracyExpr2DQSteps.scala | 4 +- .../transform/DistinctnessExpr2DQSteps.scala | 4 +- .../step/write/DataSourceUpdateWriteStep.scala | 61 ++++ .../step/write/DsCacheUpdateWriteStep.scala | 61 ---- measure/src/test/resources/env-batch.json | 9 - 28 files changed, 763 insertions(+), 780 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/Application.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala index 893ba2c..6580ffc 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala @@ -77,7 +77,7 @@ object Application extends Loggable { // choose process val procType = ProcessType(allParam.dqParam.procType) - val proc: DQApp = procType match { + val dqApp: DQApp = procType match { case BatchProcessType => BatchDQApp(allParam) case StreamingProcessType => StreamingDQApp(allParam) case _ => { @@ -88,8 +88,8 @@ object Application extends Loggable { startup - // process init - proc.init match { + // dq app init + dqApp.init match { case Success(_) => { info("process init success") } @@ -100,15 +100,15 @@ object Application extends Loggable { } } - // process run - proc.run match { + // dq app run + dqApp.run match { case Success(_) => { info("process run success") } case Failure(ex) => { error(s"process run error: ${ex.getMessage}") - if (proc.retryable) { + if (dqApp.retryable) { throw ex } else { shutdown @@ -117,8 +117,8 @@ object Application extends Loggable { } } - // process end - proc.close match { + // dq app end + dqApp.close match { case Success(_) => { info("process end success") } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala index 43b61aa..1d7dc62 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala @@ -26,6 +26,8 @@ import org.apache.spark.sql.{Encoders, SQLContext, SparkSession} /** * dq context: the context of each calculation + * unique context id in each calculation + * access the same spark session this app created */ case class DQContext(contextId: ContextId, name: String, http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala index 09ab9ea..0ca61aa 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala @@ -20,7 +20,7 @@ package org.apache.griffin.measure.context.datasource import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.params.DataSourceParam -import org.apache.griffin.measure.context.datasource.cache.DataSourceCache +import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange} import org.apache.griffin.measure.context.datasource.connector.DataConnector import org.apache.griffin.measure.context.datasource.info.TmstCache @@ -32,12 +32,12 @@ import org.apache.spark.sql._ * @param name name of data source * @param dsParam param of this data source * @param dataConnectors list of data connectors - * @param dataSourceCacheOpt data source cache option in streaming mode + * @param streamingCacheClientOpt streaming data cache client option */ case class DataSource(name: String, dsParam: DataSourceParam, dataConnectors: Seq[DataConnector], - dataSourceCacheOpt: Option[DataSourceCache] + streamingCacheClientOpt: Option[StreamingCacheClient] ) extends Loggable with Serializable { def init(): Unit = { @@ -67,7 +67,7 @@ case class DataSource(name: String, case _ => None } } - val caches = dataSourceCacheOpt match { + val caches = streamingCacheClientOpt match { case Some(dsc) => dsc.readData() :: Nil case _ => Nil } @@ -83,15 +83,15 @@ case class DataSource(name: String, } def updateData(df: DataFrame): Unit = { - dataSourceCacheOpt.foreach(_.updateData(Some(df))) + streamingCacheClientOpt.foreach(_.updateData(Some(df))) } def cleanOldData(): Unit = { - dataSourceCacheOpt.foreach(_.cleanOutTimeData) + streamingCacheClientOpt.foreach(_.cleanOutTimeData) } def processFinish(): Unit = { - dataSourceCacheOpt.foreach(_.processFinish) + streamingCacheClientOpt.foreach(_.processFinish) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala index a22b856..95c8de7 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala @@ -20,7 +20,7 @@ package org.apache.griffin.measure.context.datasource import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.params.DataSourceParam -import org.apache.griffin.measure.context.datasource.cache.DataSourceCacheFactory +import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClientFactory import org.apache.griffin.measure.context.datasource.connector.{DataConnector, DataConnectorFactory} import org.apache.griffin.measure.context.datasource.info.TmstCache import org.apache.spark.sql.SparkSession @@ -49,18 +49,19 @@ object DataSourceFactory extends Loggable { val connectorParams = dataSourceParam.getConnectors val tmstCache = TmstCache() - val dataSourceCacheOpt = DataSourceCacheFactory.getDataSourceCacheOpt( + // for streaming data cache + val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt( sparkSession.sqlContext, dataSourceParam.cache, name, index, tmstCache) val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam => DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam, - tmstCache, dataSourceCacheOpt) match { + tmstCache, streamingCacheClientOpt) match { case Success(connector) => Some(connector) case _ => None } } - Some(DataSource(name, dataSourceParam, dataConnectors, dataSourceCacheOpt)) + Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt)) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala deleted file mode 100644 index c70fd20..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala +++ /dev/null @@ -1,366 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.context.datasource.cache - -import java.util.concurrent.TimeUnit - -import org.apache.griffin.measure.Loggable -import org.apache.griffin.measure.context.TimeRange -import org.apache.griffin.measure.context.datasource.info.{DataSourceCacheable, TmstCache} -import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache} -import org.apache.griffin.measure.step.builder.ConstantColumns -import org.apache.griffin.measure.utils.DataFrameUtil._ -import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} -import org.apache.spark.sql._ - -import scala.util.Random - -/** - * data source cache in streaming mode - * save data frame into hdfs in dump phase - * read data frame from hdfs in calculate phase - * with update and clean actions for the cache data - */ -trait DataSourceCache extends DataSourceCacheable with WithFanIn[Long] with Loggable with Serializable { - - val sqlContext: SQLContext - val param: Map[String, Any] - val dsName: String - val index: Int - - val tmstCache: TmstCache - protected def fromUntilRangeTmsts(from: Long, until: Long) = tmstCache.fromUntil(from, until) - protected def clearTmst(t: Long) = tmstCache.remove(t) - protected def clearTmstsUntil(until: Long) = { - val outDateTmsts = tmstCache.until(until) - tmstCache.remove(outDateTmsts) - } - protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1) - protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1) - - val _FilePath = "file.path" - val _InfoPath = "info.path" - val _ReadyTimeInterval = "ready.time.interval" - val _ReadyTimeDelay = "ready.time.delay" - val _TimeRange = "time.range" - - val rdmStr = Random.alphanumeric.take(10).mkString - val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}" - val defInfoPath = s"${index}" - - val filePath: String = param.getString(_FilePath, defFilePath) - val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath) - val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L) - val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L) - val deltaTimeRange: (Long, Long) = { - def negative(n: Long): Long = if (n <= 0) n else 0 - param.get(_TimeRange) match { - case Some(seq: Seq[String]) => { - val nseq = seq.flatMap(TimeUtil.milliseconds(_)) - val ns = negative(nseq.headOption.getOrElse(0)) - val ne = negative(nseq.tail.headOption.getOrElse(0)) - (ns, ne) - } - case _ => (0, 0) - } - } - - val _ReadOnly = "read.only" - val readOnly = param.getBoolean(_ReadOnly, false) - - val _Updatable = "updatable" - val updatable = param.getBoolean(_Updatable, false) - - val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new") - val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old") - - val newFilePath = s"${filePath}/new" - val oldFilePath = s"${filePath}/old" - - val defOldCacheIndex = 0L - - protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit - protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame - - /** - * save data frame in dump phase - * @param dfOpt data frame to be saved - * @param ms timestamp of this data frame - */ - def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = { - if (!readOnly) { - dfOpt match { - case Some(df) => { - // cache df - df.cache - - // cache df - val cnt = df.count - info(s"save ${dsName} data count: ${cnt}") - - if (cnt > 0) { - // lock makes it safer when writing new cache data - val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) - if (newCacheLocked) { - try { - val dfw = df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst) - writeDataFrame(dfw, newFilePath) - } catch { - case e: Throwable => error(s"save data error: ${e.getMessage}") - } finally { - newCacheLock.unlock() - } - } - } - - // uncache df - df.unpersist - } - case _ => { - info(s"no data frame to save") - } - } - - // submit cache time and ready time - if (fanIncrement(ms)) { - info(s"save data [${ms}] finish") - submitCacheTime(ms) - submitReadyTime(ms) - } - - } - } - - /** - * read data frame in calculation phase - * @return data frame to calculate, with the time range of data - */ - def readData(): (Option[DataFrame], TimeRange) = { - // time range: (a, b] - val timeRange = TimeInfoCache.getTimeRange - val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) - - // read partition info - val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) { - info(s"read time range: [${reviseTimeRange._1}]") - s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}" - } else { - info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]") - s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}" - } - - // new cache data - val newDfOpt = try { - val dfr = sqlContext.read - Some(readDataFrame(dfr, newFilePath).filter(filterStr)) - } catch { - case e: Throwable => { - warn(s"read data source cache warn: ${e.getMessage}") - None - } - } - - // old cache data - val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None - val oldDfOpt = oldCacheIndexOpt.flatMap { idx => - val oldDfPath = s"${oldFilePath}/${idx}" - try { - val dfr = sqlContext.read - Some(readDataFrame(dfr, oldDfPath).filter(filterStr)) - } catch { - case e: Throwable => { - warn(s"read old data source cache warn: ${e.getMessage}") - None - } - } - } - - // whole cache data - val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt) - - // from until tmst range - val (from, until) = (reviseTimeRange._1, reviseTimeRange._2) - val tmstSet = afterTilRangeTmsts(from, until) - - val retTimeRange = TimeRange(reviseTimeRange, tmstSet) - (cacheDfOpt, retTimeRange) - } - - private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String], - func: (Long, Long) => Boolean - ): Unit = { - val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func) - // delete out time data path - earlierOrEqPaths.foreach { path => - info(s"delete hdfs path: ${path}") - HdfsUtil.deleteHdfsPath(path) - } - } - private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: Option[String], - func: (Long, Long) => Boolean - ): Iterable[String] = { - val names = HdfsUtil.listSubPathsByType(path, "dir") - val regex = partitionOpt match { - case Some(partition) => s"^${partition}=(\\d+)$$".r - case _ => "^(\\d+)$".r - } - names.filter { name => - name match { - case regex(value) => { - str2Long(value) match { - case Some(t) => func(t, bound) - case _ => false - } - } - case _ => false - } - }.map(name => s"${path}/${name}") - } - private def str2Long(str: String): Option[Long] = { - try { - Some(str.toLong) - } catch { - case e: Throwable => None - } - } - - /** - * clean out-time cached data on hdfs - */ - def cleanOutTimeData(): Unit = { - // clean tmst - val cleanTime = readCleanTime - cleanTime.foreach(clearTmstsTil(_)) - - if (!readOnly) { - // new cache data - val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime - newCacheCleanTime match { - case Some(nct) => { - // clean calculated new cache data - val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) - if (newCacheLocked) { - try { - cleanOutTimePartitions(newFilePath, nct, Some(ConstantColumns.tmst), - (a: Long, b: Long) => (a <= b)) - } catch { - case e: Throwable => error(s"clean new cache data error: ${e.getMessage}") - } finally { - newCacheLock.unlock() - } - } - } - case _ => { - // do nothing - } - } - - // old cache data - val oldCacheCleanTime = if (updatable) readCleanTime else None - oldCacheCleanTime match { - case Some(oct) => { - val oldCacheIndexOpt = readOldCacheIndex - oldCacheIndexOpt.foreach { idx => - val oldDfPath = s"${oldFilePath}/${idx}" - val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) - if (oldCacheLocked) { - try { - // clean calculated old cache data - cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => (a < b)) - // clean out time old cache data not calculated -// cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst)) - } catch { - case e: Throwable => error(s"clean old cache data error: ${e.getMessage}") - } finally { - oldCacheLock.unlock() - } - } - } - } - case _ => { - // do nothing - } - } - } - } - - /** - * update old cached data by new data frame - * @param dfOpt data frame to update old cached data - */ - def updateData(dfOpt: Option[DataFrame]): Unit = { - if (!readOnly && updatable) { - dfOpt match { - case Some(df) => { - // old cache lock - val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) - if (oldCacheLocked) { - try { - val oldCacheIndexOpt = readOldCacheIndex - val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1 - - val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}" - val cleanTime = getNextCleanTime - val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}" - val updateDf = df.filter(filterStr) - - val prlCount = sqlContext.sparkContext.defaultParallelism - // repartition - val repartitionedDf = updateDf.repartition(prlCount) - val dfw = repartitionedDf.write.mode(SaveMode.Overwrite) - writeDataFrame(dfw, oldDfPath) - - submitOldCacheIndex(nextOldCacheIndex) - } catch { - case e: Throwable => error(s"update data error: ${e.getMessage}") - } finally { - oldCacheLock.unlock() - } - } - } - case _ => { - info(s"no data frame to update") - } - } - } - } - - /** - * each time calculation phase finishes, - * data source cache needs to submit some cache information - */ - def processFinish(): Unit = { - // next last proc time - val timeRange = TimeInfoCache.getTimeRange - submitLastProcTime(timeRange._2) - - // next clean time - val nextCleanTime = timeRange._2 + deltaTimeRange._1 - submitCleanTime(nextCleanTime) - } - - // read next clean time - private def getNextCleanTime(): Long = { - val timeRange = TimeInfoCache.getTimeRange - val nextCleanTime = timeRange._2 + deltaTimeRange._1 - nextCleanTime - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala deleted file mode 100644 index ca882e0..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.context.datasource.cache - -import org.apache.griffin.measure.Loggable -import org.apache.griffin.measure.context.datasource.info.TmstCache -import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.spark.sql.SQLContext - -object DataSourceCacheFactory extends Loggable { - - private object DataSourceCacheType { - val ParquetRegex = "^(?i)parq(uet)?$".r - val JsonRegex = "^(?i)json$".r - val OrcRegex = "^(?i)orc$".r - } - import DataSourceCacheType._ - - val _type = "type" - - /** - * create data source cache - * @param sqlContext sqlContext in spark environment - * @param param data source cache config - * @param name data source name - * @param index data source index - * @param tmstCache the same tmstCache instance inside a data source - * @return data source option - */ - def getDataSourceCacheOpt(sqlContext: SQLContext, param: Map[String, Any], - name: String, index: Int, tmstCache: TmstCache - ): Option[DataSourceCache] = { - if (param != null) { - try { - val tp = param.getString(_type, "") - val dsCache = tp match { - case ParquetRegex() => ParquetDataSourceCache(sqlContext, param, name, index, tmstCache) - case JsonRegex() => JsonDataSourceCache(sqlContext, param, name, index, tmstCache) - case OrcRegex() => OrcDataSourceCache(sqlContext, param, name, index, tmstCache) - case _ => ParquetDataSourceCache(sqlContext, param, name, index, tmstCache) - } - Some(dsCache) - } catch { - case e: Throwable => { - error(s"generate data source cache fails") - None - } - } - } else None - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala deleted file mode 100644 index cb01274..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.context.datasource.cache - -import org.apache.griffin.measure.context.datasource.info.TmstCache -import org.apache.spark.sql._ - -/** - * data source cache in json format - */ -case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], - dsName: String, index: Int, tmstCache: TmstCache - ) extends DataSourceCache { - - protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { - info(s"write path: ${path}") - dfw.json(path) - } - - protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { - dfr.json(path) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala deleted file mode 100644 index daba15f..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.context.datasource.cache - -import org.apache.griffin.measure.context.datasource.info.TmstCache -import org.apache.spark.sql._ - -/** - * data source cache in orc format - */ -case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], - dsName: String, index: Int, tmstCache: TmstCache - ) extends DataSourceCache { - - protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { - info(s"write path: ${path}") - dfw.orc(path) - } - - protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { - dfr.orc(path) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala deleted file mode 100644 index f00c6a3..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.context.datasource.cache - -import org.apache.griffin.measure.context.datasource.info.TmstCache -import org.apache.spark.sql._ - -/** - * data source cache in parquet format - */ -case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], - dsName: String, index: Int, tmstCache: TmstCache - ) extends DataSourceCache { - - sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") - - protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { - info(s"write path: ${path}") - dfw.parquet(path) - } - - protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { - dfr.parquet(path) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala new file mode 100644 index 0000000..1c3ca60 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala @@ -0,0 +1,366 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.cache + +import java.util.concurrent.TimeUnit + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.context.TimeRange +import org.apache.griffin.measure.context.datasource.info.{DataSourceCacheable, TmstCache} +import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache} +import org.apache.griffin.measure.step.builder.ConstantColumns +import org.apache.griffin.measure.utils.DataFrameUtil._ +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} +import org.apache.spark.sql._ + +import scala.util.Random + +/** + * data source cache in streaming mode + * save data frame into hdfs in dump phase + * read data frame from hdfs in calculate phase + * with update and clean actions for the cache data + */ +trait StreamingCacheClient extends DataSourceCacheable with WithFanIn[Long] with Loggable with Serializable { + + val sqlContext: SQLContext + val param: Map[String, Any] + val dsName: String + val index: Int + + val tmstCache: TmstCache + protected def fromUntilRangeTmsts(from: Long, until: Long) = tmstCache.fromUntil(from, until) + protected def clearTmst(t: Long) = tmstCache.remove(t) + protected def clearTmstsUntil(until: Long) = { + val outDateTmsts = tmstCache.until(until) + tmstCache.remove(outDateTmsts) + } + protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1) + protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1) + + val _FilePath = "file.path" + val _InfoPath = "info.path" + val _ReadyTimeInterval = "ready.time.interval" + val _ReadyTimeDelay = "ready.time.delay" + val _TimeRange = "time.range" + + val rdmStr = Random.alphanumeric.take(10).mkString + val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}" + val defInfoPath = s"${index}" + + val filePath: String = param.getString(_FilePath, defFilePath) + val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath) + val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L) + val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L) + val deltaTimeRange: (Long, Long) = { + def negative(n: Long): Long = if (n <= 0) n else 0 + param.get(_TimeRange) match { + case Some(seq: Seq[String]) => { + val nseq = seq.flatMap(TimeUtil.milliseconds(_)) + val ns = negative(nseq.headOption.getOrElse(0)) + val ne = negative(nseq.tail.headOption.getOrElse(0)) + (ns, ne) + } + case _ => (0, 0) + } + } + + val _ReadOnly = "read.only" + val readOnly = param.getBoolean(_ReadOnly, false) + + val _Updatable = "updatable" + val updatable = param.getBoolean(_Updatable, false) + + val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new") + val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old") + + val newFilePath = s"${filePath}/new" + val oldFilePath = s"${filePath}/old" + + val defOldCacheIndex = 0L + + protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit + protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame + + /** + * save data frame in dump phase + * @param dfOpt data frame to be saved + * @param ms timestamp of this data frame + */ + def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = { + if (!readOnly) { + dfOpt match { + case Some(df) => { + // cache df + df.cache + + // cache df + val cnt = df.count + info(s"save ${dsName} data count: ${cnt}") + + if (cnt > 0) { + // lock makes it safer when writing new cache data + val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) + if (newCacheLocked) { + try { + val dfw = df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst) + writeDataFrame(dfw, newFilePath) + } catch { + case e: Throwable => error(s"save data error: ${e.getMessage}") + } finally { + newCacheLock.unlock() + } + } + } + + // uncache df + df.unpersist + } + case _ => { + info(s"no data frame to save") + } + } + + // submit cache time and ready time + if (fanIncrement(ms)) { + info(s"save data [${ms}] finish") + submitCacheTime(ms) + submitReadyTime(ms) + } + + } + } + + /** + * read data frame in calculation phase + * @return data frame to calculate, with the time range of data + */ + def readData(): (Option[DataFrame], TimeRange) = { + // time range: (a, b] + val timeRange = TimeInfoCache.getTimeRange + val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) + + // read partition info + val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) { + info(s"read time range: [${reviseTimeRange._1}]") + s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}" + } else { + info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]") + s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}" + } + + // new cache data + val newDfOpt = try { + val dfr = sqlContext.read + Some(readDataFrame(dfr, newFilePath).filter(filterStr)) + } catch { + case e: Throwable => { + warn(s"read data source cache warn: ${e.getMessage}") + None + } + } + + // old cache data + val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None + val oldDfOpt = oldCacheIndexOpt.flatMap { idx => + val oldDfPath = s"${oldFilePath}/${idx}" + try { + val dfr = sqlContext.read + Some(readDataFrame(dfr, oldDfPath).filter(filterStr)) + } catch { + case e: Throwable => { + warn(s"read old data source cache warn: ${e.getMessage}") + None + } + } + } + + // whole cache data + val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt) + + // from until tmst range + val (from, until) = (reviseTimeRange._1, reviseTimeRange._2) + val tmstSet = afterTilRangeTmsts(from, until) + + val retTimeRange = TimeRange(reviseTimeRange, tmstSet) + (cacheDfOpt, retTimeRange) + } + + private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String], + func: (Long, Long) => Boolean + ): Unit = { + val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func) + // delete out time data path + earlierOrEqPaths.foreach { path => + info(s"delete hdfs path: ${path}") + HdfsUtil.deleteHdfsPath(path) + } + } + private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: Option[String], + func: (Long, Long) => Boolean + ): Iterable[String] = { + val names = HdfsUtil.listSubPathsByType(path, "dir") + val regex = partitionOpt match { + case Some(partition) => s"^${partition}=(\\d+)$$".r + case _ => "^(\\d+)$".r + } + names.filter { name => + name match { + case regex(value) => { + str2Long(value) match { + case Some(t) => func(t, bound) + case _ => false + } + } + case _ => false + } + }.map(name => s"${path}/${name}") + } + private def str2Long(str: String): Option[Long] = { + try { + Some(str.toLong) + } catch { + case e: Throwable => None + } + } + + /** + * clean out-time cached data on hdfs + */ + def cleanOutTimeData(): Unit = { + // clean tmst + val cleanTime = readCleanTime + cleanTime.foreach(clearTmstsTil(_)) + + if (!readOnly) { + // new cache data + val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime + newCacheCleanTime match { + case Some(nct) => { + // clean calculated new cache data + val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) + if (newCacheLocked) { + try { + cleanOutTimePartitions(newFilePath, nct, Some(ConstantColumns.tmst), + (a: Long, b: Long) => (a <= b)) + } catch { + case e: Throwable => error(s"clean new cache data error: ${e.getMessage}") + } finally { + newCacheLock.unlock() + } + } + } + case _ => { + // do nothing + } + } + + // old cache data + val oldCacheCleanTime = if (updatable) readCleanTime else None + oldCacheCleanTime match { + case Some(oct) => { + val oldCacheIndexOpt = readOldCacheIndex + oldCacheIndexOpt.foreach { idx => + val oldDfPath = s"${oldFilePath}/${idx}" + val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) + if (oldCacheLocked) { + try { + // clean calculated old cache data + cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => (a < b)) + // clean out time old cache data not calculated +// cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst)) + } catch { + case e: Throwable => error(s"clean old cache data error: ${e.getMessage}") + } finally { + oldCacheLock.unlock() + } + } + } + } + case _ => { + // do nothing + } + } + } + } + + /** + * update old cached data by new data frame + * @param dfOpt data frame to update old cached data + */ + def updateData(dfOpt: Option[DataFrame]): Unit = { + if (!readOnly && updatable) { + dfOpt match { + case Some(df) => { + // old cache lock + val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) + if (oldCacheLocked) { + try { + val oldCacheIndexOpt = readOldCacheIndex + val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1 + + val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}" + val cleanTime = getNextCleanTime + val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}" + val updateDf = df.filter(filterStr) + + val prlCount = sqlContext.sparkContext.defaultParallelism + // repartition + val repartitionedDf = updateDf.repartition(prlCount) + val dfw = repartitionedDf.write.mode(SaveMode.Overwrite) + writeDataFrame(dfw, oldDfPath) + + submitOldCacheIndex(nextOldCacheIndex) + } catch { + case e: Throwable => error(s"update data error: ${e.getMessage}") + } finally { + oldCacheLock.unlock() + } + } + } + case _ => { + info(s"no data frame to update") + } + } + } + } + + /** + * each time calculation phase finishes, + * data source cache needs to submit some cache information + */ + def processFinish(): Unit = { + // next last proc time + val timeRange = TimeInfoCache.getTimeRange + submitLastProcTime(timeRange._2) + + // next clean time + val nextCleanTime = timeRange._2 + deltaTimeRange._1 + submitCleanTime(nextCleanTime) + } + + // read next clean time + private def getNextCleanTime(): Long = { + val timeRange = TimeInfoCache.getTimeRange + val nextCleanTime = timeRange._2 + deltaTimeRange._1 + nextCleanTime + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala new file mode 100644 index 0000000..529b07a --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala @@ -0,0 +1,68 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.cache + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.spark.sql.SQLContext + +object StreamingCacheClientFactory extends Loggable { + + private object DataSourceCacheType { + val ParquetRegex = "^(?i)parq(uet)?$".r + val JsonRegex = "^(?i)json$".r + val OrcRegex = "^(?i)orc$".r + } + import DataSourceCacheType._ + + val _type = "type" + + /** + * create streaming cache client + * @param sqlContext sqlContext in spark environment + * @param param data source cache config + * @param name data source name + * @param index data source index + * @param tmstCache the same tmstCache instance inside a data source + * @return streaming cache client option + */ + def getClientOpt(sqlContext: SQLContext, param: Map[String, Any], + name: String, index: Int, tmstCache: TmstCache + ): Option[StreamingCacheClient] = { + if (param != null) { + try { + val tp = param.getString(_type, "") + val dsCache = tp match { + case ParquetRegex() => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache) + case JsonRegex() => StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache) + case OrcRegex() => StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache) + case _ => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache) + } + Some(dsCache) + } catch { + case e: Throwable => { + error(s"generate data source cache fails") + None + } + } + } else None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala new file mode 100644 index 0000000..494db3e --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.cache + +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.spark.sql._ + +/** + * data source cache in json format + */ +case class StreamingCacheJsonClient(sqlContext: SQLContext, param: Map[String, Any], + dsName: String, index: Int, tmstCache: TmstCache + ) extends StreamingCacheClient { + + protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { + info(s"write path: ${path}") + dfw.json(path) + } + + protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { + dfr.json(path) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala new file mode 100644 index 0000000..6e0f142 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.cache + +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.spark.sql._ + +/** + * data source cache in orc format + */ +case class StreamingCacheOrcClient(sqlContext: SQLContext, param: Map[String, Any], + dsName: String, index: Int, tmstCache: TmstCache + ) extends StreamingCacheClient { + + protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { + info(s"write path: ${path}") + dfw.orc(path) + } + + protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { + dfr.orc(path) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala new file mode 100644 index 0000000..d99bc58 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala @@ -0,0 +1,42 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.cache + +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.spark.sql._ + +/** + * data source cache in parquet format + */ +case class StreamingCacheParquetClient(sqlContext: SQLContext, param: Map[String, Any], + dsName: String, index: Int, tmstCache: TmstCache + ) extends StreamingCacheClient { + + sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") + + protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { + info(s"write path: ${path}") + dfw.parquet(path) + } + + protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { + dfr.parquet(path) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala index ea22309..30352a9 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala @@ -20,7 +20,7 @@ package org.apache.griffin.measure.context.datasource.connector import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.params.DataConnectorParam -import org.apache.griffin.measure.context.datasource.cache.DataSourceCache +import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient import org.apache.griffin.measure.context.datasource.connector.batch._ import org.apache.griffin.measure.context.datasource.connector.streaming._ import org.apache.griffin.measure.context.datasource.info.TmstCache @@ -44,14 +44,14 @@ object DataConnectorFactory extends Loggable { * @param ssc spark streaming env * @param dcParam data connector param * @param tmstCache same tmst cache in one data source - * @param dataSourceCacheOpt for streaming data connector + * @param streamingCacheClientOpt for streaming cache * @return data connector */ def getDataConnector(sparkSession: SparkSession, ssc: StreamingContext, dcParam: DataConnectorParam, tmstCache: TmstCache, - dataSourceCacheOpt: Option[DataSourceCache] + streamingCacheClientOpt: Option[StreamingCacheClient] ): Try[DataConnector] = { val conType = dcParam.conType val version = dcParam.version @@ -61,7 +61,7 @@ object DataConnectorFactory extends Loggable { case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache) case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache) case KafkaRegex() => { - getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, dataSourceCacheOpt) + getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) } case _ => throw new Exception("connector creation error!") } @@ -72,13 +72,13 @@ object DataConnectorFactory extends Loggable { ssc: StreamingContext, dcParam: DataConnectorParam, tmstCache: TmstCache, - dataSourceCacheOpt: Option[DataSourceCache] + streamingCacheClientOpt: Option[StreamingCacheClient] ): StreamingDataConnector = { if (ssc == null) throw new Exception("streaming context is null!") val conType = dcParam.conType val version = dcParam.version conType match { - case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, dataSourceCacheOpt) + case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case _ => throw new Exception("streaming connector creation error!") } } @@ -87,7 +87,7 @@ object DataConnectorFactory extends Loggable { ssc: StreamingContext, dcParam: DataConnectorParam, tmstCache: TmstCache, - dataSourceCacheOpt: Option[DataSourceCache] + streamingCacheClientOpt: Option[StreamingCacheClient] ): KafkaStreamingDataConnector = { val KeyType = "key.type" val ValueType = "value.type" @@ -96,7 +96,7 @@ object DataConnectorFactory extends Loggable { val valueType = config.getOrElse(ValueType, "java.lang.String").toString (getClassTag(keyType), getClassTag(valueType)) match { case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => { - KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, dataSourceCacheOpt) + KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) } case _ => { throw new Exception("not supported type kafka data connector") http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala index 9fe4876..de2822b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala @@ -43,7 +43,7 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector { def init(): Unit = { // register fan in - dataSourceCacheOpt.foreach(_.registerFanIn) + streamingCacheClientOpt.foreach(_.registerFanIn) val ds = stream match { case Success(dstream) => dstream @@ -71,7 +71,7 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector { } // save data frame - dataSourceCacheOpt.foreach(_.saveData(saveDfOpt, ms)) + streamingCacheClientOpt.foreach(_.saveData(saveDfOpt, ms)) }) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala index b483933..3083ca6 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala @@ -20,7 +20,7 @@ package org.apache.griffin.measure.context.datasource.connector.streaming import kafka.serializer.StringDecoder import org.apache.griffin.measure.configuration.params.DataConnectorParam -import org.apache.griffin.measure.context.datasource.cache.DataSourceCache +import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient import org.apache.griffin.measure.context.datasource.info.TmstCache import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -36,7 +36,7 @@ case class KafkaStreamingStringDataConnector(@transient sparkSession: SparkSessi @transient ssc: StreamingContext, dcParam: DataConnectorParam, tmstCache: TmstCache, - dataSourceCacheOpt: Option[DataSourceCache] + streamingCacheClientOpt: Option[StreamingCacheClient] ) extends KafkaStreamingDataConnector { type K = String http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala index 3b2c355..737bc21 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala @@ -19,7 +19,7 @@ under the License. package org.apache.griffin.measure.context.datasource.connector.streaming import org.apache.griffin.measure.context.TimeRange -import org.apache.griffin.measure.context.datasource.cache.DataSourceCache +import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient import org.apache.griffin.measure.context.datasource.connector.DataConnector import org.apache.spark.rdd.RDD import org.apache.spark.sql._ @@ -41,6 +41,6 @@ trait StreamingDataConnector extends DataConnector { // streaming data connector cannot directly read data frame def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange.emptyTimeRange) - val dataSourceCacheOpt: Option[DataSourceCache] + val streamingCacheClientOpt: Option[StreamingCacheClient] } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala index 42364a8..9ec1641 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala @@ -44,7 +44,10 @@ trait DQApp extends Loggable with Serializable { */ def retryable: Boolean - protected def getAppTime: Long = { + /** + * timestamp as a key for metrics + */ + protected def getMeasureTime: Long = { if (dqParam.timestamp != null && dqParam.timestamp > 0) { dqParam.timestamp } else { System.currentTimeMillis } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala index fec27b1..1aa5039 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala @@ -66,10 +66,10 @@ case class BatchDQApp(allParam: AllParam) extends DQApp { // start time val startTime = new Date().getTime - val appTime = getAppTime - val contextId = ContextId(appTime) + val measureTime = getMeasureTime + val contextId = ContextId(measureTime) - // generate data sources + // get data sources val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.dataSources) dataSources.foreach(_.init) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala index e4a8108..d89b7e8 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala @@ -18,14 +18,17 @@ under the License. */ package org.apache.griffin.measure.launch.streaming -import java.util.{Timer, TimerTask} +import java.util.{Date, Timer, TimerTask} import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} +import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.params._ import org.apache.griffin.measure.context._ import org.apache.griffin.measure.context.datasource.DataSourceFactory -import org.apache.griffin.measure.context.streaming.info.InfoCacheInstance +import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache} +import org.apache.griffin.measure.context.streaming.metric.CacheResults +import org.apache.griffin.measure.job.builder.DQJobBuilder import org.apache.griffin.measure.launch.DQApp import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} @@ -87,8 +90,8 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp { }) // start time - val appTime = getAppTime - val contextId = ContextId(appTime) + val measureTime = getMeasureTime + val contextId = ContextId(measureTime) // generate data sources val dataSources = DataSourceFactory.getDataSources(sparkSession, ssc, dqParam.dataSources) @@ -104,13 +107,13 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp { globalContext.getPersist().start(applicationId) // process thread - val dqThread = StreamingDQApp2(globalContext, dqParam.evaluateRule) + val dqCalculator = StreamingDQCalculator(globalContext, dqParam.evaluateRule) val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match { case Some(interval) => interval case _ => throw new Exception("invalid batch interval") } - val process = TimingProcess(processInterval, dqThread) + val process = Scheduler(processInterval, dqCalculator) process.startup() ssc.start() @@ -150,7 +153,94 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp { } } - case class TimingProcess(interval: Long, runnable: Runnable) { + + /** + * + * @param globalContext + * @param evaluateRuleParam + */ + case class StreamingDQCalculator(globalContext: DQContext, + evaluateRuleParam: EvaluateRuleParam + ) extends Runnable with Loggable { + + val lock = InfoCacheInstance.genLock("process") + val appPersist = globalContext.getPersist() + + def run(): Unit = { + val updateTimeDate = new Date() + val updateTime = updateTimeDate.getTime + println(s"===== [${updateTimeDate}] process begins =====") + val locked = lock.lock(5, TimeUnit.SECONDS) + if (locked) { + try { + + TimeInfoCache.startTimeInfoCache + + val startTime = new Date().getTime + appPersist.log(startTime, s"starting process ...") + val contextId = ContextId(startTime) + + // create dq context + val dqContext: DQContext = globalContext.cloneDQContext(contextId) + + // build job + val dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam) + + // dq job execute + dqJob.execute(dqContext) + + // finish calculation + finishCalculation(dqContext) + + // end time + val endTime = new Date().getTime + appPersist.log(endTime, s"process using time: ${endTime - startTime} ms") + + TimeInfoCache.endTimeInfoCache + + // clean old data + cleanData(dqContext) + + } catch { + case e: Throwable => error(s"process error: ${e.getMessage}") + } finally { + lock.unlock() + } + } else { + println(s"===== [${updateTimeDate}] process ignores =====") + } + val endTime = new Date().getTime + println(s"===== [${updateTimeDate}] process ends, using ${endTime - updateTime} ms =====") + } + + // finish calculation for this round + private def finishCalculation(context: DQContext): Unit = { + context.dataSources.foreach(_.processFinish) + } + + // clean old data and old result cache + private def cleanData(context: DQContext): Unit = { + try { + context.dataSources.foreach(_.cleanOldData) + + context.clean() + + val cleanTime = TimeInfoCache.getCleanTime + CacheResults.refresh(cleanTime) + } catch { + case e: Throwable => error(s"clean data error: ${e.getMessage}") + } + } + + } + + + /** + * + * @param interval + * @param runnable + */ + case class Scheduler(interval: Long, runnable: Runnable) { val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor] http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala deleted file mode 100644 index 97ce980..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.launch.streaming - -import java.util.Date -import java.util.concurrent.TimeUnit - -import org.apache.griffin.measure.Loggable -import org.apache.griffin.measure.configuration.params._ -import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache} -import org.apache.griffin.measure.context.streaming.metric.CacheResults -import org.apache.griffin.measure.context.{ContextId, DQContext} -import org.apache.griffin.measure.job.builder.DQJobBuilder - -case class StreamingDQApp2(globalContext: DQContext, - evaluateRuleParam: EvaluateRuleParam - ) extends Runnable with Loggable { - - val lock = InfoCacheInstance.genLock("process") - val appPersist = globalContext.getPersist() - - def run(): Unit = { - val updateTimeDate = new Date() - val updateTime = updateTimeDate.getTime - println(s"===== [${updateTimeDate}] process begins =====") - val locked = lock.lock(5, TimeUnit.SECONDS) - if (locked) { - try { - - TimeInfoCache.startTimeInfoCache - - val startTime = new Date().getTime - appPersist.log(startTime, s"starting process ...") - val contextId = ContextId(startTime) - - // create dq context - val dqContext: DQContext = globalContext.cloneDQContext(contextId) - - // build job - val dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam) - - // dq job execute - dqJob.execute(dqContext) - - // finish calculation - finishCalculation(dqContext) - - // end time - val endTime = new Date().getTime - appPersist.log(endTime, s"process using time: ${endTime - startTime} ms") - - TimeInfoCache.endTimeInfoCache - - // clean old data - cleanData(dqContext) - - } catch { - case e: Throwable => error(s"process error: ${e.getMessage}") - } finally { - lock.unlock() - } - } else { - println(s"===== [${updateTimeDate}] process ignores =====") - } - val endTime = new Date().getTime - println(s"===== [${updateTimeDate}] process ends, using ${endTime - updateTime} ms =====") - } - - // finish calculation for this round - private def finishCalculation(context: DQContext): Unit = { - context.dataSources.foreach(_.processFinish) - } - - // clean old data and old result cache - private def cleanData(context: DQContext): Unit = { - try { - context.dataSources.foreach(_.cleanOldData) - - context.clean() - - val cleanTime = TimeInfoCache.getCleanTime - CacheResults.refresh(cleanTime) - } catch { - case e: Throwable => error(s"clean data error: ${e.getMessage}") - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala index 9a2c7b5..fa9e38b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala @@ -21,7 +21,7 @@ package org.apache.griffin.measure.step.builder import org.apache.griffin.measure.configuration.enums.NormalizeType import org.apache.griffin.measure.configuration.params.RuleParam import org.apache.griffin.measure.context.DQContext -import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, MetricWriteStep, RecordWriteStep} +import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep} import org.apache.griffin.measure.step.{DQStep, SeqDQStep} /** @@ -52,7 +52,7 @@ trait RuleParamStepBuilder extends DQStepBuilder { }.toSeq // update writer val dsCacheUpdateSteps = ruleParam.dsCacheUpdateOpt.map { dsCacheUpdate => - DsCacheUpdateWriteStep(dsCacheUpdate.dsName, name) + DataSourceUpdateWriteStep(dsCacheUpdate.dsName, name) }.toSeq metricSteps ++ recordSteps ++ dsCacheUpdateSteps http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala index a0e5ca3..9c14325 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala @@ -27,7 +27,7 @@ import org.apache.griffin.measure.step.builder.dsl.expr._ import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.AccuracyAnalyzer import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys import org.apache.griffin.measure.step.transform.{DataFrameOps, DataFrameOpsTransformStep, SparkSqlTransformStep} -import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, MetricWriteStep, RecordWriteStep} +import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep} import org.apache.griffin.measure.utils.ParamUtil._ /** @@ -91,7 +91,7 @@ case class AccuracyExpr2DQSteps(context: DQContext, case BatchProcessType => Nil case StreamingProcessType => { val dsName = ruleParam.dsCacheUpdateOpt.map(_.dsName).getOrElse(sourceName) - DsCacheUpdateWriteStep(dsName, missRecordsTableName) :: Nil + DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala index 3482955..1cf94e0 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala @@ -26,7 +26,7 @@ import org.apache.griffin.measure.step.builder.ConstantColumns import org.apache.griffin.measure.step.builder.dsl.expr.{DistinctnessClause, _} import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.DistinctnessAnalyzer import org.apache.griffin.measure.step.transform.SparkSqlTransformStep -import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, MetricWriteStep, RecordWriteStep} +import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep} import org.apache.griffin.measure.utils.ParamUtil._ /** @@ -133,7 +133,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext, val ((transSteps2, writeSteps2), dupCountTableName) = procType match { case StreamingProcessType if (withOlderTable) => { // 4.0 update old data - val targetDsUpdateWriteStep = DsCacheUpdateWriteStep(targetName, targetName) + val targetDsUpdateWriteStep = DataSourceUpdateWriteStep(targetName, targetName) // 4. older alias val olderAliasTableName = "__older" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala new file mode 100644 index 0000000..0472416 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala @@ -0,0 +1,61 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.write + +import org.apache.commons.lang.StringUtils +import org.apache.griffin.measure.context.DQContext +import org.apache.spark.sql.DataFrame + +/** + * update data source streaming cache + */ +case class DataSourceUpdateWriteStep(dsName: String, + inputName: String + ) extends WriteStep { + + val name: String = "" + val writeTimestampOpt: Option[Long] = None + + def execute(context: DQContext): Boolean = { + collectDsCacheUpdateDf(context) match { + case Some(df) => { + context.dataSources.find(ds => StringUtils.equals(ds.name, dsName)).foreach(_.updateData(df)) + } + case _ => { + warn(s"update ${dsName} from ${inputName} fails") + } + } + true + } + + private def getDataFrame(context: DQContext, name: String): Option[DataFrame] = { + try { + val df = context.sqlContext.table(s"`${name}`") + Some(df) + } catch { + case e: Throwable => { + error(s"get data frame ${name} fails") + None + } + } + } + + private def collectDsCacheUpdateDf(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName) + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala deleted file mode 100644 index 27dbb3c..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.step.write - -import org.apache.commons.lang.StringUtils -import org.apache.griffin.measure.context.DQContext -import org.apache.spark.sql.DataFrame - -/** - * update streaming data source cache - */ -case class DsCacheUpdateWriteStep(dsName: String, - inputName: String - ) extends WriteStep { - - val name: String = "" - val writeTimestampOpt: Option[Long] = None - - def execute(context: DQContext): Boolean = { - collectDsCacheUpdateDf(context) match { - case Some(df) => { - context.dataSources.find(ds => StringUtils.equals(ds.name, dsName)).foreach(_.updateData(df)) - } - case _ => { - warn(s"update ${dsName} from ${inputName} fails") - } - } - true - } - - private def getDataFrame(context: DQContext, name: String): Option[DataFrame] = { - try { - val df = context.sqlContext.table(s"`${name}`") - Some(df) - } catch { - case e: Throwable => { - error(s"get data frame ${name} fails") - None - } - } - } - - private def collectDsCacheUpdateDf(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName) - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/test/resources/env-batch.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/env-batch.json b/measure/src/test/resources/env-batch.json index 3e8aa80..0d6ea8a 100644 --- a/measure/src/test/resources/env-batch.json +++ b/measure/src/test/resources/env-batch.json @@ -20,15 +20,6 @@ "max.persist.lines": 10000, "max.lines.per.file": 10000 } - }, - { - "type": "http", - "config": { - "method": "post", - "api": "http://10.148.181.248:39200/griffin/accuracy", - "over.time": "1m", - "retry": 10 - } } ],
