Fix case clauses Author: William Guo <gu...@apache.org>
Closes #425 from guoyuepeng/fix_case_clauses. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/18fc4cf4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/18fc4cf4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/18fc4cf4 Branch: refs/heads/master Commit: 18fc4cf4cd706aa3e793fb0790c12ece5e5342a3 Parents: 485c5cf Author: William Guo <gu...@apache.org> Authored: Sat Sep 29 18:21:42 2018 +0800 Committer: Lionel Liu <bhlx3l...@163.com> Committed: Sat Sep 29 18:21:42 2018 +0800 ---------------------------------------------------------------------- .../apache/griffin/measure/Application.scala | 36 ++- .../configuration/dqdefinition/DQConfig.scala | 12 +- .../configuration/dqdefinition/EnvConfig.scala | 5 +- .../dqdefinition/reader/ParamFileReader.scala | 6 +- .../dqdefinition/reader/ParamJsonReader.scala | 5 +- .../dqdefinition/reader/ParamReader.scala | 5 +- .../measure/configuration/enums/DqType.scala | 8 +- .../configuration/enums/FlattenType.scala | 8 +- .../configuration/enums/OutputType.scala | 8 +- .../configuration/enums/ProcessType.scala | 6 +- .../measure/configuration/enums/SinkType.scala | 19 +- .../griffin/measure/context/DQContext.scala | 11 +- .../measure/context/DataFrameCache.scala | 12 +- .../griffin/measure/context/TableRegister.scala | 8 +- .../checkpoint/lock/CheckpointLockInZK.scala | 6 +- .../offset/OffsetCheckpointClient.scala | 2 +- .../offset/OffsetCheckpointFactory.scala | 5 +- .../offset/OffsetCheckpointInZK.scala | 29 ++- .../streaming/metric/AccuracyMetric.scala | 8 +- .../context/streaming/metric/CacheResults.scala | 6 +- .../griffin/measure/datasource/DataSource.scala | 11 +- .../measure/datasource/DataSourceFactory.scala | 8 +- .../measure/datasource/TimestampStorage.scala | 29 ++- .../datasource/cache/StreamingCacheClient.scala | 65 ++--- .../cache/StreamingCacheClientFactory.scala | 18 +- .../cache/StreamingCacheJsonClient.scala | 3 +- .../cache/StreamingCacheOrcClient.scala | 3 +- .../cache/StreamingCacheParquetClient.scala | 10 +- .../cache/StreamingOffsetCacheable.scala | 18 +- .../measure/datasource/cache/WithFanIn.scala | 8 +- .../datasource/connector/DataConnector.scala | 15 +- .../connector/DataConnectorFactory.scala | 35 ++- .../batch/AvroBatchDataConnector.scala | 6 +- .../batch/HiveBatchDataConnector.scala | 6 +- .../batch/TextDirBatchDataConnector.scala | 18 +- .../streaming/KafkaStreamingDataConnector.scala | 6 +- .../KafkaStreamingStringDataConnector.scala | 12 +- .../streaming/StreamingDataConnector.scala | 10 +- .../measure/job/builder/DQJobBuilder.scala | 2 - .../apache/griffin/measure/launch/DQApp.scala | 3 +- .../measure/launch/batch/BatchDQApp.scala | 10 +- .../launch/streaming/StreamingDQApp.scala | 19 +- .../griffin/measure/sink/ConsoleSink.scala | 3 +- .../measure/sink/ElasticSearchSink.scala | 15 +- .../apache/griffin/measure/sink/HdfsSink.scala | 17 +- .../apache/griffin/measure/sink/MongoSink.scala | 10 +- .../org/apache/griffin/measure/sink/Sink.scala | 3 +- .../griffin/measure/sink/SinkFactory.scala | 3 +- .../griffin/measure/sink/SinkTaskRunner.scala | 18 +- .../measure/step/builder/DQStepBuilder.scala | 10 +- .../step/builder/GriffinDslDQStepBuilder.scala | 4 +- .../step/builder/RuleParamStepBuilder.scala | 4 +- .../builder/dsl/expr/ClauseExpression.scala | 14 +- .../step/builder/dsl/expr/LogicalExpr.scala | 5 +- .../step/builder/dsl/expr/SelectExpr.scala | 8 +- .../step/builder/dsl/expr/TreeNode.scala | 18 +- .../step/builder/dsl/parser/BasicParser.scala | 6 +- .../builder/dsl/parser/GriffinDslParser.scala | 5 +- .../dsl/transform/AccuracyExpr2DQSteps.scala | 55 +++-- .../transform/CompletenessExpr2DQSteps.scala | 40 +-- .../transform/DistinctnessExpr2DQSteps.scala | 48 ++-- .../builder/dsl/transform/Expr2DQSteps.scala | 2 +- .../dsl/transform/ProfilingExpr2DQSteps.scala | 16 +- .../dsl/transform/TimelinessExpr2DQSteps.scala | 46 ++-- .../dsl/transform/UniquenessExpr2DQSteps.scala | 34 ++- .../transform/analyzer/AccuracyAnalyzer.scala | 6 +- .../analyzer/CompletenessAnalyzer.scala | 3 +- .../analyzer/DistinctnessAnalyzer.scala | 4 +- .../transform/analyzer/ProfilingAnalyzer.scala | 3 +- .../transform/analyzer/UniquenessAnalyzer.scala | 3 +- .../builder/preproc/PreProcParamMaker.scala | 8 +- .../griffin/measure/step/read/ReadStep.scala | 9 +- .../measure/step/read/UnionReadStep.scala | 3 +- .../measure/step/transform/DataFrameOps.scala | 16 +- .../transform/DataFrameOpsTransformStep.scala | 7 +- .../step/transform/SparkSqlTransformStep.scala | 3 +- .../step/write/DataSourceUpdateWriteStep.scala | 19 +- .../measure/step/write/MetricFlushStep.scala | 3 +- .../measure/step/write/MetricWriteStep.scala | 17 +- .../measure/step/write/RecordWriteStep.scala | 38 ++- .../measure/step/write/SparkRowFormatter.scala | 6 +- .../apache/griffin/measure/utils/FSUtil.scala | 14 +- .../apache/griffin/measure/utils/HdfsUtil.scala | 30 +-- .../apache/griffin/measure/utils/HttpUtil.scala | 24 +- .../apache/griffin/measure/utils/JsonUtil.scala | 5 +- .../griffin/measure/utils/ParamUtil.scala | 8 +- .../apache/griffin/measure/utils/TimeUtil.scala | 18 +- scalastyle-config.xml | 246 +++++++++++++++++++ 88 files changed, 887 insertions(+), 510 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/Application.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala index e7df806..1bac17b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala @@ -18,15 +18,16 @@ under the License. */ package org.apache.griffin.measure -import org.apache.griffin.measure.configuration.enums._ +import scala.reflect.ClassTag +import scala.util.{Failure, Success, Try} + +import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, GriffinConfig, Param} import org.apache.griffin.measure.configuration.dqdefinition.reader.ParamReaderFactory -import org.apache.griffin.measure.configuration.dqdefinition.{GriffinConfig, DQConfig, EnvConfig, Param} +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.launch.DQApp import org.apache.griffin.measure.launch.batch.BatchDQApp import org.apache.griffin.measure.launch.streaming.StreamingDQApp -import scala.reflect.ClassTag -import scala.util.{Failure, Success, Try} /** * application entrance @@ -49,17 +50,15 @@ object Application extends Loggable { // read param files val envParam = readParamFile[EnvConfig](envParamFile) match { case Success(p) => p - case Failure(ex) => { + case Failure(ex) => error(ex.getMessage) sys.exit(-2) - } } val dqParam = readParamFile[DQConfig](dqParamFile) match { case Success(p) => p - case Failure(ex) => { + case Failure(ex) => error(ex.getMessage) sys.exit(-2) - } } val allParam: GriffinConfig = GriffinConfig(envParam, dqParam) @@ -68,32 +67,28 @@ object Application extends Loggable { val dqApp: DQApp = procType match { case BatchProcessType => BatchDQApp(allParam) case StreamingProcessType => StreamingDQApp(allParam) - case _ => { + case _ => error(s"${procType} is unsupported process type!") sys.exit(-4) - } } startup // dq app init dqApp.init match { - case Success(_) => { + case Success(_) => info("process init success") - } - case Failure(ex) => { + case Failure(ex) => error(s"process init error: ${ex.getMessage}") shutdown sys.exit(-5) - } } // dq app run dqApp.run match { - case Success(_) => { + case Success(_) => info("process run success") - } - case Failure(ex) => { + case Failure(ex) => error(s"process run error: ${ex.getMessage}") if (dqApp.retryable) { @@ -102,19 +97,16 @@ object Application extends Loggable { shutdown sys.exit(-5) } - } } // dq app end dqApp.close match { - case Success(_) => { + case Success(_) => info("process end success") - } - case Failure(ex) => { + case Failure(ex) => error(s"process end error: ${ex.getMessage}") shutdown sys.exit(-5) - } } shutdown http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala index 4943329..b281481 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/DQConfig.scala @@ -18,9 +18,10 @@ under the License. */ package org.apache.griffin.measure.configuration.dqdefinition -import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include import org.apache.commons.lang.StringUtils + import org.apache.griffin.measure.configuration.enums._ /** @@ -46,7 +47,7 @@ case class DQConfig(@JsonProperty("name") private val name: String, def getDataSources: Seq[DataSourceParam] = { dataSources.foldLeft((Nil: Seq[DataSourceParam], Set[String]())) { (ret, ds) => val (seq, names) = ret - if (!names.contains(ds.getName)){ + if (!names.contains(ds.getName)) { (seq :+ ds, names + ds.getName) } else ret }._1 @@ -133,8 +134,9 @@ case class EvaluateRuleParam( @JsonProperty("rules") private val rules: List[Rul * rule param * @param dslType dsl type of this rule (must) * @param dqType dq type of this rule (must if dsl type is "griffin-dsl") - * @param inDfName name of input dataframe of this rule, by default will be the previous rule output dataframe name - * @param outDfName name of output dataframe of this rule, by default will be generated as data connector dataframe name with index suffix + * @param inDfName name of input dataframe of this rule, by default will be the previous rule output dataframe name + * @param outDfName name of output dataframe of this rule, by default will be generated + * as data connector dataframe name with index suffix * @param rule rule to define dq step calculation (must) * @param details detail config of rule (optional) * @param cache cache the result for multiple usage (optional, valid for "spark-sql" and "df-ops" mode) @@ -206,4 +208,4 @@ case class RuleOutputParam( @JsonProperty("type") private val outputType: String def getFlatten: FlattenType = if (StringUtils.isNotBlank(flatten)) FlattenType(flatten) else FlattenType("") def validate(): Unit = {} -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala index bf77d13..2e227a3 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/EnvConfig.scala @@ -18,9 +18,10 @@ under the License. */ package org.apache.griffin.measure.configuration.dqdefinition -import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.annotation.{JsonInclude, JsonProperty} +import com.fasterxml.jackson.annotation.JsonInclude.Include import org.apache.commons.lang.StringUtils + import org.apache.griffin.measure.configuration.enums._ /** @@ -109,4 +110,4 @@ case class CheckpointParam(@JsonProperty("type") private val cpType: String, def validate(): Unit = { assert(StringUtils.isNotBlank(cpType), "griffin checkpoint type should not be empty") } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala index 4a4aeed..2739f74 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamFileReader.scala @@ -18,11 +18,13 @@ under the License. */ package org.apache.griffin.measure.configuration.dqdefinition.reader +import scala.reflect.ClassTag +import scala.util.Try + import org.apache.griffin.measure.configuration.dqdefinition.Param import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil} -import scala.reflect.ClassTag -import scala.util.Try + /** * read params from config file path http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala index 063dc7e..ba51d5f 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamJsonReader.scala @@ -18,11 +18,12 @@ under the License. */ package org.apache.griffin.measure.configuration.dqdefinition.reader +import scala.reflect.ClassTag +import scala.util.Try + import org.apache.griffin.measure.configuration.dqdefinition.Param import org.apache.griffin.measure.utils.JsonUtil -import scala.reflect.ClassTag -import scala.util.Try /** * read params from json string directly http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala index 9a9a46c..5c914c6 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/dqdefinition/reader/ParamReader.scala @@ -18,11 +18,12 @@ under the License. */ package org.apache.griffin.measure.configuration.dqdefinition.reader +import scala.reflect.ClassTag +import scala.util.Try + import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.dqdefinition.Param -import scala.reflect.ClassTag -import scala.util.Try trait ParamReader extends Loggable with Serializable { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala index cee8d98..bbeb04f 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/DqType.scala @@ -31,7 +31,13 @@ sealed trait DqType { object DqType { private val dqTypes: List[DqType] = List( - AccuracyType, ProfilingType, UniquenessType, DistinctnessType, TimelinessType, CompletenessType, UnknownType + AccuracyType, + ProfilingType, + UniquenessType, + DistinctnessType, + TimelinessType, + CompletenessType, + UnknownType ) def apply(ptn: String): DqType = { dqTypes.find(dqType => ptn match { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala index 160ecaf..cb2fba1 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/FlattenType.scala @@ -29,7 +29,13 @@ sealed trait FlattenType { } object FlattenType { - private val flattenTypes: List[FlattenType] = List(DefaultFlattenType, EntriesFlattenType, ArrayFlattenType, MapFlattenType) + private val flattenTypes: List[FlattenType] = List( + DefaultFlattenType, + EntriesFlattenType, + ArrayFlattenType, + MapFlattenType + ) + val default = DefaultFlattenType def apply(ptn: String): FlattenType = { flattenTypes.find(tp => ptn match { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala index 5b1d261..8a80044 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/OutputType.scala @@ -29,7 +29,13 @@ sealed trait OutputType { } object OutputType { - private val outputTypes: List[OutputType] = List(MetricOutputType, RecordOutputType, DscUpdateOutputType, UnknownOutputType) + private val outputTypes: List[OutputType] = List( + MetricOutputType, + RecordOutputType, + DscUpdateOutputType, + UnknownOutputType + ) + val default = UnknownOutputType def apply(ptn: String): OutputType = { outputTypes.find(tp => ptn match { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala index 4f799ed..3cbc749 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/ProcessType.scala @@ -29,7 +29,11 @@ sealed trait ProcessType { } object ProcessType { - private val procTypes: List[ProcessType] = List(BatchProcessType, StreamingProcessType) + private val procTypes: List[ProcessType] = List( + BatchProcessType, + StreamingProcessType + ) + def apply(ptn: String): ProcessType = { procTypes.find(tp => ptn match { case tp.idPattern() => true http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala index 5471e83..d9e5d2b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/enums/SinkType.scala @@ -30,15 +30,22 @@ sealed trait SinkType { object SinkType { private val sinkTypes: List[SinkType] = List( - ConsoleSinkType, HdfsSinkType, ElasticsearchSinkType, MongoSinkType, UnknownSinkType + ConsoleSinkType, + HdfsSinkType, + ElasticsearchSinkType, + MongoSinkType, + UnknownSinkType ) + def apply(ptn: String): SinkType = { sinkTypes.find(tp => ptn match { case tp.idPattern() => true case _ => false }).getOrElse(UnknownSinkType) } + def unapply(pt: SinkType): Option[String] = Some(pt.desc) + def validSinkTypes(strs: Seq[String]): Seq[SinkType] = { val seq = strs.map(s => SinkType(s)).filter(_ != UnknownSinkType).distinct if (seq.size > 0) seq else Seq(ElasticsearchSinkType) @@ -48,7 +55,7 @@ object SinkType { /** * console sink, will sink metric in console */ - case object ConsoleSinkType extends SinkType { +case object ConsoleSinkType extends SinkType { val idPattern = "^(?i)console|log$".r val desc = "console" } @@ -56,7 +63,7 @@ object SinkType { /** * hdfs sink, will sink metric and record in hdfs */ - case object HdfsSinkType extends SinkType { +case object HdfsSinkType extends SinkType { val idPattern = "^(?i)hdfs$".r val desc = "hdfs" } @@ -64,7 +71,7 @@ object SinkType { /** * elasticsearch sink, will sink metric in elasticsearch */ - case object ElasticsearchSinkType extends SinkType { +case object ElasticsearchSinkType extends SinkType { val idPattern = "^(?i)es|elasticsearch|http$".r val desc = "elasticsearch" } @@ -72,12 +79,12 @@ object SinkType { /** * mongo sink, will sink metric in mongo db */ - case object MongoSinkType extends SinkType { +case object MongoSinkType extends SinkType { val idPattern = "^(?i)mongo|mongodb$".r val desc = "distinct" } - case object UnknownSinkType extends SinkType { +case object UnknownSinkType extends SinkType { val idPattern = "".r val desc = "unknown" } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala index a9f3da0..b0759c5 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala @@ -18,11 +18,12 @@ under the License. */ package org.apache.griffin.measure.context -import org.apache.griffin.measure.configuration.enums._ +import org.apache.spark.sql.{Encoders, SparkSession, SQLContext} + import org.apache.griffin.measure.configuration.dqdefinition._ +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.datasource._ import org.apache.griffin.measure.sink.{Sink, SinkFactory} -import org.apache.spark.sql.{Encoders, SQLContext, SparkSession} /** * dq context: the context of each calculation @@ -58,6 +59,7 @@ case class DQContext(contextId: ContextId, } } dataSourceNames.foreach(name => compileTableRegister.registerTable(name)) + def getDataSourceName(index: Int): String = { if (dataSourceNames.size > index) dataSourceNames(index) else "" } @@ -66,20 +68,25 @@ case class DQContext(contextId: ContextId, val functionNames: Seq[String] = sparkSession.catalog.listFunctions.map(_.name).collect.toSeq val dataSourceTimeRanges = loadDataSources() + def loadDataSources(): Map[String, TimeRange] = { dataSources.map { ds => (ds.name, ds.loadData(this)) }.toMap } + printTimeRanges private val sinkFactory = SinkFactory(sinkParams, name) private val defaultSink: Sink = createSink(contextId.timestamp) + def getSink(timestamp: Long): Sink = { if (timestamp == contextId.timestamp) getSink() else createSink(timestamp) } + def getSink(): Sink = defaultSink + private def createSink(t: Long): Sink = { procType match { case BatchProcessType => sinkFactory.getSinks(t, true) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala index 4e8f4df..0671565 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/DataFrameCache.scala @@ -18,11 +18,11 @@ under the License. */ package org.apache.griffin.measure.context -import org.apache.griffin.measure.Loggable +import scala.collection.mutable.{Map => MutableMap, MutableList} + import org.apache.spark.sql.DataFrame -import scala.collection.concurrent.{Map => ConcMap} -import scala.collection.mutable.{MutableList, Map => MutableMap} +import org.apache.griffin.measure.Loggable /** * cache and unpersist dataframes @@ -42,17 +42,15 @@ case class DataFrameCache() extends Loggable { def cacheDataFrame(name: String, df: DataFrame): Unit = { info(s"try to cache data frame ${name}") dataFrames.get(name) match { - case Some(odf) => { + case Some(odf) => trashDataFrame(odf) dataFrames += (name -> df) df.cache info("cache after replace old df") - } - case _ => { + case _ => dataFrames += (name -> df) df.cache info("cache after replace no old df") - } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala index 8d86170..c4dda3b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/TableRegister.scala @@ -18,10 +18,12 @@ under the License. */ package org.apache.griffin.measure.context -import org.apache.griffin.measure.Loggable +import scala.collection.mutable.{Set => MutableSet} + import org.apache.spark.sql._ -import scala.collection.mutable.{Set => MutableSet} +import org.apache.griffin.measure.Loggable + /** * register table name @@ -78,4 +80,4 @@ case class RunTimeTableRegister(@transient sqlContext: SQLContext) extends Table tables.clear } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala index b1cbe0f..d78aedf 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/lock/CheckpointLockInZK.scala @@ -32,10 +32,9 @@ case class CheckpointLockInZK(@transient mutex: InterProcessMutex) extends Check mutex.acquire(-1, null) } } catch { - case e: Throwable => { + case e: Throwable => error(s"lock error: ${e.getMessage}") false - } } } @@ -44,9 +43,8 @@ case class CheckpointLockInZK(@transient mutex: InterProcessMutex) extends Check try { if (mutex.isAcquiredInThisProcess) mutex.release } catch { - case e: Throwable => { + case e: Throwable => error(s"unlock error: ${e.getMessage}") - } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala index 8acfbeb..48337d5 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointClient.scala @@ -24,7 +24,7 @@ import org.apache.griffin.measure.context.streaming.checkpoint.lock.{CheckpointL object OffsetCheckpointClient extends OffsetCheckpoint with OffsetOps { var offsetCheckpoints: Seq[OffsetCheckpoint] = Nil - def initClient(checkpointParams: Iterable[CheckpointParam], metricName: String) = { + def initClient(checkpointParams: Iterable[CheckpointParam], metricName: String) : Unit = { val fac = OffsetCheckpointFactory(checkpointParams, metricName) offsetCheckpoints = checkpointParams.flatMap(param => fac.getOffsetCheckpoint(param)).toList } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala index 5fe8e15..f19542a 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointFactory.scala @@ -18,9 +18,10 @@ under the License. */ package org.apache.griffin.measure.context.streaming.checkpoint.offset +import scala.util.Try + import org.apache.griffin.measure.configuration.dqdefinition.CheckpointParam -import scala.util.Try case class OffsetCheckpointFactory(checkpointParams: Iterable[CheckpointParam], metricName: String ) extends Serializable { @@ -36,4 +37,4 @@ case class OffsetCheckpointFactory(checkpointParams: Iterable[CheckpointParam], offsetCheckpointTry.toOption } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala index e051779..b575573 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/checkpoint/offset/OffsetCheckpointInZK.scala @@ -18,22 +18,24 @@ under the License. */ package org.apache.griffin.measure.context.streaming.checkpoint.offset +import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.curator.framework.imps.CuratorFrameworkState import org.apache.curator.framework.recipes.locks.InterProcessMutex -import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.curator.utils.ZKPaths -import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK import org.apache.zookeeper.CreateMode - import scala.collection.JavaConverters._ +import org.apache.griffin.measure.context.streaming.checkpoint.lock.CheckpointLockInZK + + /** * leverage zookeeper for info cache * @param config * @param metricName */ -case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) extends OffsetCheckpoint with OffsetOps { +case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) + extends OffsetCheckpoint with OffsetOps { val Hosts = "hosts" val Namespace = "namespace" @@ -67,7 +69,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex } val lockPath = config.getOrElse(LockPath, "lock").toString - private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName + private val cacheNamespace: String = + if (namespace.isEmpty) metricName else namespace + separator + metricName + private val builder = CuratorFrameworkFactory.builder() .connectString(hosts) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) @@ -141,10 +145,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex try { client.getChildren().forPath(path).asScala.toList } catch { - case e: Throwable => { + case e: Throwable => warn(s"list ${path} warn: ${e.getMessage}") Nil - } } } @@ -162,10 +165,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex .forPath(path, content.getBytes("utf-8")) true } catch { - case e: Throwable => { + case e: Throwable => error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}") false - } } } @@ -174,10 +176,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex client.setData().forPath(path, content.getBytes("utf-8")) true } catch { - case e: Throwable => { + case e: Throwable => error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}") false - } } } @@ -185,10 +186,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex try { Some(new String(client.getData().forPath(path), "utf-8")) } catch { - case e: Throwable => { + case e: Throwable => warn(s"read ${path} warn: ${e.getMessage}") None - } } } @@ -204,10 +204,9 @@ case class OffsetCheckpointInZK(config: Map[String, Any], metricName: String) ex try { client.checkExists().forPath(path) != null } catch { - case e: Throwable => { + case e: Throwable => warn(s"check exists ${path} warn: ${e.getMessage}") false - } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala index e69716e..19dfb9e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/AccuracyMetric.scala @@ -45,9 +45,11 @@ case class AccuracyMetric(miss: Long, total: Long) extends Metric { (this.miss != other.miss) || (this.total != other.total) } - def getMiss = miss - def getTotal = total - def getMatch = total - miss + def getMiss: Long = miss + + def getTotal: Long = total + + def getMatch: Long = total - miss def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100 http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala index cc8e772..6c99618 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/metric/CacheResults.scala @@ -18,9 +18,10 @@ under the License. */ package org.apache.griffin.measure.context.streaming.metric +import scala.collection.mutable.{Map => MutableMap} + import org.apache.griffin.measure.Loggable -import scala.collection.mutable.{Map => MutableMap} /** * in streaming mode, some metrics may update, @@ -32,10 +33,9 @@ object CacheResults extends Loggable { def olderThan(ut: Long): Boolean = updateTime < ut def update(ut: Long, r: Metric): Option[Metric] = { r match { - case m: result.T if (olderThan(ut)) => { + case m: result.T if (olderThan(ut)) => val ur = result.update(m) if (result.differsFrom(ur)) Some(ur) else None - } case _ => None } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala index 6bf6373..f2cd0ec 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala @@ -18,13 +18,14 @@ under the License. */ package org.apache.griffin.measure.datasource +import org.apache.spark.sql._ + import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam -import org.apache.griffin.measure.datasource.cache.StreamingCacheClient import org.apache.griffin.measure.context.{DQContext, TimeRange} +import org.apache.griffin.measure.datasource.cache.StreamingCacheClient import org.apache.griffin.measure.datasource.connector.DataConnector import org.apache.griffin.measure.utils.DataFrameUtil._ -import org.apache.spark.sql._ /** * data source @@ -50,12 +51,10 @@ case class DataSource(name: String, val timestamp = context.contextId.timestamp val (dfOpt, timeRange) = data(timestamp) dfOpt match { - case Some(df) => { + case Some(df) => context.runTimeTableRegister.registerTable(name, df) - } - case None => { + case None => warn(s"load data source [${name}] fails") - } } timeRange } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala index 7807dfd..28e616b 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala @@ -18,14 +18,16 @@ under the License. */ package org.apache.griffin.measure.datasource +import scala.util.Success + +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.StreamingContext + import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.dqdefinition.DataSourceParam import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory} -import org.apache.spark.sql.SparkSession -import org.apache.spark.streaming.StreamingContext -import scala.util.Success object DataSourceFactory extends Loggable { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala index a305563..04a7f85 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala @@ -18,10 +18,9 @@ under the License. */ package org.apache.griffin.measure.datasource -import org.apache.griffin.measure.Loggable - import scala.collection.mutable.{SortedSet => MutableSortedSet} +import org.apache.griffin.measure.Loggable /** * tmst cache, CRUD of timestamps */ @@ -29,19 +28,19 @@ case class TimestampStorage() extends Loggable { private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long] - //-- insert tmst into tmst group -- - def insert(tmst: Long) = tmstGroup += tmst - def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts + // -- insert tmst into tmst group -- + def insert(tmst: Long) : MutableSortedSet[Long] = tmstGroup += tmst + def insert(tmsts: Iterable[Long]) : MutableSortedSet[Long] = tmstGroup ++= tmsts - //-- remove tmst from tmst group -- - def remove(tmst: Long) = tmstGroup -= tmst - def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts + // -- remove tmst from tmst group -- + def remove(tmst: Long) : MutableSortedSet[Long] = tmstGroup -= tmst + def remove(tmsts: Iterable[Long]) : MutableSortedSet[Long] = tmstGroup --= tmsts - //-- get subset of tmst group -- - def fromUntil(from: Long, until: Long) = tmstGroup.range(from, until).toSet - def afterTil(after: Long, til: Long) = tmstGroup.range(after + 1, til + 1).toSet - def until(until: Long) = tmstGroup.until(until).toSet - def from(from: Long) = tmstGroup.from(from).toSet - def all = tmstGroup.toSet + // -- get subset of tmst group -- + def fromUntil(from: Long, until: Long) : Set[Long] = tmstGroup.range(from, until).toSet + def afterTil(after: Long, til: Long) : Set[Long] = tmstGroup.range(after + 1, til + 1).toSet + def until(until: Long) : Set[Long] = tmstGroup.until(until).toSet + def from(from: Long) : Set[Long] = tmstGroup.from(from).toSet + def all : Set[Long] = tmstGroup.toSet -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala index b351f82..a03a468 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala @@ -20,17 +20,19 @@ package org.apache.griffin.measure.datasource.cache import java.util.concurrent.TimeUnit +import scala.util.Random + +import org.apache.spark.sql._ + import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.context.TimeRange import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.step.builder.ConstantColumns +import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} import org.apache.griffin.measure.utils.DataFrameUtil._ import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} -import org.apache.spark.sql._ -import scala.util.Random /** * data source cache in streaming mode @@ -38,7 +40,8 @@ import scala.util.Random * read data frame from hdfs in calculate phase * with update and clean actions for the cache data */ -trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable { +trait StreamingCacheClient + extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable { val sqlContext: SQLContext val param: Map[String, Any] @@ -46,7 +49,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] val index: Int val timestampStorage: TimestampStorage - protected def fromUntilRangeTmsts(from: Long, until: Long) = timestampStorage.fromUntil(from, until) + protected def fromUntilRangeTmsts(from: Long, until: Long) = + timestampStorage.fromUntil(from, until) + protected def clearTmst(t: Long) = timestampStorage.remove(t) protected def clearTmstsUntil(until: Long) = { val outDateTmsts = timestampStorage.until(until) @@ -67,17 +72,20 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] val filePath: String = param.getString(_FilePath, defFilePath) val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath) - val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L) - val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L) + val readyTimeInterval: Long = + TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L) + + val readyTimeDelay: Long = + TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L) + val deltaTimeRange: (Long, Long) = { def negative(n: Long): Long = if (n <= 0) n else 0 param.get(_TimeRange) match { - case Some(seq: Seq[String]) => { + case Some(seq: Seq[String]) => val nseq = seq.flatMap(TimeUtil.milliseconds(_)) val ns = negative(nseq.headOption.getOrElse(0)) val ne = negative(nseq.tail.headOption.getOrElse(0)) (ns, ne) - } case _ => (0, 0) } } @@ -112,7 +120,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = { if (!readOnly) { dfOpt match { - case Some(df) => { + case Some(df) => // cache df df.cache @@ -137,10 +145,8 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] // uncache df df.unpersist - } - case _ => { + case _ => info("no data frame to save") - } } // submit cache time and ready time @@ -168,7 +174,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}" } else { info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]") - s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}" + + s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} " + + s"AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}" } // new cache data @@ -176,10 +184,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] val dfr = sqlContext.read readDataFrameOpt(dfr, newFilePath).map(_.filter(filterStr)) } catch { - case e: Throwable => { + case e: Throwable => warn(s"read data source cache warn: ${e.getMessage}") None - } } // old cache data @@ -190,10 +197,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] val dfr = sqlContext.read readDataFrameOpt(dfr, oldDfPath).map(_.filter(filterStr)) } catch { - case e: Throwable => { + case e: Throwable => warn(s"read old data source cache warn: ${e.getMessage}") None - } } } @@ -228,12 +234,11 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] } names.filter { name => name match { - case regex(value) => { + case regex(value) => str2Long(value) match { case Some(t) => func(t, bound) case _ => false } - } case _ => false } }.map(name => s"${path}/${name}") @@ -258,7 +263,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] // new cache data val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime newCacheCleanTime match { - case Some(nct) => { + case Some(nct) => // clean calculated new cache data val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS) if (newCacheLocked) { @@ -271,16 +276,15 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] newCacheLock.unlock() } } - } - case _ => { + case _ => // do nothing - } + info("should not happen") } // old cache data val oldCacheCleanTime = if (updatable) readCleanTime else None oldCacheCleanTime match { - case Some(oct) => { + case Some(oct) => val oldCacheIndexOpt = readOldCacheIndex oldCacheIndexOpt.foreach { idx => val oldDfPath = s"${oldFilePath}/${idx}" @@ -298,10 +302,9 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] } } } - } - case _ => { + case _ => // do nothing - } + info("should not happen") } } } @@ -313,7 +316,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] def updateData(dfOpt: Option[DataFrame]): Unit = { if (!readOnly && updatable) { dfOpt match { - case Some(df) => { + case Some(df) => // old cache lock val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS) if (oldCacheLocked) { @@ -339,10 +342,8 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] oldCacheLock.unlock() } } - } - case _ => { + case _ => info("no data frame to update") - } } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala index f991e2d..eeda8ef 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala @@ -18,10 +18,11 @@ under the License. */ package org.apache.griffin.measure.datasource.cache +import org.apache.spark.sql.SQLContext + import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.spark.sql.SQLContext object StreamingCacheClientFactory extends Loggable { @@ -50,17 +51,20 @@ object StreamingCacheClientFactory extends Loggable { try { val tp = param.getString(_type, "") val dsCache = tp match { - case ParquetRegex() => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache) - case JsonRegex() => StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache) - case OrcRegex() => StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache) - case _ => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache) + case ParquetRegex() => + StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache) + case JsonRegex() => + StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache) + case OrcRegex() => + StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache) + case _ => + StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache) } Some(dsCache) } catch { - case e: Throwable => { + case e: Throwable => error("generate data source cache fails") None - } } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala index a12ef87..c81d4d1 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala @@ -18,9 +18,10 @@ under the License. */ package org.apache.griffin.measure.datasource.cache -import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.spark.sql._ +import org.apache.griffin.measure.datasource.TimestampStorage + /** * data source cache in json format */ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala index 63e7104..0649b74 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala @@ -18,9 +18,10 @@ under the License. */ package org.apache.griffin.measure.datasource.cache -import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.spark.sql._ +import org.apache.griffin.measure.datasource.TimestampStorage + /** * data source cache in orc format */ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala index c275227..9c369ee 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala @@ -18,14 +18,18 @@ under the License. */ package org.apache.griffin.measure.datasource.cache -import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.spark.sql._ +import org.apache.griffin.measure.datasource.TimestampStorage + /** * data source cache in parquet format */ -case class StreamingCacheParquetClient(sqlContext: SQLContext, param: Map[String, Any], - dsName: String, index: Int, timestampStorage: TimestampStorage +case class StreamingCacheParquetClient(sqlContext: SQLContext, + param: Map[String, Any], + dsName: String, + index: Int, + timestampStorage: TimestampStorage ) extends StreamingCacheClient { sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala index 7b7f506..e73a058 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala @@ -30,13 +30,13 @@ trait StreamingOffsetCacheable extends Loggable with Serializable { val readyTimeInterval: Long val readyTimeDelay: Long - def selfCacheInfoPath = s"${OffsetCheckpointClient.infoPath}/${cacheInfoPath}" + def selfCacheInfoPath : String = s"${OffsetCheckpointClient.infoPath}/${cacheInfoPath}" - def selfCacheTime = OffsetCheckpointClient.cacheTime(selfCacheInfoPath) - def selfLastProcTime = OffsetCheckpointClient.lastProcTime(selfCacheInfoPath) - def selfReadyTime = OffsetCheckpointClient.readyTime(selfCacheInfoPath) - def selfCleanTime = OffsetCheckpointClient.cleanTime(selfCacheInfoPath) - def selfOldCacheIndex = OffsetCheckpointClient.oldCacheIndex(selfCacheInfoPath) + def selfCacheTime : String = OffsetCheckpointClient.cacheTime(selfCacheInfoPath) + def selfLastProcTime : String = OffsetCheckpointClient.lastProcTime(selfCacheInfoPath) + def selfReadyTime : String = OffsetCheckpointClient.readyTime(selfCacheInfoPath) + def selfCleanTime : String = OffsetCheckpointClient.cleanTime(selfCacheInfoPath) + def selfOldCacheIndex : String = OffsetCheckpointClient.oldCacheIndex(selfCacheInfoPath) protected def submitCacheTime(ms: Long): Unit = { val map = Map[String, String]((selfCacheTime -> ms.toString)) @@ -80,9 +80,11 @@ trait StreamingOffsetCacheable extends Loggable with Serializable { try { Some(v.toLong) } catch { - case _:Throwable => error("try to read not existing value from OffsetCacheClient::readSelfInfo");None + case _: Throwable => + error("try to read not existing value from OffsetCacheClient::readSelfInfo") + None } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala index 413b5a2..675f2f2 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala @@ -20,7 +20,7 @@ package org.apache.griffin.measure.datasource.cache import java.util.concurrent.atomic.AtomicInteger -import scala.collection.concurrent.{TrieMap, Map => ConcMap} +import scala.collection.concurrent.{Map => ConcMap, TrieMap} /** * fan in trait, for multiple input and one output @@ -55,14 +55,12 @@ trait WithFanIn[T] { private def fanInc(key: T): Unit = { fanInCountMap.get(key) match { - case Some(n) => { + case Some(n) => val suc = fanInCountMap.replace(key, n, n + 1) if (!suc) fanInc(key) - } - case _ => { + case _ => val oldOpt = fanInCountMap.putIfAbsent(key, 1) if (oldOpt.nonEmpty) fanInc(key) - } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala index 05d3c75..ae6a18d 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala @@ -20,16 +20,17 @@ package org.apache.griffin.measure.datasource.connector import java.util.concurrent.atomic.AtomicLong +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.functions._ + import org.apache.griffin.measure.Loggable -import org.apache.griffin.measure.configuration.enums.BatchProcessType import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam +import org.apache.griffin.measure.configuration.enums.BatchProcessType import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange} import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.job.builder.DQJobBuilder import org.apache.griffin.measure.step.builder.ConstantColumns import org.apache.griffin.measure.step.builder.preproc.PreProcParamMaker -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.functions._ trait DataConnector extends Loggable with Serializable { @@ -64,7 +65,8 @@ trait DataConnector extends Loggable with Serializable { saveTmst(timestamp) // save timestamp dfOpt.flatMap { df => - val (preProcRules, thisTable) = PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName) + val (preProcRules, thisTable) = + PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName) // init data context.compileTableRegister.registerTable(thisTable) @@ -89,10 +91,9 @@ trait DataConnector extends Loggable with Serializable { } } catch { - case e: Throwable => { + case e: Throwable => error(s"pre-process of data connector [${id}] error: ${e.getMessage}") None - } } } } @@ -108,4 +109,4 @@ object DataConnectorIdGenerator { private def increment: Long = { counter.incrementAndGet() } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala index f4911fc..b51d4fb 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala @@ -18,17 +18,18 @@ under the License. */ package org.apache.griffin.measure.datasource.connector +import scala.util.Try + +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.StreamingContext + import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.datasource.cache.StreamingCacheClient import org.apache.griffin.measure.datasource.connector.batch._ import org.apache.griffin.measure.datasource.connector.streaming._ -import org.apache.spark.sql.SparkSession -import org.apache.spark.streaming.StreamingContext -import scala.reflect.ClassTag -import scala.util.Try object DataConnectorFactory extends Loggable { @@ -60,9 +61,8 @@ object DataConnectorFactory extends Loggable { case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache) case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache) case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache) - case KafkaRegex() => { + case KafkaRegex() => getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) - } case _ => throw new Exception("connector creation error!") } } @@ -78,7 +78,8 @@ object DataConnectorFactory extends Loggable { val conType = dcParam.getType val version = dcParam.getVersion conType match { - case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) + case KafkaRegex() => + getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) case _ => throw new Exception("streaming connector creation error!") } } @@ -88,7 +89,7 @@ object DataConnectorFactory extends Loggable { dcParam: DataConnectorParam, tmstCache: TimestampStorage, streamingCacheClientOpt: Option[StreamingCacheClient] - ): KafkaStreamingDataConnector = { + ): KafkaStreamingDataConnector = { val KeyType = "key.type" val ValueType = "value.type" val config = dcParam.getConfig @@ -96,22 +97,14 @@ object DataConnectorFactory extends Loggable { val valueType = config.getOrElse(ValueType, "java.lang.String").toString (keyType, valueType) match { - case ("java.lang.String", "java.lang.String") => { - KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) - } - case _ => { + case ("java.lang.String", "java.lang.String") => + KafkaStreamingStringDataConnector( + sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt) + case _ => throw new Exception("not supported type kafka data connector") - } } } -// def filterDataConnectors[T <: DataConnector : ClassTag](connectors: Seq[DataConnector]): Seq[T] = { -// connectors.flatMap { dc => -// dc match { -// case mdc: T => Some(mdc) -// case _ => None -// } -// } -// } + } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala index 7fa9080..bf71b2c 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala @@ -18,11 +18,12 @@ under the License. */ package org.apache.griffin.measure.datasource.connector.batch +import org.apache.spark.sql.{DataFrame, SparkSession} + import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam import org.apache.griffin.measure.context.TimeRange import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.utils.HdfsUtil -import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.griffin.measure.utils.ParamUtil._ /** @@ -58,10 +59,9 @@ case class AvroBatchDataConnector(@transient sparkSession: SparkSession, val preDfOpt = preProcess(dfOpt, ms) preDfOpt } catch { - case e: Throwable => { + case e: Throwable => error(s"load avro file ${concreteFileFullPath} fails") None - } } val tmsts = readTmst(ms) (dfOpt, TimeRange(ms, tmsts)) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala index ab1e823..1df3bd7 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala @@ -18,10 +18,11 @@ under the License. */ package org.apache.griffin.measure.datasource.connector.batch +import org.apache.spark.sql.{DataFrame, SparkSession} + import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam import org.apache.griffin.measure.context.TimeRange import org.apache.griffin.measure.datasource.TimestampStorage -import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.griffin.measure.utils.ParamUtil._ /** @@ -54,10 +55,9 @@ case class HiveBatchDataConnector(@transient sparkSession: SparkSession, val preDfOpt = preProcess(dfOpt, ms) preDfOpt } catch { - case e: Throwable => { + case e: Throwable => error(s"load hive table ${concreteTableName} fails: ${e.getMessage}") None - } } val tmsts = readTmst(ms) (dfOpt, TimeRange(ms, tmsts)) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala index 3dcab16..a7ab02e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala @@ -18,11 +18,12 @@ under the License. */ package org.apache.griffin.measure.datasource.connector.batch +import org.apache.spark.sql.{DataFrame, SparkSession} + import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam import org.apache.griffin.measure.context.TimeRange import org.apache.griffin.measure.datasource.TimestampStorage import org.apache.griffin.measure.utils.HdfsUtil -import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.griffin.measure.utils.ParamUtil._ /** @@ -60,7 +61,7 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession, val validDataDirs = dataDirs.filter(dir => !emptyDir(dir)) if (validDataDirs.nonEmpty) { - val df = sparkSession.read.text(validDataDirs: _*) + val df = sparkSession.read.text(validDataDirs: _*) val dfOpt = Some(df) val preDfOpt = preProcess(dfOpt, ms) preDfOpt @@ -68,16 +69,17 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession, None } } catch { - case e: Throwable => { + case e: Throwable => error(s"load text dir ${dirPath} fails: ${e.getMessage}") None - } } val tmsts = readTmst(ms) (dfOpt, TimeRange(ms, tmsts)) } - private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = { + private def listSubDirs(paths: Seq[String], + depth: Int, + filteFunc: (String) => Boolean): Seq[String] = { val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, "dir", true) } if (depth <= 0) { subDirs.filter(filteFunc) @@ -90,7 +92,8 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession, private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, doneFile) private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, successFile) - private def touchDone(dir: String): Unit = HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile)) + private def touchDone(dir: String): Unit = + HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile)) private def emptyDir(dir: String): Boolean = { HdfsUtil.listSubPathsByType(dir, "file").filter(!_.startsWith(ignoreFilePrefix)).size == 0 @@ -98,7 +101,8 @@ case class TextDirBatchDataConnector(@transient sparkSession: SparkSession, // def metaData(): Try[Iterable[(String, String)]] = { // Try { -// val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema +// val st = sqlContext.read.format("com.databricks.spark.avro"). + // load(concreteFileFullPath).schema // st.fields.map(f => (f.name, f.dataType.typeName)) // } // } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala index 1475898..ec09ffc 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala @@ -18,10 +18,11 @@ under the License. */ package org.apache.griffin.measure.datasource.connector.streaming +import scala.util.{Failure, Success, Try} + import kafka.serializer.Decoder import org.apache.spark.streaming.dstream.InputDStream -import scala.util.{Failure, Success, Try} import org.apache.griffin.measure.utils.ParamUtil._ /** @@ -64,10 +65,9 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector { // pre-process preProcess(dfOpt, ms) } catch { - case e: Throwable => { + case e: Throwable => error(s"streaming data connector error: ${e.getMessage}") None - } } // save data frame http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala index 8477fcf..ee5e497 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala @@ -19,16 +19,17 @@ under the License. package org.apache.griffin.measure.datasource.connector.streaming import kafka.serializer.StringDecoder -import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam -import org.apache.griffin.measure.datasource.TimestampStorage -import org.apache.griffin.measure.datasource.cache.StreamingCacheClient import org.apache.spark.rdd.RDD -import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, _} +import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.KafkaUtils +import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam +import org.apache.griffin.measure.datasource.TimestampStorage +import org.apache.griffin.measure.datasource.cache.StreamingCacheClient + /** * streaming data connector for kafka with string format key and value */ @@ -60,10 +61,9 @@ case class KafkaStreamingStringDataConnector(@transient sparkSession: SparkSessi val df = sparkSession.createDataFrame(rowRdd, schema) Some(df) } catch { - case e: Throwable => { + case e: Throwable => error("streaming data transform fails") None - } } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala index 5c2170c..323b0ac 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala @@ -18,14 +18,16 @@ under the License. */ package org.apache.griffin.measure.datasource.connector.streaming -import org.apache.griffin.measure.context.TimeRange -import org.apache.griffin.measure.datasource.cache.StreamingCacheClient -import org.apache.griffin.measure.datasource.connector.DataConnector +import scala.util.Try + import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.streaming.dstream.InputDStream -import scala.util.Try +import org.apache.griffin.measure.context.TimeRange +import org.apache.griffin.measure.datasource.cache.StreamingCacheClient +import org.apache.griffin.measure.datasource.connector.DataConnector + trait StreamingDataConnector extends DataConnector { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala index 249d6d2..0917919 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/job/builder/DQJobBuilder.scala @@ -18,11 +18,9 @@ under the License. */ package org.apache.griffin.measure.job.builder -import org.apache.griffin.measure.configuration.enums.DslType import org.apache.griffin.measure.configuration.dqdefinition._ import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.job._ -import org.apache.griffin.measure.step.DQStep import org.apache.griffin.measure.step.builder.DQStepBuilder import org.apache.griffin.measure.step.write.MetricFlushStep http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala index b6cca98..92480d8 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala @@ -18,10 +18,11 @@ under the License. */ package org.apache.griffin.measure.launch +import scala.util.Try + import org.apache.griffin.measure.Loggable import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, SinkParam} -import scala.util.Try /** * dq application process http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala index ba1f389..e2dbc8d 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala @@ -20,17 +20,19 @@ package org.apache.griffin.measure.launch.batch import java.util.Date -import org.apache.griffin.measure.configuration.enums._ +import scala.util.Try + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{SparkSession, SQLContext} + import org.apache.griffin.measure.configuration.dqdefinition._ +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.context._ import org.apache.griffin.measure.datasource.DataSourceFactory import org.apache.griffin.measure.job.builder.DQJobBuilder import org.apache.griffin.measure.launch.DQApp import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent -import org.apache.spark.SparkConf -import org.apache.spark.sql.{SQLContext, SparkSession} -import scala.util.Try case class BatchDQApp(allParam: GriffinConfig) extends DQApp { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala index ceecb78..eb31a5e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala @@ -21,22 +21,24 @@ package org.apache.griffin.measure.launch.streaming import java.util.{Date, Timer, TimerTask} import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit} +import scala.util.Try + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.streaming.{Milliseconds, StreamingContext} + import org.apache.griffin.measure.Loggable -import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.dqdefinition._ +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.context._ import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient -import org.apache.griffin.measure.datasource.DataSourceFactory import org.apache.griffin.measure.context.streaming.metric.CacheResults +import org.apache.griffin.measure.datasource.DataSourceFactory import org.apache.griffin.measure.job.builder.DQJobBuilder import org.apache.griffin.measure.launch.DQApp import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil} -import org.apache.spark.SparkConf -import org.apache.spark.sql.{SQLContext, SparkSession} -import org.apache.spark.streaming.{Milliseconds, StreamingContext} -import scala.util.Try case class StreamingDQApp(allParam: GriffinConfig) extends DQApp { @@ -82,10 +84,9 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp { try { createStreamingContext } catch { - case e: Throwable => { + case e: Throwable => error(s"create streaming context error: ${e.getMessage}") throw e - } } }) @@ -118,7 +119,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp { ssc.start() ssc.awaitTermination() - ssc.stop(stopSparkContext=true, stopGracefully=true) + ssc.stop(stopSparkContext = true, stopGracefully = true) // clean context globalContext.clean() http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala index 306befe..feebd91 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala @@ -18,9 +18,10 @@ under the License. */ package org.apache.griffin.measure.sink +import org.apache.spark.rdd.RDD + import org.apache.griffin.measure.utils.JsonUtil import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.spark.rdd.RDD /** * sink metric and record to console, for debug