http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/MongoThreadPool.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/MongoThreadPool.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/MongoThreadPool.scala deleted file mode 100644 index 2f43edb..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/MongoThreadPool.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.persist - -import java.util.Date -import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} - -import scala.concurrent.Future -import scala.util.{Failure, Success} - -object MongoThreadPool { - - import scala.concurrent.ExecutionContext.Implicits.global - - private val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor] - val MAX_RETRY = 100 - - def shutdown(): Unit = { - pool.shutdown() - pool.awaitTermination(10, TimeUnit.SECONDS) - } - - def addTask(func: () => (Long, Future[_]), retry: Int): Unit = { - val r = if (retry < 0) MAX_RETRY else retry - println(s"add task, current task num: ${pool.getQueue.size}") - pool.submit(Task(func, r)) - } - - case class Task(func: () => (Long, Future[_]), retry: Int) extends Runnable { - - override def run(): Unit = { - val st = new Date().getTime - val (t, res) = func() - res.onComplete { - case Success(value) => { - val et = new Date().getTime - println(s"task ${t} success [ using time ${et - st} ms ]") - } - case Failure(e) => { - val et = new Date().getTime - println(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}") - if (retry > 0) { - println(s"task ${t} retry [ rest retry count: ${retry - 1} ]") - pool.submit(Task(func, retry - 1)) - } else { - println(s"task ${t} retry ends but fails") - } - } - } - } - - def fail(): Unit = { - println("task fails") - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala deleted file mode 100644 index 4b30cba..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.persist - -import org.apache.griffin.measure.result._ -import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Dataset} - -import scala.util.Try - -// persist result and data by multiple persists -case class MultiPersists(persists: Iterable[Persist]) extends Persist { - - val timeStamp: Long = persists match { - case Nil => 0 - case _ => persists.head.timeStamp - } - - val config: Map[String, Any] = Map[String, Any]() - - def available(): Boolean = { persists.exists(_.available()) } - - def start(msg: String): Unit = { persists.foreach(_.start(msg)) } - def finish(): Unit = { persists.foreach(_.finish()) } - - def log(rt: Long, msg: String): Unit = { - persists.foreach { persist => - try { - persist.log(rt, msg) - } catch { - case e: Throwable => error(s"log error: ${e.getMessage}") - } - } - } - - def persistRecords(df: DataFrame, name: String): Unit = { - persists.foreach { persist => - try { - persist.persistRecords(df, name) - } catch { - case e: Throwable => error(s"persist df error: ${e.getMessage}") - } - } - } - def persistRecords(records: RDD[String], name: String): Unit = { - persists.foreach { persist => - try { - persist.persistRecords(records, name) - } catch { - case e: Throwable => error(s"persist records error: ${e.getMessage}") - } - } - } - def persistRecords(records: Iterable[String], name: String): Unit = { - persists.foreach { persist => - try { - persist.persistRecords(records, name) - } catch { - case e: Throwable => error(s"persist records error: ${e.getMessage}") - } - } - } -// def persistMetrics(metrics: Seq[String], name: String): Unit = { persists.foreach(_.persistMetrics(metrics, name)) } - def persistMetrics(metrics: Map[String, Any]): Unit = { - persists.foreach { persist => - try { - persist.persistMetrics(metrics) - } catch { - case e: Throwable => error(s"persist metrics error: ${e.getMessage}") - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala deleted file mode 100644 index 300f2c5..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.persist - -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.result._ -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Dataset} - -import scala.util.Try - - -trait Persist extends Loggable with Serializable { - val timeStamp: Long - - val config: Map[String, Any] - - def available(): Boolean - - def start(msg: String): Unit - def finish(): Unit - - def log(rt: Long, msg: String): Unit - -// def result(rt: Long, result: Result): Unit -// -// def records(recs: RDD[String], tp: String): Unit -// def records(recs: Iterable[String], tp: String): Unit - - def persistRecords(df: DataFrame, name: String): Unit - def persistRecords(records: RDD[String], name: String): Unit - def persistRecords(records: Iterable[String], name: String): Unit -// def persistMetrics(metrics: Seq[String], name: String): Unit - def persistMetrics(metrics: Map[String, Any]): Unit - -} - -//object PersistDataType { -// final val MISS = "miss" -// final val MATCH = "match" -//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala deleted file mode 100644 index b2e34a9..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.persist - -import org.apache.griffin.measure.config.params.env._ - -import scala.util.{Success, Try} - - -case class PersistFactory(persistParams: Iterable[PersistParam], metricName: String) extends Serializable { - - val HDFS_REGEX = """^(?i)hdfs$""".r - val HTTP_REGEX = """^(?i)http$""".r -// val OLDHTTP_REGEX = """^(?i)oldhttp$""".r - val LOG_REGEX = """^(?i)log$""".r - val MONGO_REGEX = """^(?i)mongo$""".r - - def getPersists(timeStamp: Long): MultiPersists = { - MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param))) - } - - // get the persists configured - private def getPersist(timeStamp: Long, persistParam: PersistParam): Option[Persist] = { - val config = persistParam.config - val persistTry = persistParam.persistType match { - case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp)) - case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp)) -// case OLDHTTP_REGEX() => Try(OldHttpPersist(config, metricName, timeStamp)) - case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp)) - case MONGO_REGEX() => Try(MongoPersist(config, metricName, timeStamp)) - case _ => throw new Exception("not supported persist type") - } - persistTry match { - case Success(persist) if (persist.available) => Some(persist) - case _ => None - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala deleted file mode 100644 index 0a647b4..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistThreadPool.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.persist - -import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} - -object PersistThreadPool { - - private val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor] - val MAX_RETRY = 100 - - def shutdown(): Unit = { - pool.shutdown() - pool.awaitTermination(10, TimeUnit.SECONDS) - } - - def addTask(func: () => Boolean, retry: Int): Unit = { - val r = if (retry < 0) MAX_RETRY else retry - println(s"add task, current task num: ${pool.getQueue.size}") - pool.submit(Task(func, r)) - } - - case class Task(func: () => Boolean, retry: Int) extends Runnable { - - override def run(): Unit = { - try { - var i = retry - var suc = false - while (!suc && i > 0) { - if (func()) { - println("task success") - suc = true - } else i = i - 1 - } - if (!suc) fail(s"retried for ${retry} times") - } catch { - case e: Throwable => fail(s"${e.getMessage}") - } - } - - def fail(msg: String): Unit = { - println(s"task fails: ${msg}") - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala deleted file mode 100644 index deccf5f..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala +++ /dev/null @@ -1,177 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process - -import java.util.Date - -import org.apache.griffin.measure.config.params._ -import org.apache.griffin.measure.config.params.env._ -import org.apache.griffin.measure.config.params.user._ -import org.apache.griffin.measure.data.source.DataSourceFactory -import org.apache.griffin.measure.persist.{Persist, PersistFactory} -import org.apache.griffin.measure.process.engine._ -import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters, TimeRange} -import org.apache.griffin.measure.rule.adaptor._ -import org.apache.griffin.measure.rule.plan._ -import org.apache.griffin.measure.rule.udf._ -import org.apache.spark.sql.{SQLContext, SparkSession} -import org.apache.spark.SparkConf - -import scala.util.Try - -case class BatchDqProcess(allParam: AllParam) extends DqProcess { - - val envParam: EnvParam = allParam.envParam - val userParam: UserParam = allParam.userParam - - val sparkParam = envParam.sparkParam - val metricName = userParam.name - val dataSourceNames = userParam.dataSources.map(_.name) - val baselineDsName = userParam.baselineDsName - -// var sparkContext: SparkContext = _ - var sqlContext: SQLContext = _ - - var sparkSession: SparkSession = _ - - def retriable: Boolean = false - - def init: Try[_] = Try { - val conf = new SparkConf().setAppName(metricName) - conf.setAll(sparkParam.config) - conf.set("spark.sql.crossJoin.enabled", "true") - sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() - sparkSession.sparkContext.setLogLevel(sparkParam.logLevel) - sqlContext = sparkSession.sqlContext - - // register udf - GriffinUdfs.register(sqlContext) - GriffinUdafs.register(sqlContext) - - // init adaptors - RuleAdaptorGroup.init(sparkSession, dataSourceNames, baselineDsName) - } - - def run: Try[_] = Try { - // start time - val startTime = new Date().getTime - - val appTime = getAppTime - val calcTimeInfo = CalcTimeInfo(appTime) - - // get persists to persist measure result - val persistFactory = PersistFactory(envParam.persistParams, metricName) - val persist: Persist = persistFactory.getPersists(appTime) - - // persist start id - val applicationId = sparkSession.sparkContext.applicationId - persist.start(applicationId) - - // get dq engines - val dqEngines = DqEngineFactory.genDqEngines(sqlContext) - - // generate data sources - val dataSources = DataSourceFactory.genDataSources(sqlContext, null, dqEngines, userParam.dataSources) - dataSources.foreach(_.init) - - // init data sources - val dsTimeRanges = dqEngines.loadData(dataSources, calcTimeInfo) - printTimeRanges(dsTimeRanges) - - val rulePlan = RuleAdaptorGroup.genRulePlan( - calcTimeInfo, userParam.evaluateRuleParam, BatchProcessType, dsTimeRanges) - - // run rules - dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps) - - // persist results - dqEngines.persistAllMetrics(rulePlan.metricExports, persistFactory) - - dqEngines.persistAllRecords(rulePlan.recordExports, persistFactory, dataSources) - - // end time - val endTime = new Date().getTime - persist.log(endTime, s"process using time: ${endTime - startTime} ms") - - // finish - persist.finish() - - // clean data - cleanData(calcTimeInfo) - -// sqlContext.tables().show(50) -// println(sqlContext.tableNames().size) - - // clear temp table -// ruleSteps.foreach { rs => -// println(rs) -// // sqlContext.dropTempTable(rs.ruleInfo.name) -// rs.ruleInfo.tmstNameOpt match { -// case Some(n) => sqlContext.dropTempTable(s"`${n}`") -// case _ => {} -// } -// } -// -// // -- test -- -// sqlContext.tables().show(50) - } - - private def cleanData(timeInfo: TimeInfo): Unit = { - TableRegisters.unregisterRunTempTables(sqlContext, timeInfo.key) - TableRegisters.unregisterCompileTempTables(timeInfo.key) - - DataFrameCaches.uncacheDataFrames(timeInfo.key) - DataFrameCaches.clearTrashDataFrames(timeInfo.key) - DataFrameCaches.clearGlobalTrashDataFrames() - } - - def end: Try[_] = Try { - TableRegisters.unregisterRunGlobalTables(sqlContext) - TableRegisters.unregisterCompileGlobalTables - - DataFrameCaches.uncacheGlobalDataFrames() - DataFrameCaches.clearGlobalTrashDataFrames() - - sparkSession.close() - sparkSession.stop() - } - -// private def cleanData(t: Long): Unit = { -// try { -//// dataSources.foreach(_.cleanOldData) -//// dataSources.foreach(_.dropTable(t)) -// -//// val cleanTime = TimeInfoCache.getCleanTime -//// CacheResultProcesser.refresh(cleanTime) -// -// sqlContext.dropTempTable() -// } catch { -// case e: Throwable => error(s"clean data error: ${e.getMessage}") -// } -// } - - private def printTimeRanges(timeRanges: Map[String, TimeRange]): Unit = { - val timeRangesStr = timeRanges.map { pair => - val (name, timeRange) = pair - s"${name} -> (${timeRange.begin}, ${timeRange.end}]" - }.mkString(", ") - println(s"data source timeRanges: ${timeRangesStr}") - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala deleted file mode 100644 index ac8f3d6..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process - -import org.apache.griffin.measure.config.params.env._ -import org.apache.griffin.measure.config.params.user._ -import org.apache.griffin.measure.log.Loggable - -import scala.util.Try - -trait DqProcess extends Loggable with Serializable { - - val envParam: EnvParam - val userParam: UserParam - - def init: Try[_] - - def run: Try[_] - - def end: Try[_] - - def retriable: Boolean - - protected def getAppTime: Long = { - if (userParam.timestamp != null && userParam.timestamp > 0) { userParam.timestamp } - else { System.currentTimeMillis } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala b/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala deleted file mode 100644 index 42aa92b..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/ExportMode.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process - -sealed trait ExportMode {} - -object ExportMode { - def defaultMode(procType: ProcessType): ExportMode = { - procType match { - case BatchProcessType => SimpleMode - case StreamingProcessType => TimestampMode - } - } -} - -final case object SimpleMode extends ExportMode {} - -final case object TimestampMode extends ExportMode {} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala b/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala deleted file mode 100644 index 9f9e424..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process - -import scala.util.matching.Regex - -sealed trait ProcessType { - val regex: Regex - val desc: String -} - -object ProcessType { - private val procTypes: List[ProcessType] = List(BatchProcessType, StreamingProcessType) - def apply(ptn: String): ProcessType = { - procTypes.find(tp => ptn match { - case tp.regex() => true - case _ => false - }).getOrElse(BatchProcessType) - } - def unapply(pt: ProcessType): Option[String] = Some(pt.desc) -} - -final case object BatchProcessType extends ProcessType { - val regex = """^(?i)batch$""".r - val desc = "batch" -} - -final case object StreamingProcessType extends ProcessType { - val regex = """^(?i)streaming$""".r - val desc = "streaming" -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala deleted file mode 100644 index 19e553e..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala +++ /dev/null @@ -1,180 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process - -import org.apache.griffin.measure.cache.info.InfoCacheInstance -import org.apache.griffin.measure.config.params._ -import org.apache.griffin.measure.config.params.env._ -import org.apache.griffin.measure.config.params.user._ -import org.apache.griffin.measure.data.source.DataSourceFactory -import org.apache.griffin.measure.persist.{Persist, PersistFactory} -import org.apache.griffin.measure.process.engine.DqEngineFactory -import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters} -import org.apache.griffin.measure.rule.adaptor.RuleAdaptorGroup -import org.apache.griffin.measure.rule.udf._ -import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} -import org.apache.spark.sql.{SQLContext, SparkSession} -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import org.apache.spark.{SparkConf, SparkContext} - -import scala.util.Try - -case class StreamingDqProcess(allParam: AllParam) extends DqProcess { - - val envParam: EnvParam = allParam.envParam - val userParam: UserParam = allParam.userParam - - val sparkParam = envParam.sparkParam - val metricName = userParam.name - val dataSourceNames = userParam.dataSources.map(_.name) - val baselineDsName = userParam.baselineDsName - -// var sparkContext: SparkContext = _ - var sqlContext: SQLContext = _ - - var sparkSession: SparkSession = _ - - def retriable: Boolean = true - - def init: Try[_] = Try { - val conf = new SparkConf().setAppName(metricName) - conf.setAll(sparkParam.config) - conf.set("spark.sql.crossJoin.enabled", "true") - sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() - sparkSession.sparkContext.setLogLevel(sparkParam.logLevel) - sqlContext = sparkSession.sqlContext - - // clear checkpoint directory - clearCpDir - - // init info cache instance - InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName) - InfoCacheInstance.init - - // register udf - GriffinUdfs.register(sqlContext) - GriffinUdafs.register(sqlContext) - - // init adaptors - val dataSourceNames = userParam.dataSources.map(_.name) - RuleAdaptorGroup.init(sparkSession, dataSourceNames, baselineDsName) - } - - def run: Try[_] = Try { - val ssc = StreamingContext.getOrCreate(sparkParam.cpDir, () => { - try { - createStreamingContext - } catch { - case e: Throwable => { - error(s"create streaming context error: ${e.getMessage}") - throw e - } - } - }) - - // start time - val appTime = getAppTime - - // get persists to persist measure result - val persistFactory = PersistFactory(envParam.persistParams, metricName) - val persist: Persist = persistFactory.getPersists(appTime) - - // persist start id - val applicationId = sparkSession.sparkContext.applicationId - persist.start(applicationId) - - // get dq engines - val dqEngines = DqEngineFactory.genDqEngines(sqlContext) - - // generate data sources - val dataSources = DataSourceFactory.genDataSources(sqlContext, ssc, dqEngines, userParam.dataSources) - dataSources.foreach(_.init) - - // process thread - val dqThread = StreamingDqThread(sqlContext, dqEngines, dataSources, - userParam.evaluateRuleParam, persistFactory, persist) - - // init data sources -// val dsTmsts = dqEngines.loadData(dataSources, appTime) - - // generate rule steps -// val ruleSteps = RuleAdaptorGroup.genRuleSteps( -// TimeInfo(appTime, appTime), userParam.evaluateRuleParam, dsTmsts) -// -// // run rules -// dqEngines.runRuleSteps(ruleSteps) -// -// // persist results -// dqEngines.persistAllResults(ruleSteps, persist) - - // end time -// val endTime = new Date().getTime -// persist.log(endTime, s"process using time: ${endTime - startTime} ms") - - val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match { - case Some(interval) => interval - case _ => throw new Exception("invalid batch interval") - } - val process = TimingProcess(processInterval, dqThread) - process.startup() - - ssc.start() - ssc.awaitTermination() - ssc.stop(stopSparkContext=true, stopGracefully=true) - - // finish - persist.finish() - -// process.shutdown() - } - - def end: Try[_] = Try { - TableRegisters.unregisterCompileGlobalTables() - TableRegisters.unregisterRunGlobalTables(sqlContext) - - DataFrameCaches.uncacheGlobalDataFrames() - DataFrameCaches.clearGlobalTrashDataFrames() - - sparkSession.close() - sparkSession.stop() - - InfoCacheInstance.close - } - - def createStreamingContext: StreamingContext = { - val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match { - case Some(interval) => Milliseconds(interval) - case _ => throw new Exception("invalid batch interval") - } - val ssc = new StreamingContext(sparkSession.sparkContext, batchInterval) - ssc.checkpoint(sparkParam.cpDir) - - ssc - } - - private def clearCpDir: Unit = { - if (sparkParam.needInitClear) { - val cpDir = sparkParam.cpDir - println(s"clear checkpoint directory ${cpDir}") - HdfsUtil.deleteHdfsPath(cpDir) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala deleted file mode 100644 index f67724f..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala +++ /dev/null @@ -1,146 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process - -import java.util.Date -import java.util.concurrent.TimeUnit - -import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} -import org.apache.griffin.measure.cache.result.CacheResultProcesser -import org.apache.griffin.measure.config.params.user.EvaluateRuleParam -import org.apache.griffin.measure.data.source.DataSource -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.persist.{Persist, PersistFactory} -import org.apache.griffin.measure.process.engine.DqEngines -import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters, TimeRange} -import org.apache.griffin.measure.rule.adaptor.{ProcessDetailsKeys, RuleAdaptorGroup, RunPhase} -import org.apache.griffin.measure.rule.plan._ -import org.apache.spark.sql.SQLContext - -case class StreamingDqThread(sqlContext: SQLContext, - dqEngines: DqEngines, - dataSources: Seq[DataSource], - evaluateRuleParam: EvaluateRuleParam, - persistFactory: PersistFactory, - appPersist: Persist - ) extends Runnable with Loggable { - - val lock = InfoCacheInstance.genLock("process") - - def run(): Unit = { - val updateTimeDate = new Date() - val updateTime = updateTimeDate.getTime - println(s"===== [${updateTimeDate}] process begins =====") - val locked = lock.lock(5, TimeUnit.SECONDS) - if (locked) { - try { - - val st = new Date().getTime - appPersist.log(st, s"starting process ...") - val calcTimeInfo = CalcTimeInfo(st) - - TimeInfoCache.startTimeInfoCache - - // init data sources - val dsTimeRanges = dqEngines.loadData(dataSources, calcTimeInfo) - printTimeRanges(dsTimeRanges) - - // generate rule steps - val rulePlan = RuleAdaptorGroup.genRulePlan( - calcTimeInfo, evaluateRuleParam, StreamingProcessType, dsTimeRanges) - - // run rules - dqEngines.runRuleSteps(calcTimeInfo, rulePlan.ruleSteps) - - val ct = new Date().getTime - val calculationTimeStr = s"calculation using time: ${ct - st} ms" - appPersist.log(ct, calculationTimeStr) - - // persist results - dqEngines.persistAllMetrics(rulePlan.metricExports, persistFactory) - - val rt = new Date().getTime - val persistResultTimeStr = s"persist result using time: ${rt - ct} ms" - appPersist.log(rt, persistResultTimeStr) - - // persist records - dqEngines.persistAllRecords(rulePlan.recordExports, persistFactory, dataSources) - - // update data sources - dqEngines.updateDataSources(rulePlan.dsUpdates, dataSources) - - // finish calculation - finishCalculation() - - val et = new Date().getTime - val persistTimeStr = s"persist records using time: ${et - rt} ms" - appPersist.log(et, persistTimeStr) - - TimeInfoCache.endTimeInfoCache - - // clean old data - cleanData(calcTimeInfo) - -// sqlContext.tables().show(20) - - } catch { - case e: Throwable => error(s"process error: ${e.getMessage}") - } finally { - lock.unlock() - } - } else { - println(s"===== [${updateTimeDate}] process ignores =====") - } - val endTime = new Date().getTime - println(s"===== [${updateTimeDate}] process ends, using ${endTime - updateTime} ms =====") - } - - // finish calculation for this round - private def finishCalculation(): Unit = { - dataSources.foreach(_.processFinish) - } - - // clean old data and old result cache - private def cleanData(timeInfo: TimeInfo): Unit = { - try { - dataSources.foreach(_.cleanOldData) - - TableRegisters.unregisterRunTempTables(sqlContext, timeInfo.key) - TableRegisters.unregisterCompileTempTables(timeInfo.key) - - DataFrameCaches.uncacheDataFrames(timeInfo.key) - DataFrameCaches.clearTrashDataFrames(timeInfo.key) - DataFrameCaches.clearGlobalTrashDataFrames() - - val cleanTime = TimeInfoCache.getCleanTime - CacheResultProcesser.refresh(cleanTime) - } catch { - case e: Throwable => error(s"clean data error: ${e.getMessage}") - } - } - - private def printTimeRanges(timeRanges: Map[String, TimeRange]): Unit = { - val timeRangesStr = timeRanges.map { pair => - val (name, timeRange) = pair - s"${name} -> (${timeRange.begin}, ${timeRange.end}]" - }.mkString(", ") - println(s"data source timeRanges: ${timeRangesStr}") - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala deleted file mode 100644 index 8d9bcb2..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process - -import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} -import java.util.{Timer, TimerTask} - -case class TimingProcess(interval: Long, runnable: Runnable) { - - val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor] - - val timer = new Timer("process", true) - - val timerTask = new TimerTask() { - override def run(): Unit = { - pool.submit(runnable) - } - } - - def startup(): Unit = { - timer.schedule(timerTask, interval, interval) - } - - def shutdown(): Unit = { - timer.cancel() - pool.shutdown() - pool.awaitTermination(10, TimeUnit.SECONDS) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala deleted file mode 100644 index c06406c..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala +++ /dev/null @@ -1,179 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process.engine - -import java.util.Date - -import org.apache.griffin.measure.cache.result.CacheResultProcesser -import org.apache.griffin.measure.config.params.user.DataSourceParam -import org.apache.griffin.measure.data.source.{DataSource, DataSourceFactory} -import org.apache.griffin.measure.persist.{Persist, PersistFactory} -import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters} -import org.apache.griffin.measure.result.AccuracyResult -import org.apache.griffin.measure.rule.adaptor.InternalColumns -import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.plan._ -import org.apache.griffin.measure.utils.JsonUtil -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.apache.spark.streaming.StreamingContext -import org.apache.griffin.measure.utils.ParamUtil._ - -import scala.util.Try - -import org.apache.spark.sql._ - -case class DataFrameOprEngine(sqlContext: SQLContext) extends SparkDqEngine { - - def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = { - ruleStep match { - case rs @ DfOprStep(name, rule, details, _, _) => { - try { - val df = rule match { - case DataFrameOprs._fromJson => DataFrameOprs.fromJson(sqlContext, details) - case DataFrameOprs._accuracy => DataFrameOprs.accuracy(sqlContext, timeInfo, details) - case DataFrameOprs._clear => DataFrameOprs.clear(sqlContext, details) - case _ => throw new Exception(s"df opr [ ${rule} ] not supported") - } - if (rs.needCache) DataFrameCaches.cacheDataFrame(timeInfo.key, name, df) - TableRegisters.registerRunTempTable(df, timeInfo.key, name) - true - } catch { - case e: Throwable => { - error(s"run df opr [ ${rule} ] error: ${e.getMessage}") - false - } - } - } - case _ => false - } - } - -} - -object DataFrameOprs { - - final val _fromJson = "from_json" - final val _accuracy = "accuracy" - final val _clear = "clear" - - object AccuracyOprKeys { - val _dfName = "df.name" - val _miss = "miss" - val _total = "total" - val _matched = "matched" - } - - def fromJson(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = { - val _dfName = "df.name" - val _colName = "col.name" - val dfName = details.getOrElse(_dfName, "").toString - val colNameOpt = details.get(_colName).map(_.toString) - - implicit val encoder = Encoders.STRING - - val df: DataFrame = sqlContext.table(s"`${dfName}`") - val rdd = colNameOpt match { - case Some(colName: String) => df.map(r => r.getAs[String](colName)) - case _ => df.map(_.getAs[String](0)) - } - sqlContext.read.json(rdd) // slow process - } - - def accuracy(sqlContext: SQLContext, timeInfo: TimeInfo, details: Map[String, Any]): DataFrame = { - import AccuracyOprKeys._ - - val dfName = details.getStringOrKey(_dfName) - val miss = details.getStringOrKey(_miss) - val total = details.getStringOrKey(_total) - val matched = details.getStringOrKey(_matched) - -// val _enableIgnoreCache = "enable.ignore.cache" -// val enableIgnoreCache = details.getBoolean(_enableIgnoreCache, false) - -// val tmst = InternalColumns.tmst - - val updateTime = new Date().getTime - - def getLong(r: Row, k: String): Option[Long] = { - try { - Some(r.getAs[Long](k)) - } catch { - case e: Throwable => None - } - } - - val df = sqlContext.table(s"`${dfName}`") - - val results = df.rdd.flatMap { row => - try { - val tmst = getLong(row, InternalColumns.tmst).getOrElse(timeInfo.calcTime) - val missCount = getLong(row, miss).getOrElse(0L) - val totalCount = getLong(row, total).getOrElse(0L) - val ar = AccuracyResult(missCount, totalCount) - if (ar.isLegal) Some((tmst, ar)) else None - } catch { - case e: Throwable => None - } - }.collect - - val updateResults = results.flatMap { pair => - val (t, result) = pair - val updatedCacheResultOpt = CacheResultProcesser.genUpdateCacheResult(t, updateTime, result) - updatedCacheResultOpt - } - - // update results - updateResults.foreach { r => - CacheResultProcesser.update(r) - } - - // generate metrics - val schema = StructType(Array( - StructField(InternalColumns.tmst, LongType), - StructField(miss, LongType), - StructField(total, LongType), - StructField(matched, LongType), - StructField(InternalColumns.record, BooleanType), - StructField(InternalColumns.empty, BooleanType) - )) - val rows = updateResults.map { r => - val ar = r.result.asInstanceOf[AccuracyResult] - Row(r.timeGroup, ar.miss, ar.total, ar.getMatch, !ar.initial, ar.eventual) - } - val rowRdd = sqlContext.sparkContext.parallelize(rows) - val retDf = sqlContext.createDataFrame(rowRdd, schema) - - retDf - } - - def clear(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = { - val _dfName = "df.name" - val dfName = details.getOrElse(_dfName, "").toString - - val df = sqlContext.table(s"`${dfName}`") - val emptyRdd = sqlContext.sparkContext.emptyRDD[Row] - sqlContext.createDataFrame(emptyRdd, df.schema) - } - -} - - - http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala deleted file mode 100644 index a2de69d..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process.engine - -import org.apache.griffin.measure.config.params.user.DataSourceParam -import org.apache.griffin.measure.data.source.DataSource -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.persist.{Persist, PersistFactory} -import org.apache.griffin.measure.process.ProcessType -import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.plan.{TimeInfo, _} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Dataset, Row} - -trait DqEngine extends Loggable with Serializable { - - def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean - - protected def collectable(): Boolean = false - - def collectMetrics(metricExport: MetricExport): Map[Long, Map[String, Any]] - - // def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] - // - // def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] - -// def collectUpdateRDD(ruleStep: RuleStep): Option[DataFrame] -// def collectRecords(timeInfo: TimeInfo, recordExport: RecordExport): Map[Long, DataFrame] - - - def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] - def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, Iterable[String])]], Set[Long]) - - def collectUpdateDf(dsUpdate: DsUpdate): Option[DataFrame] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala deleted file mode 100644 index e075584..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process.engine - -import org.apache.spark.sql.SQLContext -import org.apache.spark.streaming.StreamingContext - - -object DqEngineFactory { - - private val engineTypes = List("spark-sql", "df-opr") - - private final val SparkSqlEngineType = "spark-sql" - private final val DataFrameOprEngineType = "df-opr" - - def genDqEngines(sqlContext: SQLContext): DqEngines = { - val engines = engineTypes.flatMap { et => - genDqEngine(et, sqlContext) - } - DqEngines(engines) - } - - private def genDqEngine(engineType: String, sqlContext: SQLContext): Option[DqEngine] = { - engineType match { - case SparkSqlEngineType => Some(SparkSqlEngine(sqlContext)) - case DataFrameOprEngineType => Some(DataFrameOprEngine(sqlContext)) - case _ => None - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala deleted file mode 100644 index eab0b21..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala +++ /dev/null @@ -1,242 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process.engine - -import java.util.concurrent.atomic.AtomicInteger - -import org.apache.griffin.measure.config.params.user.DataSourceParam -import org.apache.griffin.measure.data.source._ -import org.apache.griffin.measure.log.Loggable -import org.apache.griffin.measure.persist._ -import org.apache.griffin.measure.process.temp.TimeRange -import org.apache.griffin.measure.process._ -import org.apache.griffin.measure.rule.adaptor.InternalColumns -import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.plan.{DsUpdate, _} -import org.apache.griffin.measure.utils.JsonUtil -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Dataset, Row} - -//import scala.concurrent._ -//import scala.concurrent.duration.Duration -//import scala.util.{Failure, Success, Try} -//import ExecutionContext.Implicits.global - -case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { - - val persistOrder: List[PersistType] = List(MetricPersistType, RecordPersistType) - - def loadData(dataSources: Seq[DataSource], timeInfo: TimeInfo): Map[String, TimeRange] = { - dataSources.map { ds => - (ds.name, ds.loadData(timeInfo)) - }.toMap - } - - def runRuleSteps(timeInfo: TimeInfo, ruleSteps: Seq[RuleStep]): Unit = { - ruleSteps.foreach { ruleStep => - runRuleStep(timeInfo, ruleStep) - } - } - - def persistAllMetrics(metricExports: Seq[MetricExport], persistFactory: PersistFactory - ): Unit = { - val allMetrics: Map[Long, Map[String, Any]] = { - metricExports.foldLeft(Map[Long, Map[String, Any]]()) { (ret, metricExport) => - val metrics = collectMetrics(metricExport) - metrics.foldLeft(ret) { (total, pair) => - val (k, v) = pair - total.get(k) match { - case Some(map) => total + (k -> (map ++ v)) - case _ => total + pair - } - } - } - } - - allMetrics.foreach { pair => - val (t, metric) = pair - val persist = persistFactory.getPersists(t) - persist.persistMetrics(metric) - } - } - -// private def persistCollectedRecords(recordExport: RecordExport, records: Map[Long, DataFrame], -// persistFactory: PersistFactory, dataSources: Seq[DataSource]): Unit = { -// val pc = ParallelCounter(records.size) -// val pro = promise[Boolean] -// if (records.size > 0) { -// records.foreach { pair => -// val (tmst, df) = pair -// val persist = persistFactory.getPersists(tmst) -// val updateDsCaches = recordExport.dataSourceCacheOpt match { -// case Some(dsName) => dataSources.filter(_.name == dsName).flatMap(_.dataSourceCacheOpt) -// case _ => Nil -// } -// val future = Future { -// persist.persistRecords(df, recordExport.name) -//// updateDsCaches.foreach(_.updateData(df, tmst)) -// updateDsCaches.foreach(_.updateData(Some(df))) -// true -// } -// future.onComplete { -// case Success(v) => { -// pc.finishOne(v) -// if (pc.checkDone) pro.trySuccess(pc.checkResult) -// } -// case Failure(ex) => { -// println(s"plan step failure: ${ex.getMessage}") -// pc.finishOne(false) -// if (pc.checkDone) pro.trySuccess(pc.checkResult) -// } -// } -// } -// } else pro.trySuccess(true) -// -// Await.result(pro.future, Duration.Inf) -// } - - def persistAllRecords(recordExports: Seq[RecordExport], - persistFactory: PersistFactory, dataSources: Seq[DataSource] - ): Unit = { - // method 1: multi thread persist multi data frame -// recordExports.foreach { recordExport => -// val records = collectRecords(timeInfo, recordExport, procType) -// persistCollectedRecords(recordExport, records, persistFactory, dataSources) -// } - - // method 2: multi thread persist multi iterable - recordExports.foreach { recordExport => - recordExport.mode match { - case SimpleMode => { - collectBatchRecords(recordExport).foreach { rdd => - persistCollectedBatchRecords(recordExport, rdd, persistFactory) - } - } - case TimestampMode => { - val (rddOpt, emptySet) = collectStreamingRecords(recordExport) - persistCollectedStreamingRecords(recordExport, rddOpt, emptySet, persistFactory, dataSources) - } - } - } - } - - def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] = { - val ret = engines.foldLeft(None: Option[RDD[String]]) { (ret, engine) => - if (ret.nonEmpty) ret else engine.collectBatchRecords(recordExport) - } - ret - } - def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = { - val ret = engines.foldLeft((None: Option[RDD[(Long, Iterable[String])]], Set[Long]())) { (ret, engine) => - if (ret._1.nonEmpty || ret._2.nonEmpty) ret else engine.collectStreamingRecords(recordExport) - } - ret - } - - private def persistCollectedBatchRecords(recordExport: RecordExport, - records: RDD[String], persistFactory: PersistFactory - ): Unit = { - val persist = persistFactory.getPersists(recordExport.defTimestamp) - persist.persistRecords(records, recordExport.name) - } - - private def persistCollectedStreamingRecords(recordExport: RecordExport, recordsOpt: Option[RDD[(Long, Iterable[String])]], - emtpyRecordKeys: Set[Long], persistFactory: PersistFactory, - dataSources: Seq[DataSource] - ): Unit = { -// val updateDsCaches = recordExport.dataSourceCacheOpt match { -// case Some(dsName) => dataSources.filter(_.name == dsName).flatMap(_.dataSourceCacheOpt) -// case _ => Nil -// } - - recordsOpt.foreach { records => - records.foreach { pair => - val (tmst, strs) = pair - val persist = persistFactory.getPersists(tmst) - - persist.persistRecords(strs, recordExport.name) -// updateDsCaches.foreach(_.updateData(strs, tmst)) - } - } - - emtpyRecordKeys.foreach { t => - val persist = persistFactory.getPersists(t) - persist.persistRecords(Nil, recordExport.name) -// updateDsCaches.foreach(_.updateData(Nil, t)) - } - } - - /////////////////////////// - - def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = { - val ret = engines.foldLeft(false) { (done, engine) => - done || engine.runRuleStep(timeInfo, ruleStep) - } - if (!ret) warn(s"run rule step warn: no dq engine support ${ruleStep}") - ret - } - - /////////////////////////// - - def collectMetrics(metricExport: MetricExport - ): Map[Long, Map[String, Any]] = { - val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) => - if (ret.nonEmpty) ret else engine.collectMetrics(metricExport) - } - ret - } - - def collectUpdateDf(dsUpdate: DsUpdate): Option[DataFrame] = { - val ret = engines.foldLeft(None: Option[DataFrame]) { (ret, engine) => - if (ret.nonEmpty) ret else engine.collectUpdateDf(dsUpdate) - } - ret - } - - def updateDataSources(dsUpdates: Seq[DsUpdate], - dataSources: Seq[DataSource]): Unit = { - dsUpdates.foreach { dsUpdate => - val dsName = dsUpdate.dsName - collectUpdateDf(dsUpdate) match { - case Some(df) => { - dataSources.find(_.name == dsName).foreach(_.updateData(df)) - } - case _ => { - // do nothing - } - } - } - } - -} - -case class ParallelCounter(total: Int) extends Serializable { - private val done: AtomicInteger = new AtomicInteger(0) - private val result: AtomicInteger = new AtomicInteger(0) - def finishOne(suc: Boolean): Unit = { - if (suc) result.incrementAndGet - done.incrementAndGet - } - def checkDone: Boolean = { - done.get() >= total - } - def checkResult: Boolean = { - if (total > 0) result.get() > 0 else true - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala deleted file mode 100644 index 4ba185f..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala +++ /dev/null @@ -1,183 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process.engine - -import org.apache.griffin.measure.process._ -import org.apache.griffin.measure.rule.adaptor.InternalColumns -import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.plan._ -import org.apache.griffin.measure.utils.JsonUtil -import org.apache.spark.rdd.RDD -import org.apache.spark.sql._ -import org.apache.griffin.measure.utils.ParamUtil._ - -trait SparkDqEngine extends DqEngine { - - val sqlContext: SQLContext - - val emptyMetricMap = Map[Long, Map[String, Any]]() - val emptyMap = Map[String, Any]() - val emptyRecordMap = Map[Long, DataFrame]() - - private def getMetricMaps(dfName: String): Seq[Map[String, Any]] = { - val pdf = sqlContext.table(s"`${dfName}`") - val records = pdf.toJSON.collect() - if (records.size > 0) { - records.flatMap { rec => - try { - val value = JsonUtil.toAnyMap(rec) - Some(value) - } catch { - case e: Throwable => None - } - }.toSeq - } else Nil - } - - private def normalizeMetric(metrics: Seq[Map[String, Any]], name: String, collectType: CollectType - ): Map[String, Any] = { - collectType match { - case EntriesCollectType => metrics.headOption.getOrElse(emptyMap) - case ArrayCollectType => Map[String, Any]((name -> metrics)) - case MapCollectType => { - val v = metrics.headOption.getOrElse(emptyMap) - Map[String, Any]((name -> v)) - } - case _ => { - if (metrics.size > 1) Map[String, Any]((name -> metrics)) - else metrics.headOption.getOrElse(emptyMap) - } - } - } - - def collectMetrics(metricExport: MetricExport): Map[Long, Map[String, Any]] = { - if (collectable) { - val MetricExport(name, stepName, collectType, defTmst, mode) = metricExport - try { - val metricMaps: Seq[Map[String, Any]] = getMetricMaps(stepName) - mode match { - case SimpleMode => { - val metrics: Map[String, Any] = normalizeMetric(metricMaps, name, collectType) - emptyMetricMap + (defTmst -> metrics) - } - case TimestampMode => { - val tmstMetrics = metricMaps.map { metric => - val tmst = metric.getLong(InternalColumns.tmst, defTmst) - val pureMetric = metric.removeKeys(InternalColumns.columns) - (tmst, pureMetric) - } - tmstMetrics.groupBy(_._1).map { pair => - val (k, v) = pair - val maps = v.map(_._2) - val mtc = normalizeMetric(maps, name, collectType) - (k, mtc) - } - } - } - } catch { - case e: Throwable => { - error(s"collect metrics ${name} error: ${e.getMessage}") - emptyMetricMap - } - } - } else emptyMetricMap - } - - private def getTmst(row: Row, defTmst: Long): Long = { - try { - row.getAs[Long](InternalColumns.tmst) - } catch { - case _: Throwable => defTmst - } - } - - private def getRecordDataFrame(recordExport: RecordExport): Option[DataFrame] = { - if (collectable) { - val stepDf = sqlContext.table(s"`${recordExport.stepName}`") - Some(stepDf) - } else None - } - - def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] = { - getRecordDataFrame(recordExport).map(_.toJSON.rdd) - } - - def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = { - implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.STRING) - val RecordExport(_, _, _, originDFOpt, defTmst, procType) = recordExport - getRecordDataFrame(recordExport) match { - case Some(stepDf) => { - originDFOpt match { - case Some(originName) => { - val tmsts = (stepDf.collect.flatMap { row => - try { - val tmst = getTmst(row, defTmst) - val empty = row.getAs[Boolean](InternalColumns.empty) - Some((tmst, empty)) - } catch { - case _: Throwable => None - } - }) - val emptyTmsts = tmsts.filter(_._2).map(_._1).toSet - val recordTmsts = tmsts.filter(!_._2).map(_._1).toSet - if (recordTmsts.size > 0) { - val recordsDf = sqlContext.table(s"`${originName}`") - val records = recordsDf.flatMap { row => - val tmst = getTmst(row, defTmst) - if (recordTmsts.contains(tmst)) { - try { - val map = SparkRowFormatter.formatRow(row) - val str = JsonUtil.toJson(map) - Some((tmst, str)) - } catch { - case e: Throwable => None - } - } else None - } - (Some(records.rdd.groupByKey), emptyTmsts) - } else (None, emptyTmsts) - } - case _ => { - val records = stepDf.flatMap { row => - val tmst = getTmst(row, defTmst) - try { - val map = SparkRowFormatter.formatRow(row) - val str = JsonUtil.toJson(map) - Some((tmst, str)) - } catch { - case e: Throwable => None - } - } - (Some(records.rdd.groupByKey), Set[Long]()) - } - } - } - case _ => (None, Set[Long]()) - } - } - - def collectUpdateDf(dsUpdate: DsUpdate): Option[DataFrame] = { - if (collectable) { - val DsUpdate(_, stepName) = dsUpdate - val stepDf = sqlContext.table(s"`${stepName}`") - Some(stepDf) - } else None - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala deleted file mode 100644 index 6ed0559..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process.engine - -import org.apache.spark.sql.Row -import org.apache.spark.sql.types.{ArrayType, DataType, StructField, StructType} - -import scala.collection.mutable.ArrayBuffer - -object SparkRowFormatter { - - def formatRow(row: Row): Map[String, Any] = { - formatRowWithSchema(row, row.schema) - } - - private def formatRowWithSchema(row: Row, schema: StructType): Map[String, Any] = { - formatStruct(schema.fields, row) - } - - private def formatStruct(schema: Seq[StructField], r: Row) = { - val paired = schema.zip(r.toSeq) - paired.foldLeft(Map[String, Any]())((s, p) => s ++ formatItem(p)) - } - - private def formatItem(p: Pair[StructField, Any]): Map[String, Any] = { - p match { - case (sf, a) => - sf.dataType match { - case ArrayType(et, _) => - Map(sf.name -> (if (a == null) a else formatArray(et, a.asInstanceOf[ArrayBuffer[Any]]))) - case StructType(s) => - Map(sf.name -> (if (a == null) a else formatStruct(s, a.asInstanceOf[Row]))) - case _ => Map(sf.name -> a) - } - } - } - - private def formatArray(et: DataType, arr: ArrayBuffer[Any]): Seq[Any] = { - et match { - case StructType(s) => arr.map(e => formatStruct(s, e.asInstanceOf[Row])) - case ArrayType(t, _) => - arr.map(e => formatArray(t, e.asInstanceOf[ArrayBuffer[Any]])) - case _ => arr - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala deleted file mode 100644 index e4ecb49..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process.engine - -import java.util.Date - -import org.apache.griffin.measure.config.params.user.DataSourceParam -import org.apache.griffin.measure.data.source._ -import org.apache.griffin.measure.persist.{Persist, PersistFactory} -import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters} -import org.apache.griffin.measure.rule.adaptor.{GlobalKeys, InternalColumns} -import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.plan._ -import org.apache.griffin.measure.utils.JsonUtil -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext -import org.apache.spark.streaming.StreamingContext - -case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine { - - override protected def collectable(): Boolean = true - - def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = { - ruleStep match { - case rs @ SparkSqlStep(name, rule, details, _, _) => { - try { - val rdf = if (rs.isGlobal && !TableRegisters.existRunGlobalTable(name)) { - details.get(GlobalKeys._initRule) match { - case Some(initRule: String) => sqlContext.sql(initRule) - case _ => sqlContext.emptyDataFrame - } - } else sqlContext.sql(rule) - -// println(name) -// rdf.show(3) - - if (rs.isGlobal) { - if (rs.needCache) DataFrameCaches.cacheGlobalDataFrame(name, rdf) - TableRegisters.registerRunGlobalTable(rdf, name) - } else { - if (rs.needCache) DataFrameCaches.cacheDataFrame(timeInfo.key, name, rdf) - TableRegisters.registerRunTempTable(rdf, timeInfo.key, name) - } - true - } catch { - case e: Throwable => { - error(s"run spark sql [ ${rule} ] error: ${e.getMessage}") - false - } - } - } - case _ => false - } - } - -} - - - - http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala b/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala deleted file mode 100644 index 58e8a13..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/temp/DataFrameCaches.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process.temp - -import org.apache.griffin.measure.log.Loggable -import org.apache.spark.sql.DataFrame - -import scala.collection.concurrent.{TrieMap, Map => ConcMap} - -object DataFrameCaches extends Loggable { - - final val _global = "_global" - - private val caches: ConcMap[String, Map[String, DataFrame]] = TrieMap[String, Map[String, DataFrame]]() - private val trashCaches: ConcMap[String, Seq[DataFrame]] = TrieMap[String, Seq[DataFrame]]() - - private def trashDataFrame(key: String, df: DataFrame): Unit = { - trashCaches.get(key) match { - case Some(seq) => { - val suc = trashCaches.replace(key, seq, seq :+ df) - if (!suc) trashDataFrame(key, df) - } - case _ => { - val oldOpt = trashCaches.putIfAbsent(key, Seq[DataFrame](df)) - if (oldOpt.nonEmpty) trashDataFrame(key, df) - } - } - } - private def trashDataFrames(key: String, dfs: Seq[DataFrame]): Unit = { - trashCaches.get(key) match { - case Some(seq) => { - val suc = trashCaches.replace(key, seq, seq ++ dfs) - if (!suc) trashDataFrames(key, dfs) - } - case _ => { - val oldOpt = trashCaches.putIfAbsent(key, dfs) - if (oldOpt.nonEmpty) trashDataFrames(key, dfs) - } - } - } - - def cacheDataFrame(key: String, name: String, df: DataFrame): Unit = { - println(s"try to cache df ${name}") - caches.get(key) match { - case Some(mp) => { - mp.get(name) match { - case Some(odf) => { - val suc = caches.replace(key, mp, mp + (name -> df)) - if (suc) { - println(s"cache after replace old df") - df.cache - trashDataFrame(key, odf) - } else { - cacheDataFrame(key, name, df) - } - } - case _ => { - val suc = caches.replace(key, mp, mp + (name -> df)) - if (suc) { - println(s"cache after replace no old df") - df.cache - } else { - cacheDataFrame(key, name, df) - } - } - } - } - case _ => { - val oldOpt = caches.putIfAbsent(key, Map[String, DataFrame]((name -> df))) - if (oldOpt.isEmpty) { - println(s"cache after put absent") - df.cache - } else { - cacheDataFrame(key, name, df) - } - } - } - } - def cacheGlobalDataFrame(name: String, df: DataFrame): Unit = { - cacheDataFrame(_global, name, df) - } - - def uncacheDataFrames(key: String): Unit = { - caches.remove(key) match { - case Some(mp) => { - trashDataFrames(key, mp.values.toSeq) - } - case _ => {} - } - } - def uncacheGlobalDataFrames(): Unit = { - uncacheDataFrames(_global) - } - - def clearTrashDataFrames(key: String): Unit = { - trashCaches.remove(key) match { - case Some(seq) => seq.foreach(_.unpersist) - case _ => {} - } - } - def clearGlobalTrashDataFrames(): Unit = { - clearTrashDataFrames(_global) - } - - def getDataFrames(key: String): Map[String, DataFrame] = { - caches.get(key) match { - case Some(mp) => mp - case _ => Map[String, DataFrame]() - } - } - def getGlobalDataFrames(): Map[String, DataFrame] = { - getDataFrames(_global) - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala deleted file mode 100644 index 91a7541..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegisters.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process.temp - -import org.apache.griffin.measure.log.Loggable -import org.apache.spark.sql.{DataFrame, SQLContext} - -import scala.collection.concurrent.{TrieMap, Map => ConcMap} - -object TableRegisters extends Loggable { - - final val _global = "_global" -// -// val tables: ConcMap[String, Set[String]] = TrieMap[String, Set[String]]() - - val compileTableRegs = TableRegs() - val runTableRegs = TableRegs() - -// private def registerTable(key: String, table: String): Unit = { -// tables.get(key) match { -// case Some(set) => { -// val suc = tables.replace(key, set, set + table) -// if (!suc) registerTable(key, table) -// } -// case _ => { -// val oldOpt = tables.putIfAbsent(key, Set[String](table)) -// if (oldOpt.nonEmpty) registerTable(key, table) -// } -// } -// } -// -// private def unregisterTable(key: String, table: String): Option[String] = { -// tables.get(key) match { -// case Some(set) => { -// val ftb = set.find(_ == table) -// ftb match { -// case Some(tb) => { -// val nset = set - tb -// val suc = tables.replace(key, set, nset) -// if (suc) Some(tb) -// else unregisterTable(key, table) -// } -// case _ => None -// } -// } -// case _ => None -// } -// } -// -// private def unregisterTables(key: String): Set[String] = { -// tables.remove(key) match { -// case Some(set) => set -// case _ => Set[String]() -// } -// } - - private def dropTempTable(sqlContext: SQLContext, table: String): Unit = { - try { - sqlContext.dropTempTable(table) - } catch { - case e: Throwable => warn(s"drop temp table ${table} fails") - } - } - - // ----- - - def registerRunGlobalTable(df: DataFrame, table: String): Unit = { - registerRunTempTable(df, _global, table) - } - - def registerRunTempTable(df: DataFrame, key: String, table: String): Unit = { - runTableRegs.registerTable(key, table) - df.registerTempTable(table) - } - - def registerCompileGlobalTable(table: String): Unit = { - registerCompileTempTable(_global, table) - } - - def registerCompileTempTable(key: String, table: String): Unit = { - compileTableRegs.registerTable(key, table) - } - - def unregisterRunTempTable(sqlContext: SQLContext, key: String, table: String): Unit = { - runTableRegs.unregisterTable(key, table).foreach(dropTempTable(sqlContext, _)) - } - - def unregisterCompileTempTable(key: String, table: String): Unit = { - compileTableRegs.unregisterTable(key, table) - } - - def unregisterRunGlobalTables(sqlContext: SQLContext): Unit = { - unregisterRunTempTables(sqlContext, _global) - } - - def unregisterCompileGlobalTables(): Unit = { - unregisterCompileTempTables(_global) - } - - def unregisterRunTempTables(sqlContext: SQLContext, key: String): Unit = { - runTableRegs.unregisterTables(key).foreach(dropTempTable(sqlContext, _)) - } - - def unregisterCompileTempTables(key: String): Unit = { - compileTableRegs.unregisterTables(key) - } - - def existRunGlobalTable(table: String): Boolean = { - existRunTempTable(_global, table) - } - - def existCompileGlobalTable(table: String): Boolean = { - existCompileTempTable(_global, table) - } - - def existRunTempTable(key: String, table: String): Boolean = { - runTableRegs.existTable(key, table) - } - - def existCompileTempTable(key: String, table: String): Boolean = { - compileTableRegs.existTable(key, table) - } - - def getRunGlobalTables(): Set[String] = { - getRunTempTables(_global) - } - - def getRunTempTables(key: String): Set[String] = { - runTableRegs.getTables(key) - } - -} - -//object TempKeys { -// def key(t: Long): String = s"${t}" -// def key(head: String, t: Long): String = s"${head}_${t}" -//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala b/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala deleted file mode 100644 index 8f8e0f1..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/process/temp/TableRegs.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.process.temp - -import org.apache.spark.sql.SQLContext - -import scala.collection.concurrent.{TrieMap, Map => ConcMap} - -case class TableRegs() { - - private val tables: ConcMap[String, Set[String]] = TrieMap[String, Set[String]]() - - def registerTable(key: String, table: String): Unit = { - tables.get(key) match { - case Some(set) => { - val suc = tables.replace(key, set, set + table) - if (!suc) registerTable(key, table) - } - case _ => { - val oldOpt = tables.putIfAbsent(key, Set[String](table)) - if (oldOpt.nonEmpty) registerTable(key, table) - } - } - } - - def unregisterTable(key: String, table: String): Option[String] = { - tables.get(key) match { - case Some(set) => { - val ftb = set.find(_ == table) - ftb match { - case Some(tb) => { - val nset = set - tb - val suc = tables.replace(key, set, nset) - if (suc) Some(tb) - else unregisterTable(key, table) - } - case _ => None - } - } - case _ => None - } - } - - def unregisterTables(key: String): Set[String] = { - tables.remove(key) match { - case Some(set) => set - case _ => Set[String]() - } - } - - def existTable(key: String, table: String): Boolean = { - tables.get(key) match { - case Some(set) => set.contains(table) - case _ => false - } - } - - def getTables(key: String): Set[String] = { - tables.get(key) match { - case Some(set) => set - case _ => Set[String]() - } - } - -}
