http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/TimeRange.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/TimeRange.scala b/measure/src/main/scala/org/apache/griffin/measure/context/TimeRange.scala new file mode 100644 index 0000000..4df700b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/TimeRange.scala @@ -0,0 +1,48 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context + +import scala.math.{max, min} + +case class TimeRange(begin: Long, end: Long, tmsts: Set[Long]) extends Serializable { + def merge(tr: TimeRange): TimeRange = { + TimeRange(min(begin, tr.begin), max(end, tr.end), tmsts ++ tr.tmsts) + } + def minTmstOpt: Option[Long] = { + try { + if (tmsts.nonEmpty) Some(tmsts.min) else None + } catch { + case _: Throwable => None + } + } +} + +object TimeRange { + val emptyTimeRange = TimeRange(0, 0, Set[Long]()) + def apply(range: (Long, Long), tmsts: Set[Long]): TimeRange = TimeRange(range._1, range._2, tmsts) + def apply(ts: Long, tmsts: Set[Long]): TimeRange = TimeRange(ts, ts, tmsts) + def apply(ts: Long): TimeRange = TimeRange(ts, ts, Set[Long](ts)) + def apply(tmsts: Set[Long]): TimeRange = { + try { + TimeRange(tmsts.min, tmsts.max, tmsts) + } catch { + case _: Throwable => emptyTimeRange + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala new file mode 100644 index 0000000..09ab9ea --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala @@ -0,0 +1,97 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.configuration.params.DataSourceParam +import org.apache.griffin.measure.context.datasource.cache.DataSourceCache +import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange} +import org.apache.griffin.measure.context.datasource.connector.DataConnector +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.griffin.measure.utils.DataFrameUtil._ +import org.apache.spark.sql._ + +/** + * data source + * @param name name of data source + * @param dsParam param of this data source + * @param dataConnectors list of data connectors + * @param dataSourceCacheOpt data source cache option in streaming mode + */ +case class DataSource(name: String, + dsParam: DataSourceParam, + dataConnectors: Seq[DataConnector], + dataSourceCacheOpt: Option[DataSourceCache] + ) extends Loggable with Serializable { + + def init(): Unit = { + dataConnectors.foreach(_.init) + } + + def loadData(context: DQContext): TimeRange = { + info(s"load data [${name}]") + val timestamp = context.contextId.timestamp + val (dfOpt, timeRange) = data(timestamp) + dfOpt match { + case Some(df) => { + context.runTimeTableRegister.registerTable(name, df) + } + case None => { + warn(s"load data source [${name}] fails") + } + } + timeRange + } + + private def data(timestamp: Long): (Option[DataFrame], TimeRange) = { + val batches = dataConnectors.flatMap { dc => + val (dfOpt, timeRange) = dc.data(timestamp) + dfOpt match { + case Some(df) => Some((dfOpt, timeRange)) + case _ => None + } + } + val caches = dataSourceCacheOpt match { + case Some(dsc) => dsc.readData() :: Nil + case _ => Nil + } + val pairs = batches ++ caches + + if (pairs.size > 0) { + pairs.reduce { (a, b) => + (unionDfOpts(a._1, b._1), a._2.merge(b._2)) + } + } else { + (None, TimeRange.emptyTimeRange) + } + } + + def updateData(df: DataFrame): Unit = { + dataSourceCacheOpt.foreach(_.updateData(Some(df))) + } + + def cleanOldData(): Unit = { + dataSourceCacheOpt.foreach(_.cleanOutTimeData) + } + + def processFinish(): Unit = { + dataSourceCacheOpt.foreach(_.processFinish) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala new file mode 100644 index 0000000..a22b856 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala @@ -0,0 +1,66 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.configuration.params.DataSourceParam +import org.apache.griffin.measure.context.datasource.cache.DataSourceCacheFactory +import org.apache.griffin.measure.context.datasource.connector.{DataConnector, DataConnectorFactory} +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.StreamingContext + +import scala.util.Success + +object DataSourceFactory extends Loggable { + + def getDataSources(sparkSession: SparkSession, + ssc: StreamingContext, + dataSourceParams: Seq[DataSourceParam] + ): Seq[DataSource] = { + dataSourceParams.zipWithIndex.flatMap { pair => + val (param, index) = pair + getDataSource(sparkSession, ssc, param, index) + } + } + + private def getDataSource(sparkSession: SparkSession, + ssc: StreamingContext, + dataSourceParam: DataSourceParam, + index: Int + ): Option[DataSource] = { + val name = dataSourceParam.name + val connectorParams = dataSourceParam.getConnectors + val tmstCache = TmstCache() + + val dataSourceCacheOpt = DataSourceCacheFactory.getDataSourceCacheOpt( + sparkSession.sqlContext, dataSourceParam.cache, name, index, tmstCache) + + val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam => + DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam, + tmstCache, dataSourceCacheOpt) match { + case Success(connector) => Some(connector) + case _ => None + } + } + + Some(DataSource(name, dataSourceParam, dataConnectors, dataSourceCacheOpt)) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala new file mode 100644 index 0000000..c70fd20 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala @@ -0,0 +1,366 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.cache + +import java.util.concurrent.TimeUnit + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.context.TimeRange +import org.apache.griffin.measure.context.datasource.info.{DataSourceCacheable, TmstCache} +import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache} +import org.apache.griffin.measure.step.builder.ConstantColumns +import org.apache.griffin.measure.utils.DataFrameUtil._ +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} +import org.apache.spark.sql._ + +import scala.util.Random + +/** + * data source cache in streaming mode + * save data frame into hdfs in dump phase + * read data frame from hdfs in calculate phase + * with update and clean actions for the cache data + */ +trait DataSourceCache extends DataSourceCacheable with WithFanIn[Long] with Loggable with Serializable { + + val sqlContext: SQLContext + val param: Map[String, Any] + val dsName: String + val index: Int + + val tmstCache: TmstCache + protected def fromUntilRangeTmsts(from: Long, until: Long) = tmstCache.fromUntil(from, until) + protected def clearTmst(t: Long) = tmstCache.remove(t) + protected def clearTmstsUntil(until: Long) = { + val outDateTmsts = tmstCache.until(until) + tmstCache.remove(outDateTmsts) + } + protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1) + protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1) + + val _FilePath = "file.path" + val _InfoPath = "info.path" + val _ReadyTimeInterval = "ready.time.interval" + val _ReadyTimeDelay = "ready.time.delay" + val _TimeRange = "time.range" + + val rdmStr = Random.alphanumeric.take(10).mkString + val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}" + val defInfoPath = s"${index}" + + val filePath: String = param.getString(_FilePath, defFilePath) + val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath) + val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L) + val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L) + val deltaTimeRange: (Long, Long) = { + def negative(n: Long): Long = if (n <= 0) n else 0 + param.get(_TimeRange) match { + case Some(seq: Seq[String]) => { + val nseq = seq.flatMap(TimeUtil.milliseconds(_)) + val ns = negative(nseq.headOption.getOrElse(0)) + val ne = negative(nseq.tail.headOption.getOrElse(0)) + (ns, ne) + } + case _ => (0, 0) + } + } + + val _ReadOnly = "read.only" + val readOnly = param.getBoolean(_ReadOnly, false) + + val _Updatable = "updatable" + val updatable = param.getBoolean(_Updatable, false) + + val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new") + val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old") + + val newFilePath = s"${filePath}/new" + val oldFilePath = s"${filePath}/old" + + val defOldCacheIndex = 0L + + protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit + protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame + + /** + * save data frame in dump phase + * @param dfOpt data frame to be saved + * @param ms timestamp of this data frame + */ + def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = { + if (!readOnly) { + dfOpt match { + case Some(df) => { + // cache df + df.cache + + // cache df + val cnt = df.count + info(s"save ${dsName} data count: ${cnt}") + + if (cnt > 0) { + // lock makes it safer when writing new cache data + val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) + if (newCacheLocked) { + try { + val dfw = df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst) + writeDataFrame(dfw, newFilePath) + } catch { + case e: Throwable => error(s"save data error: ${e.getMessage}") + } finally { + newCacheLock.unlock() + } + } + } + + // uncache df + df.unpersist + } + case _ => { + info(s"no data frame to save") + } + } + + // submit cache time and ready time + if (fanIncrement(ms)) { + info(s"save data [${ms}] finish") + submitCacheTime(ms) + submitReadyTime(ms) + } + + } + } + + /** + * read data frame in calculation phase + * @return data frame to calculate, with the time range of data + */ + def readData(): (Option[DataFrame], TimeRange) = { + // time range: (a, b] + val timeRange = TimeInfoCache.getTimeRange + val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) + + // read partition info + val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) { + info(s"read time range: [${reviseTimeRange._1}]") + s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}" + } else { + info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]") + s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}" + } + + // new cache data + val newDfOpt = try { + val dfr = sqlContext.read + Some(readDataFrame(dfr, newFilePath).filter(filterStr)) + } catch { + case e: Throwable => { + warn(s"read data source cache warn: ${e.getMessage}") + None + } + } + + // old cache data + val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None + val oldDfOpt = oldCacheIndexOpt.flatMap { idx => + val oldDfPath = s"${oldFilePath}/${idx}" + try { + val dfr = sqlContext.read + Some(readDataFrame(dfr, oldDfPath).filter(filterStr)) + } catch { + case e: Throwable => { + warn(s"read old data source cache warn: ${e.getMessage}") + None + } + } + } + + // whole cache data + val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt) + + // from until tmst range + val (from, until) = (reviseTimeRange._1, reviseTimeRange._2) + val tmstSet = afterTilRangeTmsts(from, until) + + val retTimeRange = TimeRange(reviseTimeRange, tmstSet) + (cacheDfOpt, retTimeRange) + } + + private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String], + func: (Long, Long) => Boolean + ): Unit = { + val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func) + // delete out time data path + earlierOrEqPaths.foreach { path => + info(s"delete hdfs path: ${path}") + HdfsUtil.deleteHdfsPath(path) + } + } + private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: Option[String], + func: (Long, Long) => Boolean + ): Iterable[String] = { + val names = HdfsUtil.listSubPathsByType(path, "dir") + val regex = partitionOpt match { + case Some(partition) => s"^${partition}=(\\d+)$$".r + case _ => "^(\\d+)$".r + } + names.filter { name => + name match { + case regex(value) => { + str2Long(value) match { + case Some(t) => func(t, bound) + case _ => false + } + } + case _ => false + } + }.map(name => s"${path}/${name}") + } + private def str2Long(str: String): Option[Long] = { + try { + Some(str.toLong) + } catch { + case e: Throwable => None + } + } + + /** + * clean out-time cached data on hdfs + */ + def cleanOutTimeData(): Unit = { + // clean tmst + val cleanTime = readCleanTime + cleanTime.foreach(clearTmstsTil(_)) + + if (!readOnly) { + // new cache data + val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime + newCacheCleanTime match { + case Some(nct) => { + // clean calculated new cache data + val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) + if (newCacheLocked) { + try { + cleanOutTimePartitions(newFilePath, nct, Some(ConstantColumns.tmst), + (a: Long, b: Long) => (a <= b)) + } catch { + case e: Throwable => error(s"clean new cache data error: ${e.getMessage}") + } finally { + newCacheLock.unlock() + } + } + } + case _ => { + // do nothing + } + } + + // old cache data + val oldCacheCleanTime = if (updatable) readCleanTime else None + oldCacheCleanTime match { + case Some(oct) => { + val oldCacheIndexOpt = readOldCacheIndex + oldCacheIndexOpt.foreach { idx => + val oldDfPath = s"${oldFilePath}/${idx}" + val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) + if (oldCacheLocked) { + try { + // clean calculated old cache data + cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => (a < b)) + // clean out time old cache data not calculated +// cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst)) + } catch { + case e: Throwable => error(s"clean old cache data error: ${e.getMessage}") + } finally { + oldCacheLock.unlock() + } + } + } + } + case _ => { + // do nothing + } + } + } + } + + /** + * update old cached data by new data frame + * @param dfOpt data frame to update old cached data + */ + def updateData(dfOpt: Option[DataFrame]): Unit = { + if (!readOnly && updatable) { + dfOpt match { + case Some(df) => { + // old cache lock + val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) + if (oldCacheLocked) { + try { + val oldCacheIndexOpt = readOldCacheIndex + val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1 + + val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}" + val cleanTime = getNextCleanTime + val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}" + val updateDf = df.filter(filterStr) + + val prlCount = sqlContext.sparkContext.defaultParallelism + // repartition + val repartitionedDf = updateDf.repartition(prlCount) + val dfw = repartitionedDf.write.mode(SaveMode.Overwrite) + writeDataFrame(dfw, oldDfPath) + + submitOldCacheIndex(nextOldCacheIndex) + } catch { + case e: Throwable => error(s"update data error: ${e.getMessage}") + } finally { + oldCacheLock.unlock() + } + } + } + case _ => { + info(s"no data frame to update") + } + } + } + } + + /** + * each time calculation phase finishes, + * data source cache needs to submit some cache information + */ + def processFinish(): Unit = { + // next last proc time + val timeRange = TimeInfoCache.getTimeRange + submitLastProcTime(timeRange._2) + + // next clean time + val nextCleanTime = timeRange._2 + deltaTimeRange._1 + submitCleanTime(nextCleanTime) + } + + // read next clean time + private def getNextCleanTime(): Long = { + val timeRange = TimeInfoCache.getTimeRange + val nextCleanTime = timeRange._2 + deltaTimeRange._1 + nextCleanTime + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala new file mode 100644 index 0000000..ca882e0 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala @@ -0,0 +1,68 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.cache + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.spark.sql.SQLContext + +object DataSourceCacheFactory extends Loggable { + + private object DataSourceCacheType { + val ParquetRegex = "^(?i)parq(uet)?$".r + val JsonRegex = "^(?i)json$".r + val OrcRegex = "^(?i)orc$".r + } + import DataSourceCacheType._ + + val _type = "type" + + /** + * create data source cache + * @param sqlContext sqlContext in spark environment + * @param param data source cache config + * @param name data source name + * @param index data source index + * @param tmstCache the same tmstCache instance inside a data source + * @return data source option + */ + def getDataSourceCacheOpt(sqlContext: SQLContext, param: Map[String, Any], + name: String, index: Int, tmstCache: TmstCache + ): Option[DataSourceCache] = { + if (param != null) { + try { + val tp = param.getString(_type, "") + val dsCache = tp match { + case ParquetRegex() => ParquetDataSourceCache(sqlContext, param, name, index, tmstCache) + case JsonRegex() => JsonDataSourceCache(sqlContext, param, name, index, tmstCache) + case OrcRegex() => OrcDataSourceCache(sqlContext, param, name, index, tmstCache) + case _ => ParquetDataSourceCache(sqlContext, param, name, index, tmstCache) + } + Some(dsCache) + } catch { + case e: Throwable => { + error(s"generate data source cache fails") + None + } + } + } else None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala new file mode 100644 index 0000000..cb01274 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.cache + +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.spark.sql._ + +/** + * data source cache in json format + */ +case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], + dsName: String, index: Int, tmstCache: TmstCache + ) extends DataSourceCache { + + protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { + info(s"write path: ${path}") + dfw.json(path) + } + + protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { + dfr.json(path) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala new file mode 100644 index 0000000..daba15f --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.cache + +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.spark.sql._ + +/** + * data source cache in orc format + */ +case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], + dsName: String, index: Int, tmstCache: TmstCache + ) extends DataSourceCache { + + protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { + info(s"write path: ${path}") + dfw.orc(path) + } + + protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { + dfr.orc(path) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala new file mode 100644 index 0000000..f00c6a3 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala @@ -0,0 +1,42 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.cache + +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.spark.sql._ + +/** + * data source cache in parquet format + */ +case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, Any], + dsName: String, index: Int, tmstCache: TmstCache + ) extends DataSourceCache { + + sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") + + protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = { + info(s"write path: ${path}") + dfw.parquet(path) + } + + protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = { + dfr.parquet(path) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala new file mode 100644 index 0000000..ebd2e55 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala @@ -0,0 +1,69 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.cache + +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.concurrent.{TrieMap, Map => ConcMap} + +/** + * fan in trait, for multiple input and one output + * to support multiple parallel data connectors in one data source + */ +trait WithFanIn[T] { + + // total input number + val totalNum: AtomicInteger = new AtomicInteger(0) + // concurrent map of fan in count for each key + val fanInCountMap: ConcMap[T, Int] = TrieMap[T, Int]() + + def registerFanIn(): Int = { + totalNum.incrementAndGet() + } + + /** + * increment for a key, to test if all parallel inputs finished + * @param key + * @return + */ + def fanIncrement(key: T): Boolean = { + fanInc(key) + fanInCountMap.get(key) match { + case Some(n) if (n >= totalNum.get) => { + fanInCountMap.remove(key) + true + } + case _ => false + } + } + + private def fanInc(key: T): Unit = { + fanInCountMap.get(key) match { + case Some(n) => { + val suc = fanInCountMap.replace(key, n, n + 1) + if (!suc) fanInc(key) + } + case _ => { + val oldOpt = fanInCountMap.putIfAbsent(key, 1) + if (oldOpt.nonEmpty) fanInc(key) + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala new file mode 100644 index 0000000..6dc17d1 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala @@ -0,0 +1,112 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.connector + +import java.util.concurrent.atomic.AtomicLong + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.configuration.enums.{BatchProcessType, DslType, SparkSqlType} +import org.apache.griffin.measure.configuration.params.DataConnectorParam +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange} +import org.apache.griffin.measure.job.builder.DQJobBuilder +import org.apache.griffin.measure.step.builder.ConstantColumns +import org.apache.griffin.measure.step.builder.preproc.PreProcRuleParamGenerator +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.functions._ + +trait DataConnector extends Loggable with Serializable { + + @transient val sparkSession: SparkSession + + val dcParam: DataConnectorParam + + val id: String = DataConnectorIdGenerator.genId + protected def thisName(suffix: String): String = s"this_${suffix}" + + val tmstCache: TmstCache + protected def saveTmst(t: Long) = tmstCache.insert(t) + protected def readTmst(t: Long) = tmstCache.fromUntil(t, t + 1) + + def init(): Unit + + // get data frame in batch mode + def data(ms: Long): (Option[DataFrame], TimeRange) + + private def createContext(t: Long): DQContext = { + DQContext(ContextId(t, id), id, Nil, Nil, BatchProcessType)(sparkSession) + } + + def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = { + // new context + val context = createContext(ms) + + val timestamp = context.contextId.timestamp + val suffix = context.contextId.id + val thisTable = thisName(suffix) + + try { + saveTmst(timestamp) // save timestamp + + dfOpt.flatMap { df => + val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.preProc, suffix) + + // init data + context.compileTableRegister.registerTable(thisTable) + context.runTimeTableRegister.registerTable(thisTable, df) + + // build job + val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules, SparkSqlType) + + // job execute + preprocJob.execute(context) + + // out data + val outDf = context.sparkSession.table(s"`${thisTable}`") + + // add tmst column + val withTmstDf = outDf.withColumn(ConstantColumns.tmst, lit(timestamp)) + + // clean context + context.clean() + + Some(withTmstDf) + } + + } catch { + case e: Throwable => { + error(s"pre-process of data connector [${id}] error: ${e.getMessage}") + None + } + } + } +} + +object DataConnectorIdGenerator { + private val counter: AtomicLong = new AtomicLong(0L) + private val head: String = "dc" + + def genId: String = { + s"${head}${increment}" + } + + private def increment: Long = { + counter.incrementAndGet() + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala new file mode 100644 index 0000000..ea22309 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala @@ -0,0 +1,125 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.connector + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.configuration.params.DataConnectorParam +import org.apache.griffin.measure.context.datasource.cache.DataSourceCache +import org.apache.griffin.measure.context.datasource.connector.batch._ +import org.apache.griffin.measure.context.datasource.connector.streaming._ +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.StreamingContext + +import scala.reflect.ClassTag +import scala.util.Try + +object DataConnectorFactory extends Loggable { + + val HiveRegex = """^(?i)hive$""".r + val AvroRegex = """^(?i)avro$""".r + val TextDirRegex = """^(?i)text-dir$""".r + + val KafkaRegex = """^(?i)kafka$""".r + + /** + * create data connector + * @param sparkSession spark env + * @param ssc spark streaming env + * @param dcParam data connector param + * @param tmstCache same tmst cache in one data source + * @param dataSourceCacheOpt for streaming data connector + * @return data connector + */ + def getDataConnector(sparkSession: SparkSession, + ssc: StreamingContext, + dcParam: DataConnectorParam, + tmstCache: TmstCache, + dataSourceCacheOpt: Option[DataSourceCache] + ): Try[DataConnector] = { + val conType = dcParam.conType + val version = dcParam.version + Try { + conType match { + case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache) + case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache) + case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache) + case KafkaRegex() => { + getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, dataSourceCacheOpt) + } + case _ => throw new Exception("connector creation error!") + } + } + } + + private def getStreamingDataConnector(sparkSession: SparkSession, + ssc: StreamingContext, + dcParam: DataConnectorParam, + tmstCache: TmstCache, + dataSourceCacheOpt: Option[DataSourceCache] + ): StreamingDataConnector = { + if (ssc == null) throw new Exception("streaming context is null!") + val conType = dcParam.conType + val version = dcParam.version + conType match { + case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, dataSourceCacheOpt) + case _ => throw new Exception("streaming connector creation error!") + } + } + + private def getKafkaDataConnector(sparkSession: SparkSession, + ssc: StreamingContext, + dcParam: DataConnectorParam, + tmstCache: TmstCache, + dataSourceCacheOpt: Option[DataSourceCache] + ): KafkaStreamingDataConnector = { + val KeyType = "key.type" + val ValueType = "value.type" + val config = dcParam.config + val keyType = config.getOrElse(KeyType, "java.lang.String").toString + val valueType = config.getOrElse(ValueType, "java.lang.String").toString + (getClassTag(keyType), getClassTag(valueType)) match { + case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => { + KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, dataSourceCacheOpt) + } + case _ => { + throw new Exception("not supported type kafka data connector") + } + } + } + + private def getClassTag(tp: String): ClassTag[_] = { + try { + val clazz = Class.forName(tp) + ClassTag(clazz) + } catch { + case e: Throwable => throw e + } + } + +// def filterDataConnectors[T <: DataConnector : ClassTag](connectors: Seq[DataConnector]): Seq[T] = { +// connectors.flatMap { dc => +// dc match { +// case mdc: T => Some(mdc) +// case _ => None +// } +// } +// } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala new file mode 100644 index 0000000..1ee6e78 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala @@ -0,0 +1,71 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.connector.batch + +import org.apache.griffin.measure.configuration.params.DataConnectorParam +import org.apache.griffin.measure.context.TimeRange +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.griffin.measure.utils.HdfsUtil +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.griffin.measure.utils.ParamUtil._ + +/** + * batch data connector for avro file + */ +case class AvroBatchDataConnector(@transient sparkSession: SparkSession, + dcParam: DataConnectorParam, + tmstCache: TmstCache + ) extends BatchDataConnector { + + val config = dcParam.config + + val FilePath = "file.path" + val FileName = "file.name" + + val filePath = config.getString(FilePath, "") + val fileName = config.getString(FileName, "") + + val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName + + private def pathPrefix(): Boolean = { + filePath.nonEmpty + } + + private def fileExist(): Boolean = { + HdfsUtil.existPath(concreteFileFullPath) + } + + def data(ms: Long): (Option[DataFrame], TimeRange) = { + val dfOpt = try { + val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath) + val dfOpt = Some(df) + val preDfOpt = preProcess(dfOpt, ms) + preDfOpt + } catch { + case e: Throwable => { + error(s"load avro file ${concreteFileFullPath} fails") + None + } + } + val tmsts = readTmst(ms) + (dfOpt, TimeRange(ms, tmsts)) + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala new file mode 100644 index 0000000..8f32687 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala @@ -0,0 +1,27 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.connector.batch + +import org.apache.griffin.measure.context.datasource.connector.DataConnector + +trait BatchDataConnector extends DataConnector { + + def init(): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala new file mode 100644 index 0000000..85cd120 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala @@ -0,0 +1,86 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.connector.batch + +import org.apache.griffin.measure.configuration.params.DataConnectorParam +import org.apache.griffin.measure.context.TimeRange +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.griffin.measure.utils.ParamUtil._ + +/** + * batch data connector for hive table + */ +case class HiveBatchDataConnector(@transient sparkSession: SparkSession, + dcParam: DataConnectorParam, + tmstCache: TmstCache + ) extends BatchDataConnector { + + val config = dcParam.config + + val Database = "database" + val TableName = "table.name" + val Where = "where" + + val database = config.getString(Database, "default") + val tableName = config.getString(TableName, "") + val whereString = config.getString(Where, "") + + val concreteTableName = s"${database}.${tableName}" + val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty) + + def data(ms: Long): (Option[DataFrame], TimeRange) = { + val dfOpt = try { + val dtSql = dataSql + info(dtSql) + val df = sparkSession.sql(dtSql) + val dfOpt = Some(df) + val preDfOpt = preProcess(dfOpt, ms) + preDfOpt + } catch { + case e: Throwable => { + error(s"load hive table ${concreteTableName} fails: ${e.getMessage}") + None + } + } + val tmsts = readTmst(ms) + (dfOpt, TimeRange(ms, tmsts)) + } + + + private def tableExistsSql(): String = { +// s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql + s"tableName LIKE '${tableName}'" + } + + private def metaDataSql(): String = { + s"DESCRIBE ${concreteTableName}" + } + + private def dataSql(): String = { + val tableClause = s"SELECT * FROM ${concreteTableName}" + if (wheres.length > 0) { + val clauses = wheres.map { w => + s"${tableClause} WHERE ${w}" + } + clauses.mkString(" UNION ALL ") + } else tableClause + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala new file mode 100644 index 0000000..ca5b7b5 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala @@ -0,0 +1,106 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.connector.batch + +import org.apache.griffin.measure.configuration.params.DataConnectorParam +import org.apache.griffin.measure.context.TimeRange +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.griffin.measure.utils.HdfsUtil +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.griffin.measure.utils.ParamUtil._ + +/** + * batch data connector for directory with text format data in the nth depth sub-directories + */ +case class TextDirBatchDataConnector(@transient sparkSession: SparkSession, + dcParam: DataConnectorParam, + tmstCache: TmstCache + ) extends BatchDataConnector { + + val config = dcParam.config + + val DirPath = "dir.path" + val DataDirDepth = "data.dir.depth" + val SuccessFile = "success.file" + val DoneFile = "done.file" + + val dirPath = config.getString(DirPath, "") + val dataDirDepth = config.getInt(DataDirDepth, 0) + val successFile = config.getString(SuccessFile, "_SUCCESS") + val doneFile = config.getString(DoneFile, "_DONE") + + val ignoreFilePrefix = "_" + + private def dirExist(): Boolean = { + HdfsUtil.existPath(dirPath) + } + + def data(ms: Long): (Option[DataFrame], TimeRange) = { + val dfOpt = try { + val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable) + // touch done file for read dirs + dataDirs.foreach(dir => touchDone(dir)) + + val validDataDirs = dataDirs.filter(dir => !emptyDir(dir)) + + if (validDataDirs.nonEmpty) { + val df = sparkSession.read.text(validDataDirs: _*) + val dfOpt = Some(df) + val preDfOpt = preProcess(dfOpt, ms) + preDfOpt + } else { + None + } + } catch { + case e: Throwable => { + error(s"load text dir ${dirPath} fails: ${e.getMessage}") + None + } + } + val tmsts = readTmst(ms) + (dfOpt, TimeRange(ms, tmsts)) + } + + private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = { + val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, "dir", true) } + if (depth <= 0) { + subDirs.filter(filteFunc) + } else { + listSubDirs(subDirs, depth - 1, filteFunc) + } + } + + private def readable(dir: String): Boolean = isSuccess(dir) && !isDone(dir) + private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, doneFile) + private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, successFile) + + private def touchDone(dir: String): Unit = HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile)) + + private def emptyDir(dir: String): Boolean = { + HdfsUtil.listSubPathsByType(dir, "file").filter(!_.startsWith(ignoreFilePrefix)).size == 0 + } + +// def metaData(): Try[Iterable[(String, String)]] = { +// Try { +// val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema +// st.fields.map(f => (f.name, f.dataType.typeName)) +// } +// } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala new file mode 100644 index 0000000..9fe4876 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala @@ -0,0 +1,85 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.connector.streaming + +import kafka.serializer.Decoder +import org.apache.spark.streaming.dstream.InputDStream + +import scala.util.{Failure, Success, Try} +import org.apache.griffin.measure.utils.ParamUtil._ + +/** + * streaming data connector for kafka + */ +trait KafkaStreamingDataConnector extends StreamingDataConnector { + + type KD <: Decoder[K] + type VD <: Decoder[V] + type OUT = (K, V) + + val config = dcParam.config + + val KafkaConfig = "kafka.config" + val Topics = "topics" + + val kafkaConfig = config.getAnyRef(KafkaConfig, Map[String, String]()) + val topics = config.getString(Topics, "") + + def init(): Unit = { + // register fan in + dataSourceCacheOpt.foreach(_.registerFanIn) + + val ds = stream match { + case Success(dstream) => dstream + case Failure(ex) => throw ex + } + ds.foreachRDD((rdd, time) => { + val ms = time.milliseconds + val saveDfOpt = try { + // coalesce partition number + val prlCount = rdd.sparkContext.defaultParallelism + val ptnCount = rdd.getNumPartitions + val repartitionedRdd = if (prlCount < ptnCount) { + rdd.coalesce(prlCount) + } else rdd + + val dfOpt = transform(repartitionedRdd) + + // pre-process + preProcess(dfOpt, ms) + } catch { + case e: Throwable => { + error(s"streaming data connector error: ${e.getMessage}") + None + } + } + + // save data frame + dataSourceCacheOpt.foreach(_.saveData(saveDfOpt, ms)) + }) + } + + def stream(): Try[InputDStream[OUT]] = Try { + val topicSet = topics.split(",").toSet + createDStream(topicSet) + } + + protected def createDStream(topicSet: Set[String]): InputDStream[OUT] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala new file mode 100644 index 0000000..b483933 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala @@ -0,0 +1,71 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.connector.streaming + +import kafka.serializer.StringDecoder +import org.apache.griffin.measure.configuration.params.DataConnectorParam +import org.apache.griffin.measure.context.datasource.cache.DataSourceCache +import org.apache.griffin.measure.context.datasource.info.TmstCache +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.{Row, _} +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.dstream.InputDStream +import org.apache.spark.streaming.kafka.KafkaUtils + +/** + * streaming data connector for kafka with string format key and value + */ +case class KafkaStreamingStringDataConnector(@transient sparkSession: SparkSession, + @transient ssc: StreamingContext, + dcParam: DataConnectorParam, + tmstCache: TmstCache, + dataSourceCacheOpt: Option[DataSourceCache] + ) extends KafkaStreamingDataConnector { + + type K = String + type KD = StringDecoder + type V = String + type VD = StringDecoder + + val valueColName = "value" + val schema = StructType(Array( + StructField(valueColName, StringType) + )) + + def createDStream(topicSet: Set[String]): InputDStream[OUT] = { + KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet) + } + + def transform(rdd: RDD[OUT]): Option[DataFrame] = { + if (rdd.isEmpty) None else { + try { + val rowRdd = rdd.map(d => Row(d._2)) + val df = sparkSession.createDataFrame(rowRdd, schema) + Some(df) + } catch { + case e: Throwable => { + error(s"streaming data transform fails") + None + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala new file mode 100644 index 0000000..3b2c355 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala @@ -0,0 +1,46 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.connector.streaming + +import org.apache.griffin.measure.context.TimeRange +import org.apache.griffin.measure.context.datasource.cache.DataSourceCache +import org.apache.griffin.measure.context.datasource.connector.DataConnector +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.streaming.dstream.InputDStream + +import scala.util.Try + +trait StreamingDataConnector extends DataConnector { + + type K + type V + type OUT + + protected def stream(): Try[InputDStream[OUT]] + + // transform rdd to dataframe + def transform(rdd: RDD[OUT]): Option[DataFrame] + + // streaming data connector cannot directly read data frame + def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange.emptyTimeRange) + + val dataSourceCacheOpt: Option[DataSourceCache] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala new file mode 100644 index 0000000..f721a1e --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala @@ -0,0 +1,88 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.info + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache} + +/** + * timestamp info of data source cache + */ +trait DataSourceCacheable extends Loggable with Serializable { + + val cacheInfoPath: String + val readyTimeInterval: Long + val readyTimeDelay: Long + + def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}" + + def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath) + def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath) + def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath) + def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath) + def selfOldCacheIndex = TimeInfoCache.oldCacheIndex(selfCacheInfoPath) + + protected def submitCacheTime(ms: Long): Unit = { + val map = Map[String, String]((selfCacheTime -> ms.toString)) + InfoCacheInstance.cacheInfo(map) + } + + protected def submitReadyTime(ms: Long): Unit = { + val curReadyTime = ms - readyTimeDelay + if (curReadyTime % readyTimeInterval == 0) { + val map = Map[String, String]((selfReadyTime -> curReadyTime.toString)) + InfoCacheInstance.cacheInfo(map) + } + } + + protected def submitLastProcTime(ms: Long): Unit = { + val map = Map[String, String]((selfLastProcTime -> ms.toString)) + InfoCacheInstance.cacheInfo(map) + } + + protected def readLastProcTime(): Option[Long] = readSelfInfo(selfLastProcTime) + + protected def submitCleanTime(ms: Long): Unit = { + val cleanTime = genCleanTime(ms) + val map = Map[String, String]((selfCleanTime -> cleanTime.toString)) + InfoCacheInstance.cacheInfo(map) + } + + protected def genCleanTime(ms: Long): Long = ms + + protected def readCleanTime(): Option[Long] = readSelfInfo(selfCleanTime) + + protected def submitOldCacheIndex(index: Long): Unit = { + val map = Map[String, String]((selfOldCacheIndex -> index.toString)) + InfoCacheInstance.cacheInfo(map) + } + + def readOldCacheIndex(): Option[Long] = readSelfInfo(selfOldCacheIndex) + + private def readSelfInfo(key: String): Option[Long] = { + InfoCacheInstance.readInfo(key :: Nil).get(key).flatMap { v => + try { + Some(v.toLong) + } catch { + case _ => None + } + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala new file mode 100644 index 0000000..4b1c410 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala @@ -0,0 +1,47 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.datasource.info + +import org.apache.griffin.measure.Loggable + +import scala.collection.mutable.{SortedSet => MutableSortedSet} + +/** + * tmst cache, CRUD of timestamps + */ +case class TmstCache() extends Loggable { + + private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long] + + //-- insert tmst into tmst group -- + def insert(tmst: Long) = tmstGroup += tmst + def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts + + //-- remove tmst from tmst group -- + def remove(tmst: Long) = tmstGroup -= tmst + def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts + + //-- get subset of tmst group -- + def fromUntil(from: Long, until: Long) = tmstGroup.range(from, until).toSet + def afterTil(after: Long, til: Long) = tmstGroup.range(after + 1, til + 1).toSet + def until(until: Long) = tmstGroup.until(until).toSet + def from(from: Long) = tmstGroup.from(from).toSet + def all = tmstGroup.toSet + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala new file mode 100644 index 0000000..e1d498b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala @@ -0,0 +1,39 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.streaming.info + +import org.apache.griffin.measure.Loggable +import org.apache.griffin.measure.context.streaming.lock.CacheLock + +trait InfoCache extends Loggable with Serializable { + + def init(): Unit + def available(): Boolean + def close(): Unit + + def cacheInfo(info: Map[String, String]): Boolean + def readInfo(keys: Iterable[String]): Map[String, String] + def deleteInfo(keys: Iterable[String]): Unit + def clearInfo(): Unit + + def listKeys(path: String): List[String] + + def genLock(s: String): CacheLock + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala new file mode 100644 index 0000000..85106b4 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala @@ -0,0 +1,41 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.streaming.info + +import org.apache.griffin.measure.configuration.params.InfoCacheParam + +import scala.util.{Success, Try} + +case class InfoCacheFactory(infoCacheParams: Iterable[InfoCacheParam], metricName: String) extends Serializable { + + val ZK_REGEX = """^(?i)zk|zookeeper$""".r + + def getInfoCache(infoCacheParam: InfoCacheParam): Option[InfoCache] = { + val config = infoCacheParam.config + val infoCacheTry = infoCacheParam.cacheType match { + case ZK_REGEX() => Try(ZKInfoCache(config, metricName)) + case _ => throw new Exception("not supported info cache type") + } + infoCacheTry match { + case Success(infoCache) => Some(infoCache) + case _ => None + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala new file mode 100644 index 0000000..9e8a9f6 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala @@ -0,0 +1,53 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.streaming.info + +import org.apache.griffin.measure.configuration.params.InfoCacheParam +import org.apache.griffin.measure.context.streaming.lock.{CacheLock, MultiCacheLock} + +object InfoCacheInstance extends InfoCache { + var infoCaches: List[InfoCache] = Nil + + def initInstance(infoCacheParams: Iterable[InfoCacheParam], metricName: String) = { + val fac = InfoCacheFactory(infoCacheParams, metricName) + infoCaches = infoCacheParams.flatMap(param => fac.getInfoCache(param)).toList + } + + def init(): Unit = infoCaches.foreach(_.init) + def available(): Boolean = infoCaches.foldLeft(false)(_ || _.available) + def close(): Unit = infoCaches.foreach(_.close) + + def cacheInfo(info: Map[String, String]): Boolean = { + infoCaches.foldLeft(false) { (res, infoCache) => res || infoCache.cacheInfo(info) } + } + def readInfo(keys: Iterable[String]): Map[String, String] = { + val maps = infoCaches.map(_.readInfo(keys)).reverse + maps.fold(Map[String, String]())(_ ++ _) + } + def deleteInfo(keys: Iterable[String]): Unit = infoCaches.foreach(_.deleteInfo(keys)) + def clearInfo(): Unit = infoCaches.foreach(_.clearInfo) + + def listKeys(path: String): List[String] = { + infoCaches.foldLeft(Nil: List[String]) { (res, infoCache) => + if (res.size > 0) res else infoCache.listKeys(path) + } + } + + def genLock(s: String): CacheLock = MultiCacheLock(infoCaches.map(_.genLock(s))) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala new file mode 100644 index 0000000..b1e6764 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala @@ -0,0 +1,127 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.streaming.info + +import org.apache.griffin.measure.Loggable + +object TimeInfoCache extends Loggable with Serializable { + + private val CacheTime = "cache.time" + private val LastProcTime = "last.proc.time" + private val ReadyTime = "ready.time" + private val CleanTime = "clean.time" + private val OldCacheIndex = "old.cache.index" + + def cacheTime(path: String): String = s"${path}/${CacheTime}" + def lastProcTime(path: String): String = s"${path}/${LastProcTime}" + def readyTime(path: String): String = s"${path}/${ReadyTime}" + def cleanTime(path: String): String = s"${path}/${CleanTime}" + def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}" + + val infoPath = "info" + + val finalCacheInfoPath = "info.final" + val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}" + val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}" + val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}" + + def startTimeInfoCache(): Unit = { + genFinalReadyTime + } + + def getTimeRange(): (Long, Long) = { + readTimeRange + } + + def getCleanTime(): Long = { + readCleanTime + } + + def endTimeInfoCache: Unit = { + genFinalLastProcTime + genFinalCleanTime + } + + private def genFinalReadyTime(): Unit = { + val subPath = InfoCacheInstance.listKeys(infoPath) + val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" } + val result = InfoCacheInstance.readInfo(keys) + val times = keys.flatMap { k => + getLongOpt(result, k) + } + if (times.nonEmpty) { + val time = times.min + val map = Map[String, String]((finalReadyTime -> time.toString)) + InfoCacheInstance.cacheInfo(map) + } + } + + private def genFinalLastProcTime(): Unit = { + val subPath = InfoCacheInstance.listKeys(infoPath) + val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" } + val result = InfoCacheInstance.readInfo(keys) + val times = keys.flatMap { k => + getLongOpt(result, k) + } + if (times.nonEmpty) { + val time = times.min + val map = Map[String, String]((finalLastProcTime -> time.toString)) + InfoCacheInstance.cacheInfo(map) + } + } + + private def genFinalCleanTime(): Unit = { + val subPath = InfoCacheInstance.listKeys(infoPath) + val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" } + val result = InfoCacheInstance.readInfo(keys) + val times = keys.flatMap { k => + getLongOpt(result, k) + } + if (times.nonEmpty) { + val time = times.min + val map = Map[String, String]((finalCleanTime -> time.toString)) + InfoCacheInstance.cacheInfo(map) + } + } + + private def readTimeRange(): (Long, Long) = { + val map = InfoCacheInstance.readInfo(List(finalLastProcTime, finalReadyTime)) + val lastProcTime = getLong(map, finalLastProcTime) + val curReadyTime = getLong(map, finalReadyTime) + (lastProcTime, curReadyTime) + } + + private def readCleanTime(): Long = { + val map = InfoCacheInstance.readInfo(List(finalCleanTime)) + val cleanTime = getLong(map, finalCleanTime) + cleanTime + } + + private def getLongOpt(map: Map[String, String], key: String): Option[Long] = { + try { + map.get(key).map(_.toLong) + } catch { + case e: Throwable => None + } + } + private def getLong(map: Map[String, String], key: String) = { + getLongOpt(map, key).getOrElse(-1L) + } + +}
