fix bug of non-block persist in batch mode Author: Lionel Liu <[email protected]> Author: dodobel <[email protected]>
Closes #288 from bhlx3lyx7/spark2. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/299aa476 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/299aa476 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/299aa476 Branch: refs/heads/master Commit: 299aa476df8739e5fd0bdfd4a3482cb6e38f1a40 Parents: 1d7acd5 Author: Lionel Liu <[email protected]> Authored: Tue May 29 16:18:27 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Tue May 29 16:18:27 2018 +0800 ---------------------------------------------------------------------- measure/src/main/resources/config-batch.json | 56 ++++++++ .../src/main/resources/config-streaming.json | 75 ++++++++++ measure/src/main/resources/env-batch.json | 38 +++++ measure/src/main/resources/env-streaming.json | 62 ++++++++ .../apache/griffin/measure/Application.scala | 4 +- .../griffin/measure/context/DQContext.scala | 10 +- .../griffin/measure/context/MetricWrapper.scala | 10 +- .../measure/context/writer/HdfsPersist.scala | 10 +- .../measure/context/writer/HttpPersist.scala | 20 ++- .../measure/context/writer/LoggerPersist.scala | 2 + .../measure/context/writer/MongoPersist.scala | 14 +- .../measure/context/writer/MultiPersists.scala | 2 + .../measure/context/writer/Persist.scala | 2 + .../measure/context/writer/PersistFactory.scala | 21 +-- .../context/writer/PersistTaskRunner.scala | 101 +++++++++++++ .../context/writer/PersistThreadPool.scala | 81 ----------- .../measure/step/write/MetricFlushStep.scala | 5 +- .../measure/step/write/MetricWriteStep.scala | 1 - .../resources/_accuracy-batch-griffindsl.json | 56 ++++++++ .../resources/_accuracy-batch-sparksql.json | 63 ++++++++ .../_accuracy-streaming-griffindsl.json | 121 ++++++++++++++++ .../resources/_accuracy-streaming-sparksql.json | 142 +++++++++++++++++++ .../_completeness-batch-griffindsl.json | 36 +++++ .../_completeness-streaming-griffindsl.json | 65 +++++++++ .../_distinctness-batch-griffindsl.json | 57 ++++++++ .../_distinctness-batch-griffindsl1.json | 73 ++++++++++ .../_distinctness-batch-griffindsl2.json | 74 ++++++++++ .../_distinctness-streaming-griffindsl.json | 85 +++++++++++ .../_profiling-batch-griffindsl-hive.json | 48 +++++++ .../resources/_profiling-batch-griffindsl.json | 54 +++++++ .../resources/_profiling-batch-sparksql.json | 44 ++++++ .../_profiling-streaming-griffindsl.json | 75 ++++++++++ .../_profiling-streaming-sparksql.json | 80 +++++++++++ .../resources/_timeliness-batch-griffindsl.json | 49 +++++++ .../resources/_timeliness-batch-sparksql.json | 52 +++++++ .../_timeliness-streaming-griffindsl.json | 79 +++++++++++ .../_timeliness-streaming-sparksql.json | 82 +++++++++++ .../resources/_uniqueness-batch-griffindsl.json | 58 ++++++++ .../_uniqueness-streaming-griffindsl.json | 119 ++++++++++++++++ .../_uniqueness-streaming-sparksql.json | 130 +++++++++++++++++ .../src/test/resources/config-griffindsl.json | 56 -------- .../resources/config-streaming-accuracy.json | 121 ---------------- .../src/test/resources/config-streaming.json | 75 ---------- measure/src/test/resources/config.json | 71 ---------- measure/src/test/resources/env-batch.json | 38 +++++ .../src/test/resources/env-streaming-mongo.json | 54 +++++++ measure/src/test/resources/env.json | 39 ----- measure/src/test/resources/log4j.properties | 25 ++++ .../griffin/measure/ApplicationTest.scala | 8 +- 49 files changed, 2160 insertions(+), 483 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/resources/config-batch.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/config-batch.json b/measure/src/main/resources/config-batch.json new file mode 100644 index 0000000..10167cd --- /dev/null +++ b/measure/src/main/resources/config-batch.json @@ -0,0 +1,56 @@ +{ + "name": "accu_batch", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + }, { + "name": "target", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_target.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "accuracy", + "name": "accu", + "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code", + "details": { + "source": "source", + "target": "target", + "miss": "miss_count", + "total": "total_count", + "matched": "matched_count" + }, + "metric": { + "name": "accu" + }, + "record": { + "name": "missRecords" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/resources/config-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/config-streaming.json b/measure/src/main/resources/config-streaming.json new file mode 100644 index 0000000..243a691 --- /dev/null +++ b/measure/src/main/resources/config-streaming.json @@ -0,0 +1,75 @@ +{ + "name": "prof_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.147.177.107:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"], + "init.clear": true + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "profiling", + "name": "prof", + "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source", + "metric": { + "name": "prof" + } + }, + { + "dsl.type": "griffin-dsl", + "dq.type": "profiling", + "name": "grp", + "rule": "select name, count(*) as `cnt` from source group by name", + "metric": { + "name": "name_group", + "collect.type": "array" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/resources/env-batch.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/env-batch.json b/measure/src/main/resources/env-batch.json new file mode 100644 index 0000000..3e8aa80 --- /dev/null +++ b/measure/src/main/resources/env-batch.json @@ -0,0 +1,38 @@ +{ + "spark": { + "log.level": "WARN", + "config": { + "spark.master": "local[*]" + } + }, + + "persist": [ + { + "type": "log", + "config": { + "max.log.lines": 10 + } + }, + { + "type": "hdfs", + "config": { + "path": "hdfs://localhost/griffin/batch/persist", + "max.persist.lines": 10000, + "max.lines.per.file": 10000 + } + }, + { + "type": "http", + "config": { + "method": "post", + "api": "http://10.148.181.248:39200/griffin/accuracy", + "over.time": "1m", + "retry": 10 + } + } + ], + + "info.cache": [], + + "cleaner": {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/resources/env-streaming.json ---------------------------------------------------------------------- diff --git a/measure/src/main/resources/env-streaming.json b/measure/src/main/resources/env-streaming.json new file mode 100644 index 0000000..6871bb9 --- /dev/null +++ b/measure/src/main/resources/env-streaming.json @@ -0,0 +1,62 @@ +{ + "spark": { + "log.level": "WARN", + "checkpoint.dir": "hdfs://localhost/test/griffin/cp", + "batch.interval": "2s", + "process.interval": "10s", + "init.clear": true, + "config": { + "spark.master": "local[*]", + "spark.task.maxFailures": 5, + "spark.streaming.kafkaMaxRatePerPartition": 1000, + "spark.streaming.concurrentJobs": 4, + "spark.yarn.maxAppAttempts": 5, + "spark.yarn.am.attemptFailuresValidityInterval": "1h", + "spark.yarn.max.executor.failures": 120, + "spark.yarn.executor.failuresValidityInterval": "1h", + "spark.hadoop.fs.hdfs.impl.disable.cache": true + } + }, + + "persist": [ + { + "type": "log", + "config": { + "max.log.lines": 100 + } + }, + { + "type": "hdfs", + "config": { + "path": "hdfs://localhost/griffin/streaming/persist", + "max.persist.lines": 10000, + "max.lines.per.file": 10000 + } + }, + { + "type": "http", + "config": { + "method": "post", + "api": "http://localhost:9200/griffin/accuracy" + } + } + ], + + "info.cache": [ + { + "type": "zk", + "config": { + "hosts": "localhost:2181", + "namespace": "griffin/infocache", + "lock.path": "lock", + "mode": "persist", + "init.clear": true, + "close.clear": false + } + } + ], + + "cleaner": { + "clean.interval": "2m" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/Application.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala index 25dc34e..893ba2c 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala @@ -22,7 +22,7 @@ import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.json.ParamReaderFactory import org.apache.griffin.measure.configuration.params.{AllParam, DQParam, EnvParam, Param} import org.apache.griffin.measure.configuration.validator.ParamValidator -import org.apache.griffin.measure.context.writer.PersistThreadPool +import org.apache.griffin.measure.context.writer.PersistTaskRunner import org.apache.griffin.measure.launch.DQApp import org.apache.griffin.measure.launch.batch.BatchDQApp import org.apache.griffin.measure.launch.streaming.StreamingDQApp @@ -138,11 +138,9 @@ object Application extends Loggable { } private def startup(): Unit = { - PersistThreadPool.start } private def shutdown(): Unit = { - PersistThreadPool.shutdown } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala index e4b5046..43b61aa 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala @@ -62,12 +62,18 @@ case class DQContext(contextId: ContextId, } private val persistFactory = PersistFactory(persistParams, name) - private val defaultPersist: Persist = persistFactory.getPersists(contextId.timestamp) + private val defaultPersist: Persist = createPersist(contextId.timestamp) def getPersist(timestamp: Long): Persist = { if (timestamp == contextId.timestamp) getPersist() - else persistFactory.getPersists(timestamp) + else createPersist(timestamp) } def getPersist(): Persist = defaultPersist + private def createPersist(t: Long): Persist = { + procType match { + case BatchProcessType => persistFactory.getPersists(t, true) + case StreamingProcessType => persistFactory.getPersists(t, false) + } + } def cloneDQContext(newContextId: ContextId): DQContext = { DQContext(newContextId, name, dataSources, persistParams, procType)(sparkSession) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala index d98952a..cec737f 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/MetricWrapper.scala @@ -25,6 +25,10 @@ import scala.collection.mutable.{Map => MutableMap} */ case class MetricWrapper(name: String) extends Serializable { + val _Name = "name" + val _Timestamp = "tmst" + val _Value = "value" + val metrics: MutableMap[Long, Map[String, Any]] = MutableMap() def insertMetric(timestamp: Long, value: Map[String, Any]): Unit = { @@ -39,9 +43,9 @@ case class MetricWrapper(name: String) extends Serializable { metrics.toMap.map { pair => val (timestamp, value) = pair (timestamp, Map[String, Any]( - ("name" -> name), - ("timestamp" -> timestamp), - ("value" -> value) + (_Name -> name), + (_Timestamp -> timestamp), + (_Value -> value) )) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala index 8f424e6..b6cabb3 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HdfsPersist.scala @@ -29,6 +29,8 @@ import org.apache.spark.rdd.RDD */ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + val block: Boolean = true + val Path = "path" val MaxPersistLines = "max.persist.lines" val MaxLinesPerFile = "max.lines.per.file" @@ -43,10 +45,6 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: val LogFile = filePath("_LOG") - val _MetricName = "metricName" - val _Timestamp = "timestamp" - val _Value = "value" - var _init = true def available(): Boolean = { @@ -168,10 +166,8 @@ case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: } def persistMetrics(metrics: Map[String, Any]): Unit = { - val head = Map[String, Any]((_MetricName -> metricName), (_Timestamp -> timeStamp)) - val result = head + (_Value -> metrics) try { - val json = JsonUtil.toJson(result) + val json = JsonUtil.toJson(metrics) persistRecords2Hdfs(MetricsFile, json :: Nil) } catch { case e: Throwable => error(e.getMessage) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala index 4c12652..a072fa5 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/HttpPersist.scala @@ -18,22 +18,29 @@ under the License. */ package org.apache.griffin.measure.context.writer -import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil} +import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil} import org.apache.griffin.measure.utils.ParamUtil._ import org.apache.spark.rdd.RDD -import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} /** * persist metric and record through http request */ -case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { +case class HttpPersist(config: Map[String, Any], metricName: String, + timeStamp: Long, block: Boolean + ) extends Persist { val Api = "api" val Method = "method" + val OverTime = "over.time" + val Retry = "retry" val api = config.getString(Api, "") val method = config.getString(Method, "post") + val overTime = TimeUtil.milliseconds(config.getString(OverTime, "")).getOrElse(-1L) + val retry = config.getInt(Retry, 10) val _Value = "value" @@ -55,7 +62,8 @@ case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: import scala.concurrent.ExecutionContext.Implicits.global (timeStamp, Future(HttpUtil.httpRequest(api, method, params, header, data))) } - PersistThreadPool.addTask(func _, 10) + if (block) PersistTaskRunner.addBlockTask(func _, retry, overTime) + else PersistTaskRunner.addNonBlockTask(func _, retry) } catch { case e: Throwable => error(e.getMessage) } @@ -68,9 +76,7 @@ case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: def persistRecords(records: Iterable[String], name: String): Unit = {} def persistMetrics(metrics: Map[String, Any]): Unit = { - val head = Map[String, Any](("name" -> metricName), ("tmst" -> timeStamp)) - val result = head + (_Value -> metrics) - httpResult(result) + httpResult(metrics) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala index 3063faf..eff4c62 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/LoggerPersist.scala @@ -27,6 +27,8 @@ import org.apache.spark.rdd.RDD */ case class LoggerPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + val block: Boolean = true + val MaxLogLines = "max.log.lines" val maxLogLines = config.getInt(MaxLogLines, 100) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala index 3cfcf04..14b86e4 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MongoPersist.scala @@ -19,6 +19,7 @@ under the License. package org.apache.griffin.measure.context.writer import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.griffin.measure.utils.TimeUtil import org.apache.spark.rdd.RDD import org.mongodb.scala._ import org.mongodb.scala.model.{Filters, UpdateOptions, Updates} @@ -29,10 +30,18 @@ import scala.concurrent.Future /** * persist metric and record to mongo */ -case class MongoPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { +case class MongoPersist(config: Map[String, Any], metricName: String, + timeStamp: Long, block: Boolean + ) extends Persist { MongoConnection.init(config) + val OverTime = "over.time" + val Retry = "retry" + + val overTime = TimeUtil.milliseconds(config.getString(OverTime, "")).getOrElse(-1L) + val retry = config.getInt(Retry, 10) + val _MetricName = "metricName" val _Timestamp = "timestamp" val _Value = "value" @@ -63,7 +72,8 @@ case class MongoPersist(config: Map[String, Any], metricName: String, timeStamp: (timeStamp, MongoConnection.getDataCollection.updateOne( filter, update, UpdateOptions().upsert(true)).toFuture) } - PersistThreadPool.addTask(func _, 10) + if (block) PersistTaskRunner.addBlockTask(func _, retry, overTime) + else PersistTaskRunner.addNonBlockTask(func _, retry) } catch { case e: Throwable => error(e.getMessage) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala index ea9133a..4c7d0f6 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/MultiPersists.scala @@ -25,6 +25,8 @@ import org.apache.spark.rdd.RDD */ case class MultiPersists(persists: Iterable[Persist]) extends Persist { + val block: Boolean = false + val metricName: String = persists match { case Nil => "" case _ => persists.head.metricName http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala index 28eeb64..8adbcc3 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/Persist.scala @@ -30,6 +30,8 @@ trait Persist extends Loggable with Serializable { val config: Map[String, Any] + val block: Boolean + def available(): Boolean def start(msg: String): Unit http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala index 12b5f0b..9314876 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala @@ -30,20 +30,23 @@ case class PersistFactory(persistParams: Iterable[PersistParam], metricName: Str val LOG_REGEX = """^(?i)log$""".r val MONGO_REGEX = """^(?i)mongo$""".r - def getPersists(timeStamp: Long): MultiPersists = { - MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param))) - } - /** - * create persist by param + * create persist + * @param timeStamp the timestamp of persist + * @param block persist write metric in block or non-block way + * @return persist */ - private def getPersist(timeStamp: Long, persistParam: PersistParam): Option[Persist] = { + def getPersists(timeStamp: Long, block: Boolean): MultiPersists = { + MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param, block))) + } + + private def getPersist(timeStamp: Long, persistParam: PersistParam, block: Boolean): Option[Persist] = { val config = persistParam.config val persistTry = persistParam.persistType match { - case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp)) - case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp)) case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp)) - case MONGO_REGEX() => Try(MongoPersist(config, metricName, timeStamp)) + case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp)) + case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp, block)) + case MONGO_REGEX() => Try(MongoPersist(config, metricName, timeStamp, block)) case _ => throw new Exception("not supported persist type") } persistTry match { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistTaskRunner.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistTaskRunner.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistTaskRunner.scala new file mode 100644 index 0000000..af4f36b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistTaskRunner.scala @@ -0,0 +1,101 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.context.writer + +import java.util.Date +import java.util.concurrent.TimeUnit + +import org.apache.griffin.measure.Loggable + +import scala.concurrent.duration._ +import scala.concurrent._ +import scala.util.{Failure, Success} + +/** + * persist task runner, to persist metrics in block or non-block mode + */ +object PersistTaskRunner extends Loggable { + + import scala.concurrent.ExecutionContext.Implicits.global + + val MAX_RETRY = 100 + + def addNonBlockTask(func: () => (Long, Future[_]), retry: Int): Unit = { + val r = validRetryNum(retry) + nonBlockExecute(func, r) + } + + def addBlockTask(func: () => (Long, Future[_]), retry: Int, wait: Long): Unit = { + val r = validRetryNum(retry) + val duration = if (wait >= 0) Duration(wait, TimeUnit.MILLISECONDS) else Duration.Inf + blockExecute(func, r, duration) + } + + private def nonBlockExecute(func: () => (Long, Future[_]), retry: Int): Unit = { + val nextRetry = nextRetryCount(retry) + val st = new Date().getTime + val (t, res) = func() + res.onComplete { + case Success(value) => { + val et = new Date().getTime + info(s"task ${t} success with (${value}) [ using time ${et - st} ms ]") + } + case Failure(e) => { + val et = new Date().getTime + warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}") + if (nextRetry >= 0) { + info(s"task ${t} retry [ rest retry count: ${nextRetry} ]") + nonBlockExecute(func, nextRetry) + } else { + error(s"task fails: task ${t} retry ends but fails") + } + } + } + } + + private def blockExecute(func: () => (Long, Future[_]), retry: Int, waitDuration: Duration): Unit = { + val nextRetry = nextRetryCount(retry) + val st = new Date().getTime + val (t, res) = func() + try { + val value = Await.result(res, waitDuration) + val et = new Date().getTime + info(s"task ${t} success with (${value}) [ using time ${et - st} ms ]") + } catch { + case e: Throwable => { + val et = new Date().getTime + warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}") + if (nextRetry >= 0) { + info(s"task ${t} retry [ rest retry count: ${nextRetry} ]") + blockExecute(func, nextRetry, waitDuration) + } else { + error(s"task fails: task ${t} retry ends but fails") + } + } + } + } + + private def validRetryNum(retry: Int): Int = { + if (retry > MAX_RETRY) MAX_RETRY else retry + } + private def nextRetryCount(retry: Int): Int = { + if (retry >= 0) retry - 1 else -1 + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala deleted file mode 100644 index 221fcad..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistThreadPool.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.context.writer - -import java.util.Date -import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} - -import org.apache.griffin.measure.Loggable - -import scala.concurrent.Future -import scala.util.{Failure, Success} - -/** - * persist thread pool, to persist metrics in parallel mode - */ -object PersistThreadPool extends Loggable { - - import scala.concurrent.ExecutionContext.Implicits.global - - private val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor] - val MAX_RETRY = 100 - - def start(): Unit = { - } - - def shutdown(): Unit = { - pool.shutdown() - pool.awaitTermination(10, TimeUnit.SECONDS) - } - - def addTask(func: () => (Long, Future[_]), retry: Int): Unit = { - val r = if (retry < 0) MAX_RETRY else retry - info(s"add task, current task num: ${pool.getQueue.size}") - pool.submit(Task(func, r)) - } - - case class Task(func: () => (Long, Future[_]), retry: Int) extends Runnable with Loggable { - - override def run(): Unit = { - val st = new Date().getTime - val (t, res) = func() - res.onComplete { - case Success(value) => { - val et = new Date().getTime - info(s"task ${t} success [ using time ${et - st} ms ]") - } - case Failure(e) => { - val et = new Date().getTime - warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}") - if (retry > 0) { - info(s"task ${t} retry [ rest retry count: ${retry - 1} ]") - pool.submit(Task(func, retry - 1)) - } else { - fail(s"task ${t} retry ends but fails") - } - } - } - } - - def fail(msg: String): Unit = { - error(s"task fails: ${msg}") - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala index 40c9b05..6b7944d 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala @@ -36,7 +36,10 @@ case class MetricFlushStep() extends WriteStep { context.getPersist(t).persistMetrics(metric) true } catch { - case e: Throwable => false + case e: Throwable => { + error(s"flush metrics error: ${e.getMessage}") + false + } } ret && pr } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala index c29c072..2f34d63 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala @@ -46,7 +46,6 @@ case class MetricWriteStep(name: String, val writeMode = writeTimestampOpt.map(_ => SimpleMode).getOrElse(context.writeMode) val timestampMetricMap: Map[Long, Map[String, Any]] = writeMode match { case SimpleMode => { - println(metricMaps) val metrics: Map[String, Any] = normalizeMetric(metricMaps, name, collectType) emptyMetricMap + (timestamp -> metrics) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_accuracy-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-batch-griffindsl.json b/measure/src/test/resources/_accuracy-batch-griffindsl.json new file mode 100644 index 0000000..10167cd --- /dev/null +++ b/measure/src/test/resources/_accuracy-batch-griffindsl.json @@ -0,0 +1,56 @@ +{ + "name": "accu_batch", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + }, { + "name": "target", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_target.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "accuracy", + "name": "accu", + "rule": "source.user_id = target.user_id AND upper(source.first_name) = upper(target.first_name) AND source.last_name = target.last_name AND source.address = target.address AND source.email = target.email AND source.phone = target.phone AND source.post_code = target.post_code", + "details": { + "source": "source", + "target": "target", + "miss": "miss_count", + "total": "total_count", + "matched": "matched_count" + }, + "metric": { + "name": "accu" + }, + "record": { + "name": "missRecords" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_accuracy-batch-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-batch-sparksql.json b/measure/src/test/resources/_accuracy-batch-sparksql.json new file mode 100644 index 0000000..2eef9f1 --- /dev/null +++ b/measure/src/test/resources/_accuracy-batch-sparksql.json @@ -0,0 +1,63 @@ +{ + "name": "accu_batch", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + }, { + "name": "target", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_target.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "spark-sql", + "name": "missRecords", + "rule": "SELECT source.* FROM source LEFT JOIN target ON coalesce(source.user_id, '') = coalesce(target.user_id, '') AND coalesce(source.first_name, '') = coalesce(target.first_name, '') AND coalesce(source.post_code, '') = coalesce(target.post_code, '') WHERE (NOT (source.user_id IS NULL AND source.first_name IS NULL AND source.post_code IS NULL)) AND (target.user_id IS NULL AND target.first_name IS NULL AND target.post_code IS NULL)", + "record": { + "name": "miss" + } + }, + { + "dsl.type": "spark-sql", + "name": "miss_count", + "rule": "SELECT count(*) as miss FROM `missRecords`" + }, + { + "dsl.type": "spark-sql", + "name": "total_count", + "rule": "SELECT count(*) as total FROM source" + }, + { + "dsl.type": "spark-sql", + "name": "accu", + "rule": "SELECT `total_count`.`total` AS `total`, coalesce(`miss_count`.`miss`, 0) AS `miss`, (`total` - `miss`) AS `matched` FROM `total_count` FULL JOIN `miss_count`", + "metric": { + "name": "accu" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_accuracy-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json b/measure/src/test/resources/_accuracy-streaming-griffindsl.json new file mode 100644 index 0000000..240d768 --- /dev/null +++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json @@ -0,0 +1,121 @@ +{ + "name": "accu_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.147.177.107:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "type": "parquet", + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-2m", "0"], + "init.clear": true, + "updatable": true + } + }, { + "name": "target", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.147.177.107:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${t1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${t1}" + } + ] + } + ], + "cache": { + "type": "parquet", + "file.path": "hdfs://localhost/griffin/streaming/dump/target", + "info.path": "target", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-2m", "0"], + "init.clear": true + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "accuracy", + "name": "accu", + "rule": "source.name = target.name and source.age = target.age", + "details": { + "source": "source", + "target": "target", + "miss": "miss_count", + "total": "total_count", + "matched": "matched_count" + }, + "metric": { + "name": "accu" + }, + "record": { + "name": "missRecords" + } + } + ] + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_accuracy-streaming-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_accuracy-streaming-sparksql.json b/measure/src/test/resources/_accuracy-streaming-sparksql.json new file mode 100644 index 0000000..0824cb8 --- /dev/null +++ b/measure/src/test/resources/_accuracy-streaming-sparksql.json @@ -0,0 +1,142 @@ +{ + "name": "accu_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-2m", "0"] + } + }, { + "name": "target", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${t1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${t1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/target", + "info.path": "target", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-2m", "0"] + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "spark-sql", + "name": "missRecords", + "cache": true, + "rule": "SELECT source.* FROM source LEFT JOIN target ON coalesce(source.name, '') = coalesce(target.name, '') AND coalesce(source.age, '') = coalesce(target.age, '') WHERE (NOT (source.name IS NULL AND source.age IS NULL)) AND (target.name IS NULL AND target.age IS NULL)" + }, + { + "dsl.type": "spark-sql", + "name": "miss_count", + "rule": "SELECT `__tmst`, count(*) as miss FROM `missRecords` GROUP BY `__tmst`" + }, + { + "dsl.type": "spark-sql", + "name": "total_count", + "rule": "SELECT `__tmst`, count(*) as total FROM source GROUP BY `__tmst`" + }, + { + "dsl.type": "spark-sql", + "name": "accu", + "rule": "SELECT `total_count`.`__tmst` AS `__tmst`, `total_count`.`total` AS `total`, coalesce(`miss_count`.`miss`, 0) AS `miss` FROM `total_count` FULL JOIN `miss_count` ON `total_count`.`__tmst` = `miss_count`.`__tmst`" + }, + { + "dsl.type": "df-opr", + "name": "metric_accu", + "rule": "accuracy", + "details": { + "df.name": "accu", + "miss": "miss", + "total": "total", + "matched": "matched" + }, + "metric": { + "name": "accuracy" + } + }, + { + "dsl.type": "spark-sql", + "name": "accu_miss_records", + "rule": "SELECT `__tmst`, `__empty` FROM `metric_accu` WHERE `__record`", + "record": { + "name": "missRecords", + "data.source.cache": "source", + "origin.DF": "missRecords" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_completeness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_completeness-batch-griffindsl.json b/measure/src/test/resources/_completeness-batch-griffindsl.json new file mode 100644 index 0000000..9c00444 --- /dev/null +++ b/measure/src/test/resources/_completeness-batch-griffindsl.json @@ -0,0 +1,36 @@ +{ + "name": "comp_batch", + + "process.type": "batch", + + "timestamp": 123456, + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "completeness", + "name": "comp", + "rule": "email, post_code, first_name", + "metric": { + "name": "comp" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_completeness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json new file mode 100644 index 0000000..ba8bdce --- /dev/null +++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json @@ -0,0 +1,65 @@ +{ + "name": "comp_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.147.177.107:9092", + "group.id": "source", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "test", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"], + "init.clear": true + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "completeness", + "name": "comp", + "rule": "name, age", + "metric": { + "name": "comp" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_distinctness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl.json b/measure/src/test/resources/_distinctness-batch-griffindsl.json new file mode 100644 index 0000000..af0c91e --- /dev/null +++ b/measure/src/test/resources/_distinctness-batch-griffindsl.json @@ -0,0 +1,57 @@ +{ + "name": "dist_batch", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + }, + { + "name": "target", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "distinct", + "name": "dist", + "rule": "user_id", + "details": { + "source": "source", + "target": "target", + "total": "total", + "distinct": "distinct", + "dup": "dup", + "num": "num", + "duplication.array": "dup" + }, + "metric": { + "name": "distinct" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_distinctness-batch-griffindsl1.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl1.json b/measure/src/test/resources/_distinctness-batch-griffindsl1.json new file mode 100644 index 0000000..4d94d8e --- /dev/null +++ b/measure/src/test/resources/_distinctness-batch-griffindsl1.json @@ -0,0 +1,73 @@ +{ + "name": "dist_batch", + + "process.type": "batch", + + "timestamp": 123456, + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/dupdata.avro" + }, + "pre.proc": [ + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${this}" + } + ] + } + ] + }, + { + "name": "target", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/dupdata.avro" + }, + "pre.proc": [ + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select DISTINCT name, age from ${this}" + } + ] + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "distinct", + "name": "dist", + "rule": "name", + "details": { + "source": "source", + "target": "target", + "total": "total", + "distinct": "distinct", + "dup": "dup", + "num": "num", + "duplication.array": "dup" + }, + "metric": { + "name": "distinct" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_distinctness-batch-griffindsl2.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-batch-griffindsl2.json b/measure/src/test/resources/_distinctness-batch-griffindsl2.json new file mode 100644 index 0000000..6a12719 --- /dev/null +++ b/measure/src/test/resources/_distinctness-batch-griffindsl2.json @@ -0,0 +1,74 @@ +{ + "name": "dist_batch", + + "process.type": "batch", + + "timestamp": 123456, + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/dupdata.avro" + }, + "pre.proc": [ + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${this}" + } + ] + } + ] + }, + { + "name": "target", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/dupdata.avro" + }, + "pre.proc": [ + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select DISTINCT name, age from ${this}" + } + ] + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "distinct", + "name": "dist", + "rule": "name, [age]", + "details": { + "source": "source", + "target": "target", + "total": "total", + "distinct": "distinct", + "dup": "dup", + "num": "num", + "duplication.array": "dup", + "record.enable": true + }, + "metric": { + "name": "distinct" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_distinctness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_distinctness-streaming-griffindsl.json b/measure/src/test/resources/_distinctness-streaming-griffindsl.json new file mode 100644 index 0000000..c36e7ba --- /dev/null +++ b/measure/src/test/resources/_distinctness-streaming-griffindsl.json @@ -0,0 +1,85 @@ +{ + "name": "dist_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "new", + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/old", + "info.path": "new", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"], + "read.only": true + } + }, + { + "name": "old", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "old", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/old", + "info.path": "old", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-24h", "0"] + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "distinct", + "name": "dist", + "rule": "name, age", + "details": { + "source": "new", + "target": "old", + "total": "total", + "distinct": "distinct", + "dup": "dup", + "accu_dup": "accu_dup", + "num": "num", + "duplication.array": "dup" + }, + "metric": { + "name": "distinct" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_profiling-batch-griffindsl-hive.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-batch-griffindsl-hive.json b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json new file mode 100644 index 0000000..03b0405 --- /dev/null +++ b/measure/src/test/resources/_profiling-batch-griffindsl-hive.json @@ -0,0 +1,48 @@ +{ + "name": "prof_batch", + + "process.type": "batch", + + "timestamp": 123456, + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "hive", + "version": "1.2", + "config": { + "database": "default", + "table.name": "s1" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "profiling", + "name": "prof", + "rule": "name, count(*) as cnt from source group by name", + "metric": { + "name": "name_group", + "collect.type": "array" + } + }, + { + "dsl.type": "griffin-dsl", + "dq.type": "profiling", + "name": "grp", + "rule": "age, count(*) as cnt from source group by age order by cnt", + "metric": { + "name": "age_group", + "collect.type": "array" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_profiling-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-batch-griffindsl.json b/measure/src/test/resources/_profiling-batch-griffindsl.json new file mode 100644 index 0000000..ec082c4 --- /dev/null +++ b/measure/src/test/resources/_profiling-batch-griffindsl.json @@ -0,0 +1,54 @@ +{ + "name": "prof_batch", + + "process.type": "batch", + + "timestamp": 123456, + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + }, + "pre.proc": [ + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select reg_replace(email, '^([^@0-9]+)([0-9]+)@(dc)(?:\\\\.[^@]+)$', '$1@$3') as email, post_code from ${this}" + } + ] + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "profiling", + "name": "prof", + "rule": "email, count(*) as cnt from source group by email", + "metric": { + "name": "prof", + "collect.type": "array" + } + }, + { + "dsl.type": "griffin-dsl", + "dq.type": "profiling", + "name": "grp", + "rule": "source.post_code, count(*) as cnt from source group by source.post_code order by cnt desc", + "metric": { + "name": "post_group", + "collect.type": "array" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_profiling-batch-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-batch-sparksql.json b/measure/src/test/resources/_profiling-batch-sparksql.json new file mode 100644 index 0000000..fdfd812 --- /dev/null +++ b/measure/src/test/resources/_profiling-batch-sparksql.json @@ -0,0 +1,44 @@ +{ + "name": "prof_batch", + + "process.type": "batch", + + "timestamp": 123456, + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "spark-sql", + "name": "prof", + "rule": "select count(*) as `cnt`, count(distinct `post_code`) as `dis-cnt`, max(user_id) as `max` from source", + "metric": { + "name": "prof" + } + }, + { + "dsl.type": "spark-sql", + "name": "grp", + "rule": "select post_code as `pc`, count(*) as `cnt` from source group by post_code", + "metric": { + "name": "post_group", + "collect.type": "array" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_profiling-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json b/measure/src/test/resources/_profiling-streaming-griffindsl.json new file mode 100644 index 0000000..b6feb5a --- /dev/null +++ b/measure/src/test/resources/_profiling-streaming-griffindsl.json @@ -0,0 +1,75 @@ +{ + "name": "prof_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.147.177.107:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "test", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"], + "init.clear": true + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "profiling", + "name": "prof", + "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source", + "metric": { + "name": "prof" + } + }, + { + "dsl.type": "griffin-dsl", + "dq.type": "profiling", + "name": "grp", + "rule": "select name, count(*) as `cnt` from source group by name", + "metric": { + "name": "name_group", + "collect.type": "array" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_profiling-streaming-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_profiling-streaming-sparksql.json b/measure/src/test/resources/_profiling-streaming-sparksql.json new file mode 100644 index 0000000..4f0b0ee --- /dev/null +++ b/measure/src/test/resources/_profiling-streaming-sparksql.json @@ -0,0 +1,80 @@ +{ + "name": "prof_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"] + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "spark-sql", + "name": "prof", + "rule": "select count(name) as `cnt`, max(age) as `max`, min(age) as `min` from source", + "metric": { + "name": "prof" + } + }, + { + "dsl.type": "spark-sql", + "name": "grp", + "rule": "select name, count(*) as `cnt` from source group by name", + "metric": { + "name": "name_group", + "collect.type": "array" + } + }, + { + "dsl.type": "spark-sql", + "name": "tmst_grp", + "rule": "select `__tmst`, count(*) as `cnt` from source group by `__tmst`", + "metric": { + "name": "tmst_group" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_timeliness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-batch-griffindsl.json b/measure/src/test/resources/_timeliness-batch-griffindsl.json new file mode 100644 index 0000000..90439df --- /dev/null +++ b/measure/src/test/resources/_timeliness-batch-griffindsl.json @@ -0,0 +1,49 @@ +{ + "name": "timeliness_batch", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/timeliness_data.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "timeliness", + "name": "timeliness", + "rule": "ts, end_ts", + "details": { + "source": "source", + "latency": "latency", + "total": "total", + "avg": "avg", + "threshold": "3m", + "step": "step", + "count": "cnt", + "step.size": "2m", + "percentile": "percentile", + "percentile.values": [0.95] + }, + "metric": { + "name": "timeliness" + }, + "record": { + "name": "lateRecords" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_timeliness-batch-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-batch-sparksql.json b/measure/src/test/resources/_timeliness-batch-sparksql.json new file mode 100644 index 0000000..f9cb368 --- /dev/null +++ b/measure/src/test/resources/_timeliness-batch-sparksql.json @@ -0,0 +1,52 @@ +{ + "name": "timeliness_batch", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/timeliness_data.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "spark-sql", + "name": "in_time", + "rule": "select *, (ts) as `_in_ts`, (end_ts) as `_out_ts` from source where (ts) IS NOT NULL" + }, + { + "dsl.type": "spark-sql", + "name": "lat", + "cache": true, + "rule": "select *, (`_out_ts` - `_in_ts`) as `latency` from `in_time`" + }, + { + "dsl.type": "spark-sql", + "name": "metric", + "rule": "select cast(avg(`latency`) as bigint) as `avg`, max(`latency`) as `max`, min(`latency`) as `min` from `lat`", + "metric": { + "name": "timeliness" + } + }, + { + "dsl.type": "spark-sql", + "name": "slows", + "rule": "select * from `lat` where `latency` > 60000", + "record": { + "name": "lateRecords" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_timeliness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-streaming-griffindsl.json b/measure/src/test/resources/_timeliness-streaming-griffindsl.json new file mode 100644 index 0000000..5916e5c --- /dev/null +++ b/measure/src/test/resources/_timeliness-streaming-griffindsl.json @@ -0,0 +1,79 @@ +{ + "name": "timeliness_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "fff", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select ts, end_ts, name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"] + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "timeliness", + "name": "timeliness", + "rule": "ts, end_ts", + "details": { + "source": "source", + "latency": "latency", + "total": "total", + "avg": "avg", + "threshold": "1h", + "step": "step", + "count": "cnt", + "step.size": "5m", + "percentile": "percentile", + "percentile.values": [0.2, 0.5, 0.8] + }, + "metric": { + "name": "timeliness" + }, + "record": { + "name": "lateRecords" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_timeliness-streaming-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_timeliness-streaming-sparksql.json b/measure/src/test/resources/_timeliness-streaming-sparksql.json new file mode 100644 index 0000000..dc736ab --- /dev/null +++ b/measure/src/test/resources/_timeliness-streaming-sparksql.json @@ -0,0 +1,82 @@ +{ + "name": "timeliness_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "source", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "group1", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "fff", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select ts, name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/source", + "info.path": "source", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"] + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "spark-sql", + "name": "in_time", + "rule": "select *, (ts) as `_in_ts` from source where (ts) IS NOT NULL" + }, + { + "dsl.type": "spark-sql", + "name": "lat", + "cache": true, + "rule": "select *, (`__tmst` - `_in_ts`) as `latency` from `in_time`" + }, + { + "dsl.type": "spark-sql", + "name": "metric", + "rule": "select `__tmst`, cast(avg(`latency`) as bigint) as `avg`, max(`latency`) as `max`, min(`latency`) as `min` from `lat`", + "metric": { + "name": "timeliness" + } + }, + { + "dsl.type": "spark-sql", + "name": "slows", + "rule": "select * from `lat` where `latency` > 60000", + "record": { + "name": "lateRecords" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_uniqueness-batch-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_uniqueness-batch-griffindsl.json b/measure/src/test/resources/_uniqueness-batch-griffindsl.json new file mode 100644 index 0000000..28009e8 --- /dev/null +++ b/measure/src/test/resources/_uniqueness-batch-griffindsl.json @@ -0,0 +1,58 @@ +{ + "name": "unique_batch", + + "process.type": "batch", + + "data.sources": [ + { + "name": "source", + "baseline": true, + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + }, + { + "name": "target", + "connectors": [ + { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + } + ] + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "uniqueness", + "name": "dup", + "rule": "user_id", + "details": { + "source": "source", + "target": "target", + "total": "total", + "unique": "unique", + "dup": "dup", + "num": "num" + }, + "metric": { + "name": "unique" + }, + "record": { + "name": "dupRecords" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_uniqueness-streaming-griffindsl.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_uniqueness-streaming-griffindsl.json b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json new file mode 100644 index 0000000..bc5cbd2 --- /dev/null +++ b/measure/src/test/resources/_uniqueness-streaming-griffindsl.json @@ -0,0 +1,119 @@ +{ + "name": "unique_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "new", + "baseline": true, + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "new", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/new", + "info.path": "new", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"] + } + }, + { + "name": "old", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "old", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "ttt", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/old", + "info.path": "old", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-24h", "0"] + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "griffin-dsl", + "dq.type": "uniqueness", + "name": "dup", + "rule": "name, age", + "details": { + "source": "new", + "target": "old", + "total": "total", + "unique": "unique", + "dup": "dup", + "num": "num", + "duplication.array": "dup" + }, + "metric": { + "name": "unique" + }, + "record": { + "name": "dupRecords" + } + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/299aa476/measure/src/test/resources/_uniqueness-streaming-sparksql.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/_uniqueness-streaming-sparksql.json b/measure/src/test/resources/_uniqueness-streaming-sparksql.json new file mode 100644 index 0000000..7d13215 --- /dev/null +++ b/measure/src/test/resources/_uniqueness-streaming-sparksql.json @@ -0,0 +1,130 @@ +{ + "name": "unique_streaming", + + "process.type": "streaming", + + "data.sources": [ + { + "name": "new", + "baseline": true, + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "new", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/new", + "info.path": "new", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["0", "0"] + } + }, + { + "name": "old", + "connectors": [ + { + "type": "kafka", + "version": "0.8", + "config": { + "kafka.config": { + "bootstrap.servers": "10.149.247.156:9092", + "group.id": "old", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "topics": "sss", + "key.type": "java.lang.String", + "value.type": "java.lang.String" + }, + "pre.proc": [ + { + "dsl.type": "df-opr", + "name": "${s1}", + "rule": "from_json", + "details": { + "df.name": "${this}" + } + }, + { + "dsl.type": "spark-sql", + "name": "${this}", + "rule": "select name, age from ${s1}" + } + ] + } + ], + "cache": { + "file.path": "hdfs://localhost/griffin/streaming/dump/old", + "info.path": "old", + "ready.time.interval": "10s", + "ready.time.delay": "0", + "time.range": ["-24h", "0"] + } + } + ], + + "evaluate.rule": { + "rules": [ + { + "dsl.type": "spark-sql", + "name": "dist", + "rule": "SELECT DISTINCT * FROM new" + }, + { + "dsl.type": "spark-sql", + "name": "joined", + "rule": "SELECT dist.* FROM old RIGHT JOIN dist ON coalesce(old.name, '') = coalesce(dist.name, '') AND coalesce(old.age, '') = coalesce(dist.age, '')" + }, + { + "dsl.type": "spark-sql", + "name": "grouped", + "rule": "SELECT `__tmst`, `name`, `age`, count(*) as `dup_cnt` FROM joined GROUP BY `__tmst`, `name`, `age`" + }, + { + "dsl.type": "spark-sql", + "name": "dupRecs", + "cache": true, + "rule": "SELECT * FROM grouped WHERE `dup_cnt` > 1", + "record": { + "name": "dupRecords" + } + }, + { + "dsl.type": "spark-sql", + "name": "dupMetric", + "rule": "SELECT `__tmst`, `dup_cnt`, count(*) as `item_cnt` FROM dupRecs GROUP BY `__tmst`, `dup_cnt`", + "metric": { + "name": "dup" + } + } + ] + } +} \ No newline at end of file
