http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala deleted file mode 100644 index 9e020c2..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala +++ /dev/null @@ -1,214 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.context.streaming.offset - -import org.apache.curator.framework.imps.CuratorFrameworkState -import org.apache.curator.framework.recipes.locks.InterProcessMutex -import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} -import org.apache.curator.retry.ExponentialBackoffRetry -import org.apache.curator.utils.ZKPaths -import org.apache.griffin.measure.context.streaming.lock.CacheLockInZK -import org.apache.zookeeper.CreateMode - -import scala.collection.JavaConverters._ - -/** - * leverage zookeeper for info cache - * @param config - * @param metricName - */ -case class OffsetCacheInZK(config: Map[String, Any], metricName: String) extends OffsetCache with OffsetOps { - - val Hosts = "hosts" - val Namespace = "namespace" - val Mode = "mode" - val InitClear = "init.clear" - val CloseClear = "close.clear" - val LockPath = "lock.path" - - val PersistRegex = """^(?i)persist$""".r - val EphemeralRegex = """^(?i)ephemeral$""".r - - final val separator = ZKPaths.PATH_SEPARATOR - - val hosts = config.getOrElse(Hosts, "").toString - val namespace = config.getOrElse(Namespace, "").toString - val mode: CreateMode = config.get(Mode) match { - case Some(s: String) => s match { - case PersistRegex() => CreateMode.PERSISTENT - case EphemeralRegex() => CreateMode.EPHEMERAL - case _ => CreateMode.PERSISTENT - } - case _ => CreateMode.PERSISTENT - } - val initClear = config.get(InitClear) match { - case Some(b: Boolean) => b - case _ => true - } - val closeClear = config.get(CloseClear) match { - case Some(b: Boolean) => b - case _ => false - } - val lockPath = config.getOrElse(LockPath, "lock").toString - - private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName - private val builder = CuratorFrameworkFactory.builder() - .connectString(hosts) - .retryPolicy(new ExponentialBackoffRetry(1000, 3)) - .namespace(cacheNamespace) - private val client: CuratorFramework = builder.build - - def init(): Unit = { - client.start() - info("start zk info cache") - client.usingNamespace(cacheNamespace) - info(s"init with namespace: ${cacheNamespace}") - delete(lockPath :: Nil) - if (initClear) { - clear - } - } - - def available(): Boolean = { - client.getState match { - case CuratorFrameworkState.STARTED => true - case _ => false - } - } - - def close(): Unit = { - if (closeClear) { - clear - } - info("close zk info cache") - client.close() - } - - def cache(kvs: Map[String, String]): Unit = { - kvs.foreach(kv => createOrUpdate(path(kv._1), kv._2)) - } - - def read(keys: Iterable[String]): Map[String, String] = { - keys.flatMap { key => - read(path(key)) match { - case Some(v) => Some((key, v)) - case _ => None - } - }.toMap - } - - def delete(keys: Iterable[String]): Unit = { - keys.foreach { key => delete(path(key)) } - } - - def clear(): Unit = { -// delete("/") - delete(finalCacheInfoPath :: Nil) - delete(infoPath :: Nil) - info("clear info") - } - - def listKeys(p: String): List[String] = { - children(path(p)) - } - - def genLock(s: String): CacheLockInZK = { - val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s - CacheLockInZK(new InterProcessMutex(client, lpt)) - } - - private def path(k: String): String = { - if (k.startsWith(separator)) k else separator + k - } - - private def children(path: String): List[String] = { - try { - client.getChildren().forPath(path).asScala.toList - } catch { - case e: Throwable => { - warn(s"list ${path} warn: ${e.getMessage}") - Nil - } - } - } - - private def createOrUpdate(path: String, content: String): Boolean = { - if (checkExists(path)) { - update(path, content) - } else { - create(path, content) - } - } - - private def create(path: String, content: String): Boolean = { - try { - client.create().creatingParentsIfNeeded().withMode(mode) - .forPath(path, content.getBytes("utf-8")) - true - } catch { - case e: Throwable => { - error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}") - false - } - } - } - - private def update(path: String, content: String): Boolean = { - try { - client.setData().forPath(path, content.getBytes("utf-8")) - true - } catch { - case e: Throwable => { - error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}") - false - } - } - } - - private def read(path: String): Option[String] = { - try { - Some(new String(client.getData().forPath(path), "utf-8")) - } catch { - case e: Throwable => { - warn(s"read ${path} warn: ${e.getMessage}") - None - } - } - } - - private def delete(path: String): Unit = { - try { - client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path) - } catch { - case e: Throwable => error(s"delete ${path} error: ${e.getMessage}") - } - } - - private def checkExists(path: String): Boolean = { - try { - client.checkExists().forPath(path) != null - } catch { - case e: Throwable => { - warn(s"check exists ${path} warn: ${e.getMessage}") - false - } - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala deleted file mode 100644 index c8763ec..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.context.streaming.offset - -trait OffsetOps extends Serializable { this: OffsetCache => - - val CacheTime = "cache.time" - val LastProcTime = "last.proc.time" - val ReadyTime = "ready.time" - val CleanTime = "clean.time" - val OldCacheIndex = "old.cache.index" - - def cacheTime(path: String): String = s"${path}/${CacheTime}" - def lastProcTime(path: String): String = s"${path}/${LastProcTime}" - def readyTime(path: String): String = s"${path}/${ReadyTime}" - def cleanTime(path: String): String = s"${path}/${CleanTime}" - def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}" - - val infoPath = "info" - - val finalCacheInfoPath = "info.final" - val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}" - val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}" - val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}" - - def startOffsetCache(): Unit = { - genFinalReadyTime - } - - def getTimeRange(): (Long, Long) = { - readTimeRange - } - - def getCleanTime(): Long = { - readCleanTime - } - - def endOffsetCache: Unit = { - genFinalLastProcTime - genFinalCleanTime - } - - private def genFinalReadyTime(): Unit = { - val subPath = listKeys(infoPath) - val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" } - val result = read(keys) - val times = keys.flatMap { k => - getLongOpt(result, k) - } - if (times.nonEmpty) { - val time = times.min - val map = Map[String, String]((finalReadyTime -> time.toString)) - cache(map) - } - } - - private def genFinalLastProcTime(): Unit = { - val subPath = listKeys(infoPath) - val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" } - val result = read(keys) - val times = keys.flatMap { k => - getLongOpt(result, k) - } - if (times.nonEmpty) { - val time = times.min - val map = Map[String, String]((finalLastProcTime -> time.toString)) - cache(map) - } - } - - private def genFinalCleanTime(): Unit = { - val subPath = listKeys(infoPath) - val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" } - val result = read(keys) - val times = keys.flatMap { k => - getLongOpt(result, k) - } - if (times.nonEmpty) { - val time = times.min - val map = Map[String, String]((finalCleanTime -> time.toString)) - cache(map) - } - } - - private def readTimeRange(): (Long, Long) = { - val map = read(List(finalLastProcTime, finalReadyTime)) - val lastProcTime = getLong(map, finalLastProcTime) - val curReadyTime = getLong(map, finalReadyTime) - (lastProcTime, curReadyTime) - } - - private def readCleanTime(): Long = { - val map = read(List(finalCleanTime)) - val cleanTime = getLong(map, finalCleanTime) - cleanTime - } - - private def getLongOpt(map: Map[String, String], key: String): Option[Long] = { - try { - map.get(key).map(_.toLong) - } catch { - case e: Throwable => None - } - } - private def getLong(map: Map[String, String], key: String) = { - getLongOpt(map, key).getOrElse(-1L) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala index 8bb0735..6bf6373 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala @@ -39,6 +39,8 @@ case class DataSource(name: String, streamingCacheClientOpt: Option[StreamingCacheClient] ) extends Loggable with Serializable { + val isBaseline: Boolean = dsParam.isBaseline + def init(): Unit = { dataConnectors.foreach(_.init) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala index f48185e..7807dfd 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala @@ -50,7 +50,7 @@ object DataSourceFactory extends Loggable { // for streaming data cache val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt( - sparkSession.sqlContext, dataSourceParam.getCacheOpt, name, index, timestampStorage) + sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage) val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam => DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam, http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala index cef5c53..0ebe6ba 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.context.TimeRange -import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient +import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.step.builder.ConstantColumns import org.apache.griffin.measure.utils.DataFrameUtil._ @@ -89,8 +89,8 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] val _Updatable = "updatable" val updatable = param.getBoolean(_Updatable, false) - val newCacheLock = OffsetCacheClient.genLock(s"${cacheInfoPath}.new") - val oldCacheLock = OffsetCacheClient.genLock(s"${cacheInfoPath}.old") + val newCacheLock = OffsetCheckpointClient.genLock(s"${cacheInfoPath}.new") + val oldCacheLock = OffsetCheckpointClient.genLock(s"${cacheInfoPath}.old") val newFilePath = s"${filePath}/new" val oldFilePath = s"${filePath}/old" @@ -155,7 +155,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] */ def readData(): (Option[DataFrame], TimeRange) = { // time range: (a, b] - val timeRange = OffsetCacheClient.getTimeRange + val timeRange = OffsetCheckpointClient.getTimeRange val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2) // read partition info @@ -349,7 +349,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] */ def processFinish(): Unit = { // next last proc time - val timeRange = OffsetCacheClient.getTimeRange + val timeRange = OffsetCheckpointClient.getTimeRange submitLastProcTime(timeRange._2) // next clean time @@ -359,7 +359,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] // read next clean time private def getNextCleanTime(): Long = { - val timeRange = OffsetCacheClient.getTimeRange + val timeRange = OffsetCheckpointClient.getTimeRange val nextCleanTime = timeRange._2 + deltaTimeRange._1 nextCleanTime } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala index 17b15e9..f991e2d 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala @@ -36,17 +36,17 @@ object StreamingCacheClientFactory extends Loggable { /** * create streaming cache client - * @param sqlContext sqlContext in spark environment - * @param cacheOpt data source cache config option - * @param name data source name - * @param index data source index - * @param tmstCache the same tmstCache instance inside a data source - * @return streaming cache client option + * @param sqlContext sqlContext in spark environment + * @param checkpointOpt data source checkpoint/cache config option + * @param name data source name + * @param index data source index + * @param tmstCache the same tmstCache instance inside a data source + * @return streaming cache client option */ - def getClientOpt(sqlContext: SQLContext, cacheOpt: Option[Map[String, Any]], + def getClientOpt(sqlContext: SQLContext, checkpointOpt: Option[Map[String, Any]], name: String, index: Int, tmstCache: TimestampStorage ): Option[StreamingCacheClient] = { - cacheOpt.flatMap { param => + checkpointOpt.flatMap { param => try { val tp = param.getString(_type, "") val dsCache = tp match { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala index 52c1650..7b7f506 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala @@ -19,7 +19,7 @@ under the License. package org.apache.griffin.measure.datasource.cache import org.apache.griffin.measure.Loggable -import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient +import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient /** * timestamp offset of streaming data source cache @@ -30,30 +30,30 @@ trait StreamingOffsetCacheable extends Loggable with Serializable { val readyTimeInterval: Long val readyTimeDelay: Long - def selfCacheInfoPath = s"${OffsetCacheClient.infoPath}/${cacheInfoPath}" + def selfCacheInfoPath = s"${OffsetCheckpointClient.infoPath}/${cacheInfoPath}" - def selfCacheTime = OffsetCacheClient.cacheTime(selfCacheInfoPath) - def selfLastProcTime = OffsetCacheClient.lastProcTime(selfCacheInfoPath) - def selfReadyTime = OffsetCacheClient.readyTime(selfCacheInfoPath) - def selfCleanTime = OffsetCacheClient.cleanTime(selfCacheInfoPath) - def selfOldCacheIndex = OffsetCacheClient.oldCacheIndex(selfCacheInfoPath) + def selfCacheTime = OffsetCheckpointClient.cacheTime(selfCacheInfoPath) + def selfLastProcTime = OffsetCheckpointClient.lastProcTime(selfCacheInfoPath) + def selfReadyTime = OffsetCheckpointClient.readyTime(selfCacheInfoPath) + def selfCleanTime = OffsetCheckpointClient.cleanTime(selfCacheInfoPath) + def selfOldCacheIndex = OffsetCheckpointClient.oldCacheIndex(selfCacheInfoPath) protected def submitCacheTime(ms: Long): Unit = { val map = Map[String, String]((selfCacheTime -> ms.toString)) - OffsetCacheClient.cache(map) + OffsetCheckpointClient.cache(map) } protected def submitReadyTime(ms: Long): Unit = { val curReadyTime = ms - readyTimeDelay if (curReadyTime % readyTimeInterval == 0) { val map = Map[String, String]((selfReadyTime -> curReadyTime.toString)) - OffsetCacheClient.cache(map) + OffsetCheckpointClient.cache(map) } } protected def submitLastProcTime(ms: Long): Unit = { val map = Map[String, String]((selfLastProcTime -> ms.toString)) - OffsetCacheClient.cache(map) + OffsetCheckpointClient.cache(map) } protected def readLastProcTime(): Option[Long] = readSelfInfo(selfLastProcTime) @@ -61,7 +61,7 @@ trait StreamingOffsetCacheable extends Loggable with Serializable { protected def submitCleanTime(ms: Long): Unit = { val cleanTime = genCleanTime(ms) val map = Map[String, String]((selfCleanTime -> cleanTime.toString)) - OffsetCacheClient.cache(map) + OffsetCheckpointClient.cache(map) } protected def genCleanTime(ms: Long): Long = ms @@ -70,13 +70,13 @@ trait StreamingOffsetCacheable extends Loggable with Serializable { protected def submitOldCacheIndex(index: Long): Unit = { val map = Map[String, String]((selfOldCacheIndex -> index.toString)) - OffsetCacheClient.cache(map) + OffsetCheckpointClient.cache(map) } def readOldCacheIndex(): Option[Long] = readSelfInfo(selfOldCacheIndex) private def readSelfInfo(key: String): Option[Long] = { - OffsetCacheClient.read(key :: Nil).get(key).flatMap { v => + OffsetCheckpointClient.read(key :: Nil).get(key).flatMap { v => try { Some(v.toLong) } catch { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala index fc09661..caea078 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala @@ -21,13 +21,13 @@ package org.apache.griffin.measure.datasource.connector import java.util.concurrent.atomic.AtomicLong import org.apache.griffin.measure.Loggable -import org.apache.griffin.measure.configuration.enums.{BatchProcessType, DslType, SparkSqlType} +import org.apache.griffin.measure.configuration.enums.BatchProcessType import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange} import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.job.builder.DQJobBuilder import org.apache.griffin.measure.step.builder.ConstantColumns -import org.apache.griffin.measure.step.builder.preproc.PreProcRuleParamGenerator +import org.apache.griffin.measure.step.builder.preproc.PreProcParamMaker import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ @@ -38,7 +38,6 @@ trait DataConnector extends Loggable with Serializable { val dcParam: DataConnectorParam val id: String = DataConnectorIdGenerator.genId - protected def thisName(suffix: String): String = s"this_${suffix}" val timestampStorage: TimestampStorage protected def saveTmst(t: Long) = timestampStorage.insert(t) @@ -59,13 +58,13 @@ trait DataConnector extends Loggable with Serializable { val timestamp = context.contextId.timestamp val suffix = context.contextId.id - val thisTable = thisName(suffix) + val dcDfName = dcParam.getDataFrameName("this") try { saveTmst(timestamp) // save timestamp dfOpt.flatMap { df => - val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.getPreProcRules, suffix) + val (preProcRules, thisTable) = PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName) // init data context.compileTableRegister.registerTable(thisTable) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala index 8860408..3b19892 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala @@ -19,7 +19,7 @@ under the License. package org.apache.griffin.measure.launch import org.apache.griffin.measure.Loggable -import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig} +import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, SinkParam} import scala.util.Try @@ -54,4 +54,11 @@ trait DQApp extends Loggable with Serializable { } } + protected def getSinkParams: Seq[SinkParam] = { + val validSinkTypes = dqParam.getValidSinkTypes + envParam.getSinkParams.flatMap { sinkParam => + if (validSinkTypes.contains(sinkParam.getType)) Some(sinkParam) else None + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala index a651f2e..8733789 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala @@ -41,7 +41,7 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp { val metricName = dqParam.name // val dataSourceParams = dqParam.dataSources // val dataSourceNames = dataSourceParams.map(_.name) - val persistParams = envParam.persistParams + val sinkParams = getSinkParams var sqlContext: SQLContext = _ @@ -75,12 +75,12 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp { // create dq context val dqContext: DQContext = DQContext( - contextId, metricName, dataSources, persistParams, BatchProcessType + contextId, metricName, dataSources, sinkParams, BatchProcessType )(sparkSession) // start id val applicationId = sparkSession.sparkContext.applicationId - dqContext.getPersist().start(applicationId) + dqContext.getSink().start(applicationId) // build job val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.evaluateRule) @@ -90,13 +90,13 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp { // end time val endTime = new Date().getTime - dqContext.getPersist().log(endTime, s"process using time: ${endTime - startTime} ms") + dqContext.getSink().log(endTime, s"process using time: ${endTime - startTime} ms") // clean context dqContext.clean() // finish - dqContext.getPersist().finish() + dqContext.getSink().finish() } def close: Try[_] = Try { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala index 126bedd..1768ae2 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala @@ -25,8 +25,8 @@ import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.dqdefinition._ import org.apache.griffin.measure.context._ +import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient import org.apache.griffin.measure.datasource.DataSourceFactory -import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient import org.apache.griffin.measure.context.streaming.metric.CacheResults import org.apache.griffin.measure.job.builder.DQJobBuilder import org.apache.griffin.measure.launch.DQApp @@ -47,7 +47,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp { val metricName = dqParam.name // val dataSourceParams = dqParam.dataSources // val dataSourceNames = dataSourceParams.map(_.name) - val persistParams = envParam.persistParams + val sinkParams = getSinkParams var sqlContext: SQLContext = _ @@ -68,8 +68,8 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp { clearCpDir // init info cache instance - OffsetCacheClient.initClient(envParam.offsetCacheParams, metricName) - OffsetCacheClient.init + OffsetCheckpointClient.initClient(envParam.checkpointParams, metricName) + OffsetCheckpointClient.init // register udf GriffinUDFAgent.register(sqlContext) @@ -99,12 +99,12 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp { // create dq context val globalContext: DQContext = DQContext( - contextId, metricName, dataSources, persistParams, StreamingProcessType + contextId, metricName, dataSources, sinkParams, StreamingProcessType )(sparkSession) // start id val applicationId = sparkSession.sparkContext.applicationId - globalContext.getPersist().start(applicationId) + globalContext.getSink().start(applicationId) // process thread val dqCalculator = StreamingDQCalculator(globalContext, dqParam.evaluateRule) @@ -124,7 +124,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp { globalContext.clean() // finish - globalContext.getPersist().finish() + globalContext.getSink().finish() } @@ -163,8 +163,8 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp { evaluateRuleParam: EvaluateRuleParam ) extends Runnable with Loggable { - val lock = OffsetCacheClient.genLock("process") - val appPersist = globalContext.getPersist() + val lock = OffsetCheckpointClient.genLock("process") + val appSink = globalContext.getSink() def run(): Unit = { val updateTimeDate = new Date() @@ -174,10 +174,10 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp { if (locked) { try { - OffsetCacheClient.startOffsetCache + OffsetCheckpointClient.startOffsetCheckpoint val startTime = new Date().getTime - appPersist.log(startTime, "starting process ...") + appSink.log(startTime, "starting process ...") val contextId = ContextId(startTime) // create dq context @@ -194,9 +194,9 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp { // end time val endTime = new Date().getTime - appPersist.log(endTime, s"process using time: ${endTime - startTime} ms") + appSink.log(endTime, s"process using time: ${endTime - startTime} ms") - OffsetCacheClient.endOffsetCache + OffsetCheckpointClient.endOffsetCheckpoint // clean old data cleanData(dqContext) @@ -225,7 +225,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp { context.clean() - val cleanTime = OffsetCacheClient.getCleanTime + val cleanTime = OffsetCheckpointClient.getCleanTime CacheResults.refresh(cleanTime) } catch { case e: Throwable => error(s"clean data error: ${e.getMessage}") http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala index 9e0acd8..306befe 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala @@ -23,7 +23,7 @@ import org.apache.griffin.measure.utils.ParamUtil._ import org.apache.spark.rdd.RDD /** - * persist metric and record to console, for debug + * sink metric and record to console, for debug */ case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp: Long) extends Sink { @@ -46,7 +46,7 @@ case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp: println(s"[${timeStamp}] ${rt}: ${msg}") } - def persistRecords(records: RDD[String], name: String): Unit = { + def sinkRecords(records: RDD[String], name: String): Unit = { // println(s"${metricName} [${timeStamp}] records: ") // try { // val recordCount = records.count @@ -61,7 +61,7 @@ case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp: // } } - def persistRecords(records: Iterable[String], name: String): Unit = { + def sinkRecords(records: Iterable[String], name: String): Unit = { // println(s"${metricName} [${timeStamp}] records: ") // try { // val recordCount = records.size @@ -74,7 +74,7 @@ case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp: // } } - def persistMetrics(metrics: Map[String, Any]): Unit = { + def sinkMetrics(metrics: Map[String, Any]): Unit = { println(s"${metricName} [${timeStamp}] metrics: ") val json = JsonUtil.toJson(metrics) println(json) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala new file mode 100644 index 0000000..e5f72d1 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala @@ -0,0 +1,81 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.sink + +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil} +import org.apache.spark.rdd.RDD + +import scala.concurrent.Future + +/** + * sink metric and record through http request + */ +case class ElasticSearchSink(config: Map[String, Any], metricName: String, + timeStamp: Long, block: Boolean + ) extends Sink { + + val Api = "api" + val Method = "method" + val ConnectionTimeout = "connection.timeout" + val Retry = "retry" + + val api = config.getString(Api, "") + val method = config.getString(Method, "post") + val connectionTimeout = TimeUtil.milliseconds(config.getString(ConnectionTimeout, "")).getOrElse(-1L) + val retry = config.getInt(Retry, 10) + + val _Value = "value" + + def available(): Boolean = { + api.nonEmpty + } + + def start(msg: String): Unit = {} + def finish(): Unit = {} + + private def httpResult(dataMap: Map[String, Any]) = { + try { + val data = JsonUtil.toJson(dataMap) + // http request + val params = Map[String, Object]() + val header = Map[String, Object](("Content-Type","application/json")) + + def func(): (Long, Future[Boolean]) = { + import scala.concurrent.ExecutionContext.Implicits.global + (timeStamp, Future(HttpUtil.httpRequest(api, method, params, header, data))) + } + if (block) SinkTaskRunner.addBlockTask(func _, retry, connectionTimeout) + else SinkTaskRunner.addNonBlockTask(func _, retry) + } catch { + case e: Throwable => error(e.getMessage) + } + + } + + def log(rt: Long, msg: String): Unit = {} + + def sinkRecords(records: RDD[String], name: String): Unit = {} + def sinkRecords(records: Iterable[String], name: String): Unit = {} + + def sinkMetrics(metrics: Map[String, Any]): Unit = { + httpResult(metrics) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala index 5608e7d..718f1c1 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala @@ -25,7 +25,7 @@ import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil} import org.apache.spark.rdd.RDD /** - * persist metric and record to hdfs + * sink metric and record to hdfs */ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Long) extends Sink { @@ -111,7 +111,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon HdfsUtil.deleteHdfsPath(path) } - def persistRecords(records: RDD[String], name: String): Unit = { + def sinkRecords(records: RDD[String], name: String): Unit = { val path = filePath(name) clearOldRecords(path) try { @@ -121,7 +121,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt if (groupCount <= 1) { val recs = records.take(count.toInt) - persistRecords2Hdfs(path, recs) + sinkRecords2Hdfs(path, recs) } else { val groupedRecords: RDD[(Long, Iterable[String])] = records.zipWithIndex.flatMap { r => @@ -131,7 +131,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon groupedRecords.foreach { group => val (gid, recs) = group val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString) - persistRecords2Hdfs(hdfsPath, recs) + sinkRecords2Hdfs(hdfsPath, recs) } } } @@ -140,7 +140,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon } } - def persistRecords(records: Iterable[String], name: String): Unit = { + def sinkRecords(records: Iterable[String], name: String): Unit = { val path = filePath(name) clearOldRecords(path) try { @@ -150,13 +150,13 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon val groupCount = (count - 1) / maxLinesPerFile + 1 if (groupCount <= 1) { val recs = records.take(count.toInt) - persistRecords2Hdfs(path, recs) + sinkRecords2Hdfs(path, recs) } else { val groupedRecords = records.grouped(maxLinesPerFile).zipWithIndex groupedRecords.take(groupCount).foreach { group => val (recs, gid) = group val hdfsPath = getHdfsPath(path, gid) - persistRecords2Hdfs(hdfsPath, recs) + sinkRecords2Hdfs(hdfsPath, recs) } } } @@ -165,16 +165,16 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon } } - def persistMetrics(metrics: Map[String, Any]): Unit = { + def sinkMetrics(metrics: Map[String, Any]): Unit = { try { val json = JsonUtil.toJson(metrics) - persistRecords2Hdfs(MetricsFile, json :: Nil) + sinkRecords2Hdfs(MetricsFile, json :: Nil) } catch { case e: Throwable => error(e.getMessage) } } - private def persistRecords2Hdfs(hdfsPath: String, records: Iterable[String]): Unit = { + private def sinkRecords2Hdfs(hdfsPath: String, records: Iterable[String]): Unit = { try { val recStr = records.mkString("\n") HdfsUtil.writeContent(hdfsPath, recStr) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/HttpSink.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/HttpSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/HttpSink.scala deleted file mode 100644 index 63677c5..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/HttpSink.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.sink - -import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil} -import org.apache.spark.rdd.RDD - -import scala.concurrent.Future - -/** - * persist metric and record through http request - */ -case class HttpSink(config: Map[String, Any], metricName: String, - timeStamp: Long, block: Boolean - ) extends Sink { - - val Api = "api" - val Method = "method" - val OverTime = "over.time" - val Retry = "retry" - - val api = config.getString(Api, "") - val method = config.getString(Method, "post") - val overTime = TimeUtil.milliseconds(config.getString(OverTime, "")).getOrElse(-1L) - val retry = config.getInt(Retry, 10) - - val _Value = "value" - - def available(): Boolean = { - api.nonEmpty - } - - def start(msg: String): Unit = {} - def finish(): Unit = {} - - private def httpResult(dataMap: Map[String, Any]) = { - try { - val data = JsonUtil.toJson(dataMap) - // http request - val params = Map[String, Object]() - val header = Map[String, Object](("Content-Type","application/json")) - - def func(): (Long, Future[Boolean]) = { - import scala.concurrent.ExecutionContext.Implicits.global - (timeStamp, Future(HttpUtil.httpRequest(api, method, params, header, data))) - } - if (block) SinkTaskRunner.addBlockTask(func _, retry, overTime) - else SinkTaskRunner.addNonBlockTask(func _, retry) - } catch { - case e: Throwable => error(e.getMessage) - } - - } - - def log(rt: Long, msg: String): Unit = {} - - def persistRecords(records: RDD[String], name: String): Unit = {} - def persistRecords(records: Iterable[String], name: String): Unit = {} - - def persistMetrics(metrics: Map[String, Any]): Unit = { - httpResult(metrics) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala index 6eb55e5..206f187 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala @@ -28,7 +28,7 @@ import org.mongodb.scala.result.UpdateResult import scala.concurrent.Future /** - * persist metric and record to mongo + * sink metric and record to mongo */ case class MongoSink(config: Map[String, Any], metricName: String, timeStamp: Long, block: Boolean @@ -53,10 +53,10 @@ case class MongoSink(config: Map[String, Any], metricName: String, def log(rt: Long, msg: String): Unit = {} - def persistRecords(records: RDD[String], name: String): Unit = {} - def persistRecords(records: Iterable[String], name: String): Unit = {} + def sinkRecords(records: RDD[String], name: String): Unit = {} + def sinkRecords(records: Iterable[String], name: String): Unit = {} - def persistMetrics(metrics: Map[String, Any]): Unit = { + def sinkMetrics(metrics: Map[String, Any]): Unit = { mongoInsert(metrics) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala index ede52cc..cca1ff9 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala @@ -21,13 +21,13 @@ package org.apache.griffin.measure.sink import org.apache.spark.rdd.RDD /** - * persist metric and record in multiple ways + * sink metric and record in multiple ways */ -case class MultiSinks(persists: Iterable[Sink]) extends Sink { +case class MultiSinks(sinks: Iterable[Sink]) extends Sink { val block: Boolean = false - val headSinkOpt: Option[Sink] = persists.headOption + val headSinkOpt: Option[Sink] = sinks.headOption val metricName: String = headSinkOpt.map(_.metricName).getOrElse("") @@ -35,45 +35,45 @@ case class MultiSinks(persists: Iterable[Sink]) extends Sink { val config: Map[String, Any] = Map[String, Any]() - def available(): Boolean = { persists.exists(_.available()) } + def available(): Boolean = { sinks.exists(_.available()) } - def start(msg: String): Unit = { persists.foreach(_.start(msg)) } - def finish(): Unit = { persists.foreach(_.finish()) } + def start(msg: String): Unit = { sinks.foreach(_.start(msg)) } + def finish(): Unit = { sinks.foreach(_.finish()) } def log(rt: Long, msg: String): Unit = { - persists.foreach { persist => + sinks.foreach { sink => try { - persist.log(rt, msg) + sink.log(rt, msg) } catch { case e: Throwable => error(s"log error: ${e.getMessage}") } } } - def persistRecords(records: RDD[String], name: String): Unit = { - persists.foreach { persist => + def sinkRecords(records: RDD[String], name: String): Unit = { + sinks.foreach { sink => try { - persist.persistRecords(records, name) + sink.sinkRecords(records, name) } catch { - case e: Throwable => error(s"persist records error: ${e.getMessage}") + case e: Throwable => error(s"sink records error: ${e.getMessage}") } } } - def persistRecords(records: Iterable[String], name: String): Unit = { - persists.foreach { persist => + def sinkRecords(records: Iterable[String], name: String): Unit = { + sinks.foreach { sink => try { - persist.persistRecords(records, name) + sink.sinkRecords(records, name) } catch { - case e: Throwable => error(s"persist records error: ${e.getMessage}") + case e: Throwable => error(s"sink records error: ${e.getMessage}") } } } - def persistMetrics(metrics: Map[String, Any]): Unit = { - persists.foreach { persist => + def sinkMetrics(metrics: Map[String, Any]): Unit = { + sinks.foreach { sink => try { - persist.persistMetrics(metrics) + sink.sinkMetrics(metrics) } catch { - case e: Throwable => error(s"persist metrics error: ${e.getMessage}") + case e: Throwable => error(s"sink metrics error: ${e.getMessage}") } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala index f052ae1..0c03bd8 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala @@ -22,7 +22,7 @@ import org.apache.griffin.measure.Loggable import org.apache.spark.rdd.RDD /** - * persist metric and record + * sink metric and record */ trait Sink extends Loggable with Serializable { val metricName: String @@ -39,9 +39,9 @@ trait Sink extends Loggable with Serializable { def log(rt: Long, msg: String): Unit - def persistRecords(records: RDD[String], name: String): Unit - def persistRecords(records: Iterable[String], name: String): Unit + def sinkRecords(records: RDD[String], name: String): Unit + def sinkRecords(records: Iterable[String], name: String): Unit - def persistMetrics(metrics: Map[String, Any]): Unit + def sinkMetrics(metrics: Map[String, Any]): Unit } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala index 85bf1b1..26b0178 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala @@ -18,39 +18,36 @@ under the License. */ package org.apache.griffin.measure.sink -import org.apache.griffin.measure.configuration.dqdefinition.PersistParam +import org.apache.griffin.measure.configuration.dqdefinition.SinkParam +import org.apache.griffin.measure.configuration.enums._ import scala.util.{Success, Try} -case class SinkFactory(persistParams: Iterable[PersistParam], metricName: String) extends Serializable { - - val HDFS_REGEX = """^(?i)hdfs$""".r - val HTTP_REGEX = """^(?i)http$""".r - val LOG_REGEX = """^(?i)log$""".r - val MONGO_REGEX = """^(?i)mongo$""".r +case class SinkFactory(sinkParams: Iterable[SinkParam], metricName: String) extends Serializable { /** - * create persist - * @param timeStamp the timestamp of persist - * @param block persist write metric in block or non-block way - * @return persist + * create sink + * @param timeStamp the timestamp of sink + * @param block sink write metric in block or non-block way + * @return sink */ - def getPersists(timeStamp: Long, block: Boolean): MultiSinks = { - MultiSinks(persistParams.flatMap(param => getPersist(timeStamp, param, block))) + def getSinks(timeStamp: Long, block: Boolean): MultiSinks = { + MultiSinks(sinkParams.flatMap(param => getSink(timeStamp, param, block))) } - private def getPersist(timeStamp: Long, persistParam: PersistParam, block: Boolean): Option[Sink] = { - val config = persistParam.getConfig - val persistTry = persistParam.getType match { - case LOG_REGEX() => Try(ConsoleSink(config, metricName, timeStamp)) - case HDFS_REGEX() => Try(HdfsSink(config, metricName, timeStamp)) - case HTTP_REGEX() => Try(HttpSink(config, metricName, timeStamp, block)) - case MONGO_REGEX() => Try(MongoSink(config, metricName, timeStamp, block)) - case _ => throw new Exception("not supported persist type") + private def getSink(timeStamp: Long, sinkParam: SinkParam, block: Boolean): Option[Sink] = { + val config = sinkParam.getConfig + val sinkType = sinkParam.getType + val sinkTry = sinkType match { + case ConsoleSinkType => Try(ConsoleSink(config, metricName, timeStamp)) + case HdfsSinkType => Try(HdfsSink(config, metricName, timeStamp)) + case ElasticsearchSinkType => Try(ElasticSearchSink(config, metricName, timeStamp, block)) + case MongoSinkType => Try(MongoSink(config, metricName, timeStamp, block)) + case _ => throw new Exception(s"sink type ${sinkType} is not supported!") } - persistTry match { - case Success(persist) if (persist.available) => Some(persist) + sinkTry match { + case Success(sink) if (sink.available) => Some(sink) case _ => None } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala index 79f0bb1..1cc3f3e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala @@ -28,7 +28,7 @@ import scala.concurrent.duration._ import scala.util.{Failure, Success} /** - * persist task runner, to persist metrics in block or non-block mode + * sink task runner, to sink metrics in block or non-block mode */ object SinkTaskRunner extends Loggable { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala index f945856..796c797 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala @@ -26,9 +26,10 @@ import org.apache.griffin.measure.step.transform.DataFrameOpsTransformStep case class DataFrameOpsDQStepBuilder() extends RuleParamStepBuilder { def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = { - val name = getStepName(ruleParam.getName) + val name = getStepName(ruleParam.getOutDfName()) + val inputDfName = getStepName(ruleParam.getInDfName()) val transformStep = DataFrameOpsTransformStep( - name, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache) + name, inputDfName, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache) transformStep +: buildDirectWriteSteps(ruleParam) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala index 7dd3c73..d3c0e41 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala @@ -36,14 +36,14 @@ case class GriffinDslDQStepBuilder(dataSourceNames: Seq[String], val parser = GriffinDslParser(dataSourceNames, filteredFunctionNames) def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = { - val name = getStepName(ruleParam.getName) + val name = getStepName(ruleParam.getOutDfName()) val rule = ruleParam.getRule val dqType = ruleParam.getDqType try { val result = parser.parseRule(rule, dqType) if (result.successful) { val expr = result.get - val expr2DQSteps = Expr2DQSteps(context, expr, ruleParam.replaceName(name)) + val expr2DQSteps = Expr2DQSteps(context, expr, ruleParam.replaceOutDfName(name)) expr2DQSteps.getDQSteps() } else { warn(s"parse rule [ ${rule} ] fails: \n${result}") http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala index f9f101e..176aed6 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala @@ -18,7 +18,7 @@ under the License. */ package org.apache.griffin.measure.step.builder -import org.apache.griffin.measure.configuration.enums.NormalizeType +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.dqdefinition.RuleParam import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep} @@ -41,18 +41,18 @@ trait RuleParamStepBuilder extends DQStepBuilder { def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] protected def buildDirectWriteSteps(ruleParam: RuleParam): Seq[DQStep] = { - val name = getStepName(ruleParam.getName) + val name = getStepName(ruleParam.getOutDfName()) // metric writer - val metricSteps = ruleParam.getMetricOpt.map { metric => - MetricWriteStep(metric.getNameOpt.getOrElse(name), name, NormalizeType(metric.collectType)) + val metricSteps = ruleParam.getOutputOpt(MetricOutputType).map { metric => + MetricWriteStep(metric.getNameOpt.getOrElse(name), name, FlattenType(metric.flatten)) }.toSeq // record writer - val recordSteps = ruleParam.getRecordOpt.map { record => + val recordSteps = ruleParam.getOutputOpt(RecordOutputType).map { record => RecordWriteStep(record.getNameOpt.getOrElse(name), name) }.toSeq // update writer - val dsCacheUpdateSteps = ruleParam.getDsCacheUpdateOpt.map { dsCacheUpdate => - DataSourceUpdateWriteStep(dsCacheUpdate.getDsNameOpt.getOrElse(""), name) + val dsCacheUpdateSteps = ruleParam.getOutputOpt(DscUpdateOutputType).map { dsCacheUpdate => + DataSourceUpdateWriteStep(dsCacheUpdate.getNameOpt.getOrElse(""), name) }.toSeq metricSteps ++ recordSteps ++ dsCacheUpdateSteps http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala index f66192c..b5dfd0c 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala @@ -26,7 +26,7 @@ import org.apache.griffin.measure.step.transform.SparkSqlTransformStep case class SparkSqlDQStepBuilder() extends RuleParamStepBuilder { def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = { - val name = getStepName(ruleParam.getName) + val name = getStepName(ruleParam.getOutDfName()) val transformStep = SparkSqlTransformStep( name, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache) transformStep +: buildDirectWriteSteps(ruleParam) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala index 27872c2..7c84d38 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala @@ -18,7 +18,7 @@ under the License. */ package org.apache.griffin.measure.step.builder.dsl.transform -import org.apache.griffin.measure.configuration.enums.{BatchProcessType, NormalizeType, StreamingProcessType} +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.dqdefinition.RuleParam import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.step.DQStep @@ -82,7 +82,7 @@ case class AccuracyExpr2DQSteps(context: DQContext, val missRecordsTransStep = SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true) val missRecordsWriteSteps = procType match { case BatchProcessType => { - val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(missRecordsTableName) + val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName) RecordWriteStep(rwName, missRecordsTableName) :: Nil } case StreamingProcessType => Nil @@ -90,7 +90,7 @@ case class AccuracyExpr2DQSteps(context: DQContext, val missRecordsUpdateWriteSteps = procType match { case BatchProcessType => Nil case StreamingProcessType => { - val dsName = ruleParam.getDsCacheUpdateOpt.flatMap(_.getDsNameOpt).getOrElse(sourceName) + val dsName = ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName) DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil } } @@ -114,7 +114,7 @@ case class AccuracyExpr2DQSteps(context: DQContext, val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap) // 4. accuracy metric - val accuracyTableName = ruleParam.name + val accuracyTableName = ruleParam.outDfName val matchedColName = details.getStringOrKey(_matched) val accuracyMetricSql = procType match { case BatchProcessType => { @@ -139,10 +139,10 @@ case class AccuracyExpr2DQSteps(context: DQContext, val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap) val accuracyMetricWriteSteps = procType match { case BatchProcessType => { - val metricOpt = ruleParam.getMetricOpt - val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name) - val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default) - MetricWriteStep(mwName, accuracyTableName, collectType) :: Nil + val metricOpt = ruleParam.getOutputOpt(MetricOutputType) + val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName) + val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default) + MetricWriteStep(mwName, accuracyTableName, flattenType) :: Nil } case StreamingProcessType => Nil } @@ -159,18 +159,17 @@ case class AccuracyExpr2DQSteps(context: DQContext, val accuracyMetricTableName = "__accuracy" val accuracyMetricRule = DataFrameOps._accuracy val accuracyMetricDetails = Map[String, Any]( - (AccuracyOprKeys._dfName -> accuracyTableName), (AccuracyOprKeys._miss -> missColName), (AccuracyOprKeys._total -> totalColName), (AccuracyOprKeys._matched -> matchedColName) ) val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName, - accuracyMetricRule, accuracyMetricDetails) + accuracyTableName, accuracyMetricRule, accuracyMetricDetails) val accuracyMetricWriteStep = { - val metricOpt = ruleParam.getMetricOpt - val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name) - val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default) - MetricWriteStep(mwName, accuracyMetricTableName, collectType) + val metricOpt = ruleParam.getOutputOpt(MetricOutputType) + val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName) + val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default) + MetricWriteStep(mwName, accuracyMetricTableName, flattenType) } // 6. collect accuracy records @@ -184,7 +183,7 @@ case class AccuracyExpr2DQSteps(context: DQContext, val accuracyRecordTransStep = SparkSqlTransformStep( accuracyRecordTableName, accuracyRecordSql, emptyMap) val accuracyRecordWriteStep = { - val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(missRecordsTableName) + val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName) RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName)) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala index 469b16e..347fabd 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala @@ -89,7 +89,7 @@ case class CompletenessExpr2DQSteps(context: DQContext, val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}" val incompleteRecordTransStep = SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true) val incompleteRecordWriteStep = { - val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(incompleteRecordsTableName) + val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(incompleteRecordsTableName) RecordWriteStep(rwName, incompleteRecordsTableName) } @@ -112,7 +112,7 @@ case class CompletenessExpr2DQSteps(context: DQContext, val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap) // 5. complete metric - val completeTableName = ruleParam.name + val completeTableName = ruleParam.outDfName val completeColName = details.getStringOrKey(_complete) val completeMetricSql = procType match { case BatchProcessType => { @@ -136,10 +136,10 @@ case class CompletenessExpr2DQSteps(context: DQContext, } val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap) val completeWriteStep = { - val metricOpt = ruleParam.getMetricOpt + val metricOpt = ruleParam.getOutputOpt(MetricOutputType) val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName) - val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default) - MetricWriteStep(mwName, completeTableName, collectType) + val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default) + MetricWriteStep(mwName, completeTableName, flattenType) } val transSteps = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala index 3390263..6c56a77 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala @@ -18,7 +18,7 @@ under the License. */ package org.apache.griffin.measure.step.builder.dsl.transform -import org.apache.griffin.measure.configuration.enums.{ArrayNormalizeType, EntriesNormalizeType, ProcessType, StreamingProcessType} +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.dqdefinition.RuleParam import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.step.DQStep @@ -111,7 +111,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext, } val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap) val totalMetricWriteStep = { - MetricWriteStep(totalColName, totalTableName, EntriesNormalizeType, writeTimestampOpt) + MetricWriteStep(totalColName, totalTableName, EntriesFlattenType, writeTimestampOpt) } // 3. group by self @@ -221,7 +221,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext, } val distTransStep = SparkSqlTransformStep(distTableName, distSql, emptyMap) val distMetricWriteStep = { - MetricWriteStep(distColName, distTableName, EntriesNormalizeType, writeTimestampOpt) + MetricWriteStep(distColName, distTableName, EntriesFlattenType, writeTimestampOpt) } val transSteps3 = distTransStep :: Nil @@ -271,7 +271,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext, } val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap) val dupItemsWriteStep = { - val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupItemsTableName) + val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupItemsTableName) RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt) } @@ -287,7 +287,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext, } val groupDupMetricTransStep = SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap) val groupDupMetricWriteStep = { - MetricWriteStep(duplicationArrayName, groupDupMetricTableName, ArrayNormalizeType, writeTimestampOpt) + MetricWriteStep(duplicationArrayName, groupDupMetricTableName, ArrayFlattenType, writeTimestampOpt) } val msteps = { @@ -317,7 +317,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext, } val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true) val dupRecordWriteStep = { - val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupRecordTableName) + val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName) RecordWriteStep(rwName, dupRecordTableName, None, writeTimestampOpt) } @@ -332,7 +332,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext, } val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap) val dupMetricWriteStep = { - MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayNormalizeType, writeTimestampOpt) + MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType, writeTimestampOpt) } val msteps = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala index 28fa96a..33d44c5 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala @@ -19,7 +19,7 @@ under the License. package org.apache.griffin.measure.step.builder.dsl.transform import org.apache.commons.lang.StringUtils -import org.apache.griffin.measure.configuration.enums.{BatchProcessType, NormalizeType, StreamingProcessType} +import org.apache.griffin.measure.configuration.enums.{BatchProcessType, FlattenType, MetricOutputType, StreamingProcessType} import org.apache.griffin.measure.configuration.dqdefinition.RuleParam import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.step.DQStep @@ -92,13 +92,13 @@ case class ProfilingExpr2DQSteps(context: DQContext, val profilingSql = { s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" } - val profilingName = ruleParam.name + val profilingName = ruleParam.outDfName val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details) val profilingMetricWriteStep = { - val metricOpt = ruleParam.getMetricOpt - val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name) - val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default) - MetricWriteStep(mwName, profilingName, collectType) + val metricOpt = ruleParam.getOutputOpt(MetricOutputType) + val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName) + val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default) + MetricWriteStep(mwName, profilingName, flattenType) } profilingTransStep :: profilingMetricWriteStep :: Nil } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala index e9eaa06..71e9f4b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala @@ -108,7 +108,7 @@ case class TimelinessExpr2DQSteps(context: DQContext, val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true) // 3. timeliness metric - val metricTableName = ruleParam.name + val metricTableName = ruleParam.outDfName val totalColName = details.getStringOrKey(_total) val avgColName = details.getStringOrKey(_avg) val metricSql = procType match { @@ -131,10 +131,10 @@ case class TimelinessExpr2DQSteps(context: DQContext, } val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap) val metricWriteStep = { - val metricOpt = ruleParam.getMetricOpt - val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name) - val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default) - MetricWriteStep(mwName, metricTableName, collectType) + val metricOpt = ruleParam.getOutputOpt(MetricOutputType) + val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName) + val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default) + MetricWriteStep(mwName, metricTableName, flattenType) } // current steps @@ -150,7 +150,7 @@ case class TimelinessExpr2DQSteps(context: DQContext, } val recordTransStep = SparkSqlTransformStep(recordTableName, recordSql, emptyMap) val recordWriteStep = { - val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(recordTableName) + val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(recordTableName) RecordWriteStep(rwName, recordTableName, None) } (recordTransStep :: Nil, recordWriteStep :: Nil) @@ -191,7 +191,7 @@ case class TimelinessExpr2DQSteps(context: DQContext, } val rangeMetricTransStep = SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap) val rangeMetricWriteStep = { - MetricWriteStep(stepColName, rangeMetricTableName, ArrayNormalizeType) + MetricWriteStep(stepColName, rangeMetricTableName, ArrayFlattenType) } (rangeTransStep :: rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil) @@ -216,7 +216,7 @@ case class TimelinessExpr2DQSteps(context: DQContext, } val percentileTransStep = SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap) val percentileWriteStep = { - MetricWriteStep(percentileTableName, percentileTableName, DefaultNormalizeType) + MetricWriteStep(percentileTableName, percentileTableName, DefaultFlattenType) } (percentileTransStep :: Nil, percentileWriteStep :: Nil) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala index 8827bf1..443239c 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala @@ -129,7 +129,7 @@ case class UniquenessExpr2DQSteps(context: DQContext, } } val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap) - val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, EntriesNormalizeType) + val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, EntriesFlattenType) // 6. unique record val uniqueRecordTableName = "__uniqueRecord" @@ -151,7 +151,7 @@ case class UniquenessExpr2DQSteps(context: DQContext, } } val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap) - val uniqueMetricWriteStep = MetricWriteStep(uniqueColName, uniqueTableName, EntriesNormalizeType) + val uniqueMetricWriteStep = MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType) val transSteps1 = sourceTransStep :: targetTransStep :: joinedTransStep :: groupTransStep :: totalTransStep :: uniqueRecordTransStep :: uniqueTransStep :: Nil @@ -166,7 +166,7 @@ case class UniquenessExpr2DQSteps(context: DQContext, } val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true) val dupRecordWriteStep = { - val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupRecordTableName) + val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName) RecordWriteStep(rwName, dupRecordTableName) } @@ -189,7 +189,7 @@ case class UniquenessExpr2DQSteps(context: DQContext, } val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap) val dupMetricWriteStep = { - MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayNormalizeType) + MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType) } (dupRecordTransStep :: dupMetricTransStep :: Nil, http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala new file mode 100644 index 0000000..eac3b2b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala @@ -0,0 +1,67 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.preproc + +import org.apache.griffin.measure.configuration.dqdefinition.RuleParam +import org.apache.griffin.measure.configuration.enums._ + +/** + * generate each entity pre-proc params by template defined in pre-proc param + */ +object PreProcParamMaker { + + case class StringAnyMap(values:Map[String,Any]) + + def makePreProcRules(rules: Seq[RuleParam], suffix: String, dfName: String): (Seq[RuleParam], String) = { + val len = rules.size + val (newRules, _) = rules.zipWithIndex.foldLeft((Nil: Seq[RuleParam], dfName)) { (ret, pair) => + val (rls, prevOutDfName) = ret + val (rule, i) = pair + val inName = rule.getInDfName(prevOutDfName) + val outName = if (i == len - 1) dfName else rule.getOutDfName(genNameWithIndex(dfName, i)) + val ruleWithNames = rule.replaceInOutDfName(inName, outName) + (rls :+ makeNewPreProcRule(ruleWithNames, suffix), outName) + } + (newRules, withSuffix(dfName, suffix)) + } + + private def makeNewPreProcRule(rule: RuleParam, suffix: String): RuleParam = { + val newInDfName = withSuffix(rule.getInDfName(), suffix) + val newOutDfName = withSuffix(rule.getOutDfName(), suffix) + val rpRule = rule.replaceInOutDfName(newInDfName, newOutDfName) + rule.getDslType match { + case DataFrameOpsType => rpRule + case _ => { + val newRule = replaceDfNameSuffix(rule.getRule, rule.getInDfName(), suffix) + rpRule.replaceRule(newRule) + } + } + } + + private def genNameWithIndex(name: String, i: Int): String = s"${name}${i}" + + private def replaceDfNameSuffix(str: String, dfName: String, suffix: String): String = { + val regexStr = s"(?i)${dfName}" + val replaceDfName = withSuffix(dfName, suffix) + str.replaceAll(regexStr, replaceDfName) + } + + def withSuffix(str: String, suffix: String): String = s"${str}_${suffix}" + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala deleted file mode 100644 index 64b8623..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.step.builder.preproc - -import org.apache.griffin.measure.configuration.dqdefinition.RuleParam - -/** - * generate rule params by template defined in pre-proc param - */ -object PreProcRuleParamGenerator { - - case class StringAnyMap(values:Map[String,Any]) - - val _name = "name" - - def getNewPreProcRules(rules: Seq[RuleParam], suffix: String): Seq[RuleParam] = { - rules.map { rule => - getNewPreProcRule(rule, suffix) - } - } - - private def getNewPreProcRule(param: RuleParam, suffix: String): RuleParam = { - val newName = genNewString(param.getName, suffix) - val newRule = genNewString(param.getRule, suffix) - val newDetails = getNewMap(param.getDetails, suffix) - param.replaceName(newName).replaceRule(newRule).replaceDetails(newDetails) - } - - private def getNewMap(details: Map[String, Any], suffix: String): Map[String, Any] = { - val keys = details.keys - keys.foldLeft(details) { (map, key) => - map.get(key) match { - case Some(s: String) => map + (key -> genNewString(s, suffix)) - case Some(subMap: StringAnyMap) => map + (key -> getNewMap(subMap.values, suffix)) - case Some(arr: Seq[_]) => map + (key -> getNewArr(arr, suffix)) - case _ => map - } - } - } - - private def getNewArr(paramArr: Seq[Any], suffix: String): Seq[Any] = { - paramArr.foldLeft(Nil: Seq[Any]) { (res, param) => - param match { - case s: String => res :+ genNewString(s, suffix) - case map: StringAnyMap => res :+ getNewMap(map.values, suffix) - case arr: Seq[_] => res :+ getNewArr(arr, suffix) - case _ => res :+ param - } - } - } - - private def genNewString(str: String, suffix: String): String = { - str.replaceAll("""\$\{(.*)\}""", s"$$1_${suffix}") - } - -}
