http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala deleted file mode 100644 index 5eae2a0..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSource.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.source - -import org.apache.griffin.measure.cache.tmst._ -import org.apache.griffin.measure.data.connector._ -import org.apache.griffin.measure.data.connector.batch._ -import org.apache.griffin.measure.data.connector.streaming._ -import org.apache.griffin.measure.data.source.cache._ -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters, TimeRange} -import org.apache.griffin.measure.rule.plan.TimeInfo -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.apache.griffin.measure.utils.DataFrameUtil._ - -case class DataSource(sqlContext: SQLContext, - name: String, - baseline: Boolean, - dataConnectors: Seq[DataConnector], - dataSourceCacheOpt: Option[DataSourceCache] - ) extends Loggable with Serializable { - - val batchDataConnectors = DataConnectorFactory.filterBatchDataConnectors(dataConnectors) - val streamingDataConnectors = DataConnectorFactory.filterStreamingDataConnectors(dataConnectors) - streamingDataConnectors.foreach(_.dataSourceCacheOpt = dataSourceCacheOpt) - - val tmstCache: TmstCache = TmstCache() - - def init(): Unit = { - dataSourceCacheOpt.foreach(_.init) - dataConnectors.foreach(_.init) - - dataSourceCacheOpt.foreach(_.tmstCache = tmstCache) - dataConnectors.foreach(_.tmstCache = tmstCache) - } - - def loadData(timeInfo: TimeInfo): TimeRange = { - val calcTime = timeInfo.calcTime - println(s"load data [${name}]") - val (dfOpt, tmsts) = data(calcTime) - dfOpt match { - case Some(df) => { -// DataFrameCaches.cacheDataFrame(timeInfo.key, name, df) - TableRegisters.registerRunTempTable(df, timeInfo.key, name) - } - case None => { - warn(s"load data source [${name}] fails") - } - } - tmsts - } - - private def data(ms: Long): (Option[DataFrame], TimeRange) = { - val batches = batchDataConnectors.flatMap { dc => - val (dfOpt, timeRange) = dc.data(ms) - dfOpt match { - case Some(df) => Some((dfOpt, timeRange)) - case _ => None - } - } - val caches = dataSourceCacheOpt match { - case Some(dsc) => dsc.readData() :: Nil - case _ => Nil - } - val pairs = batches ++ caches - - if (pairs.nonEmpty) { - pairs.reduce { (a, b) => - (unionDfOpts(a._1, b._1), a._2.merge(b._2)) - } - } else { - (None, TimeRange.emptyTimeRange) - } - } - -// private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: Option[DataFrame] -// ): Option[DataFrame] = { -// (dfOpt1, dfOpt2) match { -// case (Some(df1), Some(df2)) => Some(unionDataFrames(df1, df2)) -// case (Some(df1), _) => dfOpt1 -// case (_, Some(df2)) => dfOpt2 -// case _ => None -// } -// } -// -// private def unionDataFrames(df1: DataFrame, df2: DataFrame): DataFrame = { -// try { -// val cols = df1.columns -// val rdd2 = df2.map{ row => -// val values = cols.map { col => -// row.getAs[Any](col) -// } -// Row(values: _*) -// } -// val ndf2 = sqlContext.createDataFrame(rdd2, df1.schema) -// df1 unionAll ndf2 -// } catch { -// case e: Throwable => df1 -// } -// } - - def updateData(df: DataFrame): Unit = { - dataSourceCacheOpt.foreach(_.updateData(Some(df))) - } - - def updateData(df: DataFrame, ms: Long): Unit = { -// dataSourceCacheOpt.foreach(_.updateData(df, ms)) - } - - def updateDataMap(dfMap: Map[Long, DataFrame]): Unit = { -// dataSourceCacheOpt.foreach(_.updateDataMap(dfMap)) - } - - def cleanOldData(): Unit = { -// dataSourceCacheOpt.foreach(_.cleanOldData) - dataSourceCacheOpt.foreach(_.cleanOutTimeData) - } - - def processFinish(): Unit = { - dataSourceCacheOpt.foreach(_.processFinish) - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala deleted file mode 100644 index 831e990..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/DataSourceFactory.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.source - -import org.apache.griffin.measure.config.params.user._ -import org.apache.griffin.measure.data.connector.DataConnectorFactory -import org.apache.griffin.measure.data.source.cache._ -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.process.engine._ -import org.apache.spark.sql.SQLContext -import org.apache.spark.streaming.StreamingContext - -import scala.util.{Success, Try} - -object DataSourceFactory extends Loggable { - - def genDataSources(sqlContext: SQLContext, ssc: StreamingContext, dqEngines: DqEngines, - dataSourceParams: Seq[DataSourceParam]) = { - val filteredDsParams = trimDataSourceParams(dataSourceParams) - filteredDsParams.zipWithIndex.flatMap { pair => - val (param, index) = pair - genDataSource(sqlContext, ssc, dqEngines, param, index) - } - } - - private def genDataSource(sqlContext: SQLContext, ssc: StreamingContext, - dqEngines: DqEngines, - dataSourceParam: DataSourceParam, - index: Int - ): Option[DataSource] = { - val name = dataSourceParam.name - val baseline = dataSourceParam.isBaseLine - val connectorParams = dataSourceParam.getConnectors - val cacheParam = dataSourceParam.cache - val dataConnectors = connectorParams.flatMap { connectorParam => - DataConnectorFactory.getDataConnector(sqlContext, ssc, dqEngines, connectorParam) match { - case Success(connector) => Some(connector) - case _ => None - } - } - val dataSourceCacheOpt = DataSourceCacheFactory.genDataSourceCache(sqlContext, cacheParam, name, index) - - Some(DataSource(sqlContext, name, baseline, dataConnectors, dataSourceCacheOpt)) - } - - private def trimDataSourceParams(dataSourceParams: Seq[DataSourceParam]): Seq[DataSourceParam] = { - val (validDsParams, _) = - dataSourceParams.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, dsParam) => - val (seq, names) = ret - if (dsParam.hasName && !names.contains(dsParam.name)) { - (seq :+ dsParam, names + dsParam.name) - } else ret - } - if (validDsParams.nonEmpty) { - val baselineDsParam = validDsParams.find(_.isBaseLine).getOrElse(validDsParams.head) - validDsParams.map { dsParam => - if (dsParam.name != baselineDsParam.name && dsParam.isBaseLine) { - dsParam.falseBaselineClone - } else dsParam - } - } else validDsParams - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala deleted file mode 100644 index 36c556b..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataCacheable.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.source.cache - -import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} - -trait DataCacheable { - - val cacheInfoPath: String - val readyTimeInterval: Long - val readyTimeDelay: Long - - def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}" - - def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath) - def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath) - def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath) - def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath) - def selfOldCacheIndex = TimeInfoCache.oldCacheIndex(selfCacheInfoPath) - - protected def submitCacheTime(ms: Long): Unit = { - val map = Map[String, String]((selfCacheTime -> ms.toString)) - InfoCacheInstance.cacheInfo(map) - } - - protected def submitReadyTime(ms: Long): Unit = { - val curReadyTime = ms - readyTimeDelay - if (curReadyTime % readyTimeInterval == 0) { - val map = Map[String, String]((selfReadyTime -> curReadyTime.toString)) - InfoCacheInstance.cacheInfo(map) - } - } - - protected def submitLastProcTime(ms: Long): Unit = { - val map = Map[String, String]((selfLastProcTime -> ms.toString)) - InfoCacheInstance.cacheInfo(map) - } - - protected def readLastProcTime(): Option[Long] = readSelfInfo(selfLastProcTime) - - protected def submitCleanTime(ms: Long): Unit = { - val cleanTime = genCleanTime(ms) - val map = Map[String, String]((selfCleanTime -> cleanTime.toString)) - InfoCacheInstance.cacheInfo(map) - } - - protected def genCleanTime(ms: Long): Long = ms - - protected def readCleanTime(): Option[Long] = readSelfInfo(selfCleanTime) - - protected def submitOldCacheIndex(index: Long): Unit = { - val map = Map[String, String]((selfOldCacheIndex -> index.toString)) - InfoCacheInstance.cacheInfo(map) - } - - protected def readOldCacheIndex(): Option[Long] = readSelfInfo(selfOldCacheIndex) - - private def readSelfInfo(key: String): Option[Long] = { - InfoCacheInstance.readInfo(key :: Nil).get(key).flatMap { v => - try { - Some(v.toLong) - } catch { - case _ => None - } - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala deleted file mode 100644 index f70bd11..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala +++ /dev/null @@ -1,380 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.source.cache - -import java.util.concurrent.TimeUnit - -import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} -import org.apache.griffin.measure.cache.tmst.TmstCache -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.process.temp.TimeRange -import org.apache.griffin.measure.rule.adaptor.InternalColumns -import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} -import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.spark.sql._ -import org.apache.spark.sql.functions.col -import org.apache.griffin.measure.utils.DataFrameUtil._ - -import scala.util.Random - -// data source cache process steps -// dump phase: save -// process phase: read -> process -> update -> finish -> clean old data -trait DataSourceCache extends DataCacheable with WithFanIn[Long] with Loggable with Serializable { - - val sqlContext: SQLContext - val param: Map[String, Any] - val dsName: String - val index: Int - - var tmstCache: TmstCache = _ - protected def fromUntilRangeTmsts(from: Long, until: Long) = tmstCache.range(from, until) - protected def clearTmst(t: Long) = tmstCache.remove(t) - protected def clearTmstsUntil(until: Long) = { - val outDateTmsts = tmstCache.until(until) - tmstCache.remove(outDateTmsts) - } - protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1) - protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1) - - val _FilePath = "file.path" - val _InfoPath = "info.path" - val _ReadyTimeInterval = "ready.time.interval" - val _ReadyTimeDelay = "ready.time.delay" - val _TimeRange = "time.range" - - val rdmStr = Random.alphanumeric.take(10).mkString - val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}" - val defInfoPath = s"${index}" - - val filePath: String = param.getString(_FilePath, defFilePath) - val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath) - val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L) - val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L) - val deltaTimeRange: (Long, Long) = { - def negative(n: Long): Long = if (n <= 0) n else 0 - param.get(_TimeRange) match { - case Some(seq: Seq[String]) => { - val nseq = seq.flatMap(TimeUtil.milliseconds(_)) - val ns = negative(nseq.headOption.getOrElse(0)) - val ne = negative(nseq.tail.headOption.getOrElse(0)) - (ns, ne) - } - case _ => (0, 0) - } - } - - val _ReadOnly = "read.only" - val readOnly = param.getBoolean(_ReadOnly, false) - - val _Updatable = "updatable" - val updatable = param.getBoolean(_Updatable, false) - -// val rowSepLiteral = "\n" -// val partitionUnits: List[String] = List("hour", "min", "sec") -// val minUnitTime: Long = TimeUtil.timeFromUnit(1, partitionUnits.last) - - val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new") - val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old") - - val newFilePath = s"${filePath}/new" - val oldFilePath = s"${filePath}/old" - - val defOldCacheIndex = 0L - - protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit - protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame - - def init(): Unit = {} - - // save new cache data only, need index for multiple streaming data connectors - def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = { - if (!readOnly) { - dfOpt match { - case Some(df) => { - df.cache - - // cache df - val cnt = df.count - println(s"save ${dsName} data count: ${cnt}") - - // lock makes it safer when writing new cache data - val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) - if (newCacheLocked) { - try { - val dfw = df.write.mode(SaveMode.Append).partitionBy(InternalColumns.tmst) - writeDataFrame(dfw, newFilePath) - } catch { - case e: Throwable => error(s"save data error: ${e.getMessage}") - } finally { - newCacheLock.unlock() - } - } - - // uncache - df.unpersist - } - case _ => { - info(s"no data frame to save") - } - } - - // submit cache time and ready time - if (fanIncrement(ms)) { - println(s"save data [${ms}] finish") - submitCacheTime(ms) - submitReadyTime(ms) - } - - } - } - - // read new cache data and old cache data - def readData(): (Option[DataFrame], TimeRange) = { - // time range: (a, b] - val timeRange = TimeInfoCache.getTimeRange - val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) - - // read partition info - val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) { - println(s"read time range: [${reviseTimeRange._1}]") - s"`${InternalColumns.tmst}` = ${reviseTimeRange._1}" - } else { - println(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]") - s"`${InternalColumns.tmst}` > ${reviseTimeRange._1} AND `${InternalColumns.tmst}` <= ${reviseTimeRange._2}" - } - - // new cache data - val newDfOpt = try { - val dfr = sqlContext.read - Some(readDataFrame(dfr, newFilePath).filter(filterStr)) - } catch { - case e: Throwable => { - warn(s"read data source cache warn: ${e.getMessage}") - None - } - } - - // old cache data - val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None - val oldDfOpt = oldCacheIndexOpt.flatMap { idx => - val oldDfPath = s"${oldFilePath}/${idx}" - try { - val dfr = sqlContext.read - Some(readDataFrame(dfr, oldDfPath).filter(filterStr)) - } catch { - case e: Throwable => { - warn(s"read old data source cache warn: ${e.getMessage}") - None - } - } - } - - // whole cache data - val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt) - - // from until tmst range - val (from, until) = (reviseTimeRange._1, reviseTimeRange._2) - val tmstSet = afterTilRangeTmsts(from, until) - - val retTimeRange = TimeRange(reviseTimeRange, tmstSet) - (cacheDfOpt, retTimeRange) - } - -// private def unionDfOpts(dfOpt1: Option[DataFrame], dfOpt2: Option[DataFrame] -// ): Option[DataFrame] = { -// (dfOpt1, dfOpt2) match { -// case (Some(df1), Some(df2)) => Some(unionByName(df1, df2)) -// case (Some(df1), _) => dfOpt1 -// case (_, Some(df2)) => dfOpt2 -// case _ => None -// } -// } -// -// private def unionByName(a: DataFrame, b: DataFrame): DataFrame = { -// val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq -// a.select(columns: _*).unionAll(b.select(columns: _*)) -// } - - private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String], - func: (Long, Long) => Boolean - ): Unit = { - val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func) - // delete out time data path - earlierOrEqPaths.foreach { path => - println(s"delete hdfs path: ${path}") - HdfsUtil.deleteHdfsPath(path) - } - } - private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: Option[String], - func: (Long, Long) => Boolean - ): Iterable[String] = { - val names = HdfsUtil.listSubPathsByType(path, "dir") - val regex = partitionOpt match { - case Some(partition) => s"^${partition}=(\\d+)$$".r - case _ => "^(\\d+)$".r - } - names.filter { name => - name match { - case regex(value) => { - str2Long(value) match { - case Some(t) => func(t, bound) - case _ => false - } - } - case _ => false - } - }.map(name => s"${path}/${name}") - } - private def str2Long(str: String): Option[Long] = { - try { - Some(str.toLong) - } catch { - case e: Throwable => None - } - } - - // clean out time from new cache data and old cache data - def cleanOutTimeData(): Unit = { - // clean tmst - val cleanTime = readCleanTime - cleanTime.foreach(clearTmstsTil(_)) - - if (!readOnly) { - // new cache data - val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime - newCacheCleanTime match { - case Some(nct) => { - // clean calculated new cache data - val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) - if (newCacheLocked) { - try { - cleanOutTimePartitions(newFilePath, nct, Some(InternalColumns.tmst), - (a: Long, b: Long) => (a <= b)) - } catch { - case e: Throwable => error(s"clean new cache data error: ${e.getMessage}") - } finally { - newCacheLock.unlock() - } - } - } - case _ => { - // do nothing - } - } - - // old cache data - val oldCacheCleanTime = if (updatable) readCleanTime else None - oldCacheCleanTime match { - case Some(oct) => { - val oldCacheIndexOpt = readOldCacheIndex - oldCacheIndexOpt.foreach { idx => - val oldDfPath = s"${oldFilePath}/${idx}" - val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) - if (oldCacheLocked) { - try { - // clean calculated old cache data - cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => (a < b)) - // clean out time old cache data not calculated -// cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst)) - } catch { - case e: Throwable => error(s"clean old cache data error: ${e.getMessage}") - } finally { - oldCacheLock.unlock() - } - } - } - } - case _ => { - // do nothing - } - } - } - } - - // update old cache data - def updateData(dfOpt: Option[DataFrame]): Unit = { - if (!readOnly && updatable) { - dfOpt match { - case Some(df) => { - // old cache lock - val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) - if (oldCacheLocked) { - try { - val oldCacheIndexOpt = readOldCacheIndex - val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1 - - val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}" -// val cleanTime = readCleanTime -// val updateDf = cleanTime match { -// case Some(ct) => { -// val filterStr = s"`${InternalColumns.tmst}` > ${ct}" -// df.filter(filterStr) -// } -// case _ => df -// } - val cleanTime = getNextCleanTime - val filterStr = s"`${InternalColumns.tmst}` > ${cleanTime}" - val updateDf = df.filter(filterStr) - - val prlCount = sqlContext.sparkContext.defaultParallelism - // coalesce -// val ptnCount = updateDf.rdd.getNumPartitions -// val repartitionedDf = if (prlCount < ptnCount) { -// updateDf.coalesce(prlCount) -// } else updateDf - // repartition - val repartitionedDf = updateDf.repartition(prlCount) - val dfw = repartitionedDf.write.mode(SaveMode.Overwrite) - writeDataFrame(dfw, oldDfPath) - - submitOldCacheIndex(nextOldCacheIndex) - } catch { - case e: Throwable => error(s"update data error: ${e.getMessage}") - } finally { - oldCacheLock.unlock() - } - } - } - case _ => { - info(s"no data frame to update") - } - } - } - } - - // process finish - def processFinish(): Unit = { - // next last proc time - val timeRange = TimeInfoCache.getTimeRange - submitLastProcTime(timeRange._2) - - // next clean time - val nextCleanTime = timeRange._2 + deltaTimeRange._1 - submitCleanTime(nextCleanTime) - } - - // read next clean time - private def getNextCleanTime(): Long = { - val timeRange = TimeInfoCache.getTimeRange - val nextCleanTime = timeRange._2 + deltaTimeRange._1 - nextCleanTime - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala deleted file mode 100644 index d03c181..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCacheFactory.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.source.cache - -import org.apache.griffin.measure.log.Loggable -import org.apache.spark.sql.SQLContext -import org.apache.griffin.measure.utils.ParamUtil._ - -object DataSourceCacheFactory extends Loggable { - - private object DataSourceCacheType { - val parquet = "^(?i)parq(uet)?$".r - val json = "^(?i)json$".r - val orc = "^(?i)orc$".r - } - import DataSourceCacheType._ - - val _type = "type" - - def genDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], - name: String, index: Int - ) = { - if (param != null) { - try { - val tp = param.getString(_type, "") - val dsCache = tp match { - case parquet() => ParquetDataSourceCache(sqlContext, param, name, index) - case json() => JsonDataSourceCache(sqlContext, param, name, index) - case orc() => OrcDataSourceCache(sqlContext, param, name, index) - case _ => ParquetDataSourceCache(sqlContext, param, name, index) - } - Some(dsCache) - } catch { - case e: Throwable => { - error(s"generate data source cache fails") - None - } - } - } else None - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala deleted file mode 100644 index 2fa5316..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.source.cache - -import org.apache.spark.sql._ - -case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], - dsName: String, index: Int - ) extends DataSourceCache { - - override def init(): Unit = { -// sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false"); - } - - def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { - println(s"write path: ${path}") - dfw.json(path) - } - - def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { - dfr.json(path) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala deleted file mode 100644 index 5bf2500..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.source.cache - -import org.apache.spark.sql._ - -case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], - dsName: String, index: Int - ) extends DataSourceCache { - - override def init(): Unit = { -// sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false"); - } - - def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { - println(s"write path: ${path}") - dfw.orc(path) - } - - def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { - dfr.orc(path) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala deleted file mode 100644 index f39d832..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.source.cache - -import org.apache.spark.sql._ - -case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], - dsName: String, index: Int - ) extends DataSourceCache { - - override def init(): Unit = { - sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") - } - - def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { - println(s"write path: ${path}") - dfw.parquet(path) - } - - def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { - dfr.parquet(path) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala deleted file mode 100644 index aa5e04d..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/WithFanIn.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.data.source.cache - -import java.util.concurrent.atomic.AtomicInteger -import scala.collection.concurrent.{TrieMap, Map => ConcMap} - -trait WithFanIn[T] { - - val totalNum: AtomicInteger = new AtomicInteger(0) - val fanInCountMap: ConcMap[T, Int] = TrieMap[T, Int]() - - def registerFanIn(): Int = { - totalNum.incrementAndGet() - } - - def fanIncrement(key: T): Boolean = { - fanInc(key) - fanInCountMap.get(key) match { - case Some(n) if (n >= totalNum.get) => { - fanInCountMap.remove(key) - true - } - case _ => false - } - } - - private def fanInc(key: T): Unit = { - fanInCountMap.get(key) match { - case Some(n) => { - val suc = fanInCountMap.replace(key, n, n + 1) - if (!suc) fanInc(key) - } - case _ => { - val oldOpt = fanInCountMap.putIfAbsent(key, 1) - if (oldOpt.nonEmpty) fanInc(key) - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala b/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala new file mode 100644 index 0000000..c943db9 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/job/DQJob.scala @@ -0,0 +1,32 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.job + +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.DQStep + +case class DQJob(dqSteps: Seq[DQStep]) extends Serializable { + + def execute(context: DQContext): Boolean = { + dqSteps.foldLeft(true) { (ret, dqStep) => + ret && dqStep.execute(context) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala new file mode 100644 index 0000000..a8e5b26 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala @@ -0,0 +1,68 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.job.builder + +import org.apache.griffin.measure.configuration.enums.DslType +import org.apache.griffin.measure.configuration.params._ +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.job._ +import org.apache.griffin.measure.step.DQStep +import org.apache.griffin.measure.step.builder.DQStepBuilder +import org.apache.griffin.measure.step.write.MetricFlushStep + +/** + * build dq job based on configuration + */ +object DQJobBuilder { + + /** + * build dq job with rule param + * @param context dq context + * @param evaluateRuleParam evaluate rule param + * @return dq job + */ + def buildDQJob(context: DQContext, evaluateRuleParam: EvaluateRuleParam): DQJob = { + val defaultDslType = evaluateRuleParam.getDslType + val ruleParams = evaluateRuleParam.getRules + buildDQJob(context, ruleParams, defaultDslType) + } + + /** + * build dq job with rules in evaluate rule param or pre-proc param + * @param context dq context + * @param ruleParams rule params + * @param defaultDslType default dsl type in evaluate rule param + * @return dq job + */ + def buildDQJob(context: DQContext, ruleParams: Seq[RuleParam], defaultDslType: DslType): DQJob = { + // build steps by datasources + val dsSteps = context.dataSources.flatMap { dataSource => + DQStepBuilder.buildStepOptByDataSourceParam(context, dataSource.dsParam) + } + // build steps by rules + val ruleSteps = ruleParams.flatMap { ruleParam => + DQStepBuilder.buildStepOptByRuleParam(context, ruleParam, defaultDslType) + } + // metric flush step + val metricFlushStep = MetricFlushStep() + + DQJob(dsSteps ++ ruleSteps :+ metricFlushStep) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala new file mode 100644 index 0000000..42364a8 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala @@ -0,0 +1,52 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.launch + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.configuration.params.{DQParam, EnvParam} + +import scala.util.Try + +/** + * dq application process + */ +trait DQApp extends Loggable with Serializable { + + val envParam: EnvParam + val dqParam: DQParam + + def init: Try[_] + + def run: Try[_] + + def close: Try[_] + + /** + * application will exit if it fails in run phase. + * if retryable is true, the exception will be threw to spark env, + * and enable retry strategy of spark application + */ + def retryable: Boolean + + protected def getAppTime: Long = { + if (dqParam.timestamp != null && dqParam.timestamp > 0) { dqParam.timestamp } + else { System.currentTimeMillis } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala new file mode 100644 index 0000000..fec27b1 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala @@ -0,0 +1,107 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.launch.batch + +import java.util.Date + +import org.apache.griffin.measure.configuration.enums._ +import org.apache.griffin.measure.configuration.params._ +import org.apache.griffin.measure.context._ +import org.apache.griffin.measure.context.datasource.DataSourceFactory +import org.apache.griffin.measure.job.builder.DQJobBuilder +import org.apache.griffin.measure.launch.DQApp +import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent +import org.apache.spark.SparkConf +import org.apache.spark.sql.{SQLContext, SparkSession} + +import scala.util.Try + +case class BatchDQApp(allParam: AllParam) extends DQApp { + + val envParam: EnvParam = allParam.envParam + val dqParam: DQParam = allParam.dqParam + + val sparkParam = envParam.sparkParam + val metricName = dqParam.name + val dataSourceParams = dqParam.dataSources + val dataSourceNames = dataSourceParams.map(_.name) + val persistParams = envParam.persistParams + + var sqlContext: SQLContext = _ + + implicit var sparkSession: SparkSession = _ + + def retryable: Boolean = false + + def init: Try[_] = Try { + // build spark 2.0+ application context + val conf = new SparkConf().setAppName(metricName) + conf.setAll(sparkParam.config) + conf.set("spark.sql.crossJoin.enabled", "true") + sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() + sparkSession.sparkContext.setLogLevel(sparkParam.logLevel) + sqlContext = sparkSession.sqlContext + + // register udf + GriffinUDFAgent.register(sqlContext) + } + + def run: Try[_] = Try { + // start time + val startTime = new Date().getTime + + val appTime = getAppTime + val contextId = ContextId(appTime) + + // generate data sources + val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.dataSources) + dataSources.foreach(_.init) + + // create dq context + val dqContext: DQContext = DQContext( + contextId, metricName, dataSources, persistParams, BatchProcessType + )(sparkSession) + + // start id + val applicationId = sparkSession.sparkContext.applicationId + dqContext.getPersist().start(applicationId) + + // build job + val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.evaluateRule) + + // dq job execute + dqJob.execute(dqContext) + + // end time + val endTime = new Date().getTime + dqContext.getPersist().log(endTime, s"process using time: ${endTime - startTime} ms") + + // clean context + dqContext.clean() + + // finish + dqContext.getPersist().finish() + } + + def close: Try[_] = Try { + sparkSession.close() + sparkSession.stop() + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala new file mode 100644 index 0000000..e4a8108 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala @@ -0,0 +1,177 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.launch.streaming + +import java.util.{Timer, TimerTask} +import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} + +import org.apache.griffin.measure.configuration.enums._ +import org.apache.griffin.measure.configuration.params._ +import org.apache.griffin.measure.context._ +import org.apache.griffin.measure.context.datasource.DataSourceFactory +import org.apache.griffin.measure.context.streaming.info.InfoCacheInstance +import org.apache.griffin.measure.launch.DQApp +import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent +import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} +import org.apache.spark.SparkConf +import org.apache.spark.sql.{SQLContext, SparkSession} +import org.apache.spark.streaming.{Milliseconds, StreamingContext} + +import scala.util.Try + +case class StreamingDQApp(allParam: AllParam) extends DQApp { + + val envParam: EnvParam = allParam.envParam + val dqParam: DQParam = allParam.dqParam + + val sparkParam = envParam.sparkParam + val metricName = dqParam.name + val dataSourceParams = dqParam.dataSources + val dataSourceNames = dataSourceParams.map(_.name) + val persistParams = envParam.persistParams + + var sqlContext: SQLContext = _ + + implicit var sparkSession: SparkSession = _ + + def retryable: Boolean = true + + def init: Try[_] = Try { + // build spark 2.0+ application context + val conf = new SparkConf().setAppName(metricName) + conf.setAll(sparkParam.config) + conf.set("spark.sql.crossJoin.enabled", "true") + sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() + sparkSession.sparkContext.setLogLevel(sparkParam.logLevel) + sqlContext = sparkSession.sqlContext + + // clear checkpoint directory + clearCpDir + + // init info cache instance + InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName) + InfoCacheInstance.init + + // register udf + GriffinUDFAgent.register(sqlContext) + } + + def run: Try[_] = Try { + + // streaming context + val ssc = StreamingContext.getOrCreate(sparkParam.cpDir, () => { + try { + createStreamingContext + } catch { + case e: Throwable => { + error(s"create streaming context error: ${e.getMessage}") + throw e + } + } + }) + + // start time + val appTime = getAppTime + val contextId = ContextId(appTime) + + // generate data sources + val dataSources = DataSourceFactory.getDataSources(sparkSession, ssc, dqParam.dataSources) + dataSources.foreach(_.init) + + // create dq context + val globalContext: DQContext = DQContext( + contextId, metricName, dataSources, persistParams, StreamingProcessType + )(sparkSession) + + // start id + val applicationId = sparkSession.sparkContext.applicationId + globalContext.getPersist().start(applicationId) + + // process thread + val dqThread = StreamingDQApp2(globalContext, dqParam.evaluateRule) + + val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match { + case Some(interval) => interval + case _ => throw new Exception("invalid batch interval") + } + val process = TimingProcess(processInterval, dqThread) + process.startup() + + ssc.start() + ssc.awaitTermination() + ssc.stop(stopSparkContext=true, stopGracefully=true) + + // clean context + globalContext.clean() + + // finish + globalContext.getPersist().finish() + + } + + def close: Try[_] = Try { + sparkSession.close() + sparkSession.stop() + } + + + def createStreamingContext: StreamingContext = { + val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match { + case Some(interval) => Milliseconds(interval) + case _ => throw new Exception("invalid batch interval") + } + val ssc = new StreamingContext(sparkSession.sparkContext, batchInterval) + ssc.checkpoint(sparkParam.cpDir) + + ssc + } + + private def clearCpDir: Unit = { + if (sparkParam.needInitClear) { + val cpDir = sparkParam.cpDir + info(s"clear checkpoint directory ${cpDir}") + HdfsUtil.deleteHdfsPath(cpDir) + } + } + + case class TimingProcess(interval: Long, runnable: Runnable) { + + val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor] + + val timer = new Timer("process", true) + + val timerTask = new TimerTask() { + override def run(): Unit = { + pool.submit(runnable) + } + } + + def startup(): Unit = { + timer.schedule(timerTask, interval, interval) + } + + def shutdown(): Unit = { + timer.cancel() + pool.shutdown() + pool.awaitTermination(10, TimeUnit.SECONDS) + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala new file mode 100644 index 0000000..97ce980 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala @@ -0,0 +1,104 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.launch.streaming + +import java.util.Date +import java.util.concurrent.TimeUnit + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.configuration.params._ +import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache} +import org.apache.griffin.measure.context.streaming.metric.CacheResults +import org.apache.griffin.measure.context.{ContextId, DQContext} +import org.apache.griffin.measure.job.builder.DQJobBuilder + +case class StreamingDQApp2(globalContext: DQContext, + evaluateRuleParam: EvaluateRuleParam + ) extends Runnable with Loggable { + + val lock = InfoCacheInstance.genLock("process") + val appPersist = globalContext.getPersist() + + def run(): Unit = { + val updateTimeDate = new Date() + val updateTime = updateTimeDate.getTime + println(s"===== [${updateTimeDate}] process begins =====") + val locked = lock.lock(5, TimeUnit.SECONDS) + if (locked) { + try { + + TimeInfoCache.startTimeInfoCache + + val startTime = new Date().getTime + appPersist.log(startTime, s"starting process ...") + val contextId = ContextId(startTime) + + // create dq context + val dqContext: DQContext = globalContext.cloneDQContext(contextId) + + // build job + val dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam) + + // dq job execute + dqJob.execute(dqContext) + + // finish calculation + finishCalculation(dqContext) + + // end time + val endTime = new Date().getTime + appPersist.log(endTime, s"process using time: ${endTime - startTime} ms") + + TimeInfoCache.endTimeInfoCache + + // clean old data + cleanData(dqContext) + + } catch { + case e: Throwable => error(s"process error: ${e.getMessage}") + } finally { + lock.unlock() + } + } else { + println(s"===== [${updateTimeDate}] process ignores =====") + } + val endTime = new Date().getTime + println(s"===== [${updateTimeDate}] process ends, using ${endTime - updateTime} ms =====") + } + + // finish calculation for this round + private def finishCalculation(context: DQContext): Unit = { + context.dataSources.foreach(_.processFinish) + } + + // clean old data and old result cache + private def cleanData(context: DQContext): Unit = { + try { + context.dataSources.foreach(_.cleanOldData) + + context.clean() + + val cleanTime = TimeInfoCache.getCleanTime + CacheResults.refresh(cleanTime) + } catch { + case e: Throwable => error(s"clean data error: ${e.getMessage}") + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala b/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala deleted file mode 100644 index 265a8cd..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/log/Loggable.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.log - -import org.slf4j.LoggerFactory - -trait Loggable { - - @transient private lazy val logger = LoggerFactory.getLogger(getClass) - - protected def info(msg: String): Unit = { - logger.info(msg) - } - - protected def debug(msg: String): Unit = { - logger.debug(msg) - } - - protected def warn(msg: String): Unit = { - logger.warn(msg) - } - - protected def error(msg: String): Unit = { - logger.error(msg) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala deleted file mode 100644 index 11c44d8..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/HdfsPersist.scala +++ /dev/null @@ -1,344 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.persist - -import java.util.Date - -import org.apache.griffin.measure.result._ -import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame - -import scala.util.Try -import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.spark.TaskContext - -// persist result and data to hdfs -case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { - - val Path = "path" - val MaxPersistLines = "max.persist.lines" - val MaxLinesPerFile = "max.lines.per.file" - - val path = config.getOrElse(Path, "").toString - val maxPersistLines = config.getInt(MaxPersistLines, -1) - val maxLinesPerFile = math.min(config.getInt(MaxLinesPerFile, 10000), 1000000) - - val separator = "/" - - val StartFile = filePath("_START") - val FinishFile = filePath("_FINISH") - val MetricsFile = filePath("_METRICS") - -// val MissRecFile = filePath("_MISSREC") // optional -// val MatchRecFile = filePath("_MATCHREC") // optional - - val LogFile = filePath("_LOG") - - val _MetricName = "metricName" - val _Timestamp = "timestamp" - val _Value = "value" - - var _init = true - private def isInit = { - val i = _init - _init = false - i - } - - def available(): Boolean = { - path.nonEmpty - } - - private def persistHead: String = { - val dt = new Date(timeStamp) - s"================ log of ${dt} ================\n" - } - - private def timeHead(rt: Long): String = { - val dt = new Date(rt) - s"--- ${dt} ---\n" - } - - protected def filePath(file: String): String = { - HdfsUtil.getHdfsFilePath(path, s"${metricName}/${timeStamp}/${file}") - } - - protected def withSuffix(path: String, suffix: String): String = { - s"${path}.${suffix}" - } - - def start(msg: String): Unit = { - try { - HdfsUtil.writeContent(StartFile, msg) - } catch { - case e: Throwable => error(e.getMessage) - } - } - def finish(): Unit = { - try { - HdfsUtil.createEmptyFile(FinishFile) - } catch { - case e: Throwable => error(e.getMessage) - } - } - -// def result(rt: Long, result: Result): Unit = { -// try { -// val resStr = result match { -// case ar: AccuracyResult => { -// s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}" -// } -// case pr: ProfileResult => { -// s"match percentage: ${pr.matchPercentage}\ntotal count: ${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}" -// } -// case _ => { -// s"result: ${result}" -// } -// } -// HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr) -// log(rt, resStr) -// -// info(resStr) -// } catch { -// case e: Throwable => error(e.getMessage) -// } -// } - - // need to avoid string too long -// private def rddRecords(records: RDD[String], path: String): Unit = { -// try { -// val recordCount = records.count -// val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) -// if (count > 0) { -// val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt -// if (groupCount <= 1) { -// val recs = records.take(count.toInt) -// persistRecords(path, recs) -// } else { -// val groupedRecords: RDD[(Long, Iterable[String])] = -// records.zipWithIndex.flatMap { r => -// val gid = r._2 / maxLinesPerFile -// if (gid < groupCount) Some((gid, r._1)) else None -// }.groupByKey() -// groupedRecords.foreach { group => -// val (gid, recs) = group -// val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString) -// persistRecords(hdfsPath, recs) -// } -// } -// } -// } catch { -// case e: Throwable => error(e.getMessage) -// } -// } -// -// private def iterableRecords(records: Iterable[String], path: String): Unit = { -// try { -// val recordCount = records.size -// val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) -// if (count > 0) { -// val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt -// if (groupCount <= 1) { -// val recs = records.take(count.toInt) -// persistRecords(path, recs) -// } else { -// val groupedRecords = records.grouped(groupCount).zipWithIndex -// groupedRecords.take(groupCount).foreach { group => -// val (recs, gid) = group -// val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString) -// persistRecords(hdfsPath, recs) -// } -// } -// } -// } catch { -// case e: Throwable => error(e.getMessage) -// } -// } -// -// def records(recs: RDD[String], tp: String): Unit = { -// tp match { -// case PersistDataType.MISS => rddRecords(recs, MissRecFile) -// case PersistDataType.MATCH => rddRecords(recs, MatchRecFile) -// case _ => {} -// } -// } -// -// def records(recs: Iterable[String], tp: String): Unit = { -// tp match { -// case PersistDataType.MISS => iterableRecords(recs, MissRecFile) -// case PersistDataType.MATCH => iterableRecords(recs, MatchRecFile) -// case _ => {} -// } -// } - -// private def persistRecords2Hdfs(hdfsPath: String, rdd: RDD[String]): Unit = { -// try { -//// rdd.saveAsTextFile(hdfsPath) -// val recStr = rdd.collect().mkString("\n") -// HdfsUtil.writeContent(hdfsPath, recStr) -// } catch { -// case e: Throwable => error(e.getMessage) -// } -// } - private def persistRecords2Hdfs(hdfsPath: String, records: Iterable[String]): Unit = { - try { - val recStr = records.mkString("\n") - HdfsUtil.writeContent(hdfsPath, recStr) - } catch { - case e: Throwable => error(e.getMessage) - } - } - - def log(rt: Long, msg: String): Unit = { - try { - val logStr = (if (isInit) persistHead else "") + timeHead(rt) + s"${msg}\n\n" - HdfsUtil.appendContent(LogFile, logStr) - } catch { - case e: Throwable => error(e.getMessage) - } - } - - private def getHdfsPath(path: String, groupId: Int): String = { - HdfsUtil.getHdfsFilePath(path, s"${groupId}") -// if (groupId == 0) path else withSuffix(path, s"${groupId}") - } - private def getHdfsPath(path: String, ptnId: Int, groupId: Int): String = { - HdfsUtil.getHdfsFilePath(path, s"${ptnId}.${groupId}") -// if (ptnId == 0 && groupId == 0) path else withSuffix(path, s"${ptnId}.${groupId}") - } - - private def clearOldRecords(path: String): Unit = { - HdfsUtil.deleteHdfsPath(path) - } - - def persistRecords(df: DataFrame, name: String): Unit = { - val path = filePath(name) - clearOldRecords(path) - try { - val recordCount = df.count - val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) - val maxCount = count.toInt - if (maxCount > 0) { - val recDf = df.limit(maxCount) - recDf.toJSON.foreachPartition { ptn => - val ptnid = TaskContext.getPartitionId() - val groupedRecords = ptn.grouped(maxLinesPerFile).zipWithIndex - groupedRecords.foreach { group => - val (recs, gid) = group - val hdfsPath = getHdfsPath(path, ptnid, gid) - persistRecords2Hdfs(hdfsPath, recs) - } - } - } - } catch { - case e: Throwable => error(e.getMessage) - } - } - - def persistRecords(records: RDD[String], name: String): Unit = { - val path = filePath(name) - clearOldRecords(path) - try { - val recordCount = records.count - val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) - if (count > 0) { - val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt - if (groupCount <= 1) { - val recs = records.take(count.toInt) - persistRecords2Hdfs(path, recs) - } else { - val groupedRecords: RDD[(Long, Iterable[String])] = - records.zipWithIndex.flatMap { r => - val gid = r._2 / maxLinesPerFile - if (gid < groupCount) Some((gid, r._1)) else None - }.groupByKey() - groupedRecords.foreach { group => - val (gid, recs) = group - val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString) - persistRecords2Hdfs(hdfsPath, recs) - } - } - } - } catch { - case e: Throwable => error(e.getMessage) - } - } - - def persistRecords(records: Iterable[String], name: String): Unit = { - val path = filePath(name) - clearOldRecords(path) - try { - val recordCount = records.size - val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) - if (count > 0) { - val groupCount = (count - 1) / maxLinesPerFile + 1 - if (groupCount <= 1) { - val recs = records.take(count.toInt) - persistRecords2Hdfs(path, recs) - } else { - val groupedRecords = records.grouped(maxLinesPerFile).zipWithIndex - groupedRecords.take(groupCount).foreach { group => - val (recs, gid) = group - val hdfsPath = getHdfsPath(path, gid) - persistRecords2Hdfs(hdfsPath, recs) - } - } - } - } catch { - case e: Throwable => error(e.getMessage) - } - } - -// def persistMetrics(metrics: Seq[String], name: String): Unit = { -// val path = filePath(name) -// try { -// val recordCount = metrics.size -// val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) -// if (count > 0) { -// val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt -// if (groupCount <= 1) { -// val recs = metrics.take(count.toInt) -// persistRecords(path, recs) -// } else { -// val groupedRecords = metrics.grouped(groupCount).zipWithIndex -// groupedRecords.take(groupCount).foreach { group => -// val (recs, gid) = group -// val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString) -// persistRecords(hdfsPath, recs) -// } -// } -// } -// } catch { -// case e: Throwable => error(e.getMessage) -// } -// } - - def persistMetrics(metrics: Map[String, Any]): Unit = { - val head = Map[String, Any]((_MetricName -> metricName), (_Timestamp -> timeStamp)) - val result = head + (_Value -> metrics) - try { - val json = JsonUtil.toJson(result) - persistRecords2Hdfs(MetricsFile, json :: Nil) - } catch { - case e: Throwable => error(e.getMessage) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala deleted file mode 100644 index e41ae55..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.persist - -import org.apache.griffin.measure.result._ -import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame - -import scala.util.Try -import org.apache.griffin.measure.utils.ParamUtil._ - -// persist result by http way -case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { - - val Api = "api" - val Method = "method" - - val api = config.getString(Api, "") - val method = config.getString(Method, "post") - - val _Value = "value" - - def available(): Boolean = { - api.nonEmpty - } - - def start(msg: String): Unit = {} - def finish(): Unit = {} - -// def result(rt: Long, result: Result): Unit = { -// result match { -// case ar: AccuracyResult => { -// val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch)) -// httpResult(dataMap) -// } -// case pr: ProfileResult => { -// val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> pr.getTotal), ("matched" -> pr.getMatch)) -// httpResult(dataMap) -// } -// case _ => { -// info(s"result: ${result}") -// } -// } -// } - - private def httpResult(dataMap: Map[String, Any]) = { - try { - val data = JsonUtil.toJson(dataMap) - // post - val params = Map[String, Object]() - val header = Map[String, Object](("Content-Type","application/json")) - - def func(): Boolean = { - HttpUtil.httpRequest(api, method, params, header, data) - } - - PersistThreadPool.addTask(func _, 10) - -// val status = HttpUtil.httpRequest(api, method, params, header, data) -// info(s"${method} to ${api} response status: ${status}") - } catch { - case e: Throwable => error(e.getMessage) - } - - } - -// def records(recs: RDD[String], tp: String): Unit = {} -// def records(recs: Iterable[String], tp: String): Unit = {} - -// def missRecords(records: RDD[String]): Unit = {} -// def matchRecords(records: RDD[String]): Unit = {} - - def log(rt: Long, msg: String): Unit = {} - - def persistRecords(df: DataFrame, name: String): Unit = {} - def persistRecords(records: RDD[String], name: String): Unit = {} - def persistRecords(records: Iterable[String], name: String): Unit = {} - -// def persistMetrics(metrics: Seq[String], name: String): Unit = { -// val maps = metrics.flatMap { m => -// try { -// Some(JsonUtil.toAnyMap(m) ++ Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp))) -// } catch { -// case e: Throwable => None -// } -// } -// maps.foreach { map => -// httpResult(map) -// } -// } - - def persistMetrics(metrics: Map[String, Any]): Unit = { - val head = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp)) - val result = head + (_Value -> metrics) - httpResult(result) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala deleted file mode 100644 index d9a601a..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala +++ /dev/null @@ -1,184 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.persist - -import java.util.Date - -import org.apache.griffin.measure.result._ -import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame -import org.apache.griffin.measure.utils.ParamUtil._ - -// persist result and data to hdfs -case class LoggerPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { - - val MaxLogLines = "max.log.lines" - - val maxLogLines = config.getInt(MaxLogLines, 100) - - def available(): Boolean = true - - def start(msg: String): Unit = { - println(s"[${timeStamp}] ${metricName} start: ${msg}") - } - def finish(): Unit = { - println(s"[${timeStamp}] ${metricName} finish") - } - -// def result(rt: Long, result: Result): Unit = { -// try { -// val resStr = result match { -// case ar: AccuracyResult => { -// s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}" -// } -// case pr: ProfileResult => { -// s"match percentage: ${pr.matchPercentage}\ntotal count: ${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}" -// } -// case _ => { -// s"result: ${result}" -// } -// } -// println(s"[${timeStamp}] ${metricName} result: \n${resStr}") -// } catch { -// case e: Throwable => error(e.getMessage) -// } -// } -// -// // need to avoid string too long -// private def rddRecords(records: RDD[String]): Unit = { -// try { -// val recordCount = records.count.toInt -// val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) -// if (count > 0) { -// val recordsArray = records.take(count) -//// recordsArray.foreach(println) -// } -// } catch { -// case e: Throwable => error(e.getMessage) -// } -// } - -// private def iterableRecords(records: Iterable[String]): Unit = { -// try { -// val recordCount = records.size -// val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) -// if (count > 0) { -// val recordsArray = records.take(count) -//// recordsArray.foreach(println) -// } -// } catch { -// case e: Throwable => error(e.getMessage) -// } -// } - -// def records(recs: RDD[String], tp: String): Unit = { -// tp match { -// case PersistDataType.MISS => rddRecords(recs) -// case PersistDataType.MATCH => rddRecords(recs) -// case _ => {} -// } -// } -// -// def records(recs: Iterable[String], tp: String): Unit = { -// tp match { -// case PersistDataType.MISS => iterableRecords(recs) -// case PersistDataType.MATCH => iterableRecords(recs) -// case _ => {} -// } -// } - -// def missRecords(records: RDD[String]): Unit = { -// warn(s"[${timeStamp}] ${metricName} miss records: ") -// rddRecords(records) -// } -// def matchRecords(records: RDD[String]): Unit = { -// warn(s"[${timeStamp}] ${metricName} match records: ") -// rddRecords(records) -// } - - def log(rt: Long, msg: String): Unit = { - println(s"[${timeStamp}] ${rt}: ${msg}") - } - - def persistRecords(df: DataFrame, name: String): Unit = { -// println(s"${metricName} [${timeStamp}] records: ") -// try { -// val recordCount = df.count -// val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) -// val maxCount = count.toInt -// if (maxCount > 0) { -// val recDf = df.limit(maxCount) -// val recordsArray = recDf.toJSON.collect() -// recordsArray.foreach(println) -// } -// } catch { -// case e: Throwable => error(e.getMessage) -// } - } - - def persistRecords(records: RDD[String], name: String): Unit = { -// println(s"${metricName} [${timeStamp}] records: ") -// try { -// val recordCount = records.count -// val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) -// val maxCount = count.toInt -// if (maxCount > 0) { -// val recordsArray = records.take(maxCount) -// recordsArray.foreach(println) -// } -// } catch { -// case e: Throwable => error(e.getMessage) -// } - } - - def persistRecords(records: Iterable[String], name: String): Unit = { -// println(s"${metricName} [${timeStamp}] records: ") -// try { -// val recordCount = records.size -// val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) -// if (count > 0) { -// records.foreach(println) -// } -// } catch { -// case e: Throwable => error(e.getMessage) -// } - } - -// def persistMetrics(metrics: Seq[String], name: String): Unit = { -// try { -// val recordCount = metrics.size -// val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) -// if (count > 0) { -// val recordsArray = metrics.take(count) -// recordsArray.foreach(println) -// } -// } catch { -// case e: Throwable => error(e.getMessage) -// } -// } - - def persistMetrics(metrics: Map[String, Any]): Unit = { - println(s"${metricName} [${timeStamp}] metrics: ") - val json = JsonUtil.toJson(metrics) - println(json) - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/MongoPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/MongoPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/MongoPersist.scala deleted file mode 100644 index b5923ce..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/MongoPersist.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.persist - -import org.mongodb.scala._ -import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame -import org.mongodb.scala.model.{Filters, UpdateOptions, Updates} -import org.mongodb.scala.result.UpdateResult - -import scala.concurrent.Future -import scala.util.{Failure, Success} - - -case class MongoPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { - - MongoConnection.init(config) - - val _MetricName = "metricName" - val _Timestamp = "timestamp" - val _Value = "value" - - def available(): Boolean = MongoConnection.dataConf.available - - def start(msg: String): Unit = {} - def finish(): Unit = {} - - def log(rt: Long, msg: String): Unit = {} - - def persistRecords(df: DataFrame, name: String): Unit = {} - def persistRecords(records: RDD[String], name: String): Unit = {} - def persistRecords(records: Iterable[String], name: String): Unit = {} - - def persistMetrics(metrics: Map[String, Any]): Unit = { - mongoInsert(metrics) - } - - private val filter = Filters.and( - Filters.eq(_MetricName, metricName), - Filters.eq(_Timestamp, timeStamp) - ) - - private def mongoInsert(dataMap: Map[String, Any]): Unit = { - try { - val update = Updates.set(_Value, dataMap) - def func(): (Long, Future[UpdateResult]) = { - (timeStamp, MongoConnection.getDataCollection.updateOne( - filter, update, UpdateOptions().upsert(true)).toFuture) - } - MongoThreadPool.addTask(func _, 10) - } catch { - case e: Throwable => error(e.getMessage) - } - } - -} - -case class MongoConf(url: String, database: String, collection: String) { - def available: Boolean = url.nonEmpty && database.nonEmpty && collection.nonEmpty -} - -object MongoConnection { - - val _MongoHead = "mongodb://" - - val Url = "url" - val Database = "database" - val Collection = "collection" - - private var initialed = false - - var dataConf: MongoConf = _ - private var dataCollection: MongoCollection[Document] = _ - - def getDataCollection = dataCollection - - def init(config: Map[String, Any]): Unit = { - if (!initialed) { - dataConf = mongoConf(config) - dataCollection = mongoCollection(dataConf) - initialed = true - } - } - - private def mongoConf(cfg: Map[String, Any]): MongoConf = { - val url = cfg.getString(Url, "").trim - val mongoUrl = if (url.startsWith(_MongoHead)) url else { - _MongoHead + url - } - MongoConf( - mongoUrl, - cfg.getString(Database, ""), - cfg.getString(Collection, "") - ) - } - private def mongoCollection(mongoConf: MongoConf): MongoCollection[Document] = { - val mongoClient: MongoClient = MongoClient(mongoConf.url) - val database: MongoDatabase = mongoClient.getDatabase(mongoConf.database) - database.getCollection(mongoConf.collection) - } - -} \ No newline at end of file
