http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala index c90e572..39444cd 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala @@ -28,9 +28,13 @@ import org.apache.griffin.measure.data.source.DataSource import org.apache.griffin.measure.log.Loggable import org.apache.griffin.measure.persist.{Persist, PersistFactory} import org.apache.griffin.measure.process.engine.DqEngines +import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters} import org.apache.griffin.measure.rule.adaptor.{RuleAdaptorGroup, RunPhase} +import org.apache.griffin.measure.rule.plan._ +import org.apache.spark.sql.SQLContext -case class StreamingDqThread(dqEngines: DqEngines, +case class StreamingDqThread(sqlContext: SQLContext, + dqEngines: DqEngines, dataSources: Seq[DataSource], evaluateRuleParam: EvaluateRuleParam, persistFactory: PersistFactory, @@ -49,63 +53,88 @@ case class StreamingDqThread(dqEngines: DqEngines, val st = new Date().getTime appPersist.log(st, s"starting process ...") + val calcTimeInfo = CalcTimeInfo(st) TimeInfoCache.startTimeInfoCache // init data sources - dqEngines.loadData(dataSources, st) + val dsTmsts = dqEngines.loadData(dataSources, calcTimeInfo) + + println(s"data sources timestamps: ${dsTmsts}") // generate rule steps - val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps(evaluateRuleParam, RunPhase) +// val ruleSteps = RuleAdaptorGroup.genRuleSteps( +// CalcTimeInfo(st), evaluateRuleParam, dsTmsts) + val rulePlan = RuleAdaptorGroup.genRulePlan( + calcTimeInfo, evaluateRuleParam, StreamingProcessType) + +// ruleSteps.foreach(println) // run rules - dqEngines.runRuleSteps(ruleSteps) +// dqEngines.runRuleSteps(ruleSteps) + dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps) val ct = new Date().getTime val calculationTimeStr = s"calculation using time: ${ct - st} ms" - println(calculationTimeStr) +// println(calculationTimeStr) appPersist.log(ct, calculationTimeStr) // persist results - val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory) +// val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory) + dqEngines.persistAllMetrics(calcTimeInfo, rulePlan.metricExports, + StreamingProcessType, persistFactory) +// println(s"--- timeGroups: ${timeGroups}") val rt = new Date().getTime val persistResultTimeStr = s"persist result using time: ${rt - ct} ms" - println(persistResultTimeStr) appPersist.log(rt, persistResultTimeStr) - val rdds = dqEngines.collectUpdateRDDs(ruleSteps, timeGroups) - rdds.foreach(_._2.cache()) - rdds.foreach { pr => - val (step, rdd) = pr - val cnt = rdd.count - println(s"step [${step.name}] group count: ${cnt}") - } - - val lt = new Date().getTime - val collectRddTimeStr = s"collect records using time: ${lt - rt} ms" - println(collectRddTimeStr) - appPersist.log(lt, collectRddTimeStr) - // persist records - dqEngines.persistAllRecords(rdds, persistFactory) -// dqEngines.persistAllRecords(ruleSteps, persistFactory, timeGroups) + dqEngines.persistAllRecords(calcTimeInfo, rulePlan.recordExports, + StreamingProcessType, persistFactory, dataSources) - // update data source - dqEngines.updateDataSources(rdds, dataSources) -// dqEngines.updateDataSources(ruleSteps, dataSources, timeGroups) + val et = new Date().getTime + val persistTimeStr = s"persist records using time: ${et - rt} ms" + appPersist.log(et, persistTimeStr) - rdds.foreach(_._2.unpersist()) +// val dfs = dqEngines.collectUpdateRDDs(ruleSteps, timeGroups.toSet) +// dfs.foreach(_._2.cache()) +// dfs.foreach { pr => +// val (step, df) = pr +// val cnt = df.count +// println(s"step [${step.name}] group count: ${cnt}") +// } +// +// val lt = new Date().getTime +// val collectRddTimeStr = s"collect records using time: ${lt - rt} ms" +//// println(collectRddTimeStr) +// appPersist.log(lt, collectRddTimeStr) +// +// // persist records +// dqEngines.persistAllRecords(dfs, persistFactory) +//// dqEngines.persistAllRecords(ruleSteps, persistFactory, timeGroups) +// +// // update data source +// dqEngines.updateDataSources(dfs, dataSources) +//// dqEngines.updateDataSources(ruleSteps, dataSources, timeGroups) +// +// dfs.foreach(_._2.unpersist()) TimeInfoCache.endTimeInfoCache +// sqlContext.tables().show(20) + + // cache global data +// val globalTables = TableRegisters.getRunGlobalTables +// globalTables.foreach { gt => +// val df = sqlContext.table(gt) +// df.cache +// } + // clean old data - cleanData + cleanData(calcTimeInfo) - val et = new Date().getTime - val persistTimeStr = s"persist records using time: ${et - lt} ms" - println(persistTimeStr) - appPersist.log(et, persistTimeStr) +// sqlContext.tables().show(20) } catch { case e: Throwable => error(s"process error: ${e.getMessage}") @@ -120,10 +149,16 @@ case class StreamingDqThread(dqEngines: DqEngines, } // clean old data and old result cache - private def cleanData(): Unit = { + private def cleanData(timeInfo: TimeInfo): Unit = { try { dataSources.foreach(_.cleanOldData) - dataSources.foreach(_.dropTable) + + TableRegisters.unregisterRunTempTables(sqlContext, timeInfo.key) + TableRegisters.unregisterCompileTempTables(timeInfo.key) + + DataFrameCaches.uncacheDataFrames(timeInfo.key) + DataFrameCaches.clearTrashDataFrames(timeInfo.key) + DataFrameCaches.clearGlobalTrashDataFrames() val cleanTime = TimeInfoCache.getCleanTime CacheResultProcesser.refresh(cleanTime)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala b/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala deleted file mode 100644 index 91855c2..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process.check - -import org.apache.spark.sql.SQLContext - -case class DataChecker(sqlContext: SQLContext) { - - def existDataSourceName(name: String): Boolean = { - sqlContext.tableNames.exists(_ == name) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala index c3205b5..59b765e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala @@ -22,41 +22,36 @@ import java.util.Date import org.apache.griffin.measure.cache.result.CacheResultProcesser import org.apache.griffin.measure.config.params.user.DataSourceParam -import org.apache.griffin.measure.data.connector.GroupByColumn import org.apache.griffin.measure.data.source.{DataSource, DataSourceFactory} import org.apache.griffin.measure.persist.{Persist, PersistFactory} +import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters} import org.apache.griffin.measure.result.AccuracyResult +import org.apache.griffin.measure.rule.adaptor.InternalColumns import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.step._ +import org.apache.griffin.measure.rule.plan._ import org.apache.griffin.measure.utils.JsonUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.streaming.StreamingContext +import org.apache.griffin.measure.utils.ParamUtil._ + +import scala.util.Try case class DataFrameOprEngine(sqlContext: SQLContext) extends SparkDqEngine { - def runRuleStep(ruleStep: ConcreteRuleStep): Boolean = { + def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = { ruleStep match { - case DfOprStep(name, rule, details, _, _) => { + case rs @ DfOprStep(name, rule, details, _, _) => { try { - rule match { - case DataFrameOprs._fromJson => { - val df = DataFrameOprs.fromJson(sqlContext, details) - df.registerTempTable(name) - } - case DataFrameOprs._accuracy => { - val df = DataFrameOprs.accuracy(sqlContext, details) - df.registerTempTable(name) - } - case DataFrameOprs._clear => { - val df = DataFrameOprs.clear(sqlContext, details) - df.registerTempTable(name) - } - case _ => { - throw new Exception(s"df opr [ ${rule} ] not supported") - } + val df = rule match { + case DataFrameOprs._fromJson => DataFrameOprs.fromJson(sqlContext, details) + case DataFrameOprs._accuracy => DataFrameOprs.accuracy(sqlContext, timeInfo, details) + case DataFrameOprs._clear => DataFrameOprs.clear(sqlContext, details) + case _ => throw new Exception(s"df opr [ ${rule} ] not supported") } + if (rs.needCache) DataFrameCaches.cacheDataFrame(timeInfo.key, name, df) + TableRegisters.registerRunTempTable(df, timeInfo.key, name) true } catch { case e: Throwable => { @@ -77,6 +72,13 @@ object DataFrameOprs { final val _accuracy = "accuracy" final val _clear = "clear" + object AccuracyOprKeys { + val _dfName = "df.name" + val _miss = "miss" + val _total = "total" + val _matched = "matched" + } + def fromJson(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = { val _dfName = "df.name" val _colName = "col.name" @@ -88,41 +90,44 @@ object DataFrameOprs { case Some(colName: String) => df.map(_.getAs[String](colName)) case _ => df.map(_.getAs[String](0)) } - sqlContext.read.json(rdd) + sqlContext.read.json(rdd) // slow process } - def accuracy(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = { - val _dfName = "df.name" - val _miss = "miss" - val _total = "total" - val _matched = "matched" -// val _tmst = "tmst" - val dfName = details.getOrElse(_dfName, _dfName).toString - val miss = details.getOrElse(_miss, _miss).toString - val total = details.getOrElse(_total, _total).toString - val matched = details.getOrElse(_matched, _matched).toString -// val tmst = details.getOrElse(_tmst, _tmst).toString - val tmst = GroupByColumn.tmst + def accuracy(sqlContext: SQLContext, timeInfo: TimeInfo, details: Map[String, Any]): DataFrame = { + import AccuracyOprKeys._ + + val dfName = details.getStringOrKey(_dfName) + val miss = details.getStringOrKey(_miss) + val total = details.getStringOrKey(_total) + val matched = details.getStringOrKey(_matched) + +// val _enableIgnoreCache = "enable.ignore.cache" +// val enableIgnoreCache = details.getBoolean(_enableIgnoreCache, false) + +// val tmst = InternalColumns.tmst val updateTime = new Date().getTime - def getLong(r: Row, k: String): Long = { + def getLong(r: Row, k: String): Option[Long] = { try { - r.getAs[Long](k) + Some(r.getAs[Long](k)) } catch { - case e: Throwable => 0L + case e: Throwable => None } } val df = sqlContext.table(s"`${dfName}`") + val results = df.flatMap { row => - val t = getLong(row, tmst) - if (t > 0) { - val missCount = getLong(row, miss) - val totalCount = getLong(row, total) + try { + val tmst = getLong(row, InternalColumns.tmst).getOrElse(timeInfo.calcTime) + val missCount = getLong(row, miss).getOrElse(0L) + val totalCount = getLong(row, total).getOrElse(0L) val ar = AccuracyResult(missCount, totalCount) - Some((t, ar)) - } else None + if (ar.isLegal) Some((tmst, ar)) else None + } catch { + case e: Throwable => None + } }.collect val updateResults = results.flatMap { pair => @@ -131,24 +136,28 @@ object DataFrameOprs { updatedCacheResultOpt } - // update + // update results updateResults.foreach { r => CacheResultProcesser.update(r) } + // generate metrics val schema = StructType(Array( - StructField(tmst, LongType), + StructField(InternalColumns.tmst, LongType), StructField(miss, LongType), StructField(total, LongType), - StructField(matched, LongType) + StructField(matched, LongType), + StructField(InternalColumns.record, BooleanType), + StructField(InternalColumns.empty, BooleanType) )) val rows = updateResults.map { r => val ar = r.result.asInstanceOf[AccuracyResult] - Row(r.timeGroup, ar.miss, ar.total, ar.getMatch) + Row(r.timeGroup, ar.miss, ar.total, ar.getMatch, !ar.initial, ar.eventual) } val rowRdd = sqlContext.sparkContext.parallelize(rows) - sqlContext.createDataFrame(rowRdd, schema) + val retDf = sqlContext.createDataFrame(rowRdd, schema) + retDf } def clear(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala index e28dfa4..a48c4d1 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala @@ -22,22 +22,30 @@ import org.apache.griffin.measure.config.params.user.DataSourceParam import org.apache.griffin.measure.data.source.DataSource import org.apache.griffin.measure.log.Loggable import org.apache.griffin.measure.persist.{Persist, PersistFactory} +import org.apache.griffin.measure.process.ProcessType import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.step._ +import org.apache.griffin.measure.rule.plan.{TimeInfo, _} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Row} trait DqEngine extends Loggable with Serializable { - def runRuleStep(ruleStep: ConcreteRuleStep): Boolean + def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean protected def collectable(): Boolean = false - def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]] + def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport, procType: ProcessType + ): Map[Long, Map[String, Any]] -// def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] -// -// def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] + // def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] + // + // def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] - def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] -} +// def collectUpdateRDD(ruleStep: RuleStep): Option[DataFrame] + def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport, procType: ProcessType + ): Map[Long, DataFrame] + + + def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] + def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, Iterable[String])]], Set[Long]) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala index 1af2ae3..03ee208 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala @@ -18,38 +18,47 @@ under the License. */ package org.apache.griffin.measure.process.engine +import java.util.concurrent.atomic.AtomicInteger + import org.apache.griffin.measure.config.params.user.DataSourceParam -import org.apache.griffin.measure.data.connector.GroupByColumn import org.apache.griffin.measure.data.source._ import org.apache.griffin.measure.log.Loggable import org.apache.griffin.measure.persist.{Persist, PersistFactory} +import org.apache.griffin.measure.process.{BatchProcessType, ProcessType, StreamingProcessType} +import org.apache.griffin.measure.rule.adaptor.InternalColumns import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.step._ +import org.apache.griffin.measure.rule.plan._ +import org.apache.griffin.measure.utils.JsonUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{DataFrame, Row} + +import scala.concurrent._ +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success} +import ExecutionContext.Implicits.global case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { val persistOrder: List[PersistType] = List(MetricPersistType, RecordPersistType) - def loadData(dataSources: Seq[DataSource], ms: Long): Unit = { - dataSources.foreach { ds => - ds.loadData(ms) - } + def loadData(dataSources: Seq[DataSource], timeInfo: TimeInfo): Map[String, Set[Long]] = { + dataSources.map { ds => + (ds.name, ds.loadData(timeInfo)) + }.toMap } - def runRuleSteps(ruleSteps: Seq[ConcreteRuleStep]): Unit = { + def runRuleSteps(timeInfo: TimeInfo, ruleSteps: Seq[RuleStep]): Unit = { ruleSteps.foreach { ruleStep => - runRuleStep(ruleStep) + runRuleStep(timeInfo, ruleStep) } } - def persistAllMetrics(ruleSteps: Seq[ConcreteRuleStep], persistFactory: PersistFactory - ): Iterable[Long] = { - val metricSteps = ruleSteps.filter(_.persistType == MetricPersistType) + def persistAllMetrics(timeInfo: TimeInfo, metricExports: Seq[MetricExport], + procType: ProcessType, persistFactory: PersistFactory + ): Unit = { val allMetrics: Map[Long, Map[String, Any]] = { - metricSteps.foldLeft(Map[Long, Map[String, Any]]()) { (ret, step) => - val metrics = collectMetrics(step) + metricExports.foldLeft(Map[Long, Map[String, Any]]()) { (ret, step) => + val metrics = collectMetrics(timeInfo, step, procType) metrics.foldLeft(ret) { (total, pair) => val (k, v) = pair total.get(k) match { @@ -59,15 +68,149 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { } } } - val updateTimeGroups = allMetrics.keys + allMetrics.foreach { pair => val (t, metric) = pair val persist = persistFactory.getPersists(t) persist.persistMetrics(metric) } - updateTimeGroups } + private def persistCollectedRecords(recordExport: RecordExport, records: Map[Long, DataFrame], + persistFactory: PersistFactory, dataSources: Seq[DataSource]): Unit = { + val pc = ParallelCounter(records.size) + val pro = promise[Boolean] + if (records.size > 0) { + records.foreach { pair => + val (tmst, df) = pair + val persist = persistFactory.getPersists(tmst) + val updateDsCaches = recordExport.dataSourceCacheOpt match { + case Some(dsName) => dataSources.filter(_.name == dsName).flatMap(_.dataSourceCacheOpt) + case _ => Nil + } + val future = Future { +// df.cache + persist.persistRecords(df, recordExport.name) + updateDsCaches.foreach(_.updateData(df, tmst)) +// df.unpersist + true + } + future.onComplete { + case Success(v) => { + pc.finishOne(v) + if (pc.checkDone) pro.trySuccess(pc.checkResult) + } + case Failure(ex) => { + println(s"plan step failure: ${ex.getMessage}") + pc.finishOne(false) + if (pc.checkDone) pro.trySuccess(pc.checkResult) + } + } + } + } else pro.trySuccess(true) + + Await.result(pro.future, Duration.Inf) + } + + def persistAllRecords(timeInfo: TimeInfo, recordExports: Seq[RecordExport], procType: ProcessType, + persistFactory: PersistFactory, dataSources: Seq[DataSource] + ): Unit = { + // method 1: multi thread persist multi data frame +// recordExports.foreach { recordExport => +// val records = collectRecords(timeInfo, recordExport, procType) +// persistCollectedRecords(recordExport, records, persistFactory, dataSources) +// } + + // method 2: multi thread persist multi iterable + recordExports.foreach { recordExport => +// val records = collectRecords(timeInfo, recordExport, procType) + procType match { + case BatchProcessType => { + collectBatchRecords(recordExport).foreach { rdd => + persistCollectedBatchRecords(timeInfo, recordExport, rdd, persistFactory) + } + } + case StreamingProcessType => { + val (rddOpt, emptySet) = collectStreamingRecords(recordExport) + persistCollectedStreamingRecords(recordExport, rddOpt, emptySet, persistFactory, dataSources) +// collectStreamingRecords(recordExport).foreach { rddPair => +// persistCollectedStreamingRecords(recordExport, rddPair._1, rddPair._2, persistFactory, dataSources) +// } + } + } + } + } + + def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] = { + val ret = engines.foldLeft(None: Option[RDD[String]]) { (ret, engine) => + if (ret.nonEmpty) ret else engine.collectBatchRecords(recordExport) + } + ret + } + def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = { + val ret = engines.foldLeft((None: Option[RDD[(Long, Iterable[String])]], Set[Long]())) { (ret, engine) => + if (ret._1.nonEmpty || ret._2.nonEmpty) ret else engine.collectStreamingRecords(recordExport) + } + ret + } + + private def persistCollectedBatchRecords(timeInfo: TimeInfo, recordExport: RecordExport, + records: RDD[String], persistFactory: PersistFactory + ): Unit = { + val persist = persistFactory.getPersists(timeInfo.calcTime) + persist.persistRecords(records, recordExport.name) + } + + private def persistCollectedStreamingRecords(recordExport: RecordExport, recordsOpt: Option[RDD[(Long, Iterable[String])]], + emtpyRecordKeys: Set[Long], persistFactory: PersistFactory, + dataSources: Seq[DataSource] + ): Unit = { + val updateDsCaches = recordExport.dataSourceCacheOpt match { + case Some(dsName) => dataSources.filter(_.name == dsName).flatMap(_.dataSourceCacheOpt) + case _ => Nil + } + + recordsOpt.foreach { records => + records.foreach { pair => + val (tmst, strs) = pair + val persist = persistFactory.getPersists(tmst) + + persist.persistRecords(strs, recordExport.name) + updateDsCaches.foreach(_.updateData(strs, tmst)) + } + } + + emtpyRecordKeys.foreach { t => + val persist = persistFactory.getPersists(t) + persist.persistRecords(Nil, recordExport.name) + updateDsCaches.foreach(_.updateData(Nil, t)) + } + } + +// private def persistCollectedStreamingRecords(recordExport: RecordExport, records: RDD[(Long, Iterable[String])], +// emtpyRecordKeys: Set[Long], persistFactory: PersistFactory, +// dataSources: Seq[DataSource] +// ): Unit = { +// val updateDsCaches = recordExport.dataSourceCacheOpt match { +// case Some(dsName) => dataSources.filter(_.name == dsName).flatMap(_.dataSourceCacheOpt) +// case _ => Nil +// } +// +// records.foreach { pair => +// val (tmst, strs) = pair +// val persist = persistFactory.getPersists(tmst) +// +// persist.persistRecords(strs, recordExport.name) +// updateDsCaches.foreach(_.updateData(strs, tmst)) +// } +// +// emtpyRecordKeys.foreach { t => +// val persist = persistFactory.getPersists(t) +// persist.persistRecords(Nil, recordExport.name) +// updateDsCaches.foreach(_.updateData(Nil, t)) +// } +// } + // def persistAllRecords(ruleSteps: Seq[ConcreteRuleStep], persistFactory: PersistFactory, // timeGroups: Iterable[Long]): Unit = { // val recordSteps = ruleSteps.filter(_.persistType == RecordPersistType) @@ -119,9 +262,9 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { /////////////////////////// - def runRuleStep(ruleStep: ConcreteRuleStep): Boolean = { + def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = { val ret = engines.foldLeft(false) { (done, engine) => - done || engine.runRuleStep(ruleStep) + done || engine.runRuleStep(timeInfo, ruleStep) } if (!ret) warn(s"run rule step warn: no dq engine support ${ruleStep}") ret @@ -139,69 +282,143 @@ case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { // engine.collectUpdateCacheDatas(ruleStep, timeGroups) // }.headOption // } - def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]] = { + def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport, procType: ProcessType + ): Map[Long, Map[String, Any]] = { val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) => - ret ++ engine.collectMetrics(ruleStep) + if (ret.nonEmpty) ret else engine.collectMetrics(timeInfo, metricExport, procType) } ret } - def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long] - ): Option[RDD[(Long, Iterable[String])]] = { - engines.flatMap { engine => - engine.collectUpdateRDD(ruleStep, timeGroups) - }.headOption + def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport, procType: ProcessType + ): Map[Long, DataFrame] = { + val ret = engines.foldLeft(Map[Long, DataFrame]()) { (ret, engine) => + if (ret.nonEmpty) ret else engine.collectRecords(timeInfo, recordExport, procType) + } + ret } + def collectUpdateRDD(ruleStep: RuleStep): Option[DataFrame] = { +// engines.flatMap { engine => +// engine.collectUpdateRDD(ruleStep) +// }.headOption + None + } + +// def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long] +// ): Option[RDD[(Long, Iterable[String])]] = { +// engines.flatMap { engine => +// engine.collectUpdateRDD(ruleStep, timeGroups) +// }.headOption +// } + //////////////////////////// - def collectUpdateRDDs(ruleSteps: Seq[ConcreteRuleStep], timeGroups: Iterable[Long] - ): Seq[(ConcreteRuleStep, RDD[(Long, Iterable[String])])] = { - ruleSteps.flatMap { rs => - collectUpdateRDD(rs, timeGroups) match { - case Some(rdd) => Some((rs, rdd)) - case _ => None - } - } + def collectUpdateRDDs(ruleSteps: Seq[RuleStep], timeGroups: Set[Long] + ): Seq[(RuleStep, DataFrame)] = { +// ruleSteps.flatMap { rs => +// val t = rs.timeInfo.tmst +// if (timeGroups.contains(t)) { +// collectUpdateRDD(rs).map((rs, _)) +// } else None +// } + Nil } - def persistAllRecords(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, Iterable[String])])], +// def collectUpdateRDDs(ruleSteps: Seq[ConcreteRuleStep], timeGroups: Iterable[Long] +// ): Seq[(ConcreteRuleStep, RDD[(Long, Iterable[String])])] = { +// ruleSteps.flatMap { rs => +// collectUpdateRDD(rs, timeGroups) match { +// case Some(rdd) => Some((rs, rdd)) +// case _ => None +// } +// } +// } + + def persistAllRecords(stepRdds: Seq[(RuleStep, DataFrame)], persistFactory: PersistFactory): Unit = { - stepRdds.foreach { stepRdd => - val (step, rdd) = stepRdd - if (step.persistType == RecordPersistType) { - val name = step.name - rdd.foreach { pair => - val (t, items) = pair - val persist = persistFactory.getPersists(t) - persist.persistRecords(items, name) - } - } - } +// stepRdds.foreach { stepRdd => +// val (step, df) = stepRdd +// if (step.ruleInfo.persistType == RecordPersistType) { +// val name = step.ruleInfo.name +// val t = step.timeInfo.tmst +// val persist = persistFactory.getPersists(t) +// persist.persistRecords(df, name) +// } +// } } - def updateDataSources(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, Iterable[String])])], +// def persistAllRecords(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, Iterable[String])])], +// persistFactory: PersistFactory): Unit = { +// stepRdds.foreach { stepRdd => +// val (step, rdd) = stepRdd +// if (step.ruleInfo.persistType == RecordPersistType) { +// val name = step.name +// rdd.foreach { pair => +// val (t, items) = pair +// val persist = persistFactory.getPersists(t) +// persist.persistRecords(items, name) +// } +// } +// } +// } + + def updateDataSources(stepRdds: Seq[(RuleStep, DataFrame)], dataSources: Seq[DataSource]): Unit = { - stepRdds.foreach { stepRdd => - val (step, rdd) = stepRdd - if (step.updateDataSource.nonEmpty) { - val udpateDataSources = dataSources.filter { ds => - step.updateDataSource match { - case Some(dsName) if (dsName == ds.name) => true - case _ => false - } - } - if (udpateDataSources.size > 0) { - val name = step.name - rdd.foreach { pair => - val (t, items) = pair - udpateDataSources.foreach { ds => - ds.dataSourceCacheOpt.foreach(_.updateData(items, t)) - } - } - } - } - } +// stepRdds.foreach { stepRdd => +// val (step, df) = stepRdd +// if (step.ruleInfo.cacheDataSourceOpt.nonEmpty) { +// val udpateDsCaches = dataSources.filter { ds => +// step.ruleInfo.cacheDataSourceOpt match { +// case Some(dsName) if (dsName == ds.name) => true +// case _ => false +// } +// }.flatMap(_.dataSourceCacheOpt) +// if (udpateDsCaches.size > 0) { +// val t = step.timeInfo.tmst +// udpateDsCaches.foreach(_.updateData(df, t)) +// } +// } +// } } +// def updateDataSources(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, Iterable[String])])], +// dataSources: Seq[DataSource]): Unit = { +// stepRdds.foreach { stepRdd => +// val (step, rdd) = stepRdd +// if (step.ruleInfo.cacheDataSourceOpt.nonEmpty) { +// val udpateDataSources = dataSources.filter { ds => +// step.ruleInfo.cacheDataSourceOpt match { +// case Some(dsName) if (dsName == ds.name) => true +// case _ => false +// } +// } +// if (udpateDataSources.size > 0) { +// val name = step.name +// rdd.foreach { pair => +// val (t, items) = pair +// udpateDataSources.foreach { ds => +// ds.dataSourceCacheOpt.foreach(_.updateData(items, t)) +// } +// } +// } +// } +// } +// } + } + +case class ParallelCounter(total: Int) extends Serializable { + private val done: AtomicInteger = new AtomicInteger(0) + private val result: AtomicInteger = new AtomicInteger(0) + def finishOne(suc: Boolean): Unit = { + if (suc) result.incrementAndGet + done.incrementAndGet + } + def checkDone: Boolean = { + done.get() >= total + } + def checkResult: Boolean = { + if (total > 0) result.get() > 0 else true + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala index e8a7b16..f1e12d2 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala @@ -18,100 +18,316 @@ under the License. */ package org.apache.griffin.measure.process.engine -import org.apache.griffin.measure.data.connector.GroupByColumn +import org.apache.griffin.measure.cache.tmst.{TempName, TmstCache} import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.rule.dsl.{MetricPersistType, RecordPersistType} -import org.apache.griffin.measure.rule.step._ +import org.apache.griffin.measure.process.{BatchProcessType, ProcessType, StreamingProcessType} +import org.apache.griffin.measure.rule.adaptor.InternalColumns +import org.apache.griffin.measure.rule.dsl._ +import org.apache.griffin.measure.rule.plan._ import org.apache.griffin.measure.utils.JsonUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.griffin.measure.utils.ParamUtil._ trait SparkDqEngine extends DqEngine { val sqlContext: SQLContext - def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]] = { + val emptyMetricMap = Map[Long, Map[String, Any]]() + val emptyMap = Map[String, Any]() + val emptyRecordMap = Map[Long, DataFrame]() + + private def getMetricMaps(dfName: String): Seq[Map[String, Any]] = { + val pdf = sqlContext.table(s"`${dfName}`") + val records = pdf.toJSON.collect() + if (records.size > 0) { + records.flatMap { rec => + try { + val value = JsonUtil.toAnyMap(rec) + Some(value) + } catch { + case e: Throwable => None + } + }.toSeq + } else Nil + } + + private def normalizeMetric(metrics: Seq[Map[String, Any]], name: String, collectType: CollectType + ): Map[String, Any] = { + collectType match { + case EntriesCollectType => metrics.headOption.getOrElse(emptyMap) + case ArrayCollectType => Map[String, Any]((name -> metrics)) + case MapCollectType => { + val v = metrics.headOption.getOrElse(emptyMap) + Map[String, Any]((name -> v)) + } + case _ => { + if (metrics.size > 1) Map[String, Any]((name -> metrics)) + else metrics.headOption.getOrElse(emptyMap) + } + } + } + + def collectMetrics(timeInfo: TimeInfo, metricExport: MetricExport, procType: ProcessType + ): Map[Long, Map[String, Any]] = { if (collectable) { - val emptyMap = Map[String, Any]() - ruleStep match { - case step: ConcreteRuleStep if (step.persistType == MetricPersistType) => { - val name = step.name - try { - val pdf = sqlContext.table(s"`${name}`") - val records = pdf.toJSON.collect() - - val pairs = records.flatMap { rec => - try { - val value = JsonUtil.toAnyMap(rec) - value.get(GroupByColumn.tmst) match { - case Some(t) => { - val key = t.toString.toLong - Some((key, value)) - } - case _ => None - } - } catch { - case e: Throwable => None - } + val MetricExport(name, stepName, collectType) = metricExport + try { + val metricMaps = getMetricMaps(stepName) + procType match { + case BatchProcessType => { + val metrics: Map[String, Any] = normalizeMetric(metricMaps, name, collectType) + emptyMetricMap + (timeInfo.calcTime -> metrics) + } + case StreamingProcessType => { + val tmstMetrics = metricMaps.map { metric => + val tmst = metric.getLong(InternalColumns.tmst, timeInfo.calcTime) + val pureMetric = metric.removeKeys(InternalColumns.columns) + (tmst, pureMetric) } - val groupedPairs = pairs.foldLeft(Map[Long, Seq[Map[String, Any]]]()) { (ret, pair) => + tmstMetrics.groupBy(_._1).map { pair => val (k, v) = pair - ret.get(k) match { - case Some(seq) => ret + (k -> (seq :+ v)) - case _ => ret + (k -> (v :: Nil)) - } + val maps = v.map(_._2) + val mtc = normalizeMetric(maps, name, collectType) + (k, mtc) } - groupedPairs.mapValues { vs => - if (vs.size > 1) { - Map[String, Any]((name -> vs)) - } else { - vs.headOption.getOrElse(emptyMap) - } + } + } + } catch { + case e: Throwable => { + error(s"collect metrics ${name} error: ${e.getMessage}") + emptyMetricMap + } + } + } else emptyMetricMap + } + + + def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport, procType: ProcessType + ): Map[Long, DataFrame] = { + if (collectable) { + val RecordExport(_, stepName, _, originDFOpt) = recordExport + val stepDf = sqlContext.table(s"`${stepName}`") + val recordsDf = originDFOpt match { + case Some(originName) => sqlContext.table(s"`${originName}`") + case _ => stepDf + } + + procType match { + case BatchProcessType => { + val recordsDf = sqlContext.table(s"`${stepName}`") + emptyRecordMap + (timeInfo.calcTime -> recordsDf) + } + case StreamingProcessType => { + originDFOpt match { + case Some(originName) => { + val recordsDf = sqlContext.table(s"`${originName}`") + stepDf.collect.map { row => + val tmst = row.getAs[Long](InternalColumns.tmst) + val trdf = recordsDf.filter(s"`${InternalColumns.tmst}` = ${tmst}") + (tmst, trdf) + }.toMap } - } catch { - case e: Throwable => { - error(s"collect metrics ${name} error: ${e.getMessage}") - Map[Long, Map[String, Any]]() + case _ => { + val recordsDf = sqlContext.table(s"`${stepName}`") + emptyRecordMap + (timeInfo.calcTime -> recordsDf) } } } - case _ => Map[Long, Map[String, Any]]() } - } else Map[Long, Map[String, Any]]() + } else emptyRecordMap } - def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long] - ): Option[RDD[(Long, Iterable[String])]] = { + private def getRecordDataFrame(recordExport: RecordExport): Option[DataFrame] = { if (collectable) { - ruleStep match { - case step: ConcreteRuleStep if ((step.persistType == RecordPersistType) - || (step.updateDataSource.nonEmpty)) => { - val name = step.name - try { - val pdf = sqlContext.table(s"`${name}`") - val cols = pdf.columns - val rdd = pdf.flatMap { row => - val values = cols.flatMap { col => - Some((col, row.getAs[Any](col))) - }.toMap - values.get(GroupByColumn.tmst) match { - case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, JsonUtil.toJson(values))) - case _ => None + val RecordExport(_, stepName, _, _) = recordExport + val stepDf = sqlContext.table(s"`${stepName}`") + Some(stepDf) + } else None + } + + def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] = { + getRecordDataFrame(recordExport).map(_.toJSON) + } + + def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = { + val RecordExport(_, _, _, originDFOpt) = recordExport + getRecordDataFrame(recordExport) match { + case Some(stepDf) => { + originDFOpt match { + case Some(originName) => { + val tmsts = (stepDf.collect.flatMap { row => + try { + val tmst = row.getAs[Long](InternalColumns.tmst) + val empty = row.getAs[Boolean](InternalColumns.empty) + Some((tmst, empty)) + } catch { + case _: Throwable => None + } + }) + val emptyTmsts = tmsts.filter(_._2).map(_._1).toSet + val recordTmsts = tmsts.filter(!_._2).map(_._1).toSet + if (recordTmsts.size > 0) { + val recordsDf = sqlContext.table(s"`${originName}`") + val records = recordsDf.flatMap { row => + val tmst = row.getAs[Long](InternalColumns.tmst) + if (recordTmsts.contains(tmst)) { + try { + val map = SparkRowFormatter.formatRow(row) + val str = JsonUtil.toJson(map) + Some((tmst, str)) + } catch { + case e: Throwable => None + } + } else None + } + (Some(records.groupByKey), emptyTmsts) + } else (None, emptyTmsts) + } + case _ => { + val records = stepDf.flatMap { row => + val tmst = row.getAs[Long](InternalColumns.tmst) + try { + val map = SparkRowFormatter.formatRow(row) + val str = JsonUtil.toJson(map) + Some((tmst, str)) + } catch { + case e: Throwable => None } - }.groupByKey() - Some(rdd) - } catch { - case e: Throwable => { - error(s"collect records ${name} error: ${e.getMessage}") - None } + (Some(records.groupByKey), Set[Long]()) } } - case _ => None } - } else None + case _ => (None, Set[Long]()) + } +// val recordsOpt = getRecordDataFrame(recordExport).flatMap { stepDf => +// originDFOpt match { +// case Some(originName) => { +// val tmsts = (stepDf.collect.flatMap { row => +// try { +// val tmst = row.getAs[Long](InternalColumns.tmst) +// val empty = row.getAs[Boolean](InternalColumns.empty) +// Some((tmst, empty)) +// } catch { +// case _: Throwable => None +// } +// }) +// val emptyTmsts = tmsts.filter(_._2).map(_._1).toSet +// val recordTmsts = tmsts.filter(!_._2).map(_._1).toSet +// if (recordTmsts.size > 0) { +// val recordsDf = sqlContext.table(s"`${originName}`") +// val records = recordsDf.flatMap { row => +// val tmst = row.getAs[Long](InternalColumns.tmst) +// if (recordTmsts.contains(tmst)) { +// try { +// val map = SparkRowFormatter.formatRow(row) +// val str = JsonUtil.toJson(map) +// Some((tmst, str)) +// } catch { +// case e: Throwable => None +// } +// } else None +// } +// Some((Some(records.groupByKey), emptyTmsts)) +// } else Some((None, emptyTmsts)) +// } +// case _ => { +// val records = stepDf.flatMap { row => +// val tmst = row.getAs[Long](InternalColumns.tmst) +// try { +// val map = SparkRowFormatter.formatRow(row) +// val str = JsonUtil.toJson(map) +// Some((tmst, str)) +// } catch { +// case e: Throwable => None +// } +// } +// Some(records.groupByKey) +// } +// } +// } } +// +// def collectUpdateRDD(ruleStep: ConcreteRuleStep): Option[DataFrame] = { +// if (collectable) { +// ruleStep match { +// case step: ConcreteRuleStep if ((step.ruleInfo.persistType == RecordPersistType) +// || (step.ruleInfo.cacheDataSourceOpt.nonEmpty)) => { +// val tmst = step.timeInfo.tmst +//// val metricName = step.ruleInfo.name +// +// step.ruleInfo.tmstNameOpt match { +// case Some(metricTmstName) => { +// try { +// val pdf = sqlContext.table(s"`${metricTmstName}`") +// Some(pdf) +// } catch { +// case e: Throwable => { +// error(s"collect records ${metricTmstName} error: ${e.getMessage}") +// None +// } +// } +// } +// case _ => None +// } +// } +// case _ => None +// } +// } else None +// } + + + + + +// def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long] +// ): Option[RDD[(Long, Iterable[String])]] = { +// if (collectable) { +// ruleStep match { +// case step: ConcreteRuleStep if ((step.ruleInfo.persistType == RecordPersistType) +// || (step.ruleInfo.cacheDataSourceOpt.nonEmpty)) => { +// val tmst = step.timeInfo.tmst +// val metricName = step.ruleInfo.name +// +// step.ruleInfo.tmstNameOpt match { +// case Some(metricTmstName) => { +// try { +// val pdf = sqlContext.table(s"`${metricTmstName}`") +// val cols = pdf.columns +// val rdd = pdf.flatMap { row => +// val values = cols.flatMap { col => +// Some((col, row.getAs[Any](col))) +// }.toMap +// values.get(GroupByColumn.tmst) match { +// case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, JsonUtil.toJson(values))) +// case _ => None +// } +// }.groupByKey() +// +// // find other keys in time groups, create empty records for those timestamps +// val existKeys = rdd.keys.collect +// val otherKeys = timeGroups.filter(t => !existKeys.exists(_ == t)) +// val otherPairs = otherKeys.map((_, Iterable[String]())).toSeq +// val otherPairRdd = sqlContext.sparkContext.parallelize(otherPairs) +// +// Some(rdd union otherPairRdd) +// } catch { +// case e: Throwable => { +// error(s"collect records ${metricTmstName} error: ${e.getMessage}") +// None +// } +// } +// } +// case _ => None +// } +// } +// case _ => None +// } +// } else None +// } + // def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = { // ruleStep match { // case step: ConcreteRuleStep if (step.persistType == RecordPersistType) => { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala index 9c47d77..9de7955 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala @@ -21,11 +21,12 @@ package org.apache.griffin.measure.process.engine import java.util.Date import org.apache.griffin.measure.config.params.user.DataSourceParam -import org.apache.griffin.measure.data.connector.GroupByColumn import org.apache.griffin.measure.data.source._ import org.apache.griffin.measure.persist.{Persist, PersistFactory} +import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters} +import org.apache.griffin.measure.rule.adaptor.{GlobalKeys, InternalColumns} import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.step._ +import org.apache.griffin.measure.rule.plan._ import org.apache.griffin.measure.utils.JsonUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, GroupedData, SQLContext} @@ -35,12 +36,24 @@ case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine { override protected def collectable(): Boolean = true - def runRuleStep(ruleStep: ConcreteRuleStep): Boolean = { + def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = { ruleStep match { - case SparkSqlStep(name, rule, _, _, _) => { + case rs @ SparkSqlStep(name, rule, details, _, _) => { try { - val rdf = sqlContext.sql(rule) - rdf.registerTempTable(name) + val rdf = if (rs.isGlobal && !TableRegisters.existRunGlobalTable(name)) { + details.get(GlobalKeys._initRule) match { + case Some(initRule: String) => sqlContext.sql(initRule) + case _ => sqlContext.emptyDataFrame + } + } else sqlContext.sql(rule) + + if (rs.isGlobal) { + if (rs.needCache) DataFrameCaches.cacheGlobalDataFrame(name, rdf) + TableRegisters.registerRunGlobalTable(rdf, name) + } else { + if (rs.needCache) DataFrameCaches.cacheDataFrame(timeInfo.key, name, rdf) + TableRegisters.registerRunTempTable(rdf, timeInfo.key, name) + } true } catch { case e: Throwable => { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala b/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala new file mode 100644 index 0000000..fc5fea3 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala @@ -0,0 +1,115 @@ +package org.apache.griffin.measure.process.temp + +import org.apache.griffin.measure.log.Loggable +import org.apache.spark.sql.DataFrame + +import scala.collection.concurrent.{TrieMap, Map => ConcMap} + +object DataFrameCaches extends Loggable { + + final val _global = "_global" + + private val caches: ConcMap[String, Map[String, DataFrame]] = TrieMap[String, Map[String, DataFrame]]() + private val trashCaches: ConcMap[String, Seq[DataFrame]] = TrieMap[String, Seq[DataFrame]]() + + private def trashDataFrame(key: String, df: DataFrame): Unit = { + trashCaches.get(key) match { + case Some(seq) => { + val suc = trashCaches.replace(key, seq, seq :+ df) + if (!suc) trashDataFrame(key, df) + } + case _ => { + val oldOpt = trashCaches.putIfAbsent(key, Seq[DataFrame](df)) + if (oldOpt.nonEmpty) trashDataFrame(key, df) + } + } + } + private def trashDataFrames(key: String, dfs: Seq[DataFrame]): Unit = { + trashCaches.get(key) match { + case Some(seq) => { + val suc = trashCaches.replace(key, seq, seq ++ dfs) + if (!suc) trashDataFrames(key, dfs) + } + case _ => { + val oldOpt = trashCaches.putIfAbsent(key, dfs) + if (oldOpt.nonEmpty) trashDataFrames(key, dfs) + } + } + } + + def cacheDataFrame(key: String, name: String, df: DataFrame): Unit = { + println(s"try to cache df ${name}") + caches.get(key) match { + case Some(mp) => { + mp.get(name) match { + case Some(odf) => { + val suc = caches.replace(key, mp, mp + (name -> df)) + if (suc) { + println(s"cache after replace old df") + df.cache + trashDataFrame(key, odf) + } else { + cacheDataFrame(key, name, df) + } + } + case _ => { + val suc = caches.replace(key, mp, mp + (name -> df)) + if (suc) { + println(s"cache after replace no old df") + df.cache + } else { + cacheDataFrame(key, name, df) + } + } + } + } + case _ => { + val oldOpt = caches.putIfAbsent(key, Map[String, DataFrame]((name -> df))) + if (oldOpt.isEmpty) { + println(s"cache after put absent") + df.cache + } else { + cacheDataFrame(key, name, df) + } + } + } + } + def cacheGlobalDataFrame(name: String, df: DataFrame): Unit = { + cacheDataFrame(_global, name, df) + } + + def uncacheDataFrames(key: String): Unit = { + caches.remove(key) match { + case Some(mp) => { + trashDataFrames(key, mp.values.toSeq) + } + case _ => {} + } + } + def uncacheGlobalDataFrames(): Unit = { + uncacheDataFrames(_global) + } + + def clearTrashDataFrames(key: String): Unit = { + trashCaches.remove(key) match { + case Some(seq) => seq.foreach(_.unpersist) + case _ => {} + } + } + def clearGlobalTrashDataFrames(): Unit = { + clearTrashDataFrames(_global) + } + + def getDataFrames(key: String): Map[String, DataFrame] = { + caches.get(key) match { + case Some(mp) => mp + case _ => Map[String, DataFrame]() + } + } + def getGlobalDataFrames(): Map[String, DataFrame] = { + getDataFrames(_global) + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala new file mode 100644 index 0000000..91a7541 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala @@ -0,0 +1,153 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process.temp + +import org.apache.griffin.measure.log.Loggable +import org.apache.spark.sql.{DataFrame, SQLContext} + +import scala.collection.concurrent.{TrieMap, Map => ConcMap} + +object TableRegisters extends Loggable { + + final val _global = "_global" +// +// val tables: ConcMap[String, Set[String]] = TrieMap[String, Set[String]]() + + val compileTableRegs = TableRegs() + val runTableRegs = TableRegs() + +// private def registerTable(key: String, table: String): Unit = { +// tables.get(key) match { +// case Some(set) => { +// val suc = tables.replace(key, set, set + table) +// if (!suc) registerTable(key, table) +// } +// case _ => { +// val oldOpt = tables.putIfAbsent(key, Set[String](table)) +// if (oldOpt.nonEmpty) registerTable(key, table) +// } +// } +// } +// +// private def unregisterTable(key: String, table: String): Option[String] = { +// tables.get(key) match { +// case Some(set) => { +// val ftb = set.find(_ == table) +// ftb match { +// case Some(tb) => { +// val nset = set - tb +// val suc = tables.replace(key, set, nset) +// if (suc) Some(tb) +// else unregisterTable(key, table) +// } +// case _ => None +// } +// } +// case _ => None +// } +// } +// +// private def unregisterTables(key: String): Set[String] = { +// tables.remove(key) match { +// case Some(set) => set +// case _ => Set[String]() +// } +// } + + private def dropTempTable(sqlContext: SQLContext, table: String): Unit = { + try { + sqlContext.dropTempTable(table) + } catch { + case e: Throwable => warn(s"drop temp table ${table} fails") + } + } + + // ----- + + def registerRunGlobalTable(df: DataFrame, table: String): Unit = { + registerRunTempTable(df, _global, table) + } + + def registerRunTempTable(df: DataFrame, key: String, table: String): Unit = { + runTableRegs.registerTable(key, table) + df.registerTempTable(table) + } + + def registerCompileGlobalTable(table: String): Unit = { + registerCompileTempTable(_global, table) + } + + def registerCompileTempTable(key: String, table: String): Unit = { + compileTableRegs.registerTable(key, table) + } + + def unregisterRunTempTable(sqlContext: SQLContext, key: String, table: String): Unit = { + runTableRegs.unregisterTable(key, table).foreach(dropTempTable(sqlContext, _)) + } + + def unregisterCompileTempTable(key: String, table: String): Unit = { + compileTableRegs.unregisterTable(key, table) + } + + def unregisterRunGlobalTables(sqlContext: SQLContext): Unit = { + unregisterRunTempTables(sqlContext, _global) + } + + def unregisterCompileGlobalTables(): Unit = { + unregisterCompileTempTables(_global) + } + + def unregisterRunTempTables(sqlContext: SQLContext, key: String): Unit = { + runTableRegs.unregisterTables(key).foreach(dropTempTable(sqlContext, _)) + } + + def unregisterCompileTempTables(key: String): Unit = { + compileTableRegs.unregisterTables(key) + } + + def existRunGlobalTable(table: String): Boolean = { + existRunTempTable(_global, table) + } + + def existCompileGlobalTable(table: String): Boolean = { + existCompileTempTable(_global, table) + } + + def existRunTempTable(key: String, table: String): Boolean = { + runTableRegs.existTable(key, table) + } + + def existCompileTempTable(key: String, table: String): Boolean = { + compileTableRegs.existTable(key, table) + } + + def getRunGlobalTables(): Set[String] = { + getRunTempTables(_global) + } + + def getRunTempTables(key: String): Set[String] = { + runTableRegs.getTables(key) + } + +} + +//object TempKeys { +// def key(t: Long): String = s"${t}" +// def key(head: String, t: Long): String = s"${head}_${t}" +//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala new file mode 100644 index 0000000..d205099 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala @@ -0,0 +1,81 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process.temp + +import org.apache.spark.sql.SQLContext + +import scala.collection.concurrent.{TrieMap, Map => ConcMap} + +case class TableRegs() { + + private val tables: ConcMap[String, Set[String]] = TrieMap[String, Set[String]]() + + def registerTable(key: String, table: String): Unit = { + tables.get(key) match { + case Some(set) => { + val suc = tables.replace(key, set, set + table) + if (!suc) registerTable(key, table) + } + case _ => { + val oldOpt = tables.putIfAbsent(key, Set[String](table)) + if (oldOpt.nonEmpty) registerTable(key, table) + } + } + } + + def unregisterTable(key: String, table: String): Option[String] = { + tables.get(key) match { + case Some(set) => { + val ftb = set.find(_ == table) + ftb match { + case Some(tb) => { + val nset = set - tb + val suc = tables.replace(key, set, nset) + if (suc) Some(tb) + else unregisterTable(key, table) + } + case _ => None + } + } + case _ => None + } + } + + def unregisterTables(key: String): Set[String] = { + tables.remove(key) match { + case Some(set) => set + case _ => Set[String]() + } + } + + def existTable(key: String, table: String): Boolean = { + tables.get(key) match { + case Some(set) => set.exists(_ == table) + case _ => false + } + } + + def getTables(key: String): Set[String] = { + tables.get(key) match { + case Some(set) => set + case _ => Set[String]() + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala b/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala index 16bb772..7b75043 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/result/AccuracyResult.scala @@ -23,10 +23,16 @@ case class AccuracyResult(miss: Long, total: Long) extends Result { type T = AccuracyResult + override def isLegal(): Boolean = getTotal > 0 + def update(delta: T): T = { AccuracyResult(delta.miss, total) } + def initial(): Boolean = { + getMatch <= 0 + } + def eventual(): Boolean = { this.miss <= 0 } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala b/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala index 803416e..c90e095 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/result/ProfileResult.scala @@ -27,6 +27,10 @@ case class ProfileResult(matchCount: Long, totalCount: Long) extends Result { ProfileResult(matchCount + delta.matchCount, totalCount) } + def initial(): Boolean = { + this.matchCount <= 0 + } + def eventual(): Boolean = { this.matchCount >= totalCount } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala b/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala index 6dcd9a1..caf6d96 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/result/Result.scala @@ -23,8 +23,12 @@ trait Result extends Serializable { type T <: Result + def isLegal(): Boolean = true + def update(delta: T): T + def initial(): Boolean + def eventual(): Boolean def differsFrom(other: T): Boolean http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/9c586ee6/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala index eb57838..5447ccc 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/DataFrameOprAdaptor.scala @@ -19,26 +19,37 @@ under the License. package org.apache.griffin.measure.rule.adaptor import org.apache.griffin.measure.process.ProcessType -import org.apache.griffin.measure.rule.step._ +import org.apache.griffin.measure.rule.plan.{TimeInfo, _} +import org.apache.griffin.measure.utils.ParamUtil._ -case class DataFrameOprAdaptor(adaptPhase: AdaptPhase) extends RuleAdaptor { +case class DataFrameOprAdaptor() extends RuleAdaptor { - def genRuleStep(param: Map[String, Any]): Seq[RuleStep] = { - DfOprStep(getName(param), getRule(param), getDetails(param), - getPersistType(param), getUpdateDataSource(param)) :: Nil - } - def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = { - ruleStep match { - case rs @ DfOprStep(_, _, _, _, _) => rs :: Nil - case _ => Nil - } - } +// def genRuleStep(timeInfo: TimeInfo, param: Map[String, Any]): Seq[RuleStep] = { +// val ruleInfo = RuleInfoGen(param, timeInfo) +// DfOprStep(timeInfo, ruleInfo) :: Nil +//// DfOprStep(getName(param), getRule(param), getDetails(param), +//// getPersistType(param), getUpdateDataSource(param)) :: Nil +// } +// def adaptConcreteRuleStep(ruleStep: RuleStep): Seq[ConcreteRuleStep] = { +// ruleStep match { +// case rs @ DfOprStep(_, _) => rs :: Nil +// case _ => Nil +// } +// } + +// def getTempSourceNames(param: Map[String, Any]): Seq[String] = { +// param.get(_name) match { +// case Some(name) => name.toString :: Nil +// case _ => Nil +// } +// } + + import RuleParamKeys._ - def getTempSourceNames(param: Map[String, Any]): Seq[String] = { - param.get(_name) match { - case Some(name) => name.toString :: Nil - case _ => Nil - } + def genRulePlan(timeInfo: TimeInfo, param: Map[String, Any], procType: ProcessType): RulePlan = { + val name = getRuleName(param) + val step = DfOprStep(name, getRule(param), getDetails(param), getCache(param), getGlobal(param)) + RulePlan(step :: Nil, genRuleExports(param, name, name)) } }
