http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SchemaFieldParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SchemaFieldParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SchemaFieldParam.scala new file mode 100644 index 0000000..56bdc40 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SchemaFieldParam.scala @@ -0,0 +1,20 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class SchemaFieldParam( @JsonProperty("name") name: String, + @JsonProperty("type") fieldType: String, + @JsonProperty("default.value") defaultValue: String, + @JsonProperty("extract.steps") extractSteps: List[String] + ) extends ConfigParam { + + override def equals(x: Any): Boolean = { + if (x.isInstanceOf[SchemaFieldParam]) { + val s = x.asInstanceOf[SchemaFieldParam] + s.name == this.name + } else false + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SparkParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SparkParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SparkParam.scala new file mode 100644 index 0000000..e460fb6 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/SparkParam.scala @@ -0,0 +1,15 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class SparkParam( @JsonProperty("app.name") appName: String, + @JsonProperty("log.level") logLevel: String, + @JsonProperty("config") config: Map[String, String], + @JsonProperty("streaming.checkpoint.dir") cpDir: String, + @JsonProperty("streaming.batch.interval.seconds") batchInterval: Int, + @JsonProperty("streaming.sample.interval.seconds") sampleInterval: Int + ) extends ConfigParam { + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TimeRangeParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TimeRangeParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TimeRangeParam.scala new file mode 100644 index 0000000..6dfcd22 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TimeRangeParam.scala @@ -0,0 +1,12 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class TimeRangeParam( @JsonProperty("begin") begin: Int, + @JsonProperty("end") end: Int, + @JsonProperty("unit") unit: String + ) extends ConfigParam { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TypeConfigParam.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TypeConfigParam.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TypeConfigParam.scala new file mode 100644 index 0000000..a78163b --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/config/params/TypeConfigParam.scala @@ -0,0 +1,11 @@ +package org.apache.griffin.config.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +@JsonInclude(Include.NON_NULL) +case class TypeConfigParam( @JsonProperty("dq") dqType: String, + @JsonProperty("dataAsset") dataAssetTypeMap: Map[String, String] + ) extends ConfigParam { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapper.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapper.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapper.scala new file mode 100644 index 0000000..54ebef3 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapper.scala @@ -0,0 +1,21 @@ +package org.apache.griffin.dump + +case class DumpWrapper(infos: Iterable[DumpWrapperInfo]) extends Serializable { + + def wrapSchema(): List[(String, String)] = { + infos.map { info => + (info.key, info.tp) + }.toList + } + + def wrapValues[T](data: Map[String, T]): List[Option[T]] = { + infos.map { i => + data.get(i.key) + }.toList + } + + def wrapKeys(): List[String] = { + infos.map(_.key).toList + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapperInfo.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapperInfo.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapperInfo.scala new file mode 100644 index 0000000..a974d54 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/DumpWrapperInfo.scala @@ -0,0 +1,32 @@ +package org.apache.griffin.dump + +sealed trait DumpWrapperInfo { + type T + val key: String + val tp: String + def wrap(value: T) = (key -> value) +} + +final case object TimeGroupInfo extends DumpWrapperInfo { + type T = Long + val key = "__time__" + val tp = "bigint" +} + +final case object NextFireTimeInfo extends DumpWrapperInfo { + type T = Long + val key = "__next_fire_time__" + val tp = "bigint" +} + +final case object MismatchInfo extends DumpWrapperInfo { + type T = String + val key = "__mismatch__" + val tp = "string" +} + +final case object ErrorInfo extends DumpWrapperInfo { + type T = String + val key = "__error__" + val tp = "string" +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/HdfsFileDumpUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/HdfsFileDumpUtil.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/HdfsFileDumpUtil.scala new file mode 100644 index 0000000..c76e621 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/dump/HdfsFileDumpUtil.scala @@ -0,0 +1,37 @@ +package org.apache.griffin.dump + +import org.apache.griffin.utils.HdfsUtil +import org.apache.spark.rdd.RDD + +object HdfsFileDumpUtil { + + val sepCount = 5000 + + private def suffix(i: Long): String = { + if (i == 0) "" else s".${i}" + } + + def splitRdd[T](rdd: RDD[T])(implicit m: Manifest[T]): RDD[(Long, Iterable[T])] = { + val indexRdd = rdd.zipWithIndex + indexRdd.map(p => ((p._2 / sepCount), p._1)).groupByKey() + } + + private def directDump(path: String, list: Iterable[String], lineSep: String): Unit = { + // collect and save + val strRecords = list.mkString(lineSep) + // save into hdfs + HdfsUtil.writeContent(path, strRecords) + } + + def dump(path: String, recordsRdd: RDD[String], lineSep: String): Unit = { + val groupedRdd = splitRdd(recordsRdd) + + // dump + groupedRdd.foreach { pair => + val (idx, list) = pair + val filePath = path + suffix(idx) + directDump(filePath, list, lineSep) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/DataParser.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/DataParser.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/DataParser.scala new file mode 100644 index 0000000..da2b239 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/DataParser.scala @@ -0,0 +1,10 @@ +package org.apache.griffin.prep.parse + +trait DataParser extends Serializable { + + type In + type Out + + def parse(data: In): Seq[Out] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/Json2MapParser.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/Json2MapParser.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/Json2MapParser.scala new file mode 100644 index 0000000..643d0f0 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/prep/parse/Json2MapParser.scala @@ -0,0 +1,59 @@ +package org.apache.griffin.prep.parse + +import org.apache.griffin.config.params._ +import org.apache.griffin.utils.{DataTypeUtil, ExtractJsonUtil} + +import scala.collection.mutable.MutableList +import scala.util.{Failure, Success, Try} + + +case class Json2MapParser(param: ParseConfigParam) extends DataParser { + + type In = String + type Out = Map[String, Any] + + def parse(data: In): Seq[Out] = { + val datas = Some(data) :: Nil + + val schema = param.schema + val stepsMap = schema.map { field => + (field.name, field.extractSteps) + }.toMap + + val resultList: List[Map[String, Option[_]]] = ExtractJsonUtil.extractDataListWithSchemaMap(datas, stepsMap) + + resultList.map { mp => + mp.map { pair => + val (name, opt) = pair + val field = findFieldByName(name, schema) + val value = field match { + case Some(f) => parseValue(opt, f) + case _ => opt.getOrElse(null) + } + (name, value) + } + } + } + + private def findFieldByName(name: String, fields: List[SchemaFieldParam]): Option[SchemaFieldParam] = { + fields.filter(_.name == name) match { + case Nil => None + case head :: _ => Some(head) + } + } + + private def parseValue(valueOpt: Option[_], fieldSchema: SchemaFieldParam): Any = { + valueOpt match { + case Some(v) => { + val convertFunc = DataTypeUtil.str2ConvertFunc(fieldSchema.fieldType) + convertFunc(v) + } + case _ => { + val defValue = fieldSchema.defaultValue + val convertFunc = DataTypeUtil.str2StrValConvertFunc(fieldSchema.fieldType) + convertFunc(defValue) + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/HdfsRecorder.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/HdfsRecorder.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/HdfsRecorder.scala new file mode 100644 index 0000000..210d5b4 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/HdfsRecorder.scala @@ -0,0 +1,87 @@ +package org.apache.griffin.record + +import java.util.Date + +import org.apache.griffin.record.result.AccuResult +import org.apache.griffin.utils.HdfsUtil + +case class HdfsRecorder(recPath: String, recMetricName: String, recTime: Long) extends Recorder(recTime) { + + val startFile = "_START" + val outFile = "_RESULT" + val finalOutFile = "_RESULT_FINAL" + val errorFile = "_ERROR" + val infoFile = "_INFO" + val finishFile = "_FINISHED" + val missingRecFile = "_missingRec" + val delayFile = "_delayResult" + val nullResponseFile = "_nullResponseResult" + val timeFile = "_time" + + val updateResultFile = "_RESULT_UPDATE" + val updateMissingRecFile = "_missingRec_UPDATE" + val updateTimeFile = "_time_UPDATE" + + val maxCountOfStrings = 5000 + + private def timeHead(rt: Long): String = { + val dt = new Date(rt) + s"--- ${dt} ---\n" + } + + def start(): Unit = { + val startFilePath = getFilePath(recPath, s"${recMetricName}/${getTime}/${startFile}") + HdfsUtil.createEmptyFile(startFilePath) + } + + def finish(): Unit = { + HdfsUtil.createEmptyFile(getFilePath(recPath, s"${recMetricName}/${getTime}/${finishFile}")) + } + + def error(rt: Long, msg: String): Unit = { + val outStr = timeHead(rt) + s"${msg}\n" + HdfsUtil.appendContent(getFilePath(recPath, s"${recMetricName}/${getTime}/${errorFile}"), outStr) + } + + def info(rt: Long, msg: String): Unit = { + val outStr = timeHead(rt) + s"${msg}\n" + HdfsUtil.appendContent(getFilePath(recPath, s"${recMetricName}/${getTime}/${infoFile}"), outStr) + } + + def accuracyResult(rt: Long, res: AccuResult): Unit = { + val matchPercentage: Double = if (res.totalCount <= 0) 0 else (1 - res.missCount.toDouble / res.totalCount) * 100 + + val outStr = timeHead(rt) + s"match percentage: ${matchPercentage}\nmiss count: ${res.missCount}\ntotal count: ${res.totalCount}\n" + + // output + HdfsUtil.appendContent(getFilePath(recPath, s"${recMetricName}/${getTime}/${outFile}"), outStr) + HdfsUtil.writeContent(getFilePath(recPath, s"${recMetricName}/${getTime}/${finalOutFile}"), outStr) + } + + def accuracyMissingRecords(records: Iterable[AnyRef]): Unit = { + val filePath = getFilePath(recPath, s"${recMetricName}/${getTime}/${missingRecFile}") + val groupCount = (records.size - 1) / maxCountOfStrings + 1 + val groupedRecords = records.grouped(groupCount).zipWithIndex + groupedRecords.foreach { pair => + val (rcs, idx) = pair + val recFunc = if (idx == 0) HdfsUtil.writeContent _ else HdfsUtil.appendContent _ + recFunc(filePath, rcs.mkString("\n")) + } + } + + def recordTime(rt: Long, calcTime: Long): Unit = { + val outStr = timeHead(rt) + s"${calcTime}\n" + HdfsUtil.appendContent(getFilePath(recPath, s"${recMetricName}/${getTime}/${timeFile}"), outStr) + } + +// override def recordUpdateTime(calcTime: Long): Unit = { +// val timeFilePath = getFilePath(recPath, s"${recMetricName}/${getTime}/${updateTimeFile}") +// HdfsUtil.appendContent(timeFilePath, s"${recTime}\n${calcTime}\n\n") +// } + + protected def getFilePath(parentPath: String, fileName: String): String = { + val sep = "/" + if (parentPath.endsWith(sep)) parentPath + fileName else parentPath + sep + fileName + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/NullRecorder.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/NullRecorder.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/NullRecorder.scala new file mode 100644 index 0000000..b99de27 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/NullRecorder.scala @@ -0,0 +1,20 @@ +package org.apache.griffin.record + +import org.apache.griffin.record.result.AccuResult + +case class NullRecorder() extends Recorder(0L) { + + def start(): Unit = {} + def finish(): Unit = {} + def error(rt: Long, msg: String): Unit = {} + def info(rt: Long, msg: String): Unit = {} + + def accuracyResult(rt: Long, res: AccuResult): Unit = {} + def accuracyMissingRecords(records: Iterable[AnyRef]): Unit = {} + +// def delayResult(res: (Long, Iterable[AnyRef])): Unit = {} +// def nullResponseResult(res: (Long, Iterable[AnyRef])): Unit = {} + + def recordTime(rt: Long, calcTime: Long): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/PostRecorder.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/PostRecorder.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/PostRecorder.scala new file mode 100644 index 0000000..9489349 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/PostRecorder.scala @@ -0,0 +1,46 @@ +package org.apache.griffin.record + +import org.apache.griffin.utils.{RestfulUtil, JsonUtil} + +import org.apache.griffin.record.result.AccuResult + +//case class PostRecorder(url: String, path: String, metricName: String, t: Long) extends HdfsRecorder(path, metricName, t) { +case class PostRecorder(url: String, metricName: String, recTime: Long) extends Recorder(recTime) { + + val api = "api/v1/metrics" + val urlPath = getUrlPath(url, api) + + def accuracyResult(rt: Long, res: AccuResult): Unit = { + val matchPercentage: Double = if (res.totalCount <= 0) 0 else (1 - res.missCount.toDouble / res.totalCount) * 100 + + def getDataStr(): String = { + val dataMap: Map[String, Any] = Map[String, Any](("metricName" -> metricName), ("timestamp" -> getTime), ("value" -> matchPercentage)) + JsonUtil.toJson(dataMap) + } + + // post + val params = Map[String, Object]() + val header = Map[String, Object](("content-type" -> "application/json")) + val data = getDataStr + val status = RestfulUtil.postData(urlPath, params, header, data) + println(s"post response status: ${status}") + } + + def accuracyMissingRecords(records: Iterable[AnyRef]): Unit = { + ; + } + + private def getUrlPath(urlPath: String, apiPath: String): String = { + val sep = "/" + if (urlPath.endsWith(sep)) urlPath + apiPath else urlPath + sep + apiPath + } + + def start(): Unit = {} + def finish(): Unit = {} + def error(rt: Long, msg: String): Unit = {} + def info(rt: Long, msg: String): Unit = {} +// def delayResult(res: (Long, Iterable[AnyRef])): Unit = {} +// def nullResponseResult(res: (Long, Iterable[AnyRef])): Unit = {} + def recordTime(rt: Long, calcTime: Long): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/Recorder.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/Recorder.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/Recorder.scala new file mode 100644 index 0000000..657e958 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/Recorder.scala @@ -0,0 +1,19 @@ +package org.apache.griffin.record + +import org.apache.griffin.record.result.AccuResult + + +abstract class Recorder(time: Long) extends Serializable { + + def getTime(): Long = time + + def start(): Unit + def finish(): Unit + def error(rt: Long, msg: String): Unit + def info(rt: Long, msg: String): Unit + + def accuracyResult(rt: Long, res: AccuResult): Unit + def accuracyMissingRecords(records: Iterable[AnyRef]): Unit + + def recordTime(rt: Long, calcTime: Long): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/RecorderFactory.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/RecorderFactory.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/RecorderFactory.scala new file mode 100644 index 0000000..4a47998 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/RecorderFactory.scala @@ -0,0 +1,48 @@ +package org.apache.griffin.record + +import org.apache.griffin.config.params.RecorderParam + + +case class RecorderFactory(recorderParam: RecorderParam) extends Serializable { + + val HDFS_REGEX = """^(?i)hdfs$""".r + val POST_REGEX = """^(?i)post$""".r + + val metricName = recorderParam.metricName + val configMap = recorderParam.config + + val HDFS_DIR = "hdfs.dir" + val POST_URL = "post.url" + val POST_METRIC_NAME = "post.metric.name" + + def getRecorders(time: Long): Iterable[Recorder] = { + val recorderTypes = recorderParam.types + recorderTypes.map(tp => getRecorder(time, tp)) + } + + private def getRecorder(time: Long, recorderType: String): Recorder = { + val recorder = recorderType match { + case HDFS_REGEX() => { + configMap.get(HDFS_DIR) match { + case Some(path) => HdfsRecorder(path, metricName, time) + case _ => NullRecorder() + } + } + case POST_REGEX() => { + configMap.get(POST_URL) match { + case Some(url) => { + val mtrName = configMap.get(POST_METRIC_NAME) match { + case Some(s) => s + case _ => metricName + } + PostRecorder(url, mtrName, time) + } + case _ => NullRecorder() + } + } + case _ => NullRecorder() + } + recorder + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/CleanerFactory.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/CleanerFactory.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/CleanerFactory.scala new file mode 100644 index 0000000..6051fdc --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/CleanerFactory.scala @@ -0,0 +1,31 @@ +//package org.apache.griffin.record.cleaner +// +//import com.ebay.griffin.config.params.{AppParam, DataParam} +// +//class CleanerFactory(appParam: AppParam, dataParam: DataParam) { +// +// var cleaner: HdfsCleaner = null +// +// def cleanerStart(): Unit = { +// val recorderType = appParam.getParam(appParam.RecorderType).toString +// if (cleaner == null) { +// cleaner = recorderType match { +// case appParam.RecorderHdfs => { +// new HdfsCleaner(appParam, dataParam) +// } +// case appParam.RecorderPost => { +// new HdfsCleaner(appParam, dataParam) +// } +// case _ => null +// } +// cleaner.start +// } +// } +// +// def cleanerEnd(): Unit = { +// if (cleaner != null) { +// cleaner.end +// } +// } +// +//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/HdfsCleaner.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/HdfsCleaner.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/HdfsCleaner.scala new file mode 100644 index 0000000..7407dfe --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/cleaner/HdfsCleaner.scala @@ -0,0 +1,78 @@ +//package org.apache.griffin.record.cleaner +// +//import java.io.{BufferedReader, InputStreamReader} +//import java.util.{Date, Timer, TimerTask} +// +//import com.ebay.griffin.config.params.{AppParam, DataParam} +// +// +//class HdfsCleaner(appParam: AppParam, dataParam: DataParam) { +// +// val expireDefSec: Long = 604800 +// val periodDefSec: Long = 7200 +// +// val timer = new Timer +// +// def start(): Unit = { +// val path = appParam.getParam(appParam.RecorderHdfsDir).toString +// val metricName = dataParam.getParam(dataParam.MetricName).toString +// val dirPath = getFilePath(path, metricName) +// +// val expireSec = if (appParam.containsParam(appParam.RecorderCleanExpireSeconds)) { +// appParam.getParam(appParam.RecorderCleanExpireSeconds).toString.toLong +// } else expireDefSec +// val periodSec = if (appParam.containsParam(appParam.RecorderCleanPeriodSeconds)) { +// appParam.getParam(appParam.RecorderCleanPeriodSeconds).toString.toLong +// } else periodDefSec +// +// val expireMs = expireSec * 1000 +// val periodMs = periodSec * 1000 +// +// timer.scheduleAtFixedRate(new TimerTask() { +// override def run(): Unit = { +// val now = new Date().getTime +// clean(dirPath, now - expireMs) +// } +// }, periodMs, periodMs) +// } +// +// def end(): Unit = { +// timer.cancel +// } +// +// private def clean(dirPath: String, expireLineMs: Long): Unit = { +// val process = Runtime.getRuntime.exec("hadoop fs -ls " + dirPath) +// val result = process.waitFor() +// if (result == 0) { +// val reader = new BufferedReader(new InputStreamReader(process.getInputStream)) +// var line = reader.readLine +// while (line != null) { +// val index = line.indexOf("/") +// if (index >= 0) { +// val dir = line.substring(index) +// val tail = dir.split("/").last +// val t = try { +// tail.toLong +// } catch { +// case _ => Long.MaxValue +// } +// +// if (t < expireLineMs) { // expire +// val remove = Runtime.getRuntime().exec("hadoop fs -rm -R " + dir) +// val removeResult = remove.waitFor() +// if (removeResult == 0) { +// println(s"===== hadoop fs -rm -R ${dir} =====") +// } +// } +// } +// line = reader.readLine +// } +// } +// } +// +// protected def getFilePath(parentPath: String, fileName: String): String = { +// val sep = "/" +// if (parentPath.endsWith(sep)) parentPath + fileName else parentPath + sep + fileName +// } +// +//} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/AccuResult.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/AccuResult.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/AccuResult.scala new file mode 100644 index 0000000..25f4d9e --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/AccuResult.scala @@ -0,0 +1,18 @@ +package org.apache.griffin.record.result + +case class AccuResult(missCount: Long, totalCount: Long) extends Result { + + type T = AccuResult + + def updateResult(delta: T): T = { + AccuResult(delta.missCount, totalCount) + } + + def needCalc(): Boolean = { + this.missCount > 0 + } + + def differsFrom(other: T): Boolean = { + (this.missCount != other.missCount) || (this.totalCount != other.totalCount) + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/Result.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/Result.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/Result.scala new file mode 100644 index 0000000..7d73677 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/record/result/Result.scala @@ -0,0 +1,12 @@ +package org.apache.griffin.record.result + +trait Result extends Serializable { + + type T <: Result + + def updateResult(delta: T): T + + def needCalc(): Boolean + + def differsFrom(other: T): Boolean +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/DataTypeUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/DataTypeUtil.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/DataTypeUtil.scala new file mode 100644 index 0000000..9ad128d --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/DataTypeUtil.scala @@ -0,0 +1,179 @@ +package org.apache.griffin.utils + +import java.net.URLDecoder + +import org.apache.avro.util.Utf8 + +object DataTypeUtil { + + object DataTypeEnum { + val NULL_TYPE = "null" + val INT_TYPE = "int" + val SHORT_TYPE = "short" + val LONG_TYPE = "long" + val BYTE_TYPE = "byte" + val FLOAT_TYPE = "float" + val DOUBLE_TYPE = "double" + val STRING_TYPE = "string" + val BOOLEAN_TYPE = "boolean" + val UTF8_TYPE = "utf8" + val ISO88591_TYPE = "iso-8859-1" + + val NULL_REGEX = """^[Nn]ull(?:Type)?$""".r + val INT_REGEX = """^[Ii]nt(?:eger)?(?:Type)?$""".r + val SHORT_REGEX = """^[Ss]hort(?:Type)?[Ss]mall(?:[Ii]nt)?$""".r + val LONG_REGEX = """^[Ll]ong(?:Type)?|[Bb]ig(?:[Ii]nt)?$""".r + val BYTE_REGEX = """^[Bb]yte(?:Type)?|[Tt]iny(?:[Ii]nt)?$""".r + val FLOAT_REGEX = """^[Ff]loat(?:Type)?$""".r + val DOUBLE_REGEX = """^[Dd]ouble(?:Type)?$""".r + val STRING_REGEX = """^[Ss]tr(?:ing)?(?:Type)?|[Vv]ar(?:[Cc]har)?|[Cc]har$""".r + val BOOLEAN_REGEX = """^[Bb]ool(?:ean)?(?:Type)?|[Bb]inary$""".r + val UTF8_REGEX = """^[Uu][Tt][Ff](?:-)?8(?:Type)?$""".r + val ISO88591_REGEX = """^[Ii][Ss][Oo](?:-)?8859(?:-)?1(?:Type)?$""".r + } + import DataTypeEnum._ + + object DataTypeTuple { + // (type name, type name pattern, type convert function, string value convert function) + final val nullType = (NULL_TYPE, NULL_REGEX, convertNull _, StringConverter.str2Null _) + final val intType = (INT_TYPE, INT_REGEX, convertType[Int] _, StringConverter.str2Int _) + final val shortType = (SHORT_TYPE, SHORT_REGEX, convertType[Short] _, StringConverter.str2Short _) + final val longType = (LONG_TYPE, LONG_REGEX, convertType[Long] _, StringConverter.str2Long _) + final val byteType = (BYTE_TYPE, BYTE_REGEX, convertType[Byte] _, StringConverter.str2Byte _) + final val floatType = (FLOAT_TYPE, FLOAT_REGEX, convertType[Float] _, StringConverter.str2Float _) + final val doubleType = (DOUBLE_TYPE, DOUBLE_REGEX, convertType[Double] _, StringConverter.str2Double _) + final val stringType = (STRING_TYPE, STRING_REGEX, convertType[String] _, StringConverter.str2String _) + final val booleanType = (BOOLEAN_TYPE, BOOLEAN_REGEX, convertType[Boolean] _, StringConverter.str2Boolean _) + final val utf8Type = (UTF8_TYPE, UTF8_REGEX, convertUtf8 _, StringConverter.str2Utf8 _) + final val iso88591Type = (ISO88591_TYPE, ISO88591_REGEX, convertIso88591 _, StringConverter.str2Iso88591 _) + } + import DataTypeTuple._ + + private def convertType[T](x: Any): T = { x.asInstanceOf[T] } + private def convertUtf8(x: Any): String = { x.asInstanceOf[Utf8].toString } + private def convertIso88591(x: Any): String = { + val str = new String(x.toString.getBytes("ISO-8859-1"), "UTF-8") + val str1 = x.asInstanceOf[Utf8].toString + println(s"=== ${x.getClass.getSimpleName} === ${str} === ${str1} ===") + str + } + private def convertNull(x: Any): Null = { null } + private def convertDecodeString(x: Any): String = { + val str = decodePrepare(convertType[String](x)) + URLDecoder.decode(str, "UTF-8") + } + private def convertDecodeUtf8(x: Any): String = { + val str = decodePrepare(convertUtf8(x)) + URLDecoder.decode(str, "UTF-8") + } + private def convertDecodeIso88591(x: Any): String = { + val str = decodePrepare(convertIso88591(x)) + URLDecoder.decode(str, "UTF-8") + } + + def str2ConvertFunc(tp: String, decode: Boolean = false): (Any) => _ = { + tp match { + case nullType._2() => nullType._3 + case intType._2() => intType._3 + case shortType._2() => shortType._3 + case longType._2() => longType._3 + case byteType._2() => byteType._3 + case floatType._2() => floatType._3 + case doubleType._2() => doubleType._3 + case stringType._2() => if (decode) convertDecodeString _ else stringType._3 + case booleanType._2() => booleanType._3 + case utf8Type._2() => if (decode) convertDecodeUtf8 _ else utf8Type._3 + case iso88591Type._2() => if (decode) convertDecodeIso88591 _ else iso88591Type._3 + case _ => stringType._3 + } + } + + def str2StrValConvertFunc(tp: String, decode: Boolean = false): (String) => _ = { + tp match { + case nullType._2() => nullType._4 + case intType._2() => intType._4 + case shortType._2() => shortType._4 + case longType._2() => longType._4 + case byteType._2() => byteType._4 + case floatType._2() => floatType._4 + case doubleType._2() => doubleType._4 + case stringType._2() => if (decode) StringConverter.str2DecodeString _ else stringType._4 + case booleanType._2() => booleanType._4 + case utf8Type._2() => if (decode) StringConverter.str2DecodeUtf8 _ else utf8Type._4 + case iso88591Type._2() => if (decode) StringConverter.str2DecodeIso88591 _ else iso88591Type._4 + case _ => stringType._4 + } + } + + object StringConverter { + def str2Null(s: String): Null = null + def str2Int(s: String): Int = s.toInt + def str2Short(s: String): Short = s.toShort + def str2Long(s: String): Long = s.toLong + def str2Byte(s: String): Byte = s.toByte + def str2Float(s: String): Float = s.toFloat + def str2Double(s: String): Double = s.toDouble + def str2String(s: String): String = s + def str2DecodeString(s: String): String = { + val str = decodePrepare(str2String(s)) + URLDecoder.decode(str, "UTF-8") + } + def str2Boolean(s: String): Boolean = s.toBoolean + def str2Utf8(s: String): String = { new Utf8(s).toString } + def str2DecodeUtf8(s: String): String = { + val str = decodePrepare(str2Utf8(s)) + URLDecoder.decode(str, "UTF-8") + } + def str2Iso88591(s: String): String = { new String(s.getBytes("ISO-8859-1"), "UTF-8") } + def str2DecodeIso88591(s: String): String = { + val str = decodePrepare(str2Iso88591(s)) + URLDecoder.decode(str, "UTF-8") + } + } + + private def decodePrepare(s: String): String = { + s.replaceAll("%(?![0-9a-fA-F]{2})", "%25") + } + +// object ParamTypeTuple { +// +// final val intParamType = (INT_TYPE, INT_REGEX, getInt _, getIntList _) +// final val shortParamType = (SHORT_TYPE, SHORT_REGEX, getInt _, getIntList _) +// final val longParamType = (LONG_TYPE, LONG_REGEX, getLong _, getLongList _) +// final val byteParamType = (BYTE_TYPE, BYTE_REGEX, getInt _, getIntList _) +// final val floatParamType = (FLOAT_TYPE, FLOAT_REGEX, getDouble _, getDoubleList _) +// final val doubleParamType = (DOUBLE_TYPE, DOUBLE_REGEX, getDouble _, getDoubleList _) +// final val stringParamType = (STRING_TYPE, STRING_REGEX, getString _, getStringList _) +// final val booleanParamType = (BOOLEAN_TYPE, BOOLEAN_REGEX, getBoolean _, getBooleanList _) +// } + +// import com.typesafe.config.Config +// +// def str2GetValueFunc(tp: String): (Config => String => _) = { +// tp match { +// case intParamType._2() => intParamType._3 +// case shortParamType._2() => shortParamType._3 +// case longParamType._2() => longParamType._3 +// case byteParamType._2() => byteParamType._3 +// case floatParamType._2() => floatParamType._3 +// case doubleParamType._2() => doubleParamType._3 +// case stringParamType._2() => stringParamType._3 +// case booleanParamType._2() => booleanParamType._3 +// case _ => stringType._3 +// } +// } +// +// def str2GetValueListFunc(tp: String): (Config => String => List[_]) = { +// tp match { +// case intParamType._2() => intParamType._4 +// case shortParamType._2() => shortParamType._4 +// case longParamType._2() => longParamType._4 +// case byteParamType._2() => byteParamType._4 +// case floatParamType._2() => floatParamType._4 +// case doubleParamType._2() => doubleParamType._4 +// case stringParamType._2() => stringParamType._4 +// case booleanParamType._2() => booleanParamType._4 +// case _ => stringType._4 +// } +// } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ExtractJsonUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ExtractJsonUtil.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ExtractJsonUtil.scala new file mode 100644 index 0000000..bd95d40 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ExtractJsonUtil.scala @@ -0,0 +1,205 @@ +package org.apache.griffin.utils + +import scala.util.{Failure, Success, Try} + + +object ExtractJsonUtil { + + val JSON_REGEX = """^(?i)json$""".r + val COUNT_REGEX = """^(?i)count$""".r + val MAP_REGEX = """^\.(.+)$""".r + val LIST_REGEX = """^\[(\d+)\]$""".r + val ALL_LIST_REGEX = """^\[\*\]$""".r + val FILTER_LIST_REGEX = """^\[\.(.+)=(.+)\]$""".r + + def extract(data: Option[Any], steps: List[String]): Option[Any] = { + Try { + steps match { + case Nil => data + case step :: tailSteps => { + step match { + case JSON_REGEX() => { + data match { +// case Some(line: String) => extract(Some(JsonUtil.toMap[Any](line)), tailSteps) + case Some(line: String) => extract(Some(JsonUtil.toAnyMap(line)), tailSteps) + case _ => { + throw new Exception(s"error: ${step} should be for string") + } + } + } + case MAP_REGEX(key) => { + data match { + case Some(map: Map[String, Any]) => extract(map.get(key), tailSteps) + case _ => { + throw new Exception(s"error: ${step} should be for map") + } + } + } + case LIST_REGEX(index) => { + data match { + case Some(list: List[Any]) => extract(list.lift(index.toInt), tailSteps) + case _ => { + throw new Exception(s"error: ${step} should be for list") + } + } + } + case COUNT_REGEX() => { + data match { + case Some(list: List[Any]) => Some(list.size) + case Some(map: Map[String, Any]) => Some(map.size) + case _ => { + throw new Exception(s"error: ${step} should be for list or map") + } + } + } + case _ => { + throw new Exception(s"error: ${step} undefined") + } + } + } + } + } match { + case Success(res) => res + case Failure(ex) => { + println(s"extract failure: ${ex.getMessage}") + None + } + } + + } + + def stepHead(path: List[String], step: String): List[String] = { + path :+ step + } + + def extractList(pathDatas: List[(List[String], Option[_])], steps: List[String]): List[(List[String], Option[_])] = { + Try { + steps match { + case Nil => pathDatas + case step :: tailSteps => { + pathDatas.flatMap { pathData => + val (path, data) = pathData + step match { + case JSON_REGEX() => { + data match { +// case Some(line: String) => extractList((stepHead(path, step), Some(JsonUtil.toMap[Any](line))) :: Nil, tailSteps) + case Some(line: String) => extractList((stepHead(path, step), Some(JsonUtil.toAnyMap(line))) :: Nil, tailSteps) + case _ => { +// extractList((stepHead(path, step), None) :: Nil, tailSteps) + throw new Exception(s"error: ${step} should be for string") + } + } + } + case MAP_REGEX(key) => { + data match { + case Some(map: Map[String, Any]) => extractList((stepHead(path, step), map.get(key)) :: Nil, tailSteps) + case _ => { +// extractList((stepHead(path, step), None) :: Nil, tailSteps) + throw new Exception(s"error: ${step} should be for map") + } + } + } + case LIST_REGEX(index) => { + data match { + case Some(list: List[Any]) => extractList((stepHead(path, step), list.lift(index.toInt)) :: Nil, tailSteps) + case _ => { +// extractList((stepHead(path, step), None) :: Nil, tailSteps) + throw new Exception(s"error: ${step} should be for list") + } + } + } + case ALL_LIST_REGEX() => { + data match { + case Some(list: List[Any]) => { + val idxes = (0 to (list.size - 1)).toList + val newList = idxes.map { idx => + val newStep = s"${step}_${idx}" + (stepHead(path, newStep), list.lift(idx)) + } + extractList(newList, tailSteps) + } + case _ => { +// extractList((stepHead(path, step), None) :: Nil, tailSteps) + throw new Exception(s"error: ${step} should be for list") + } + } + } + case FILTER_LIST_REGEX(key, value) => { + data match { + case Some(list: List[Any]) => { + val filteredList = list.filter { item => + item match { + case mp: Map[String, Any] => { + mp.get(key) match { + case Some(v) => v == value + case _ => false + } + } + case _ => false + } + } + val idxes = (0 to (filteredList.size - 1)).toList + val newList = idxes.map { idx => + val newStep = s"${step}_${idx}" + (stepHead(path, newStep), filteredList.lift(idx)) + } + extractList(newList, tailSteps) + } + case _ => { +// extractList((stepHead(path, step), None) :: Nil, tailSteps) + throw new Exception(s"error: ${step} should be for list") + } + } + } + case COUNT_REGEX() => { + data match { + case Some(list: List[Any]) => (stepHead(path, step), Some(list.size)) :: Nil + case Some(map: Map[String, Any]) => (stepHead(path, step), Some(map.size)) :: Nil + case _ => { +// extractList((stepHead(path, step), None) :: Nil, tailSteps) + throw new Exception(s"error: ${step} should be for list or map") + } + } + } + case _ => { +// extractList((stepHead(path, step), None) :: Nil, tailSteps) + throw new Exception(s"error: ${step} undefined") + } + } + } + } + } + } match { + case Success(res) => res + case Failure(ex) => { + println(s"extract failure: ${ex.getMessage}") + Nil + } + } + + } + + def extractPathDataListWithSchemaMap(pathDatas: List[(List[String], Option[_])], stepsMap: Map[String, List[String]]): List[Map[String, Option[_]]] = { + val schemaValues: Map[String, List[(List[String], Option[_])]] = stepsMap.mapValues { steps => + extractList(pathDatas, steps) + } + + ValueListCombineUtil.cartesian(schemaValues) + } + + def extractDataListWithSchemaMap(datas: List[Option[_]], stepsMap: Map[String, List[String]]): List[Map[String, Option[_]]] = { + val pathDatas = datas.map((Nil, _)) + extractPathDataListWithSchemaMap(pathDatas, stepsMap) + } + +// def extractDataListWithSchemaMapTest(datas: List[Option[_]], stepsMap: Map[String, List[String]]): Unit = { +// val pathDatas = datas.map((Nil, _)) +// +// val schemaValues: Map[String, List[(List[String], Option[_])]] = stepsMap.mapValues { steps => +// extractList(pathDatas, steps) +// } +// +// ValueListCombineUtil.cartesianTest(schemaValues) +// } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/HdfsUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/HdfsUtil.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/HdfsUtil.scala new file mode 100644 index 0000000..70a8004 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/HdfsUtil.scala @@ -0,0 +1,57 @@ +package org.apache.griffin.utils + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} + +object HdfsUtil { + + private val seprator = "/" + + private val conf = new Configuration() + conf.set("dfs.support.append", "true") + + private val dfs = FileSystem.get(conf) + + def createFile(filePath: String): FSDataOutputStream = { + val path = new Path(filePath) + if (dfs.exists(path)) dfs.delete(path, true) + return dfs.create(path) + } + + def appendOrCreateFile(filePath: String): FSDataOutputStream = { + val path = new Path(filePath) + if (dfs.exists(path)) dfs.append(path) else createFile(filePath) + } + + def openFile(filePath: String): FSDataInputStream = { + val path = new Path(filePath) + dfs.open(path) + } + + def writeContent(filePath: String, message: String): Unit = { + val out = createFile(filePath) + out.write(message.getBytes("utf-8")) + out.close + } + + def appendContent(filePath: String, message: String): Unit = { + val out = appendOrCreateFile(filePath) + out.write(message.getBytes("utf-8")) + out.close + } + + def createEmptyFile(filePath: String): Unit = { + val out = createFile(filePath) + out.close + } + + + def getHdfsFilePath(parentPath: String, fileName: String): String = { + if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + seprator + fileName + } + + def deleteHdfsPath(dirPath: String): Unit = { + val path = new Path(dirPath) + if (dfs.exists(path)) dfs.delete(path, true) + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/JsonUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/JsonUtil.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/JsonUtil.scala new file mode 100644 index 0000000..d56b467 --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/JsonUtil.scala @@ -0,0 +1,33 @@ +package org.apache.griffin.utils + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.griffin.config.params.AllParam + +object JsonUtil { + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + + def toJson(value: Map[Symbol, Any]): String = { + toJson(value map { case (k,v) => k.name -> v}) + } + + def toJson(value: Any): String = { + mapper.writeValueAsString(value) + } + +// def toMap[V](json:String)(implicit m: Manifest[V]) = fromJson[Map[String,V]](json) +// +// def fromJson[T](json: String)(implicit m : Manifest[T]): T = { +// mapper.readValue(json, classOf[T]) +// } + + def toAnyMap(json: String) = { + mapper.readValue(json, classOf[Map[String, Any]]) + } + + def fromJson2AllParam(json: String) = { + mapper.readValue(json, classOf[AllParam]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/RestfulUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/RestfulUtil.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/RestfulUtil.scala new file mode 100644 index 0000000..a233aad --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/RestfulUtil.scala @@ -0,0 +1,16 @@ +package org.apache.griffin.utils + +import scalaj.http._ + +object RestfulUtil { + + def postData(url: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = { + val response = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString + response.code.toString + } + + private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, String] = { + map.map(pair => pair._1 -> pair._2.asInstanceOf[String]) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ValueListCombineUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ValueListCombineUtil.scala b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ValueListCombineUtil.scala new file mode 100644 index 0000000..3a428cf --- /dev/null +++ b/griffin-streaming-model/src/main/scala/org/apache/griffin/utils/ValueListCombineUtil.scala @@ -0,0 +1,198 @@ +package org.apache.griffin.utils + +import scala.collection.mutable.MutableList + +object ValueListCombineUtil { + + // Map[String, List[(List[String], T)]]: Map[key, List[(path, value)]] + def cartesian[T](valuesMap: Map[String, List[(List[String], T)]]): List[Map[String, T]] = { + val fieldsList: List[(String, List[(List[String], T)])] = valuesMap.toList + + // wrong algorithm: assume the lists have same size +// val minSize = fieldsList.map(_._2.size).min +// val idxes = (0 to (minSize - 1)).toList +// idxes.map { idx => +// fieldsList.foldLeft(Map[String, T]()) { (map, pair) => +// val (key, value) = pair +// map + (key -> value(idx)._2) +// } +// } + + // following is correct algorithm + + // List[key, List[(path, value)]] to List[(path, (key, value))] + val valueList: List[(List[String], (String, T))] = fieldsList.flatMap { fields => + val (key, list) = fields + list.map { pv => + val (path, value) = pv + (path, (key, value)) + } + } + + // 1. generate tree from value list, and return root node + val root = TreeUtil.genRootTree(valueList) + + // 2. deep first visit tree from root, merge datas into value map list + val valueMapList: List[Map[String, _]] = TreeUtil.mergeDatasIntoMap(root, Nil) + + // 3. simple change + valueMapList.map { mp => + mp.map { kv => + val (k, v) = kv + (k, v.asInstanceOf[T]) + } + } + + } + +// def cartesianTest[T](valuesMap: Map[String, List[(List[String], T)]]): Unit = { +// val fieldsList: List[(String, List[(List[String], T)])] = valuesMap.toList +// +// // List[key, List[(path, value)]] to List[(path, (key, value))] +// val valueList: List[(List[String], (String, T))] = fieldsList.flatMap { fields => +// val (key, list) = fields +// list.map { pv => +// val (path, value) = pv +// (path, (key, value)) +// } +// } +// +// // 1. generate tree from value list, and return root node +// val root = TreeUtil.genRootTree(valueList) +// +// // 2. deep first visit tree from root, merge datas into value map list +// val valueMapList: List[Map[String, _]] = TreeUtil.mergeDatasIntoMap(root, Nil) +// +// valueMapList.foreach(println) +// +// } + + + case class TreeNode(key: String, var datas: List[(String, _)]) { + var children = List[TreeNode]() + def addChild(node: TreeNode): Unit = children = children :+ node + def mergeSelf(node: TreeNode): Unit = datas = datas ::: node.datas + } + + object TreeUtil { + private def genTree(path: List[String], datas: List[(String, _)]): Option[TreeNode] = { + path match { + case Nil => None +// case head :: Nil => Some(TreeNode(datas)) + case head :: tail => { + genTree(tail, datas) match { + case Some(child) => { + val curNode = TreeNode(head, Nil) + curNode.addChild(child) + Some(curNode) + } + case _ => Some(TreeNode(head, datas)) + } + } + } + } + + private def mergeTrees(trees: List[TreeNode], newTreeOpt: Option[TreeNode]): List[TreeNode] = { + newTreeOpt match { + case Some(newTree) => { + trees.find(tree => tree.key == newTree.key) match { + case Some(tree) => { + // children merge + for (child <- newTree.children) { + tree.children = mergeTrees(tree.children, Some(child)) + } + // self data merge + tree.mergeSelf(newTree) + trees + } + case _ => trees :+ newTree + } + } + case _ => trees + } + } + + private def root(): TreeNode = TreeNode("", Nil) + + def genRootTree(values: List[(List[String], (String, _))]): TreeNode = { + val rootNode = root() + val nodeOpts = values.map(value => genTree(value._1, value._2 :: Nil)) + rootNode.children = nodeOpts.foldLeft(List[TreeNode]()) { (trees, treeOpt) => + mergeTrees(trees, treeOpt) + } + rootNode + } + + private def add(mapList1: List[Map[String, _]], mapList2: List[Map[String, _]]): List[Map[String, _]] = { + mapList1 ::: mapList2 + } + private def multiply(mapList1: List[Map[String, _]], mapList2: List[Map[String, _]]): List[Map[String, _]] = { + mapList1.flatMap { map1 => + mapList2.map { map2 => + map1 ++ map2 + } + } + } + + private def keysList(mapList: List[Map[String, _]]): List[String] = { + val keySet = mapList match { + case Nil => Set[String]() + case head :: _ => head.keySet + } + keySet.toList + } + + def mergeDatasIntoMap(root: TreeNode, mapDatas: List[Map[String, _]]): List[Map[String, _]] = { + val childrenKeysMapDatas = root.children.foldLeft(Map[List[String], List[Map[String, _]]]()) { (keysMap, child) => + val childMdts = mergeDatasIntoMap(child, List[Map[String, _]]()) + val keys = keysList(childMdts) + val afterList = keysMap.get(keys) match { + case Some(list) => add(list, childMdts) + case _ => childMdts + } + keysMap + (keys -> afterList) + } + val childrenMergeMaps = childrenKeysMapDatas.values.foldLeft(List[Map[String, _]]()) { (originList, list) => + originList match { + case Nil => list + case _ => multiply(originList, list) + } + } + mergeNodeChildrenDatasIntoMap(root, childrenMergeMaps) + } + + private def mergeNodeChildrenDatasIntoMap(node: TreeNode, mapDatas: List[Map[String, _]]): List[Map[String, _]] = { + val datas: List[(String, (String, Any))] = node.children.flatMap { child => + child.datas.map(dt => (dt._1, (child.key, dt._2))) + } + val childrenDataKeys: Set[String] = datas.map(_._1).toSet + val childrenDataLists: Map[String, List[(String, _)]] = datas.foldLeft(childrenDataKeys.map(k => (k, List[(String, _)]())).toMap) { (maps, data) => + maps.get(data._1) match { + case Some(list) => maps + (data._1 -> (list :+ data._2)) + case _ => maps + } + } + + // multiply different key datas + childrenDataLists.foldLeft(mapDatas) { (mdts, klPair) => + val (key, list) = klPair + mdts match { + case Nil => list.map(pr => Map[String, Any]((key -> pr._2))) + case _ => { + list.flatMap { kvPair => + val (path, value) = kvPair + mdts.map { mp => + mp + (key -> value) + } + } + } + } + } + + } + } + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f783cb78/griffin-streaming-model/src/test/resources/config.json ---------------------------------------------------------------------- diff --git a/griffin-streaming-model/src/test/resources/config.json b/griffin-streaming-model/src/test/resources/config.json new file mode 100644 index 0000000..808b0f7 --- /dev/null +++ b/griffin-streaming-model/src/test/resources/config.json @@ -0,0 +1,11 @@ +{ + "spark": { + "app.name": "GriffinAccuStreamingApp", + "checkpoint.dir": "hdfs:///test/kafka/cp", + "config": { + "spark.task.maxFailures": 5, + "spark.streaming.kafka.maxRatePerPartition": 1000, + "spark.streaming.concurrentJobs": 4 + } + } +} \ No newline at end of file
