Repository: incubator-griffin Updated Branches: refs/heads/master 77aad4955 -> 060fc28b8
add validation of parameters Author: Lionel Liu <[email protected]> Author: dodobel <[email protected]> Closes #297 from bhlx3lyx7/spark2. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/060fc28b Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/060fc28b Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/060fc28b Branch: refs/heads/master Commit: 060fc28b81320ebb44604e967f7f47076384b2e0 Parents: 77aad49 Author: Lionel Liu <[email protected]> Authored: Wed Jun 13 15:28:06 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Wed Jun 13 15:28:06 2018 +0800 ---------------------------------------------------------------------- .../apache/griffin/measure/Application.scala | 23 +- .../configuration/enums/NormalizeType.scala | 3 +- .../measure/configuration/params/AllParam.scala | 36 --- .../measure/configuration/params/DQConfig.scala | 217 +++++++++++++++++++ .../measure/configuration/params/DQParam.scala | 183 ---------------- .../configuration/params/EnvConfig.scala | 112 ++++++++++ .../measure/configuration/params/EnvParam.scala | 97 --------- .../configuration/params/GriffinConfig.scala | 42 ++++ .../measure/configuration/params/Param.scala | 3 +- .../params/reader/ParamFileReader.scala | 2 +- .../params/reader/ParamJsonReader.scala | 2 +- .../params/reader/ParamReader.scala | 10 + .../validator/ParamValidator.scala | 38 ---- .../context/datasource/DataSourceFactory.scala | 8 +- .../cache/StreamingCacheClientFactory.scala | 8 +- .../datasource/connector/DataConnector.scala | 4 +- .../connector/DataConnectorFactory.scala | 8 +- .../batch/AvroBatchDataConnector.scala | 2 +- .../batch/HiveBatchDataConnector.scala | 2 +- .../batch/TextDirBatchDataConnector.scala | 2 +- .../streaming/KafkaStreamingDataConnector.scala | 2 +- .../streaming/info/InfoCacheFactory.scala | 4 +- .../measure/context/writer/PersistFactory.scala | 4 +- .../measure/job/builder/DQJobBuilder.scala | 8 +- .../apache/griffin/measure/launch/DQApp.scala | 6 +- .../measure/launch/batch/BatchDQApp.scala | 16 +- .../launch/streaming/StreamingDQApp.scala | 26 +-- .../measure/step/builder/DQStepBuilder.scala | 4 +- .../builder/DataSourceParamStepBuilder.scala | 2 +- .../step/builder/RuleParamStepBuilder.scala | 12 +- .../dsl/transform/AccuracyExpr2DQSteps.scala | 16 +- .../transform/CompletenessExpr2DQSteps.scala | 8 +- .../transform/DistinctnessExpr2DQSteps.scala | 4 +- .../dsl/transform/ProfilingExpr2DQSteps.scala | 5 +- .../dsl/transform/TimelinessExpr2DQSteps.scala | 7 +- .../dsl/transform/UniquenessExpr2DQSteps.scala | 2 +- .../preproc/PreProcRuleParamGenerator.scala | 6 +- 37 files changed, 475 insertions(+), 459 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/Application.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala index dbbe970..566e8df 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala @@ -20,9 +20,7 @@ package org.apache.griffin.measure import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.params.reader.ParamReaderFactory -import org.apache.griffin.measure.configuration.params.{AllParam, DQParam, EnvParam, Param} -import org.apache.griffin.measure.configuration.validator.ParamValidator -import org.apache.griffin.measure.context.writer.PersistTaskRunner +import org.apache.griffin.measure.configuration.params.{GriffinConfig, DQConfig, EnvConfig, Param} import org.apache.griffin.measure.launch.DQApp import org.apache.griffin.measure.launch.batch.BatchDQApp import org.apache.griffin.measure.launch.streaming.StreamingDQApp @@ -48,35 +46,24 @@ object Application extends Loggable { info(dqParamFile) // read param files - val envParam = readParamFile[EnvParam](envParamFile) match { + val envParam = readParamFile[EnvConfig](envParamFile) match { case Success(p) => p case Failure(ex) => { error(ex.getMessage) sys.exit(-2) } } - val dqParam = readParamFile[DQParam](dqParamFile) match { + val dqParam = readParamFile[DQConfig](dqParamFile) match { case Success(p) => p case Failure(ex) => { error(ex.getMessage) sys.exit(-2) } } - val allParam: AllParam = AllParam(envParam, dqParam) - - // validate param files - ParamValidator.validate(allParam) match { - case Failure(ex) => { - error(ex.getMessage) - sys.exit(-3) - } - case _ => { - info("params validation pass") - } - } + val allParam: GriffinConfig = GriffinConfig(envParam, dqParam) // choose process - val procType = ProcessType(allParam.dqParam.procType) + val procType = ProcessType(allParam.getDqConfig.procType) val dqApp: DQApp = procType match { case BatchProcessType => BatchDQApp(allParam) case StreamingProcessType => StreamingDQApp(allParam) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala index b353cbc..61bf27c 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/NormalizeType.scala @@ -30,11 +30,12 @@ sealed trait NormalizeType { object NormalizeType { private val normalizeTypes: List[NormalizeType] = List(DefaultNormalizeType, EntriesNormalizeType, ArrayNormalizeType, MapNormalizeType) + val default = DefaultNormalizeType def apply(ptn: String): NormalizeType = { normalizeTypes.find(tp => ptn match { case tp.idPattern() => true case _ => false - }).getOrElse(DefaultNormalizeType) + }).getOrElse(default) } def unapply(pt: NormalizeType): Option[String] = Some(pt.desc) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala deleted file mode 100644 index 4ba1a15..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/AllParam.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.configuration.params - -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import com.fasterxml.jackson.annotation.JsonInclude.Include - -/** - * full set of griffin configuration - * @param envParam environment configuration (must) - * @param dqParam dq measurement configuration (must) - */ -@JsonInclude(Include.NON_NULL) -case class AllParam( @JsonProperty("env") envParam: EnvParam, - @JsonProperty("dq") dqParam: DQParam - ) extends Param { - override def validate(): Boolean = { - envParam != null && dqParam != null - } -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQConfig.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQConfig.scala new file mode 100644 index 0000000..d07ab51 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQConfig.scala @@ -0,0 +1,217 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.params + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import org.apache.commons.lang.StringUtils +import org.apache.griffin.measure.configuration.enums._ + +/** + * dq param + * @param name name of dq measurement (must) + * @param timestamp default timestamp of measure in batch mode (optional) + * @param procType batch mode or streaming mode (must) + * @param dataSources data sources (must) + * @param evaluateRule dq measurement (must) + */ +@JsonInclude(Include.NON_NULL) +case class DQConfig(@JsonProperty("name") name: String, + @JsonProperty("timestamp") timestamp: Long, + @JsonProperty("process.type") procType: String, + @JsonProperty("data.sources") dataSources: List[DataSourceParam], + @JsonProperty("evaluate.rule") evaluateRule: EvaluateRuleParam + ) extends Param { + def getName: String = name + def getTimestamp: Long = timestamp + def getProcType: String = procType + def getDataSources: Seq[DataSourceParam] = { + dataSources.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, ds) => + val (seq, names) = ret + if (!names.contains(ds.getName)){ + (seq :+ ds, names + ds.getName) + } else ret + }._1 + } + def getEvaluateRule: EvaluateRuleParam = evaluateRule + + def validate(): Unit = { + assert(StringUtils.isNotBlank(name), "dq config name should not be blank") + assert(StringUtils.isNotBlank(procType), "process.type should not be blank") + assert((dataSources != null), "data.sources should not be null") + assert((evaluateRule != null), "evaluate.rule should not be null") + getDataSources.foreach(_.validate) + evaluateRule.validate + } +} + +/** + * data source param + * @param name data source name (must) + * @param connectors data connectors (optional) + * @param cache data source cache configuration (must in streaming mode with streaming connectors) + */ +@JsonInclude(Include.NON_NULL) +case class DataSourceParam( @JsonProperty("name") name: String, + @JsonProperty("connectors") connectors: List[DataConnectorParam], + @JsonProperty("cache") cache: Map[String, Any] + ) extends Param { + def getName: String = name + def getConnectors: Seq[DataConnectorParam] = if (connectors != null) connectors else Nil + def getCacheOpt: Option[Map[String, Any]] = if (cache != null) Some(cache) else None + + def validate(): Unit = { + assert(StringUtils.isNotBlank(name), "data source name should not be empty") + getConnectors.foreach(_.validate) + } +} + +/** + * data connector param + * @param conType data connector type, e.g.: hive, avro, kafka (must) + * @param version data connector type version (optional) + * @param config detail configuration of data connector (must) + * @param preProc pre-process rules after load data (optional) + */ +@JsonInclude(Include.NON_NULL) +case class DataConnectorParam( @JsonProperty("type") conType: String, + @JsonProperty("version") version: String, + @JsonProperty("config") config: Map[String, Any], + @JsonProperty("pre.proc") preProc: List[RuleParam] + ) extends Param { + def getType: String = conType + def getVersion: String = version + def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]() + def getPreProcRules: Seq[RuleParam] = if (preProc != null) preProc else Nil + + def validate(): Unit = { + assert(StringUtils.isNotBlank(conType), "data connector type should not be empty") + getPreProcRules.foreach(_.validate) + } +} + +/** + * evaluate rule param + * @param rules rules to define dq measurement (optional) + */ +@JsonInclude(Include.NON_NULL) +case class EvaluateRuleParam( @JsonProperty("rules") rules: List[RuleParam] + ) extends Param { + def getRules: Seq[RuleParam] = if (rules != null) rules else Nil + + def validate(): Unit = { + getRules.foreach(_.validate) + } +} + +/** + * rule param + * @param dslType dsl type of this rule (must) + * @param dqType dq type of this rule (must if dsl type is "griffin-dsl") + * @param name name of result calculated by this rule (must if for later usage) + * @param rule rule to define dq step calculation (must) + * @param details detail config of rule (optional) + * @param cache cache the result for multiple usage (optional, valid for "spark-sql" and "df-opr" mode) + * @param metric config for metric output (optional) + * @param record config for record output (optional) + * @param dsCacheUpdate config for data source cache update output (optional, valid in streaming mode) + */ +@JsonInclude(Include.NON_NULL) +case class RuleParam( @JsonProperty("dsl.type") dslType: String, + @JsonProperty("dq.type") dqType: String, + @JsonProperty("name") name: String, + @JsonProperty("rule") rule: String, + @JsonProperty("details") details: Map[String, Any], + @JsonProperty("cache") cache: Boolean, + @JsonProperty("metric") metric: RuleMetricParam, + @JsonProperty("record") record: RuleRecordParam, + @JsonProperty("ds.cache.update") dsCacheUpdate: RuleDsCacheUpdateParam + ) extends Param { + def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("") + def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("") + def getCache: Boolean = if (cache != null) cache else false + + def getName: String = if (name != null) name else "" + def getRule: String = if (rule != null) rule else "" + def getDetails: Map[String, Any] = if (details != null) details else Map[String, Any]() + + def getMetricOpt: Option[RuleMetricParam] = if (metric != null) Some(metric) else None + def getRecordOpt: Option[RuleRecordParam] = if (record != null) Some(record) else None + def getDsCacheUpdateOpt: Option[RuleDsCacheUpdateParam] = if (dsCacheUpdate != null) Some(dsCacheUpdate) else None + + def replaceName(newName: String): RuleParam = { + if (StringUtils.equals(newName, name)) this + else RuleParam(dslType, dqType, newName, rule, details, cache, metric, record, dsCacheUpdate) + } + def replaceRule(newRule: String): RuleParam = { + if (StringUtils.equals(newRule, rule)) this + else RuleParam(dslType, dqType, name, newRule, details, cache, metric, record, dsCacheUpdate) + } + def replaceDetails(newDetails: Map[String, Any]): RuleParam = { + RuleParam(dslType, dqType, name, rule, newDetails, cache, metric, record, dsCacheUpdate) + } + + def validate(): Unit = { + assert(!(getDslType.equals(GriffinDslType) && getDqType.equals(UnknownType)), + "unknown dq type for griffin dsl") + + getMetricOpt.foreach(_.validate) + getRecordOpt.foreach(_.validate) + getDsCacheUpdateOpt.foreach(_.validate) + } +} + +/** + * metric param of rule + * @param name name of metric to output (optional) + * @param collectType the normalize strategy to collect metric (optional) + */ +@JsonInclude(Include.NON_NULL) +case class RuleMetricParam( @JsonProperty("name") name: String, + @JsonProperty("collect.type") collectType: String + ) extends Param { + def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name)) Some(name) else None + def getCollectType: NormalizeType = if (StringUtils.isNotBlank(collectType)) NormalizeType(collectType) else NormalizeType("") + + def validate(): Unit = {} +} + +/** + * record param of rule + * @param name name of record to output (optional) + */ +@JsonInclude(Include.NON_NULL) +case class RuleRecordParam( @JsonProperty("name") name: String + ) extends Param { + def getNameOpt: Option[String] = if (StringUtils.isNotBlank(name)) Some(name) else None + + def validate(): Unit = {} +} + +/** + * data source cache update param of rule + * @param dsName name of data source to be updated by thie rule result (must) + */ +@JsonInclude(Include.NON_NULL) +case class RuleDsCacheUpdateParam( @JsonProperty("ds.name") dsName: String + ) extends Param { + def getDsNameOpt: Option[String] = if (StringUtils.isNotBlank(dsName)) Some(dsName) else None + + def validate(): Unit = {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala deleted file mode 100644 index 8d3c354..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/DQParam.scala +++ /dev/null @@ -1,183 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.configuration.params - -import com.fasterxml.jackson.annotation.JsonInclude.Include -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import org.apache.commons.lang.StringUtils -import org.apache.griffin.measure.configuration.enums.{DqType, DslType} - -/** - * dq param - * @param name name of dq measurement (must) - * @param timestamp default timestamp of measure in batch mode (optional) - * @param procType batch mode or streaming mode (must) - * @param dataSourceParams data sources (optional) - * @param evaluateRuleParam dq measurement (optional) - */ -@JsonInclude(Include.NON_NULL) -case class DQParam( @JsonProperty("name") name: String, - @JsonProperty("timestamp") timestamp: Long, - @JsonProperty("process.type") procType: String, - @JsonProperty("data.sources") dataSourceParams: List[DataSourceParam], - @JsonProperty("evaluate.rule") evaluateRuleParam: EvaluateRuleParam - ) extends Param { - val dataSources = { - val (validDsParams, _) = dataSourceParams.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, dsParam) => - val (seq, names) = ret - if (dsParam.hasName && !names.contains(dsParam.name)) { - (seq :+ dsParam, names + dsParam.name) - } else ret - } - validDsParams - } - val evaluateRule: EvaluateRuleParam = { - if (evaluateRuleParam != null) evaluateRuleParam - else EvaluateRuleParam("", Nil) - } - - override def validate(): Boolean = { - dataSources.nonEmpty - } -} - -/** - * data source param - * @param name data source name (must) - * @param connectors data connectors (optional) - * @param cache data source cache configuration (must in streaming mode with streaming connectors) - */ -@JsonInclude(Include.NON_NULL) -case class DataSourceParam( @JsonProperty("name") name: String, - @JsonProperty("connectors") connectors: List[DataConnectorParam], - @JsonProperty("cache") cache: Map[String, Any] - ) extends Param { - def hasName: Boolean = StringUtils.isNotBlank(name) - def getConnectors: List[DataConnectorParam] = if (connectors != null) connectors else Nil - def hasCache: Boolean = (cache != null) - - override def validate(): Boolean = hasName -} - -/** - * data connector param - * @param conType data connector type, e.g.: hive, avro, kafka (must) - * @param version data connector type version (optional) - * @param config detail configuration of data connector (must) - * @param preProc pre-process rules after load data (optional) - */ -@JsonInclude(Include.NON_NULL) -case class DataConnectorParam( @JsonProperty("type") conType: String, - @JsonProperty("version") version: String, - @JsonProperty("config") config: Map[String, Any], - @JsonProperty("pre.proc") preProc: List[RuleParam] - ) extends Param { - override def validate(): Boolean = { - StringUtils.isNotBlank(conType) - } -} - -/** - * evaluate rule param - * @param dslType default dsl type for all rules (optional) - * @param rules rules to define dq measurement (optional) - */ -@JsonInclude(Include.NON_NULL) -case class EvaluateRuleParam( @JsonProperty("dsl.type") dslType: String, - @JsonProperty("rules") rules: List[RuleParam] - ) extends Param { - def getDslType: DslType = if (dslType != null) DslType(dslType) else DslType("") - def getRules: List[RuleParam] = if (rules != null) rules else Nil -} - -/** - * rule param - * @param dslType dsl type of this rule (must if default dsl type not set) - * @param dqType dq type of this rule (valid for "griffin-dsl") - * @param name name of result calculated by this rule (must if for later usage) - * @param rule rule to define dq step calculation (must) - * @param details detail config of rule (optional) - * @param cache cache the result for multiple usage (optional, valid for "spark-sql" and "df-opr" mode) - * @param metric config for metric output (optional) - * @param record config for record output (optional) - * @param dsCacheUpdate config for data source cache update output (optional, valid in streaming mode) - */ -@JsonInclude(Include.NON_NULL) -case class RuleParam( @JsonProperty("dsl.type") dslType: String, - @JsonProperty("dq.type") dqType: String, - @JsonProperty("name") name: String, - @JsonProperty("rule") rule: String, - @JsonProperty("details") details: Map[String, Any], - @JsonProperty("cache") cache: Boolean, - @JsonProperty("metric") metric: RuleMetricParam, - @JsonProperty("record") record: RuleRecordParam, - @JsonProperty("ds.cache.update") dsCacheUpdate: RuleDsCacheUpdateParam - ) extends Param { - def getDslType(defaultDslType: DslType): DslType = if (dslType != null) DslType(dslType) else defaultDslType - def getDqType: DqType = if (dqType != null) DqType(dqType) else DqType("") - def getName: String = if (name != null) name else "" - def getRule: String = if (rule != null) rule else "" - def getDetails: Map[String, Any] = if (details != null) details else Map[String, Any]() - def getCache: Boolean = if (cache != null) cache else false - - def metricOpt: Option[RuleMetricParam] = if (metric != null) Some(metric) else None - def recordOpt: Option[RuleRecordParam] = if (record != null) Some(record) else None - def dsCacheUpdateOpt: Option[RuleDsCacheUpdateParam] = if (dsCacheUpdate != null) Some(dsCacheUpdate) else None - - def replaceName(newName: String): RuleParam = { - if (StringUtils.equals(newName, name)) this - else RuleParam(dslType, dqType, newName, rule, details, cache, metric, record, dsCacheUpdate) - } - def replaceRule(newRule: String): RuleParam = { - if (StringUtils.equals(newRule, rule)) this - else RuleParam(dslType, dqType, name, newRule, details, cache, metric, record, dsCacheUpdate) - } - def replaceDetails(newDetails: Map[String, Any]): RuleParam = { - RuleParam(dslType, dqType, name, rule, newDetails, cache, metric, record, dsCacheUpdate) - } -} - -/** - * metric param of rule - * @param name name of metric to output (optional) - * @param collectType the normalize strategy to collect metric (optional) - */ -@JsonInclude(Include.NON_NULL) -case class RuleMetricParam( @JsonProperty("name") name: String, - @JsonProperty("collect.type") collectType: String - ) extends Param { -} - -/** - * record param of rule - * @param name name of record to output (optional) - */ -@JsonInclude(Include.NON_NULL) -case class RuleRecordParam( @JsonProperty("name") name: String - ) extends Param { -} - -/** - * data source cache update param of rule - * @param dsName name of data source to be updated by thie rule result (must) - */ -@JsonInclude(Include.NON_NULL) -case class RuleDsCacheUpdateParam( @JsonProperty("ds.name") dsName: String - ) extends Param { -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala new file mode 100644 index 0000000..bc6d50f --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala @@ -0,0 +1,112 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.params + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import org.apache.commons.lang.StringUtils +import org.apache.griffin.measure.utils.TimeUtil + +/** + * environment param + * @param sparkParam config of spark environment (must) + * @param persistParams config of persist ways (optional) + * @param infoCacheParams config of information cache ways (required in streaming mode) + */ +@JsonInclude(Include.NON_NULL) +case class EnvConfig(@JsonProperty("spark") sparkParam: SparkParam, + @JsonProperty("persist") persistParams: List[PersistParam], + @JsonProperty("info.cache") infoCacheParams: List[InfoCacheParam] + ) extends Param { + def getSparkParam: SparkParam = sparkParam + def getPersistParams: Seq[PersistParam] = if (persistParams != null) persistParams else Nil + def getInfoCacheParams: Seq[InfoCacheParam] = if (infoCacheParams != null) infoCacheParams else Nil + + def validate(): Unit = { + assert((sparkParam != null), "spark param should not be null") + sparkParam.validate + getPersistParams.foreach(_.validate) + getInfoCacheParams.foreach(_.validate) + } +} + +/** + * spark param + * @param logLevel log level of spark application (optional) + * @param cpDir checkpoint directory for spark streaming (required in streaming mode) + * @param batchInterval batch interval for spark streaming (required in streaming mode) + * @param processInterval process interval for streaming dq calculation (required in streaming mode) + * @param config extra config for spark environment (optional) + * @param initClear clear checkpoint directory or not when initial (optional) + */ +@JsonInclude(Include.NON_NULL) +case class SparkParam( @JsonProperty("log.level") logLevel: String, + @JsonProperty("checkpoint.dir") cpDir: String, + @JsonProperty("batch.interval") batchInterval: String, + @JsonProperty("process.interval") processInterval: String, + @JsonProperty("config") config: Map[String, String], + @JsonProperty("init.clear") initClear: Boolean + ) extends Param { + def getLogLevel: String = if (logLevel != null) logLevel else "WARN" + def getCpDir: String = if (cpDir != null) cpDir else "" + def getBatchInterval: String = if (batchInterval != null) batchInterval else "" + def getProcessInterval: String = if (processInterval != null) processInterval else "" + def getConfig: Map[String, String] = if (config != null) config else Map[String, String]() + def needInitClear: Boolean = if (initClear != null) initClear else false + + def validate(): Unit = { +// assert(StringUtils.isNotBlank(cpDir), "checkpoint.dir should not be empty") +// assert(TimeUtil.milliseconds(getBatchInterval).nonEmpty, "batch.interval should be valid time string") +// assert(TimeUtil.milliseconds(getProcessInterval).nonEmpty, "process.interval should be valid time string") + } +} + +/** + * persist param + * @param persistType persist type, e.g.: log, hdfs, http, mongo (must) + * @param config config of persist way (must) + */ +@JsonInclude(Include.NON_NULL) +case class PersistParam( @JsonProperty("type") persistType: String, + @JsonProperty("config") config: Map[String, Any] + ) extends Param { + def getType: String = persistType + def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]() + + def validate(): Unit = { + assert(StringUtils.isNotBlank(persistType), "persist type should not be empty") + } +} + +/** + * info cache param + * @param cacheType information cache type, e.g.: zookeeper (must) + * @param config config of cache way + */ +@JsonInclude(Include.NON_NULL) +case class InfoCacheParam( @JsonProperty("type") cacheType: String, + @JsonProperty("config") config: Map[String, Any] + ) extends Param { + def getType: String = cacheType + def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]() + + def validate(): Unit = { + assert(StringUtils.isNotBlank(cacheType), "info cache type should not be empty") + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala deleted file mode 100644 index 5ee0610..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvParam.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.configuration.params - -import com.fasterxml.jackson.annotation.JsonInclude.Include -import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} -import org.apache.commons.lang.StringUtils - -/** - * environment param - * @param sparkParam config of spark environment (must) - * @param persistParams config of persist ways (optional) - * @param infoCacheParams config of information cache ways (must in streaming mode) - * @param cleanerParam config of cleaner (optional) - */ -@JsonInclude(Include.NON_NULL) -case class EnvParam( @JsonProperty("spark") sparkParam: SparkParam, - @JsonProperty("persist") persistParams: List[PersistParam], - @JsonProperty("info.cache") infoCacheParams: List[InfoCacheParam], - @JsonProperty("cleaner") cleanerParam: CleanerParam - ) extends Param { -} - -/** - * spark param - * @param logLevel log level of spark application (optional) - * @param cpDir checkpoint directory for spark streaming (must in streaming mode) - * @param batchInterval batch interval for spark streaming (must in streaming mode) - * @param processInterval process interval for streaming dq calculation (must in streaming mode) - * @param config extra config for spark environment (optional) - * @param initClear clear checkpoint directory or not when initial (optional) - */ -@JsonInclude(Include.NON_NULL) -case class SparkParam( @JsonProperty("log.level") logLevel: String, - @JsonProperty("checkpoint.dir") cpDir: String, - @JsonProperty("batch.interval") batchInterval: String, - @JsonProperty("process.interval") processInterval: String, - @JsonProperty("config") config: Map[String, String], - @JsonProperty("init.clear") initClear: Boolean - ) extends Param { - def getLogLevel: String = if (logLevel != null) logLevel else "WARN" - def needInitClear: Boolean = if (initClear != null) initClear else false -} - -/** - * persist param - * @param persistType persist type, e.g.: log, hdfs, http, mongo (must) - * @param config config of persist way (must) - */ -@JsonInclude(Include.NON_NULL) -case class PersistParam( @JsonProperty("type") persistType: String, - @JsonProperty("config") config: Map[String, Any] - ) extends Param { - override def validate(): Boolean = { - StringUtils.isNotBlank(persistType) - } -} - -/** - * info cache param - * @param cacheType information cache type, e.g.: zookeeper (must) - * @param config config of cache way - */ -@JsonInclude(Include.NON_NULL) -case class InfoCacheParam( @JsonProperty("type") cacheType: String, - @JsonProperty("config") config: Map[String, Any] - ) extends Param { - override def validate(): Boolean = { - StringUtils.isNotBlank(cacheType) - } -} - -/** - * cleaner param, invalid at current - * @param cleanInterval clean interval (optional) - */ -@JsonInclude(Include.NON_NULL) -case class CleanerParam( @JsonProperty("clean.interval") cleanInterval: String - ) extends Param { - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/GriffinConfig.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/GriffinConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/GriffinConfig.scala new file mode 100644 index 0000000..8debe48 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/GriffinConfig.scala @@ -0,0 +1,42 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.configuration.params + +import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include + +/** + * full set of griffin configuration + * @param envConfig environment configuration (must) + * @param dqConfig dq measurement configuration (must) + */ +@JsonInclude(Include.NON_NULL) +case class GriffinConfig(@JsonProperty("env") envConfig: EnvConfig, + @JsonProperty("dq") dqConfig: DQConfig + ) extends Param { + def getEnvConfig: EnvConfig = envConfig + def getDqConfig: DQConfig = dqConfig + + def validate(): Unit = { + assert((envConfig != null), "environment config should not be null") + assert((dqConfig != null), "dq config should not be null") + envConfig.validate + dqConfig.validate + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala index 87ad246..6116cdf 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/Param.scala @@ -22,8 +22,7 @@ trait Param extends Serializable { /** * validate param internally - * @return */ - def validate(): Boolean = true + def validate(): Unit } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala index 0ff5b06..a528127 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamFileReader.scala @@ -36,7 +36,7 @@ case class ParamFileReader(filePath: String) extends ParamReader { val source = HdfsUtil.openFile(filePath) val param = JsonUtil.fromJson[T](source) source.close - param + validate(param) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala index 7a7eaf6..91ffa9a 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamJsonReader.scala @@ -32,7 +32,7 @@ case class ParamJsonReader(jsonString: String) extends ParamReader { def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] = { Try { val param = JsonUtil.fromJson[T](jsonString) - param + validate(param) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala index 77134ea..21ebccd 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/reader/ParamReader.scala @@ -32,4 +32,14 @@ trait ParamReader extends Loggable with Serializable { */ def readConfig[T <: Param](implicit m : Manifest[T]): Try[T] + /** + * validate config param + * @param param param to be validated + * @return param itself + */ + protected def validate[T <: Param](param: T): T = { + param.validate() + param + } + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala deleted file mode 100644 index 4c9a4d6..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/validator/ParamValidator.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.configuration.validator - -import org.apache.griffin.measure.Loggable -import org.apache.griffin.measure.configuration.params._ - -import scala.util.Try - -object ParamValidator extends Loggable with Serializable { - - /** - * validate param - * @param param param to be validated - * @tparam T type of param - * @return param valid or not - */ - def validate[T <: Param](param: T): Try[Boolean] = Try { - param.validate - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala index 95c8de7..edd88b6 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala @@ -32,9 +32,9 @@ object DataSourceFactory extends Loggable { def getDataSources(sparkSession: SparkSession, ssc: StreamingContext, - dataSourceParams: Seq[DataSourceParam] + dataSources: Seq[DataSourceParam] ): Seq[DataSource] = { - dataSourceParams.zipWithIndex.flatMap { pair => + dataSources.zipWithIndex.flatMap { pair => val (param, index) = pair getDataSource(sparkSession, ssc, param, index) } @@ -45,13 +45,13 @@ object DataSourceFactory extends Loggable { dataSourceParam: DataSourceParam, index: Int ): Option[DataSource] = { - val name = dataSourceParam.name + val name = dataSourceParam.getName val connectorParams = dataSourceParam.getConnectors val tmstCache = TmstCache() // for streaming data cache val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt( - sparkSession.sqlContext, dataSourceParam.cache, name, index, tmstCache) + sparkSession.sqlContext, dataSourceParam.getCacheOpt, name, index, tmstCache) val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam => DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam, http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala index 529b07a..fd9d231 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala @@ -37,16 +37,16 @@ object StreamingCacheClientFactory extends Loggable { /** * create streaming cache client * @param sqlContext sqlContext in spark environment - * @param param data source cache config + * @param cacheOpt data source cache config option * @param name data source name * @param index data source index * @param tmstCache the same tmstCache instance inside a data source * @return streaming cache client option */ - def getClientOpt(sqlContext: SQLContext, param: Map[String, Any], + def getClientOpt(sqlContext: SQLContext, cacheOpt: Option[Map[String, Any]], name: String, index: Int, tmstCache: TmstCache ): Option[StreamingCacheClient] = { - if (param != null) { + cacheOpt.flatMap { param => try { val tp = param.getString(_type, "") val dsCache = tp match { @@ -62,7 +62,7 @@ object StreamingCacheClientFactory extends Loggable { None } } - } else None + } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala index 6dc17d1..a4c1995 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala @@ -65,14 +65,14 @@ trait DataConnector extends Loggable with Serializable { saveTmst(timestamp) // save timestamp dfOpt.flatMap { df => - val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.preProc, suffix) + val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.getPreProcRules, suffix) // init data context.compileTableRegister.registerTable(thisTable) context.runTimeTableRegister.registerTable(thisTable, df) // build job - val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules, SparkSqlType) + val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules) // job execute preprocJob.execute(context) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala index 30352a9..4538fbb 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala @@ -53,8 +53,8 @@ object DataConnectorFactory extends Loggable { tmstCache: TmstCache, streamingCacheClientOpt: Option[StreamingCacheClient] ): Try[DataConnector] = { - val conType = dcParam.conType - val version = dcParam.version + val conType = dcParam.getType + val version = dcParam.getVersion Try { conType match { case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache) @@ -75,8 +75,8 @@ object DataConnectorFactory extends Loggable { streamingCacheClientOpt: Option[StreamingCacheClient] ): StreamingDataConnector = { if (ssc == null) throw new Exception("streaming context is null!") - val conType = dcParam.conType - val version = dcParam.version + val conType = dcParam.getType + val version = dcParam.getVersion conType match { case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case _ => throw new Exception("streaming connector creation error!") http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala index 1ee6e78..a906246 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala @@ -33,7 +33,7 @@ case class AvroBatchDataConnector(@transient sparkSession: SparkSession, tmstCache: TmstCache ) extends BatchDataConnector { - val config = dcParam.config + val config = dcParam.getConfig val FilePath = "file.path" val FileName = "file.name" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala index 85cd120..331f469 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala @@ -32,7 +32,7 @@ case class HiveBatchDataConnector(@transient sparkSession: SparkSession, tmstCache: TmstCache ) extends BatchDataConnector { - val config = dcParam.config + val config = dcParam.getConfig val Database = "database" val TableName = "table.name" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala index ca5b7b5..bc76f9d 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala @@ -33,7 +33,7 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession, tmstCache: TmstCache ) extends BatchDataConnector { - val config = dcParam.config + val config = dcParam.getConfig val DirPath = "dir.path" val DataDirDepth = "data.dir.depth" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala index de2822b..0f30b7f 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala @@ -33,7 +33,7 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector { type VD <: Decoder[V] type OUT = (K, V) - val config = dcParam.config + val config = dcParam.getConfig val KafkaConfig = "kafka.config" val Topics = "topics" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala index 85106b4..28ade3b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala @@ -27,8 +27,8 @@ case class InfoCacheFactory(infoCacheParams: Iterable[InfoCacheParam], metricNam val ZK_REGEX = """^(?i)zk|zookeeper$""".r def getInfoCache(infoCacheParam: InfoCacheParam): Option[InfoCache] = { - val config = infoCacheParam.config - val infoCacheTry = infoCacheParam.cacheType match { + val config = infoCacheParam.getConfig + val infoCacheTry = infoCacheParam.getType match { case ZK_REGEX() => Try(ZKInfoCache(config, metricName)) case _ => throw new Exception("not supported info cache type") } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala index 9314876..0a2649e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/writer/PersistFactory.scala @@ -41,8 +41,8 @@ case class PersistFactory(persistParams: Iterable[PersistParam], metricName: Str } private def getPersist(timeStamp: Long, persistParam: PersistParam, block: Boolean): Option[Persist] = { - val config = persistParam.config - val persistTry = persistParam.persistType match { + val config = persistParam.getConfig + val persistTry = persistParam.getType match { case LOG_REGEX() => Try(LoggerPersist(config, metricName, timeStamp)) case HDFS_REGEX() => Try(HdfsPersist(config, metricName, timeStamp)) case HTTP_REGEX() => Try(HttpPersist(config, metricName, timeStamp, block)) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala index a8e5b26..074a74e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala @@ -38,26 +38,24 @@ object DQJobBuilder { * @return dq job */ def buildDQJob(context: DQContext, evaluateRuleParam: EvaluateRuleParam): DQJob = { - val defaultDslType = evaluateRuleParam.getDslType val ruleParams = evaluateRuleParam.getRules - buildDQJob(context, ruleParams, defaultDslType) + buildDQJob(context, ruleParams) } /** * build dq job with rules in evaluate rule param or pre-proc param * @param context dq context * @param ruleParams rule params - * @param defaultDslType default dsl type in evaluate rule param * @return dq job */ - def buildDQJob(context: DQContext, ruleParams: Seq[RuleParam], defaultDslType: DslType): DQJob = { + def buildDQJob(context: DQContext, ruleParams: Seq[RuleParam]): DQJob = { // build steps by datasources val dsSteps = context.dataSources.flatMap { dataSource => DQStepBuilder.buildStepOptByDataSourceParam(context, dataSource.dsParam) } // build steps by rules val ruleSteps = ruleParams.flatMap { ruleParam => - DQStepBuilder.buildStepOptByRuleParam(context, ruleParam, defaultDslType) + DQStepBuilder.buildStepOptByRuleParam(context, ruleParam) } // metric flush step val metricFlushStep = MetricFlushStep() http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala index 9ec1641..79cef33 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala @@ -19,7 +19,7 @@ under the License. package org.apache.griffin.measure.launch import org.apache.griffin.measure.Loggable -import org.apache.griffin.measure.configuration.params.{DQParam, EnvParam} +import org.apache.griffin.measure.configuration.params.{DQConfig, EnvConfig} import scala.util.Try @@ -28,8 +28,8 @@ import scala.util.Try */ trait DQApp extends Loggable with Serializable { - val envParam: EnvParam - val dqParam: DQParam + val envParam: EnvConfig + val dqParam: DQConfig def init: Try[_] http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala index 1aa5039..06892a3 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala @@ -32,15 +32,15 @@ import org.apache.spark.sql.{SQLContext, SparkSession} import scala.util.Try -case class BatchDQApp(allParam: AllParam) extends DQApp { +case class BatchDQApp(allParam: GriffinConfig) extends DQApp { - val envParam: EnvParam = allParam.envParam - val dqParam: DQParam = allParam.dqParam + val envParam: EnvConfig = allParam.getEnvConfig + val dqParam: DQConfig = allParam.getDqConfig val sparkParam = envParam.sparkParam val metricName = dqParam.name - val dataSourceParams = dqParam.dataSources - val dataSourceNames = dataSourceParams.map(_.name) +// val dataSourceParams = dqParam.dataSources +// val dataSourceNames = dataSourceParams.map(_.name) val persistParams = envParam.persistParams var sqlContext: SQLContext = _ @@ -52,10 +52,10 @@ case class BatchDQApp(allParam: AllParam) extends DQApp { def init: Try[_] = Try { // build spark 2.0+ application context val conf = new SparkConf().setAppName(metricName) - conf.setAll(sparkParam.config) + conf.setAll(sparkParam.getConfig) conf.set("spark.sql.crossJoin.enabled", "true") sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() - sparkSession.sparkContext.setLogLevel(sparkParam.logLevel) + sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel) sqlContext = sparkSession.sqlContext // register udf @@ -70,7 +70,7 @@ case class BatchDQApp(allParam: AllParam) extends DQApp { val contextId = ContextId(measureTime) // get data sources - val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.dataSources) + val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.getDataSources) dataSources.foreach(_.init) // create dq context http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala index d89b7e8..f990c8e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala @@ -38,15 +38,15 @@ import org.apache.spark.streaming.{Milliseconds, StreamingContext} import scala.util.Try -case class StreamingDQApp(allParam: AllParam) extends DQApp { +case class StreamingDQApp(allParam: GriffinConfig) extends DQApp { - val envParam: EnvParam = allParam.envParam - val dqParam: DQParam = allParam.dqParam + val envParam: EnvConfig = allParam.getEnvConfig + val dqParam: DQConfig = allParam.getDqConfig val sparkParam = envParam.sparkParam val metricName = dqParam.name - val dataSourceParams = dqParam.dataSources - val dataSourceNames = dataSourceParams.map(_.name) +// val dataSourceParams = dqParam.dataSources +// val dataSourceNames = dataSourceParams.map(_.name) val persistParams = envParam.persistParams var sqlContext: SQLContext = _ @@ -58,10 +58,10 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp { def init: Try[_] = Try { // build spark 2.0+ application context val conf = new SparkConf().setAppName(metricName) - conf.setAll(sparkParam.config) + conf.setAll(sparkParam.getConfig) conf.set("spark.sql.crossJoin.enabled", "true") sparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() - sparkSession.sparkContext.setLogLevel(sparkParam.logLevel) + sparkSession.sparkContext.setLogLevel(sparkParam.getLogLevel) sqlContext = sparkSession.sqlContext // clear checkpoint directory @@ -78,7 +78,7 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp { def run: Try[_] = Try { // streaming context - val ssc = StreamingContext.getOrCreate(sparkParam.cpDir, () => { + val ssc = StreamingContext.getOrCreate(sparkParam.getCpDir, () => { try { createStreamingContext } catch { @@ -94,7 +94,7 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp { val contextId = ContextId(measureTime) // generate data sources - val dataSources = DataSourceFactory.getDataSources(sparkSession, ssc, dqParam.dataSources) + val dataSources = DataSourceFactory.getDataSources(sparkSession, ssc, dqParam.getDataSources) dataSources.foreach(_.init) // create dq context @@ -109,7 +109,7 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp { // process thread val dqCalculator = StreamingDQCalculator(globalContext, dqParam.evaluateRule) - val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match { + val processInterval = TimeUtil.milliseconds(sparkParam.getProcessInterval) match { case Some(interval) => interval case _ => throw new Exception("invalid batch interval") } @@ -135,19 +135,19 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp { def createStreamingContext: StreamingContext = { - val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match { + val batchInterval = TimeUtil.milliseconds(sparkParam.getBatchInterval) match { case Some(interval) => Milliseconds(interval) case _ => throw new Exception("invalid batch interval") } val ssc = new StreamingContext(sparkSession.sparkContext, batchInterval) - ssc.checkpoint(sparkParam.cpDir) + ssc.checkpoint(sparkParam.getCpDir) ssc } private def clearCpDir: Unit = { if (sparkParam.needInitClear) { - val cpDir = sparkParam.cpDir + val cpDir = sparkParam.getCpDir info(s"clear checkpoint directory ${cpDir}") HdfsUtil.deleteHdfsPath(cpDir) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala index f5b69ad..2ee308d 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala @@ -57,9 +57,9 @@ object DQStepBuilder { } } - def buildStepOptByRuleParam(context: DQContext, ruleParam: RuleParam, defaultDslType: DslType + def buildStepOptByRuleParam(context: DQContext, ruleParam: RuleParam ): Option[DQStep] = { - val dslType = ruleParam.getDslType(defaultDslType) + val dslType = ruleParam.getDslType val dsNames = context.dataSourceNames val funcNames = context.functionNames val dqStepOpt = getRuleParamStepBuilder(dslType, dsNames, funcNames) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala index b941211..333615d 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataSourceParamStepBuilder.scala @@ -31,7 +31,7 @@ trait DataSourceParamStepBuilder extends DQStepBuilder { type ParamType = DataSourceParam def buildDQStep(context: DQContext, param: ParamType): Option[DQStep] = { - val name = getStepName(param.name) + val name = getStepName(param.getName) val steps = param.getConnectors.flatMap { dc => buildReadSteps(context, dc) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala index fa9e38b..2a43b34 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala @@ -43,16 +43,16 @@ trait RuleParamStepBuilder extends DQStepBuilder { protected def buildDirectWriteSteps(ruleParam: RuleParam): Seq[DQStep] = { val name = getStepName(ruleParam.getName) // metric writer - val metricSteps = ruleParam.metricOpt.map { metric => - MetricWriteStep(metric.name, name, NormalizeType(metric.collectType)) + val metricSteps = ruleParam.getMetricOpt.map { metric => + MetricWriteStep(metric.getNameOpt.getOrElse(name), name, NormalizeType(metric.collectType)) }.toSeq // record writer - val recordSteps = ruleParam.recordOpt.map { record => - RecordWriteStep(record.name, name) + val recordSteps = ruleParam.getRecordOpt.map { record => + RecordWriteStep(record.getNameOpt.getOrElse(name), name) }.toSeq // update writer - val dsCacheUpdateSteps = ruleParam.dsCacheUpdateOpt.map { dsCacheUpdate => - DataSourceUpdateWriteStep(dsCacheUpdate.dsName, name) + val dsCacheUpdateSteps = ruleParam.getDsCacheUpdateOpt.map { dsCacheUpdate => + DataSourceUpdateWriteStep(dsCacheUpdate.getDsNameOpt.getOrElse(""), name) }.toSeq metricSteps ++ recordSteps ++ dsCacheUpdateSteps http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala index 9c14325..0416dbb 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala @@ -82,7 +82,7 @@ case class AccuracyExpr2DQSteps(context: DQContext, val missRecordsTransStep = SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true) val missRecordsWriteSteps = procType match { case BatchProcessType => { - val rwName = ruleParam.recordOpt.map(_.name).getOrElse(missRecordsTableName) + val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(missRecordsTableName) RecordWriteStep(rwName, missRecordsTableName) :: Nil } case StreamingProcessType => Nil @@ -90,7 +90,7 @@ case class AccuracyExpr2DQSteps(context: DQContext, val missRecordsUpdateWriteSteps = procType match { case BatchProcessType => Nil case StreamingProcessType => { - val dsName = ruleParam.dsCacheUpdateOpt.map(_.dsName).getOrElse(sourceName) + val dsName = ruleParam.getDsCacheUpdateOpt.flatMap(_.getDsNameOpt).getOrElse(sourceName) DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil } } @@ -139,8 +139,9 @@ case class AccuracyExpr2DQSteps(context: DQContext, val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap) val accuracyMetricWriteSteps = procType match { case BatchProcessType => { - val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name) - val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse("")) + val metricOpt = ruleParam.getMetricOpt + val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name) + val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default) MetricWriteStep(mwName, accuracyTableName, collectType) :: Nil } case StreamingProcessType => Nil @@ -166,8 +167,9 @@ case class AccuracyExpr2DQSteps(context: DQContext, val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName, accuracyMetricRule, accuracyMetricDetails) val accuracyMetricWriteStep = { - val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name) - val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse("")) + val metricOpt = ruleParam.getMetricOpt + val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name) + val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default) MetricWriteStep(mwName, accuracyMetricTableName, collectType) } @@ -182,7 +184,7 @@ case class AccuracyExpr2DQSteps(context: DQContext, val accuracyRecordTransStep = SparkSqlTransformStep( accuracyRecordTableName, accuracyRecordSql, emptyMap) val accuracyRecordWriteStep = { - val rwName = ruleParam.recordOpt.map(_.name).getOrElse(missRecordsTableName) + val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(missRecordsTableName) RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName)) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala index fcb576c..bd03c7a 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala @@ -89,7 +89,7 @@ case class CompletenessExpr2DQSteps(context: DQContext, val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}" val incompleteRecordTransStep = SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true) val incompleteRecordWriteStep = { - val rwName = ruleParam.recordOpt.map(_.name).getOrElse(incompleteRecordsTableName) + val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(incompleteRecordsTableName) RecordWriteStep(rwName, incompleteRecordsTableName) } @@ -136,8 +136,10 @@ case class CompletenessExpr2DQSteps(context: DQContext, } val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap) val completeWriteStep = { - val mwName = ruleParam.metricOpt.map(_.name).getOrElse(completeTableName) - MetricWriteStep(mwName, completeTableName, DefaultNormalizeType) + val metricOpt = ruleParam.getMetricOpt + val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName) + val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default) + MetricWriteStep(mwName, completeTableName, collectType) } val transSteps = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala index 1cf94e0..cf886e3 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala @@ -271,7 +271,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext, } val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap) val dupItemsWriteStep = { - val rwName = ruleParam.recordOpt.map(_.name).getOrElse(dupItemsTableName) + val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupItemsTableName) RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt) } @@ -317,7 +317,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext, } val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true) val dupRecordWriteStep = { - val rwName = ruleParam.recordOpt.map(_.name).getOrElse(dupRecordTableName) + val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupRecordTableName) RecordWriteStep(rwName, dupRecordTableName, None, writeTimestampOpt) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala index b4da7eb..bc7f620 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala @@ -94,8 +94,9 @@ case class ProfilingExpr2DQSteps(context: DQContext, val profilingName = ruleParam.name val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details) val profilingMetricWriteStep = { - val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name) - val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse("")) + val metricOpt = ruleParam.getMetricOpt + val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name) + val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default) MetricWriteStep(mwName, profilingName, collectType) } profilingTransStep :: profilingMetricWriteStep :: Nil http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala index a56937c..9fa58f2 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala @@ -131,8 +131,9 @@ case class TimelinessExpr2DQSteps(context: DQContext, } val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap) val metricWriteStep = { - val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name) - val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse("")) + val metricOpt = ruleParam.getMetricOpt + val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name) + val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default) MetricWriteStep(mwName, metricTableName, collectType) } @@ -149,7 +150,7 @@ case class TimelinessExpr2DQSteps(context: DQContext, } val recordTransStep = SparkSqlTransformStep(recordTableName, recordSql, emptyMap) val recordWriteStep = { - val rwName = ruleParam.recordOpt.map(_.name).getOrElse(recordTableName) + val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(recordTableName) RecordWriteStep(rwName, recordTableName, None) } (recordTransStep :: Nil, recordWriteStep :: Nil) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala index 9fecb6d..77a79d4 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala @@ -166,7 +166,7 @@ case class UniquenessExpr2DQSteps(context: DQContext, } val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true) val dupRecordWriteStep = { - val rwName = ruleParam.recordOpt.map(_.name).getOrElse(dupRecordTableName) + val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupRecordTableName) RecordWriteStep(rwName, dupRecordTableName) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/060fc28b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala index f1543be..07b13ea 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala @@ -28,10 +28,8 @@ object PreProcRuleParamGenerator { val _name = "name" def getNewPreProcRules(rules: Seq[RuleParam], suffix: String): Seq[RuleParam] = { - if (rules == null) Nil else { - rules.map { rule => - getNewPreProcRule(rule, suffix) - } + rules.map { rule => + getNewPreProcRule(rule, suffix) } }
