http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala index 6d5bac3..225ee41 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/HttpPersist.scala @@ -21,8 +21,10 @@ package org.apache.griffin.measure.persist import org.apache.griffin.measure.result._ import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame import scala.util.Try +import org.apache.griffin.measure.utils.ParamUtil._ // persist result by http way case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { @@ -30,8 +32,10 @@ case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: val Api = "api" val Method = "method" - val api = config.getOrElse(Api, "").toString - val method = config.getOrElse(Method, "post").toString + val api = config.getString(Api, "") + val method = config.getString(Method, "post") + + val _Value = "value" def available(): Boolean = { api.nonEmpty @@ -40,21 +44,21 @@ case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: def start(msg: String): Unit = {} def finish(): Unit = {} - def result(rt: Long, result: Result): Unit = { - result match { - case ar: AccuracyResult => { - val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch)) - httpResult(dataMap) - } - case pr: ProfileResult => { - val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> pr.getTotal), ("matched" -> pr.getMatch)) - httpResult(dataMap) - } - case _ => { - info(s"result: ${result}") - } - } - } +// def result(rt: Long, result: Result): Unit = { +// result match { +// case ar: AccuracyResult => { +// val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> ar.getTotal), ("matched" -> ar.getMatch)) +// httpResult(dataMap) +// } +// case pr: ProfileResult => { +// val dataMap = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp), ("total" -> pr.getTotal), ("matched" -> pr.getMatch)) +// httpResult(dataMap) +// } +// case _ => { +// info(s"result: ${result}") +// } +// } +// } private def httpResult(dataMap: Map[String, Any]) = { try { @@ -77,12 +81,34 @@ case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: } - def records(recs: RDD[String], tp: String): Unit = {} - def records(recs: Iterable[String], tp: String): Unit = {} +// def records(recs: RDD[String], tp: String): Unit = {} +// def records(recs: Iterable[String], tp: String): Unit = {} // def missRecords(records: RDD[String]): Unit = {} // def matchRecords(records: RDD[String]): Unit = {} def log(rt: Long, msg: String): Unit = {} +// def persistRecords(df: DataFrame, name: String): Unit = {} + def persistRecords(records: Iterable[String], name: String): Unit = {} + +// def persistMetrics(metrics: Seq[String], name: String): Unit = { +// val maps = metrics.flatMap { m => +// try { +// Some(JsonUtil.toAnyMap(m) ++ Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp))) +// } catch { +// case e: Throwable => None +// } +// } +// maps.foreach { map => +// httpResult(map) +// } +// } + + def persistMetrics(metrics: Map[String, Any]): Unit = { + val head = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp)) + val result = head + (_Value -> metrics) + httpResult(result) + } + }
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala index 00d41ea..0cd6f6b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/LoggerPersist.scala @@ -21,98 +21,151 @@ package org.apache.griffin.measure.persist import java.util.Date import org.apache.griffin.measure.result._ -import org.apache.griffin.measure.utils.HdfsUtil +import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame +import org.apache.griffin.measure.utils.ParamUtil._ // persist result and data to hdfs case class LoggerPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { val MaxLogLines = "max.log.lines" - val maxLogLines = try { config.getOrElse(MaxLogLines, 100).toString.toInt } catch { case _ => 100 } + val maxLogLines = config.getInt(MaxLogLines, 100) def available(): Boolean = true def start(msg: String): Unit = { - println(s"[${timeStamp}] ${metricName} start") + println(s"[${timeStamp}] ${metricName} start: ${msg}") } def finish(): Unit = { println(s"[${timeStamp}] ${metricName} finish") } - def result(rt: Long, result: Result): Unit = { - try { - val resStr = result match { - case ar: AccuracyResult => { - s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}" - } - case pr: ProfileResult => { - s"match percentage: ${pr.matchPercentage}\ntotal count: ${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}" - } - case _ => { - s"result: ${result}" - } - } - println(s"[${timeStamp}] ${metricName} result: \n${resStr}") - } catch { - case e: Throwable => error(e.getMessage) - } +// def result(rt: Long, result: Result): Unit = { +// try { +// val resStr = result match { +// case ar: AccuracyResult => { +// s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}" +// } +// case pr: ProfileResult => { +// s"match percentage: ${pr.matchPercentage}\ntotal count: ${pr.getTotal}\nmiss count: ${pr.getMiss}, match count: ${pr.getMatch}" +// } +// case _ => { +// s"result: ${result}" +// } +// } +// println(s"[${timeStamp}] ${metricName} result: \n${resStr}") +// } catch { +// case e: Throwable => error(e.getMessage) +// } +// } +// +// // need to avoid string too long +// private def rddRecords(records: RDD[String]): Unit = { +// try { +// val recordCount = records.count.toInt +// val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) +// if (count > 0) { +// val recordsArray = records.take(count) +//// recordsArray.foreach(println) +// } +// } catch { +// case e: Throwable => error(e.getMessage) +// } +// } + +// private def iterableRecords(records: Iterable[String]): Unit = { +// try { +// val recordCount = records.size +// val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) +// if (count > 0) { +// val recordsArray = records.take(count) +//// recordsArray.foreach(println) +// } +// } catch { +// case e: Throwable => error(e.getMessage) +// } +// } + +// def records(recs: RDD[String], tp: String): Unit = { +// tp match { +// case PersistDataType.MISS => rddRecords(recs) +// case PersistDataType.MATCH => rddRecords(recs) +// case _ => {} +// } +// } +// +// def records(recs: Iterable[String], tp: String): Unit = { +// tp match { +// case PersistDataType.MISS => iterableRecords(recs) +// case PersistDataType.MATCH => iterableRecords(recs) +// case _ => {} +// } +// } + +// def missRecords(records: RDD[String]): Unit = { +// warn(s"[${timeStamp}] ${metricName} miss records: ") +// rddRecords(records) +// } +// def matchRecords(records: RDD[String]): Unit = { +// warn(s"[${timeStamp}] ${metricName} match records: ") +// rddRecords(records) +// } + + def log(rt: Long, msg: String): Unit = { + println(s"[${timeStamp}] ${rt}: ${msg}") } - // need to avoid string too long - private def rddRecords(records: RDD[String]): Unit = { - try { - val recordCount = records.count.toInt - val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) - if (count > 0) { - val recordsArray = records.take(count) +// def persistRecords(df: DataFrame, name: String): Unit = { +// val records = df.toJSON +// println(s"${name} [${timeStamp}] records: ") +// try { +// val recordCount = records.count.toInt +// val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) +// if (count > 0) { +// val recordsArray = records.take(count) // recordsArray.foreach(println) - } - } catch { - case e: Throwable => error(e.getMessage) - } - } +// } +// } catch { +// case e: Throwable => error(e.getMessage) +// } +// } - private def iterableRecords(records: Iterable[String]): Unit = { + def persistRecords(records: Iterable[String], name: String): Unit = { try { val recordCount = records.size val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) if (count > 0) { - val recordsArray = records.take(count) -// recordsArray.foreach(println) + records.foreach(println) } } catch { case e: Throwable => error(e.getMessage) } } - def records(recs: RDD[String], tp: String): Unit = { - tp match { - case PersistType.MISS => rddRecords(recs) - case PersistType.MATCH => rddRecords(recs) - case _ => {} - } - } - - def records(recs: Iterable[String], tp: String): Unit = { - tp match { - case PersistType.MISS => iterableRecords(recs) - case PersistType.MATCH => iterableRecords(recs) - case _ => {} - } - } - -// def missRecords(records: RDD[String]): Unit = { -// warn(s"[${timeStamp}] ${metricName} miss records: ") -// rddRecords(records) -// } -// def matchRecords(records: RDD[String]): Unit = { -// warn(s"[${timeStamp}] ${metricName} match records: ") -// rddRecords(records) +// def persistMetrics(metrics: Seq[String], name: String): Unit = { +// try { +// val recordCount = metrics.size +// val count = if (maxLogLines < 0) recordCount else scala.math.min(maxLogLines, recordCount) +// if (count > 0) { +// val recordsArray = metrics.take(count) +// recordsArray.foreach(println) +// } +// } catch { +// case e: Throwable => error(e.getMessage) +// } // } - def log(rt: Long, msg: String): Unit = { - println(s"[${timeStamp}] ${rt}: ${msg}") + def persistMetrics(metrics: Map[String, Any]): Unit = { + println(s"${metricName} [${timeStamp}] metrics: ") + val json = JsonUtil.toJson(metrics) + println(json) +// metrics.foreach { metric => +// val (key, value) = metric +// println(s"${key}: ${value}") +// } } + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala index 25c8b0b..0b7c98c 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala @@ -21,6 +21,7 @@ package org.apache.griffin.measure.persist import org.apache.griffin.measure.result._ import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame import scala.util.Try @@ -39,14 +40,19 @@ case class MultiPersists(persists: Iterable[Persist]) extends Persist { def start(msg: String): Unit = { persists.foreach(_.start(msg)) } def finish(): Unit = { persists.foreach(_.finish()) } - def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, result)) } - - def records(recs: RDD[String], tp: String): Unit = { persists.foreach(_.records(recs, tp)) } - def records(recs: Iterable[String], tp: String): Unit = { persists.foreach(_.records(recs, tp)) } +// def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, result)) } +// +// def records(recs: RDD[String], tp: String): Unit = { persists.foreach(_.records(recs, tp)) } +// def records(recs: Iterable[String], tp: String): Unit = { persists.foreach(_.records(recs, tp)) } // def missRecords(records: RDD[String]): Unit = { persists.foreach(_.missRecords(records)) } // def matchRecords(records: RDD[String]): Unit = { persists.foreach(_.matchRecords(records)) } def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) } +// def persistRecords(df: DataFrame, name: String): Unit = { persists.foreach(_.persistRecords(df, name)) } + def persistRecords(records: Iterable[String], name: String): Unit = { persists.foreach(_.persistRecords(records, name)) } +// def persistMetrics(metrics: Seq[String], name: String): Unit = { persists.foreach(_.persistMetrics(metrics, name)) } + def persistMetrics(metrics: Map[String, Any]): Unit = { persists.foreach(_.persistMetrics(metrics)) } + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala index 357d6e1..84316b3 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/OldHttpPersist.scala @@ -1,87 +1,87 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.persist - -import org.apache.griffin.measure.result._ -import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil} -import org.apache.spark.rdd.RDD - -// persist result by old http way -- temporary way -case class OldHttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { - - val Api = "api" - val Method = "method" - - val api = config.getOrElse(Api, "").toString - val method = config.getOrElse(Method, "post").toString - - def available(): Boolean = { - api.nonEmpty - } - - def start(msg: String): Unit = {} - def finish(): Unit = {} - - def result(rt: Long, result: Result): Unit = { - result match { - case ar: AccuracyResult => { - val matchPercentage: Double = if (ar.getTotal <= 0) 0 else (ar.getMatch * 1.0 / ar.getTotal) * 100 - val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> matchPercentage), ("count" -> ar.getTotal)) - httpResult(dataMap) - } - case pr: ProfileResult => { - val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> pr.getMatch), ("count" -> pr.getTotal)) - httpResult(dataMap) - } - case _ => { - info(s"result: ${result}") - } - } - } - - private def httpResult(dataMap: Map[String, Any]) = { - try { - val data = JsonUtil.toJson(dataMap) - // post - val params = Map[String, Object]() - val header = Map[String, Object](("content-type" -> "application/json")) - - def func(): Boolean = { - HttpUtil.httpRequest(api, method, params, header, data) - } - - PersistThreadPool.addTask(func _, 10) - -// val status = HttpUtil.httpRequest(api, method, params, header, data) -// info(s"${method} to ${api} response status: ${status}") - } catch { - case e: Throwable => error(e.getMessage) - } - - } - - def records(recs: RDD[String], tp: String): Unit = {} - def records(recs: Iterable[String], tp: String): Unit = {} - -// def missRecords(records: RDD[String]): Unit = {} -// def matchRecords(records: RDD[String]): Unit = {} - - def log(rt: Long, msg: String): Unit = {} - -} +///* +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. +//*/ +//package org.apache.griffin.measure.persist +// +//import org.apache.griffin.measure.result._ +//import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil} +//import org.apache.spark.rdd.RDD +// +//// persist result by old http way -- temporary way +//case class OldHttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { +// +// val Api = "api" +// val Method = "method" +// +// val api = config.getOrElse(Api, "").toString +// val method = config.getOrElse(Method, "post").toString +// +// def available(): Boolean = { +// api.nonEmpty +// } +// +// def start(msg: String): Unit = {} +// def finish(): Unit = {} +// +// def result(rt: Long, result: Result): Unit = { +// result match { +// case ar: AccuracyResult => { +// val matchPercentage: Double = if (ar.getTotal <= 0) 0 else (ar.getMatch * 1.0 / ar.getTotal) * 100 +// val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> matchPercentage), ("count" -> ar.getTotal)) +// httpResult(dataMap) +// } +// case pr: ProfileResult => { +// val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> pr.getMatch), ("count" -> pr.getTotal)) +// httpResult(dataMap) +// } +// case _ => { +// info(s"result: ${result}") +// } +// } +// } +// +// private def httpResult(dataMap: Map[String, Any]) = { +// try { +// val data = JsonUtil.toJson(dataMap) +// // post +// val params = Map[String, Object]() +// val header = Map[String, Object](("content-type" -> "application/json")) +// +// def func(): Boolean = { +// HttpUtil.httpRequest(api, method, params, header, data) +// } +// +// PersistThreadPool.addTask(func _, 10) +// +//// val status = HttpUtil.httpRequest(api, method, params, header, data) +//// info(s"${method} to ${api} response status: ${status}") +// } catch { +// case e: Throwable => error(e.getMessage) +// } +// +// } +// +// def records(recs: RDD[String], tp: String): Unit = {} +// def records(recs: Iterable[String], tp: String): Unit = {} +// +//// def missRecords(records: RDD[String]): Unit = {} +//// def matchRecords(records: RDD[String]): Unit = {} +// +// def log(rt: Long, msg: String): Unit = {} +// +//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala index bc16599..2884fa6 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala @@ -21,6 +21,7 @@ package org.apache.griffin.measure.persist import org.apache.griffin.measure.log.Loggable import org.apache.griffin.measure.result._ import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame import scala.util.Try @@ -35,18 +36,21 @@ trait Persist extends Loggable with Serializable { def start(msg: String): Unit def finish(): Unit - def result(rt: Long, result: Result): Unit + def log(rt: Long, msg: String): Unit - def records(recs: RDD[String], tp: String): Unit - def records(recs: Iterable[String], tp: String): Unit +// def result(rt: Long, result: Result): Unit +// +// def records(recs: RDD[String], tp: String): Unit +// def records(recs: Iterable[String], tp: String): Unit -// def missRecords(records: RDD[String]): Unit -// def matchRecords(records: RDD[String]): Unit +// def persistRecords(df: DataFrame, name: String): Unit + def persistRecords(records: Iterable[String], name: String): Unit +// def persistMetrics(metrics: Seq[String], name: String): Unit + def persistMetrics(metrics: Map[String, Any]): Unit - def log(rt: Long, msg: String): Unit } -object PersistType { - final val MISS = "miss" - final val MATCH = "match" -} \ No newline at end of file +//object PersistDataType { +// final val MISS = "miss" +// final val MATCH = "match" +//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala index 4330160..3a74343 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/persist/PersistFactory.scala @@ -27,7 +27,7 @@ case class PersistFactory(persistParams: Iterable[PersistParam], metricName: Str val HDFS_REGEX = """^(?i)hdfs$""".r val HTTP_REGEX = """^(?i)http$""".r - val OLDHTTP_REGEX = """^(?i)oldhttp$""".r +// val OLDHTTP_REGEX = """^(?i)oldhttp$""".r val LOG_REGEX = """^(?i)log$""".r def getPersists(timeStamp: Long): MultiPersists = { @@ -40,7 +40,7 @@ case class PersistFactory(persistParams: Iterable[PersistParam], metricName: Str val persistTry = persistParam.persistType match { case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp)) case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp)) - case OLDHTTP_REGEX() => Try(OldHttpPersist(config, metricName, timeStamp)) +// case OLDHTTP_REGEX() => Try(OldHttpPersist(config, metricName, timeStamp)) case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp)) case _ => throw new Exception("not supported persist type") } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala b/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala new file mode 100644 index 0000000..7f1b153 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/Algo.scala @@ -0,0 +1,34 @@ +///* +//Licensed to the Apache Software Foundation (ASF) under one +//or more contributor license agreements. See the NOTICE file +//distributed with this work for additional information +//regarding copyright ownership. The ASF licenses this file +//to you under the Apache License, Version 2.0 (the +//"License"); you may not use this file except in compliance +//with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, +//software distributed under the License is distributed on an +//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//KIND, either express or implied. See the License for the +//specific language governing permissions and limitations +//under the License. +//*/ +//package org.apache.griffin.measure.algo +// +//import org.apache.griffin.measure.config.params.env._ +//import org.apache.griffin.measure.config.params.user._ +//import org.apache.griffin.measure.log.Loggable +// +//import scala.util.Try +// +//trait Algo extends Loggable with Serializable { +// +// val envParam: EnvParam +// val userParam: UserParam +// +// def run(): Try[_] +// +//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala new file mode 100644 index 0000000..737a43f --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala @@ -0,0 +1,117 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process + +import java.util.Date + +import org.apache.griffin.measure.config.params._ +import org.apache.griffin.measure.config.params.env._ +import org.apache.griffin.measure.config.params.user._ +import org.apache.griffin.measure.data.source.DataSourceFactory +import org.apache.griffin.measure.persist.{Persist, PersistFactory} +import org.apache.griffin.measure.process.engine.{DqEngineFactory, SparkSqlEngine} +import org.apache.griffin.measure.rule.adaptor.{RuleAdaptorGroup, RunPhase} +import org.apache.griffin.measure.rule.udf.GriffinUdfs +import org.apache.griffin.measure.utils.JsonUtil +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.{SparkConf, SparkContext} + +import scala.util.Try + +case class BatchDqProcess(allParam: AllParam) extends DqProcess { + + val envParam: EnvParam = allParam.envParam + val userParam: UserParam = allParam.userParam + + val metricName = userParam.name + val sparkParam = envParam.sparkParam + + var sparkContext: SparkContext = _ + var sqlContext: SQLContext = _ + + def retriable: Boolean = false + + def init: Try[_] = Try { + val conf = new SparkConf().setAppName(metricName) + conf.setAll(sparkParam.config) + sparkContext = new SparkContext(conf) + sparkContext.setLogLevel(sparkParam.logLevel) + sqlContext = new HiveContext(sparkContext) + + // register udf + GriffinUdfs.register(sqlContext) + + // init adaptors + val dataSourceNames = userParam.dataSources.map(_.name) + RuleAdaptorGroup.init(sqlContext, dataSourceNames) + } + + def run: Try[_] = Try { + // start time + val startTime = new Date().getTime() + + // get persists to persist measure result + val persistFactory = PersistFactory(envParam.persistParams, metricName) + val persist: Persist = persistFactory.getPersists(startTime) + + // persist start id + val applicationId = sparkContext.applicationId + persist.start(applicationId) + + // get dq engines + val dqEngines = DqEngineFactory.genDqEngines(sqlContext) + + // generate data sources + val dataSources = DataSourceFactory.genDataSources(sqlContext, null, dqEngines, userParam.dataSources, metricName) + dataSources.foreach(_.init) + + // init data sources + dqEngines.loadData(dataSources, startTime) + + // generate rule steps + val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps(userParam.evaluateRuleParam, RunPhase) + + // run rules + dqEngines.runRuleSteps(ruleSteps) + + // persist results + val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory) + + val rdds = dqEngines.collectUpdateRDDs(ruleSteps, timeGroups) + rdds.foreach(_._2.cache()) + + dqEngines.persistAllRecords(rdds, persistFactory) +// dqEngines.persistAllRecords(ruleSteps, persistFactory, timeGroups) + + rdds.foreach(_._2.unpersist()) + + // end time + val endTime = new Date().getTime + persist.log(endTime, s"process using time: ${endTime - startTime} ms") + + // finish + persist.finish() + } + + def end: Try[_] = Try { + sparkContext.stop + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala new file mode 100644 index 0000000..50b04a8 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/DqProcess.scala @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process + +import org.apache.griffin.measure.config.params.env._ +import org.apache.griffin.measure.config.params.user._ +import org.apache.griffin.measure.log.Loggable + +import scala.util.Try + +trait DqProcess extends Loggable with Serializable { + + val envParam: EnvParam + val userParam: UserParam + + def init: Try[_] + + def run: Try[_] + + def end: Try[_] + + def retriable: Boolean + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala b/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala new file mode 100644 index 0000000..36f88e1 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/ProcessType.scala @@ -0,0 +1,47 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process + +import scala.util.matching.Regex + +sealed trait ProcessType { + val regex: Regex + val desc: String +} + +object ProcessType { + private val procTypes: List[ProcessType] = List(BatchProcessType, StreamingProcessType) + def apply(ptn: String): ProcessType = { + procTypes.filter(tp => ptn match { + case tp.regex() => true + case _ => false + }).headOption.getOrElse(BatchProcessType) + } + def unapply(pt: ProcessType): Option[String] = Some(pt.desc) +} + +final case object BatchProcessType extends ProcessType { + val regex = """^(?i)batch$""".r + val desc = "batch" +} + +final case object StreamingProcessType extends ProcessType { + val regex = """^(?i)streaming$""".r + val desc = "streaming" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala new file mode 100644 index 0000000..a567941 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala @@ -0,0 +1,157 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process + +import java.util.Date + +import org.apache.griffin.measure.cache.info.InfoCacheInstance +import org.apache.griffin.measure.config.params._ +import org.apache.griffin.measure.config.params.env._ +import org.apache.griffin.measure.config.params.user._ +import org.apache.griffin.measure.data.source.DataSourceFactory +import org.apache.griffin.measure.persist.{Persist, PersistFactory} +import org.apache.griffin.measure.process.engine.DqEngineFactory +import org.apache.griffin.measure.rule.adaptor.RuleAdaptorGroup +import org.apache.griffin.measure.rule.udf.GriffinUdfs +import org.apache.griffin.measure.utils.TimeUtil +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.streaming.{Milliseconds, StreamingContext} +import org.apache.spark.{SparkConf, SparkContext} + +import scala.util.Try + +case class StreamingDqProcess(allParam: AllParam) extends DqProcess { + + val envParam: EnvParam = allParam.envParam + val userParam: UserParam = allParam.userParam + + val metricName = userParam.name + val sparkParam = envParam.sparkParam + + var sparkContext: SparkContext = _ + var sqlContext: SQLContext = _ + + def retriable: Boolean = true + + def init: Try[_] = Try { + val conf = new SparkConf().setAppName(metricName) + conf.setAll(sparkParam.config) + sparkContext = new SparkContext(conf) + sparkContext.setLogLevel(sparkParam.logLevel) + sqlContext = new HiveContext(sparkContext) + + // init info cache instance + InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName) + InfoCacheInstance.init + + // register udf + GriffinUdfs.register(sqlContext) + + // init adaptors + val dataSourceNames = userParam.dataSources.map(_.name) + RuleAdaptorGroup.init(sqlContext, dataSourceNames) + } + + def run: Try[_] = Try { + val ssc = StreamingContext.getOrCreate(sparkParam.cpDir, () => { + try { + createStreamingContext + } catch { + case e: Throwable => { + error(s"create streaming context error: ${e.getMessage}") + throw e + } + } + }) + + // start time + val startTime = new Date().getTime() + + // get persists to persist measure result + val persistFactory = PersistFactory(envParam.persistParams, metricName) + val persist: Persist = persistFactory.getPersists(startTime) + + // persist start id + val applicationId = sparkContext.applicationId + persist.start(applicationId) + + // get dq engines + val dqEngines = DqEngineFactory.genDqEngines(sqlContext) + + // generate data sources + val dataSources = DataSourceFactory.genDataSources(sqlContext, ssc, dqEngines, userParam.dataSources, metricName) + dataSources.foreach(_.init) + + // process thread + val dqThread = StreamingDqThread(dqEngines, dataSources, userParam.evaluateRuleParam, persistFactory, persist) + + // init data sources +// dqEngines.loadData(dataSources) +// +// // generate rule steps +// val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps(userParam.evaluateRuleParam) +// +// // run rules +// dqEngines.runRuleSteps(ruleSteps) +// +// // persist results +// dqEngines.persistAllResults(ruleSteps, persist) + + // end time +// val endTime = new Date().getTime +// persist.log(endTime, s"process using time: ${endTime - startTime} ms") + + val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match { + case Some(interval) => interval + case _ => throw new Exception("invalid batch interval") + } + val process = TimingProcess(processInterval, dqThread) + process.startup() + + ssc.start() + ssc.awaitTermination() + ssc.stop(stopSparkContext=true, stopGracefully=true) + + // finish + persist.finish() + +// process.shutdown() + } + + def end: Try[_] = Try { + sparkContext.stop + + InfoCacheInstance.close + } + + def createStreamingContext: StreamingContext = { + val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match { + case Some(interval) => Milliseconds(interval) + case _ => throw new Exception("invalid batch interval") + } + val ssc = new StreamingContext(sparkContext, batchInterval) + ssc.checkpoint(sparkParam.cpDir) + + + + ssc + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala new file mode 100644 index 0000000..df1cc1b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqThread.scala @@ -0,0 +1,185 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process + +import java.util.Date +import java.util.concurrent.TimeUnit + +import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache} +import org.apache.griffin.measure.cache.result.CacheResultProcesser +import org.apache.griffin.measure.config.params.user.EvaluateRuleParam +import org.apache.griffin.measure.data.source.DataSource +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.persist.{Persist, PersistFactory} +import org.apache.griffin.measure.process.engine.DqEngines +import org.apache.griffin.measure.rule.adaptor.{RuleAdaptorGroup, RunPhase} + +case class StreamingDqThread(dqEngines: DqEngines, + dataSources: Seq[DataSource], + evaluateRuleParam: EvaluateRuleParam, + persistFactory: PersistFactory, + appPersist: Persist + ) extends Runnable with Loggable { + + val lock = InfoCacheInstance.genLock("process") + + def run(): Unit = { + val updateTimeDate = new Date() + val updateTime = updateTimeDate.getTime + println(s"===== [${updateTimeDate}] process begins =====") + val locked = lock.lock(5, TimeUnit.SECONDS) + if (locked) { + try { + + val st = new Date().getTime + appPersist.log(st, s"starting process ...") + + TimeInfoCache.startTimeInfoCache + + // init data sources + dqEngines.loadData(dataSources, st) + + // generate rule steps + val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps(evaluateRuleParam, RunPhase) + + // run rules + dqEngines.runRuleSteps(ruleSteps) + + val ct = new Date().getTime + val calculationTimeStr = s"calculation using time: ${ct - st} ms" + println(calculationTimeStr) + appPersist.log(ct, calculationTimeStr) + + // persist results + val timeGroups = dqEngines.persistAllMetrics(ruleSteps, persistFactory) + + val rt = new Date().getTime + val persistResultTimeStr = s"persist result using time: ${rt - ct} ms" + println(persistResultTimeStr) + appPersist.log(rt, persistResultTimeStr) + + val rdds = dqEngines.collectUpdateRDDs(ruleSteps, timeGroups) + rdds.foreach(_._2.cache()) + rdds.foreach { pr => + val (step, rdd) = pr + val cnt = rdd.count + println(s"step [${step.name}] group count: ${cnt}") + } + + val lt = new Date().getTime + val collectoRddTimeStr = s"collect records using time: ${lt - rt} ms" + println(collectoRddTimeStr) + appPersist.log(lt, collectoRddTimeStr) + + // persist records + dqEngines.persistAllRecords(rdds, persistFactory) +// dqEngines.persistAllRecords(ruleSteps, persistFactory, timeGroups) + + // update data source + dqEngines.updateDataSources(rdds, dataSources) +// dqEngines.updateDataSources(ruleSteps, dataSources, timeGroups) + + rdds.foreach(_._2.unpersist()) + + TimeInfoCache.endTimeInfoCache + + // clean old data + cleanData + + val et = new Date().getTime + val persistTimeStr = s"persist records using time: ${et - lt} ms" + println(persistTimeStr) + appPersist.log(et, persistTimeStr) + + } catch { + case e: Throwable => error(s"process error: ${e.getMessage}") + } finally { + lock.unlock() + } + } else { + println(s"===== [${updateTimeDate}] process ignores =====") + } + val endTime = new Date().getTime + println(s"===== [${updateTimeDate}] process ends, using ${endTime - updateTime} ms =====") + } + + // clean old data and old result cache + private def cleanData(): Unit = { + try { + dataSources.foreach(_.cleanOldData) + dataSources.foreach(_.dropTable) + + val cleanTime = TimeInfoCache.getCleanTime + CacheResultProcesser.refresh(cleanTime) + } catch { + case e: Throwable => error(s"clean data error: ${e.getMessage}") + } + } + +// // calculate accuracy between source data and target data +// private def accuracy(sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))], +// targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))], +// ruleAnalyzer: RuleAnalyzer) = { +// // 1. cogroup +// val allKvs = sourceData.cogroup(targetData) +// +// // 2. accuracy calculation +// val (accuResult, missingRdd, matchedRdd) = AccuracyCore.accuracy(allKvs, ruleAnalyzer) +// +// (accuResult, missingRdd, matchedRdd) +// } +// +// private def reorgByTimeGroup(rdd: RDD[(Product, (Map[String, Any], Map[String, Any]))] +// ): RDD[(Long, (Product, (Map[String, Any], Map[String, Any])))] = { +// rdd.flatMap { row => +// val (key, (value, info)) = row +// val b: Option[(Long, (Product, (Map[String, Any], Map[String, Any])))] = info.get(TimeStampInfo.key) match { +// case Some(t: Long) => Some((t, row)) +// case _ => None +// } +// b +// } +// } +// +// // convert data into a string +// def record2String(rec: (Product, (Map[String, Any], Map[String, Any])), dataPersist: Iterable[Expr], infoPersist: Iterable[Expr]): String = { +// val (key, (data, info)) = rec +// val persistData = getPersistMap(data, dataPersist) +// val persistInfo = info.mapValues { value => +// value match { +// case vd: Map[String, Any] => getPersistMap(vd, infoPersist) +// case v => v +// } +// }.map(identity) +// s"${persistData} [${persistInfo}]" +// } +// +// // get the expr value map of the persist expressions +// private def getPersistMap(data: Map[String, Any], persist: Iterable[Expr]): Map[String, Any] = { +// val persistMap = persist.map(e => (e._id, e.desc)).toMap +// data.flatMap { pair => +// val (k, v) = pair +// persistMap.get(k) match { +// case Some(d) => Some((d -> v)) +// case _ => None +// } +// } +// } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala new file mode 100644 index 0000000..8d9bcb2 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/TimingProcess.scala @@ -0,0 +1,46 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process + +import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} +import java.util.{Timer, TimerTask} + +case class TimingProcess(interval: Long, runnable: Runnable) { + + val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor] + + val timer = new Timer("process", true) + + val timerTask = new TimerTask() { + override def run(): Unit = { + pool.submit(runnable) + } + } + + def startup(): Unit = { + timer.schedule(timerTask, interval, interval) + } + + def shutdown(): Unit = { + timer.cancel() + pool.shutdown() + pool.awaitTermination(10, TimeUnit.SECONDS) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala b/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala new file mode 100644 index 0000000..91855c2 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/check/DataChecker.scala @@ -0,0 +1,29 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process.check + +import org.apache.spark.sql.SQLContext + +case class DataChecker(sqlContext: SQLContext) { + + def existDataSourceName(name: String): Boolean = { + sqlContext.tableNames.exists(_ == name) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala new file mode 100644 index 0000000..b409b8d --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala @@ -0,0 +1,165 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process.engine + +import java.util.Date + +import org.apache.griffin.measure.cache.result.CacheResultProcesser +import org.apache.griffin.measure.config.params.user.DataSourceParam +import org.apache.griffin.measure.data.connector.GroupByColumn +import org.apache.griffin.measure.data.source.{DataSource, DataSourceFactory} +import org.apache.griffin.measure.persist.{Persist, PersistFactory} +import org.apache.griffin.measure.result.AccuracyResult +import org.apache.griffin.measure.rule.dsl._ +import org.apache.griffin.measure.rule.step._ +import org.apache.griffin.measure.utils.JsonUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.streaming.StreamingContext + +case class DataFrameOprEngine(sqlContext: SQLContext) extends SparkDqEngine { + + def runRuleStep(ruleStep: ConcreteRuleStep): Boolean = { + ruleStep match { + case DfOprStep(name, rule, details, _, _) => { + try { + rule match { + case DataFrameOprs._fromJson => { + val df = DataFrameOprs.fromJson(sqlContext, details) + df.registerTempTable(name) + } + case DataFrameOprs._accuracy => { + val df = DataFrameOprs.accuracy(sqlContext, details) + df.registerTempTable(name) + } + case DataFrameOprs._clear => { + val df = DataFrameOprs.clear(sqlContext, details) + df.registerTempTable(name) + } + case _ => { + throw new Exception(s"df opr [ ${rule} ] not supported") + } + } + true + } catch { + case e: Throwable => { + error(s"run df opr [ ${rule} ] error: ${e.getMessage}") + false + } + } + } + case _ => false + } + } + +} + +object DataFrameOprs { + + final val _fromJson = "from_json" + final val _accuracy = "accuracy" + final val _clear = "clear" + + def fromJson(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = { + val _dfName = "df.name" + val _colName = "col.name" + val dfName = details.getOrElse(_dfName, "").toString + val colNameOpt = details.get(_colName).map(_.toString) + + val df = sqlContext.table(s"`${dfName}`") + val rdd = colNameOpt match { + case Some(colName: String) => df.map(_.getAs[String](colName)) + case _ => df.map(_.getAs[String](0)) + } + sqlContext.read.json(rdd) + } + + def accuracy(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = { + val _dfName = "df.name" + val _miss = "miss" + val _total = "total" + val _matched = "matched" + val _tmst = "tmst" + val dfName = details.getOrElse(_dfName, _dfName).toString + val miss = details.getOrElse(_miss, _miss).toString + val total = details.getOrElse(_total, _total).toString + val matched = details.getOrElse(_matched, _matched).toString + val tmst = details.getOrElse(_tmst, _tmst).toString + + val updateTime = new Date().getTime + + def getLong(r: Row, k: String): Long = { + try { + r.getAs[Long](k) + } catch { + case e: Throwable => 0L + } + } + + val df = sqlContext.table(s"`${dfName}`") + val results = df.flatMap { row => + val t = getLong(row, tmst) + if (t > 0) { + val missCount = getLong(row, miss) + val totalCount = getLong(row, total) + val ar = AccuracyResult(missCount, totalCount) + Some((t, ar)) + } else None + }.collect + + val updateResults = results.flatMap { pair => + val (t, result) = pair + val updatedCacheResultOpt = CacheResultProcesser.genUpdateCacheResult(t, updateTime, result) + updatedCacheResultOpt + } + + // update + updateResults.foreach { r => + CacheResultProcesser.update(r) + } + + val schema = StructType(Array( + StructField(tmst, LongType), + StructField(miss, LongType), + StructField(total, LongType), + StructField(matched, LongType) + )) + val rows = updateResults.map { r => + val ar = r.result.asInstanceOf[AccuracyResult] + Row(r.timeGroup, ar.miss, ar.total, ar.getMatch) + } + val rowRdd = sqlContext.sparkContext.parallelize(rows) + sqlContext.createDataFrame(rowRdd, schema) + + } + + def clear(sqlContext: SQLContext, details: Map[String, Any]): DataFrame = { + val _dfName = "df.name" + val dfName = details.getOrElse(_dfName, "").toString + + val df = sqlContext.table(s"`${dfName}`") + val emptyRdd = sqlContext.sparkContext.emptyRDD[Row] + sqlContext.createDataFrame(emptyRdd, df.schema) + } + +} + + + http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala new file mode 100644 index 0000000..84d5917 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala @@ -0,0 +1,41 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process.engine + +import org.apache.griffin.measure.config.params.user.DataSourceParam +import org.apache.griffin.measure.data.source.DataSource +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.persist.{Persist, PersistFactory} +import org.apache.griffin.measure.rule.dsl._ +import org.apache.griffin.measure.rule.step._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame + +trait DqEngine extends Loggable with Serializable { + + def runRuleStep(ruleStep: ConcreteRuleStep): Boolean + + def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]] + +// def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] +// +// def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] + + def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala new file mode 100644 index 0000000..e075584 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngineFactory.scala @@ -0,0 +1,47 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process.engine + +import org.apache.spark.sql.SQLContext +import org.apache.spark.streaming.StreamingContext + + +object DqEngineFactory { + + private val engineTypes = List("spark-sql", "df-opr") + + private final val SparkSqlEngineType = "spark-sql" + private final val DataFrameOprEngineType = "df-opr" + + def genDqEngines(sqlContext: SQLContext): DqEngines = { + val engines = engineTypes.flatMap { et => + genDqEngine(et, sqlContext) + } + DqEngines(engines) + } + + private def genDqEngine(engineType: String, sqlContext: SQLContext): Option[DqEngine] = { + engineType match { + case SparkSqlEngineType => Some(SparkSqlEngine(sqlContext)) + case DataFrameOprEngineType => Some(DataFrameOprEngine(sqlContext)) + case _ => None + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala new file mode 100644 index 0000000..1bafa15 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala @@ -0,0 +1,208 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process.engine + +import org.apache.griffin.measure.config.params.user.DataSourceParam +import org.apache.griffin.measure.data.connector.GroupByColumn +import org.apache.griffin.measure.data.source._ +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.persist.{Persist, PersistFactory} +import org.apache.griffin.measure.rule.dsl._ +import org.apache.griffin.measure.rule.step._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame + +case class DqEngines(engines: Seq[DqEngine]) extends DqEngine { + + val persistOrder: List[PersistType] = List(MetricPersistType, RecordPersistType) + + def loadData(dataSources: Seq[DataSource], ms: Long): Unit = { + dataSources.foreach { ds => + ds.loadData(ms) + } + } + + def runRuleSteps(ruleSteps: Seq[ConcreteRuleStep]): Unit = { + ruleSteps.foreach { ruleStep => + runRuleStep(ruleStep) + } + } + + def persistAllMetrics(ruleSteps: Seq[ConcreteRuleStep], persistFactory: PersistFactory + ): Iterable[Long] = { + val metricSteps = ruleSteps.filter(_.persistType == MetricPersistType) + val allMetrics: Map[Long, Map[String, Any]] = { + metricSteps.foldLeft(Map[Long, Map[String, Any]]()) { (ret, step) => + val metrics = collectMetrics(step) + metrics.foldLeft(ret) { (total, pair) => + val (k, v) = pair + total.get(k) match { + case Some(map) => total + (k -> (map ++ v)) + case _ => total + pair + } + } + } + } + val updateTimeGroups = allMetrics.keys + allMetrics.foreach { pair => + val (t, metric) = pair + val persist = persistFactory.getPersists(t) + persist.persistMetrics(metric) + } + updateTimeGroups + } + +// def persistAllRecords(ruleSteps: Seq[ConcreteRuleStep], persistFactory: PersistFactory, +// timeGroups: Iterable[Long]): Unit = { +// val recordSteps = ruleSteps.filter(_.persistType == RecordPersistType) +// recordSteps.foreach { step => +// collectRecords(step, timeGroups) match { +// case Some(rdd) => { +// val name = step.name +// rdd.foreach { pair => +// val (t, items) = pair +// val persist = persistFactory.getPersists(t) +// persist.persistRecords(items, name) +// } +// } +// case _ => { +// println(s"empty records to persist") +// } +// } +// } +// } +// +// def updateDataSources(ruleSteps: Seq[ConcreteRuleStep], dataSources: Seq[DataSource], +// timeGroups: Iterable[Long]): Unit = { +// val updateSteps = ruleSteps.filter(_.updateDataSource.nonEmpty) +// updateSteps.foreach { step => +// collectUpdateCacheDatas(step, timeGroups) match { +// case Some(rdd) => { +// val udpateDataSources = dataSources.filter { ds => +// step.updateDataSource match { +// case Some(dsName) if (dsName == ds.name) => true +// case _ => false +// } +// } +// if (udpateDataSources.size > 0) { +// val name = step.name +// rdd.foreach { pair => +// val (t, items) = pair +// udpateDataSources.foreach { ds => +// ds.dataSourceCacheOpt.foreach(_.updateData(items, t)) +// } +// } +// } +// } +// case _ => { +// println(s"empty data source to update") +// } +// } +// } +// } + + /////////////////////////// + + def runRuleStep(ruleStep: ConcreteRuleStep): Boolean = { + val ret = engines.foldLeft(false) { (done, engine) => + done || engine.runRuleStep(ruleStep) + } + if (!ret) warn(s"run rule step warn: no dq engine support ${ruleStep}") + ret + } + + /////////////////////////// + +// def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = { +// engines.flatMap { engine => +// engine.collectRecords(ruleStep, timeGroups) +// }.headOption +// } +// def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = { +// engines.flatMap { engine => +// engine.collectUpdateCacheDatas(ruleStep, timeGroups) +// }.headOption +// } + def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]] = { + val ret = engines.foldLeft(Map[Long, Map[String, Any]]()) { (ret, engine) => + ret ++ engine.collectMetrics(ruleStep) + } +// if (ret.isEmpty) warn(s"collect metrics warn: no metrics collected for ${ruleStep}") + ret + } + + def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long] + ): Option[RDD[(Long, Iterable[String])]] = { + engines.flatMap { engine => + engine.collectUpdateRDD(ruleStep, timeGroups) + }.headOption + } + + //////////////////////////// + + def collectUpdateRDDs(ruleSteps: Seq[ConcreteRuleStep], timeGroups: Iterable[Long] + ): Seq[(ConcreteRuleStep, RDD[(Long, Iterable[String])])] = { + ruleSteps.flatMap { rs => + collectUpdateRDD(rs, timeGroups) match { + case Some(rdd) => Some((rs, rdd)) + case _ => None + } + } + } + + def persistAllRecords(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, Iterable[String])])], + persistFactory: PersistFactory): Unit = { + stepRdds.foreach { stepRdd => + val (step, rdd) = stepRdd + if (step.persistType == RecordPersistType) { + val name = step.name + rdd.foreach { pair => + val (t, items) = pair + val persist = persistFactory.getPersists(t) + persist.persistRecords(items, name) + } + } + } + } + + def updateDataSources(stepRdds: Seq[(ConcreteRuleStep, RDD[(Long, Iterable[String])])], + dataSources: Seq[DataSource]): Unit = { + stepRdds.foreach { stepRdd => + val (step, rdd) = stepRdd + if (step.updateDataSource.nonEmpty) { + val udpateDataSources = dataSources.filter { ds => + step.updateDataSource match { + case Some(dsName) if (dsName == ds.name) => true + case _ => false + } + } + if (udpateDataSources.size > 0) { + val name = step.name + rdd.foreach { pair => + val (t, items) = pair + udpateDataSources.foreach { ds => + ds.dataSourceCacheOpt.foreach(_.updateData(items, t)) + } + } + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala new file mode 100644 index 0000000..ee994fd --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala @@ -0,0 +1,167 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process.engine + +import org.apache.griffin.measure.data.connector.GroupByColumn +import org.apache.griffin.measure.log.Loggable +import org.apache.griffin.measure.rule.dsl.{MetricPersistType, RecordPersistType} +import org.apache.griffin.measure.rule.step._ +import org.apache.griffin.measure.utils.JsonUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SQLContext} + +trait SparkDqEngine extends DqEngine { + + val sqlContext: SQLContext + + def collectMetrics(ruleStep: ConcreteRuleStep): Map[Long, Map[String, Any]] = { + val emptyMap = Map[String, Any]() + ruleStep match { + case step: ConcreteRuleStep if (step.persistType == MetricPersistType) => { + val name = step.name + try { + val pdf = sqlContext.table(s"`${name}`") + val records = pdf.toJSON.collect() + + val pairs = records.flatMap { rec => + try { + val value = JsonUtil.toAnyMap(rec) + value.get(GroupByColumn.tmst) match { + case Some(t) => { + val key = t.toString.toLong + Some((key, value)) + } + case _ => None + } + } catch { + case e: Throwable => None + } + } + val groupedPairs = pairs.foldLeft(Map[Long, Seq[Map[String, Any]]]()) { (ret, pair) => + val (k, v) = pair + ret.get(k) match { + case Some(seq) => ret + (k -> (seq :+ v)) + case _ => ret + (k -> (v :: Nil)) + } + } + groupedPairs.mapValues { vs => + if (vs.size > 1) { + Map[String, Any]((name -> vs)) + } else { + vs.headOption.getOrElse(emptyMap) + } + } + } catch { + case e: Throwable => { + error(s"collect metrics ${name} error: ${e.getMessage}") + Map[Long, Map[String, Any]]() + } + } + } + case _ => Map[Long, Map[String, Any]]() + } + } + + def collectUpdateRDD(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long] + ): Option[RDD[(Long, Iterable[String])]] = { + ruleStep match { + case step: ConcreteRuleStep if ((step.persistType == RecordPersistType) + || (step.updateDataSource.nonEmpty)) => { + val name = step.name + try { + val pdf = sqlContext.table(s"`${name}`") + val cols = pdf.columns + val rdd = pdf.flatMap { row => + val values = cols.flatMap { col => + Some((col, row.getAs[Any](col))) + }.toMap + values.get(GroupByColumn.tmst) match { + case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, JsonUtil.toJson(values))) + case _ => None + } + }.groupByKey() + Some(rdd) + } catch { + case e: Throwable => { + error(s"collect records ${name} error: ${e.getMessage}") + None + } + } + } + case _ => None + } + } + +// def collectRecords(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = { +// ruleStep match { +// case step: ConcreteRuleStep if (step.persistType == RecordPersistType) => { +// val name = step.name +// try { +// val pdf = sqlContext.table(s"`${name}`") +// val cols = pdf.columns +// val rdd = pdf.flatMap { row => +// val values = cols.flatMap { col => +// Some((col, row.getAs[Any](col))) +// }.toMap +// values.get(GroupByColumn.tmst) match { +// case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, JsonUtil.toJson(values))) +// case _ => None +// } +// }.groupByKey() +// Some(rdd) +// } catch { +// case e: Throwable => { +// error(s"collect records ${name} error: ${e.getMessage}") +// None +// } +// } +// } +// case _ => None +// } +// } +// +// def collectUpdateCacheDatas(ruleStep: ConcreteRuleStep, timeGroups: Iterable[Long]): Option[RDD[(Long, Iterable[String])]] = { +// ruleStep match { +// case step: ConcreteRuleStep if (step.updateDataSource.nonEmpty) => { +// val name = step.name +// try { +// val pdf = sqlContext.table(s"`${name}`") +// val cols = pdf.columns +// val rdd = pdf.flatMap { row => +// val values = cols.flatMap { col => +// Some((col, row.getAs[Any](col))) +// }.toMap +// values.get(GroupByColumn.tmst) match { +// case Some(t: Long) if (timeGroups.exists(_ == t)) => Some((t, JsonUtil.toJson(values))) +// case _ => None +// } +// }.groupByKey() +// Some(rdd) +// } catch { +// case e: Throwable => { +// error(s"collect update cache datas ${name} error: ${e.getMessage}") +// None +// } +// } +// } +// case _ => None +// } +// } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala new file mode 100644 index 0000000..6ed0559 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkRowFormatter.scala @@ -0,0 +1,62 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process.engine + +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{ArrayType, DataType, StructField, StructType} + +import scala.collection.mutable.ArrayBuffer + +object SparkRowFormatter { + + def formatRow(row: Row): Map[String, Any] = { + formatRowWithSchema(row, row.schema) + } + + private def formatRowWithSchema(row: Row, schema: StructType): Map[String, Any] = { + formatStruct(schema.fields, row) + } + + private def formatStruct(schema: Seq[StructField], r: Row) = { + val paired = schema.zip(r.toSeq) + paired.foldLeft(Map[String, Any]())((s, p) => s ++ formatItem(p)) + } + + private def formatItem(p: Pair[StructField, Any]): Map[String, Any] = { + p match { + case (sf, a) => + sf.dataType match { + case ArrayType(et, _) => + Map(sf.name -> (if (a == null) a else formatArray(et, a.asInstanceOf[ArrayBuffer[Any]]))) + case StructType(s) => + Map(sf.name -> (if (a == null) a else formatStruct(s, a.asInstanceOf[Row]))) + case _ => Map(sf.name -> a) + } + } + } + + private def formatArray(et: DataType, arr: ArrayBuffer[Any]): Seq[Any] = { + et match { + case StructType(s) => arr.map(e => formatStruct(s, e.asInstanceOf[Row])) + case ArrayType(t, _) => + arr.map(e => formatArray(t, e.asInstanceOf[ArrayBuffer[Any]])) + case _ => arr + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala new file mode 100644 index 0000000..15df3b5 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala @@ -0,0 +1,58 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.process.engine + +import java.util.Date + +import org.apache.griffin.measure.config.params.user.DataSourceParam +import org.apache.griffin.measure.data.connector.GroupByColumn +import org.apache.griffin.measure.data.source._ +import org.apache.griffin.measure.persist.{Persist, PersistFactory} +import org.apache.griffin.measure.rule.dsl._ +import org.apache.griffin.measure.rule.step._ +import org.apache.griffin.measure.utils.JsonUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, GroupedData, SQLContext} +import org.apache.spark.streaming.StreamingContext + +case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine { + + def runRuleStep(ruleStep: ConcreteRuleStep): Boolean = { + ruleStep match { + case SparkSqlStep(name, rule, _, _, _) => { + try { + val rdf = sqlContext.sql(rule) + rdf.registerTempTable(name) + true + } catch { + case e: Throwable => { + error(s"run spark sql [ ${rule} ] error: ${e.getMessage}") + false + } + } + } + case _ => false + } + } + +} + + + +
