http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala index e5f72d1..3c20a0e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala @@ -18,11 +18,13 @@ under the License. */ package org.apache.griffin.measure.sink -import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil} +import scala.concurrent.Future + import org.apache.spark.rdd.RDD -import scala.concurrent.Future +import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil} +import org.apache.griffin.measure.utils.ParamUtil._ + /** * sink metric and record through http request @@ -38,7 +40,10 @@ case class ElasticSearchSink(config: Map[String, Any], metricName: String, val api = config.getString(Api, "") val method = config.getString(Method, "post") - val connectionTimeout = TimeUtil.milliseconds(config.getString(ConnectionTimeout, "")).getOrElse(-1L) + + val connectionTimeout = + TimeUtil.milliseconds(config.getString(ConnectionTimeout, "")).getOrElse(-1L) + val retry = config.getInt(Retry, 10) val _Value = "value" @@ -55,7 +60,7 @@ case class ElasticSearchSink(config: Map[String, Any], metricName: String, val data = JsonUtil.toJson(dataMap) // http request val params = Map[String, Object]() - val header = Map[String, Object](("Content-Type","application/json")) + val header = Map[String, Object](("Content-Type", "application/json")) def func(): (Long, Future[Boolean]) = { import scala.concurrent.ExecutionContext.Implicits.global
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala index 718f1c1..588fabf 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala @@ -20,10 +20,11 @@ package org.apache.griffin.measure.sink import java.util.Date -import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil} import org.apache.spark.rdd.RDD +import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil} +import org.apache.griffin.measure.utils.ParamUtil._ + /** * sink metric and record to hdfs */ @@ -83,6 +84,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon case e: Throwable => error(e.getMessage) } } + def finish(): Unit = { try { HdfsUtil.createEmptyFile(FinishFile) @@ -103,6 +105,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon private def getHdfsPath(path: String, groupId: Int): String = { HdfsUtil.getHdfsFilePath(path, s"${groupId}") } + private def getHdfsPath(path: String, ptnId: Int, groupId: Int): String = { HdfsUtil.getHdfsFilePath(path, s"${ptnId}.${groupId}") } @@ -116,7 +119,10 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon clearOldRecords(path) try { val recordCount = records.count - val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) + + val count = + if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) + if (count > 0) { val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt if (groupCount <= 1) { @@ -145,7 +151,10 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon clearOldRecords(path) try { val recordCount = records.size - val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) + + val count = + if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) + if (count > 0) { val groupCount = (count - 1) / maxLinesPerFile + 1 if (groupCount <= 1) { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala index 206f187..ab59e59 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala @@ -18,14 +18,16 @@ under the License. */ package org.apache.griffin.measure.sink -import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.griffin.measure.utils.TimeUtil +import scala.concurrent.Future + import org.apache.spark.rdd.RDD import org.mongodb.scala._ import org.mongodb.scala.model.{Filters, UpdateOptions, Updates} import org.mongodb.scala.result.UpdateResult -import scala.concurrent.Future +import org.apache.griffin.measure.utils.ParamUtil._ +import org.apache.griffin.measure.utils.TimeUtil + /** * sink metric and record to mongo @@ -98,7 +100,7 @@ object MongoConnection { var dataConf: MongoConf = _ private var dataCollection: MongoCollection[Document] = _ - def getDataCollection = dataCollection + def getDataCollection : MongoCollection[Document] = dataCollection def init(config: Map[String, Any]): Unit = { if (!initialed) { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala index 0c03bd8..9d598fb 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala @@ -18,9 +18,10 @@ under the License. */ package org.apache.griffin.measure.sink -import org.apache.griffin.measure.Loggable import org.apache.spark.rdd.RDD +import org.apache.griffin.measure.Loggable + /** * sink metric and record */ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala index 26b0178..49818f2 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala @@ -18,10 +18,11 @@ under the License. */ package org.apache.griffin.measure.sink +import scala.util.{Success, Try} + import org.apache.griffin.measure.configuration.dqdefinition.SinkParam import org.apache.griffin.measure.configuration.enums._ -import scala.util.{Success, Try} case class SinkFactory(sinkParams: Iterable[SinkParam], metricName: String) extends Serializable { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala index 1cc3f3e..ca38629 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala @@ -21,12 +21,13 @@ package org.apache.griffin.measure.sink import java.util.Date import java.util.concurrent.TimeUnit -import org.apache.griffin.measure.Loggable - import scala.concurrent._ import scala.concurrent.duration._ import scala.util.{Failure, Success} +import org.apache.griffin.measure.Loggable + + /** * sink task runner, to sink metrics in block or non-block mode */ @@ -52,11 +53,11 @@ object SinkTaskRunner extends Loggable { val st = new Date().getTime val (t, res) = func() res.onComplete { - case Success(value) => { + case Success(value) => val et = new Date().getTime info(s"task ${t} success with (${value}) [ using time ${et - st} ms ]") - } - case Failure(e) => { + + case Failure(e) => val et = new Date().getTime warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}") if (nextRetry >= 0) { @@ -65,11 +66,11 @@ object SinkTaskRunner extends Loggable { } else { error(s"task fails: task ${t} retry ends but fails") } - } } } - private def blockExecute(func: () => (Long, Future[_]), retry: Int, waitDuration: Duration): Unit = { + private def blockExecute(func: () => (Long, Future[_]), + retry: Int, waitDuration: Duration): Unit = { val nextRetry = nextRetryCount(retry) val st = new Date().getTime val (t, res) = func() @@ -78,7 +79,7 @@ object SinkTaskRunner extends Loggable { val et = new Date().getTime info(s"task ${t} success with (${value}) [ using time ${et - st} ms ]") } catch { - case e: Throwable => { + case e: Throwable => val et = new Date().getTime warn(s"task ${t} fails [ using time ${et - st} ms ] : ${e.getMessage}") if (nextRetry >= 0) { @@ -87,7 +88,6 @@ object SinkTaskRunner extends Loggable { } else { error(s"task fails: task ${t} retry ends but fails") } - } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala index c37b5a3..5ce2b14 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DQStepBuilder.scala @@ -19,9 +19,10 @@ under the License. package org.apache.griffin.measure.step.builder import org.apache.commons.lang.StringUtils + import org.apache.griffin.measure.Loggable -import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.dqdefinition.{DataSourceParam, Param, RuleParam} +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.step._ @@ -49,7 +50,8 @@ object DQStepBuilder { .flatMap(_.buildDQStep(context, dsParam)) } - private def getDataSourceParamStepBuilder(procType: ProcessType): Option[DataSourceParamStepBuilder] = { + private def getDataSourceParamStepBuilder(procType: ProcessType) + : Option[DataSourceParamStepBuilder] = { procType match { case BatchProcessType => Some(BatchDataSourceStepBuilder()) case StreamingProcessType => Some(StreamingDataSourceStepBuilder()) @@ -64,7 +66,9 @@ object DQStepBuilder { val funcNames = context.functionNames val dqStepOpt = getRuleParamStepBuilder(dslType, dsNames, funcNames) .flatMap(_.buildDQStep(context, ruleParam)) - dqStepOpt.toSeq.flatMap(_.getNames).foreach(name => context.compileTableRegister.registerTable(name)) + dqStepOpt.toSeq.flatMap(_.getNames).foreach(name => + context.compileTableRegister.registerTable(name) + ) dqStepOpt } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala index d3c0e41..aa43cf6 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala @@ -24,7 +24,6 @@ import org.apache.griffin.measure.step.DQStep import org.apache.griffin.measure.step.builder.dsl.parser.GriffinDslParser import org.apache.griffin.measure.step.builder.dsl.transform.Expr2DQSteps -import scala.util.{Failure, Success} case class GriffinDslDQStepBuilder(dataSourceNames: Seq[String], functionNames: Seq[String] @@ -50,10 +49,9 @@ case class GriffinDslDQStepBuilder(dataSourceNames: Seq[String], Nil } } catch { - case e: Throwable => { + case e: Throwable => error(s"generate rule plan ${name} fails: ${e.getMessage}") Nil - } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala index 5a04c11..4af3ceb 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala @@ -18,11 +18,11 @@ under the License. */ package org.apache.griffin.measure.step.builder -import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.dqdefinition.RuleParam +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.context.DQContext -import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep} import org.apache.griffin.measure.step.{DQStep, SeqDQStep} +import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep} /** * build dq step by rule param http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala index 17e678e..1c04e75 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala @@ -35,7 +35,8 @@ case class SelectClause(exprs: Seq[Expr], extraConditionOpt: Option[ExtraConditi def coalesceDesc: String = desc override def map(func: (Expr) => Expr): SelectClause = { - SelectClause(exprs.map(func(_)), extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr])) + SelectClause(exprs.map(func(_)), + extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr])) } } @@ -81,11 +82,10 @@ case class GroupbyClause(exprs: Seq[Expr], havingClauseOpt: Option[Expr]) extend def merge(other: GroupbyClause): GroupbyClause = { val newHavingClauseOpt = (havingClauseOpt, other.havingClauseOpt) match { - case (Some(hc), Some(ohc)) => { + case (Some(hc), Some(ohc)) => val logical1 = LogicalFactorExpr(hc, false, None) val logical2 = LogicalFactorExpr(ohc, false, None) Some(BinaryLogicalExpr(logical1, ("AND", logical2) :: Nil)) - } case (a @ Some(_), _) => a case (_, b @ Some(_)) => b case (_, _) => None @@ -250,7 +250,8 @@ case class DistinctnessClause(exprs: Seq[Expr]) extends ClauseExpression { def desc: String = exprs.map(_.desc).mkString(", ") def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ") - override def map(func: (Expr) => Expr): DistinctnessClause = DistinctnessClause(exprs.map(func(_))) + override def map(func: (Expr) => Expr) : DistinctnessClause = + DistinctnessClause(exprs.map(func(_))) } case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression { @@ -266,5 +267,6 @@ case class CompletenessClause(exprs: Seq[Expr]) extends ClauseExpression { def desc: String = exprs.map(_.desc).mkString(", ") def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ") - override def map(func: (Expr) => Expr): CompletenessClause = CompletenessClause(exprs.map(func(_))) -} \ No newline at end of file + override def map(func: (Expr) => Expr): CompletenessClause = + CompletenessClause(exprs.map(func(_))) +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala index 13317bb..426dbbb 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala @@ -185,7 +185,8 @@ case class UnaryLogicalExpr(oprs: Seq[String], factor: LogicalExpr) extends Logi } } -case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, LogicalExpr)]) extends LogicalExpr { +case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, LogicalExpr)]) + extends LogicalExpr { addChildren(factor +: tails.map(_._2)) @@ -220,4 +221,4 @@ case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, LogicalExp (pair._1, func(pair._2).asInstanceOf[LogicalExpr]) }) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala index b64dae2..c4f80d1 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala @@ -96,17 +96,17 @@ case class FunctionSelectExpr(functionName: String, args: Seq[Expr]) extends Sel // ------------- -case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: Option[String]) extends SelectExpr { +case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: Option[String]) + extends SelectExpr { addChildren(head +: selectors) def desc: String = { selectors.foldLeft(head.desc) { (hd, sel) => sel match { - case FunctionSelectExpr(funcName, args) => { + case FunctionSelectExpr(funcName, args) => val nargs = hd +: args.map(_.desc) s"${funcName}(${nargs.mkString(", ")})" - } case _ => s"${hd}${sel.desc}" } } @@ -129,4 +129,4 @@ case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: O SelectionExpr(func(head).asInstanceOf[HeadExpr], selectors.map(func(_).asInstanceOf[SelectExpr]), aliasOpt) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala index 4ce0fe6..b10fd57 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala @@ -24,13 +24,15 @@ trait TreeNode extends Serializable { var children = Seq[TreeNode]() - def addChild(expr: TreeNode) = { children :+= expr } - def addChildren(exprs: Seq[TreeNode]) = { children ++= exprs } + def addChild(expr: TreeNode) : Unit = { children :+= expr } + def addChildren(exprs: Seq[TreeNode]) : Unit = { children ++= exprs } - def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T)(implicit tag: ClassTag[A]): T = { + def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T) + (seqOp: (A, T) => T, combOp: (T, T) => T) + (implicit tag: ClassTag[A]): T = { val clazz = tag.runtimeClass - if(clazz.isAssignableFrom(this.getClass)){ + if(clazz.isAssignableFrom(this.getClass)) { val tv = seqOp(this.asInstanceOf[A], z) children.foldLeft(combOp(z, tv)) { (ov, tn) => combOp(ov, tn.preOrderTraverseDepthFirst(z)(seqOp, combOp)) @@ -41,16 +43,18 @@ trait TreeNode extends Serializable { } } - def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T)(implicit tag: ClassTag[A]): T = { + def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T) + (seqOp: (A, T) => T, combOp: (T, T) => T) + (implicit tag: ClassTag[A]): T = { val clazz = tag.runtimeClass - if(clazz.isAssignableFrom(this.getClass)){ + if(clazz.isAssignableFrom(this.getClass)) { val cv = children.foldLeft(z) { (ov, tn) => combOp(ov, tn.postOrderTraverseDepthFirst(z)(seqOp, combOp)) } combOp(z, seqOp(this.asInstanceOf[A], cv)) } - else{ + else { z } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala index 18f7754..fe6678d 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala @@ -18,9 +18,10 @@ under the License. */ package org.apache.griffin.measure.step.builder.dsl.parser +import scala.util.parsing.combinator.JavaTokenParsers + import org.apache.griffin.measure.step.builder.dsl.expr._ -import scala.util.parsing.combinator.JavaTokenParsers /** * basic parser for sql like syntax @@ -388,10 +389,9 @@ trait BasicParser extends JavaTokenParsers with Serializable { def combinedClause: Parser[CombinedClause] = selectClause ~ opt(fromClause) ~ opt(whereClause) ~ opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ { - case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => { + case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => val tails = Seq(whereOpt, groupbyOpt, orderbyOpt, limitOpt).flatMap(opt => opt) CombinedClause(sel, fromOpt, tails) - } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala index 2cd638c..77ed987 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala @@ -37,11 +37,10 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str def profilingClause: Parser[ProfilingClause] = selectClause ~ opt(fromClause) ~ opt(whereClause) ~ opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ { - case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => { + case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => val preClauses = Seq(whereOpt).flatMap(opt => opt) val postClauses = Seq(orderbyOpt, limitOpt).flatMap(opt => opt) ProfilingClause(sel, fromOpt, groupbyOpt, preClauses, postClauses) - } } /** @@ -59,7 +58,7 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str * <distinctness-clauses> = <distExpr> [, <distExpr>]+ */ def sqbrExpr: Parser[Expr] = LSQBR ~> expression <~ RSQBR ^^ { - case expr => { expr.tag = "[]"; expr} + case expr => expr.tag = "[]"; expr } def distExpr: Parser[Expr] = expression | sqbrExpr def distinctnessClause: Parser[DistinctnessClause] = rep1sep(distExpr, Operator.COMMA) ^^ { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala index b97a1a0..3bf7d04 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala @@ -18,15 +18,15 @@ under the License. */ package org.apache.griffin.measure.step.builder.dsl.transform -import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.dqdefinition.RuleParam +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.step.DQStep import org.apache.griffin.measure.step.builder.ConstantColumns import org.apache.griffin.measure.step.builder.dsl.expr._ import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.AccuracyAnalyzer -import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys import org.apache.griffin.measure.step.transform.{DataFrameOps, DataFrameOpsTransformStep, SparkSqlTransformStep} +import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep} import org.apache.griffin.measure.utils.ParamUtil._ @@ -77,30 +77,37 @@ case class AccuracyExpr2DQSteps(context: DQContext, s"${sel.desc} IS NULL" }.mkString(" AND ") val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" - s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" + s"SELECT ${selClause} FROM `${sourceName}` " + + s"LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" } - val missRecordsTransStep = SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true) + val missRecordsTransStep = + SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true) + val missRecordsWriteSteps = procType match { - case BatchProcessType => { - val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName) + case BatchProcessType => + val rwName = + ruleParam.getOutputOpt(RecordOutputType). + flatMap(_.getNameOpt).getOrElse(missRecordsTableName) RecordWriteStep(rwName, missRecordsTableName) :: Nil - } case StreamingProcessType => Nil } val missRecordsUpdateWriteSteps = procType match { case BatchProcessType => Nil - case StreamingProcessType => { - val dsName = ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName) + case StreamingProcessType => + val dsName = + ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName) DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil - } } // 2. miss count val missCountTableName = "__missCount" val missColName = details.getStringOrKey(_miss) val missCountSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`" - case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`" + case BatchProcessType => + s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`" + case StreamingProcessType => + s"SELECT `${ConstantColumns.tmst}`,COUNT(*) AS `${missColName}` " + + s"FROM `${missRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`" } val missCountTransStep = SparkSqlTransformStep(missCountTableName, missCountSql, emptyMap) @@ -109,7 +116,9 @@ case class AccuracyExpr2DQSteps(context: DQContext, val totalColName = details.getStringOrKey(_total) val totalCountSql = procType match { case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" - case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`" + case StreamingProcessType => + s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` " + + s"FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`" } val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap) @@ -117,15 +126,14 @@ case class AccuracyExpr2DQSteps(context: DQContext, val accuracyTableName = ruleParam.getOutDfName() val matchedColName = details.getStringOrKey(_matched) val accuracyMetricSql = procType match { - case BatchProcessType => { + case BatchProcessType => s""" |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}` |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` """.stripMargin - } - case StreamingProcessType => { + case StreamingProcessType => s""" |SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS `${ConstantColumns.tmst}`, |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, @@ -134,27 +142,26 @@ case class AccuracyExpr2DQSteps(context: DQContext, |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` |ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = `${missCountTableName}`.`${ConstantColumns.tmst}` """.stripMargin - } } val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap) val accuracyMetricWriteSteps = procType match { - case BatchProcessType => { + case BatchProcessType => val metricOpt = ruleParam.getOutputOpt(MetricOutputType) val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.getOutDfName()) val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default) MetricWriteStep(mwName, accuracyTableName, flattenType) :: Nil - } case StreamingProcessType => Nil } // accuracy current steps val transSteps1 = missRecordsTransStep :: missCountTransStep :: totalCountTransStep :: accuracyTransStep :: Nil - val writeSteps1 = accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ missRecordsUpdateWriteSteps + val writeSteps1 = + accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ missRecordsUpdateWriteSteps // streaming extra steps val (transSteps2, writeSteps2) = procType match { case BatchProcessType => (Nil, Nil) - case StreamingProcessType => { + case StreamingProcessType => // 5. accuracy metric merge val accuracyMetricTableName = "__accuracy" val accuracyMetricRule = DataFrameOps._accuracy @@ -183,14 +190,16 @@ case class AccuracyExpr2DQSteps(context: DQContext, val accuracyRecordTransStep = SparkSqlTransformStep( accuracyRecordTableName, accuracyRecordSql, emptyMap) val accuracyRecordWriteStep = { - val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName) + val rwName = + ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt) + .getOrElse(missRecordsTableName) + RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName)) } // extra steps (accuracyMetricTransStep :: accuracyRecordTransStep :: Nil, accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil) - } } // full steps http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala index 4852a5b..87cfa86 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala @@ -18,8 +18,8 @@ under the License. */ package org.apache.griffin.measure.step.builder.dsl.transform -import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.dqdefinition.RuleParam +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.step.DQStep import org.apache.griffin.measure.step.builder.ConstantColumns @@ -80,16 +80,23 @@ case class CompletenessExpr2DQSteps(context: DQContext, val sourceAliasSql = { s"SELECT ${selClause} FROM `${sourceName}`" } - val sourceAliasTransStep = SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true) + val sourceAliasTransStep = + SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true) // 2. incomplete record val incompleteRecordsTableName = "__incompleteRecords" val completeWhereClause = aliases.map(a => s"`${a}` IS NOT NULL").mkString(" AND ") val incompleteWhereClause = s"NOT (${completeWhereClause})" - val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}" - val incompleteRecordTransStep = SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true) + + val incompleteRecordsSql = + s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}" + + val incompleteRecordTransStep = + SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true) val incompleteRecordWriteStep = { - val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(incompleteRecordsTableName) + val rwName = + ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt) + .getOrElse(incompleteRecordsTableName) RecordWriteStep(rwName, incompleteRecordsTableName) } @@ -97,17 +104,24 @@ case class CompletenessExpr2DQSteps(context: DQContext, val incompleteCountTableName = "__incompleteCount" val incompleteColName = details.getStringOrKey(_incomplete) val incompleteCountSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}`" - case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`" + case BatchProcessType => + s"SELECT COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}`" + case StreamingProcessType => + s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${incompleteColName}` " + + s"FROM `${incompleteRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`" } - val incompleteCountTransStep = SparkSqlTransformStep(incompleteCountTableName, incompleteCountSql, emptyMap) + val incompleteCountTransStep = + SparkSqlTransformStep(incompleteCountTableName, incompleteCountSql, emptyMap) // 4. total count val totalCountTableName = "__totalCount" val totalColName = details.getStringOrKey(_total) val totalCountSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`" - case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}` GROUP BY `${ConstantColumns.tmst}`" + case BatchProcessType => + s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`" + case StreamingProcessType => + s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` " + + s"FROM `${sourceAliasTableName}` GROUP BY `${ConstantColumns.tmst}`" } val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap) @@ -115,15 +129,14 @@ case class CompletenessExpr2DQSteps(context: DQContext, val completeTableName = ruleParam.getOutDfName() val completeColName = details.getStringOrKey(_complete) val completeMetricSql = procType match { - case BatchProcessType => { + case BatchProcessType => s""" |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`, |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}` |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}` """.stripMargin - } - case StreamingProcessType => { + case StreamingProcessType => s""" |SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS `${ConstantColumns.tmst}`, |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, @@ -132,7 +145,6 @@ case class CompletenessExpr2DQSteps(context: DQContext, |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}` |ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = `${incompleteCountTableName}`.`${ConstantColumns.tmst}` """.stripMargin - } } val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap) val completeWriteStep = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala index 6c56a77..70fee6c 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala @@ -18,8 +18,8 @@ under the License. */ package org.apache.griffin.measure.step.builder.dsl.transform -import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.dqdefinition.RuleParam +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.step.DQStep import org.apache.griffin.measure.step.builder.ConstantColumns @@ -101,7 +101,8 @@ case class DistinctnessExpr2DQSteps(context: DQContext, val sourceAliasSql = { s"SELECT ${selClause} FROM `${sourceName}`" } - val sourceAliasTransStep = SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true) + val sourceAliasTransStep = + SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true) // 2. total metric val totalTableName = "__totalMetric" @@ -125,13 +126,14 @@ case class DistinctnessExpr2DQSteps(context: DQContext, |FROM `${sourceAliasTableName}` GROUP BY ${distAliasesClause} """.stripMargin } - val selfGroupTransStep = SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true) + val selfGroupTransStep = + SparkSqlTransformStep(selfGroupTableName, selfGroupSql, emptyMap, true) val transSteps1 = sourceAliasTransStep :: totalTransStep :: selfGroupTransStep :: Nil val writeSteps1 = totalMetricWriteStep :: Nil val ((transSteps2, writeSteps2), dupCountTableName) = procType match { - case StreamingProcessType if (withOlderTable) => { + case StreamingProcessType if (withOlderTable) => // 4.0 update old data val targetDsUpdateWriteStep = DataSourceUpdateWriteStep(targetName, targetName) @@ -200,14 +202,14 @@ case class DistinctnessExpr2DQSteps(context: DQContext, |FROM `${groupTableName}` """.stripMargin } - val finalDupCountTransStep = SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true) + val finalDupCountTransStep = + SparkSqlTransformStep(finalDupCountTableName, finalDupCountSql, emptyMap, true) - ((olderAliasTransStep :: joinedTransStep :: groupTransStep :: finalDupCountTransStep :: Nil, + ((olderAliasTransStep :: joinedTransStep + :: groupTransStep :: finalDupCountTransStep :: Nil, targetDsUpdateWriteStep :: Nil), finalDupCountTableName) - } - case _ => { + case _ => ((Nil, Nil), selfGroupTableName) - } } // 8. distinct metric @@ -285,9 +287,13 @@ case class DistinctnessExpr2DQSteps(context: DQContext, |FROM `${dupItemsTableName}` GROUP BY ${groupSelClause}, `${dupColName}` """.stripMargin } - val groupDupMetricTransStep = SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap) + val groupDupMetricTransStep = + SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap) val groupDupMetricWriteStep = { - MetricWriteStep(duplicationArrayName, groupDupMetricTableName, ArrayFlattenType, writeTimestampOpt) + MetricWriteStep(duplicationArrayName, + groupDupMetricTableName, + ArrayFlattenType, + writeTimestampOpt) } val msteps = { @@ -306,7 +312,9 @@ case class DistinctnessExpr2DQSteps(context: DQContext, // 9. duplicate record val dupRecordTableName = "__dupRecords" val dupRecordSelClause = procType match { - case StreamingProcessType if (withOlderTable) => s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`" + case StreamingProcessType if (withOlderTable) => + s"${distAliasesClause}, `${dupColName}`, `${accuDupColName}`" + case _ => s"${distAliasesClause}, `${dupColName}`" } val dupRecordSql = { @@ -315,9 +323,14 @@ case class DistinctnessExpr2DQSteps(context: DQContext, |FROM `${dupCountTableName}` WHERE `${dupColName}` > 0 """.stripMargin } - val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true) + val dupRecordTransStep = + SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true) + val dupRecordWriteStep = { - val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName) + val rwName = + ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt) + .getOrElse(dupRecordTableName) + RecordWriteStep(rwName, dupRecordTableName, None, writeTimestampOpt) } @@ -332,7 +345,12 @@ case class DistinctnessExpr2DQSteps(context: DQContext, } val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap) val dupMetricWriteStep = { - MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType, writeTimestampOpt) + MetricWriteStep( + duplicationArrayName, + dupMetricTableName, + ArrayFlattenType, + writeTimestampOpt + ) } val msteps = { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala index d986b56..492f4fd 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/Expr2DQSteps.scala @@ -19,8 +19,8 @@ under the License. package org.apache.griffin.measure.step.builder.dsl.transform import org.apache.griffin.measure.Loggable -import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.dqdefinition.RuleParam +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange} import org.apache.griffin.measure.step.DQStep import org.apache.griffin.measure.step.builder.dsl.expr.Expr http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala index ecc115c..af493af 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala @@ -19,8 +19,9 @@ under the License. package org.apache.griffin.measure.step.builder.dsl.transform import org.apache.commons.lang.StringUtils -import org.apache.griffin.measure.configuration.enums.{BatchProcessType, FlattenType, MetricOutputType, StreamingProcessType} + import org.apache.griffin.measure.configuration.dqdefinition.RuleParam +import org.apache.griffin.measure.configuration.enums.{BatchProcessType, FlattenType, MetricOutputType, StreamingProcessType} import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.step.DQStep import org.apache.griffin.measure.step.builder.ConstantColumns @@ -63,7 +64,9 @@ case class ProfilingExpr2DQSteps(context: DQContext, val analyzer = ProfilingAnalyzer(profilingExpr, sourceName) val selExprDescs = analyzer.selectionExprs.map { sel => val alias = sel match { - case s: AliasableExpr => s.alias.filter(StringUtils.isNotEmpty).map(a => s" AS `${a}`").getOrElse("") + case s: AliasableExpr => + s.alias.filter(StringUtils.isNotEmpty).map(a => s" AS `${a}`").getOrElse("") + case _ => "" } s"${sel.desc}${alias}" @@ -76,21 +79,22 @@ case class ProfilingExpr2DQSteps(context: DQContext, val groupByClauseOpt = analyzer.groupbyExprOpt val groupbyClause = procType match { case BatchProcessType => groupByClauseOpt.map(_.desc).getOrElse("") - case StreamingProcessType => { - val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${ConstantColumns.tmst}`") :: Nil, None) + case StreamingProcessType => + val tmstGroupbyClause = + GroupbyClause(LiteralStringExpr(s"`${ConstantColumns.tmst}`") :: Nil, None) val mergedGroubbyClause = tmstGroupbyClause.merge(groupByClauseOpt match { case Some(gbc) => gbc case _ => GroupbyClause(Nil, None) }) mergedGroubbyClause.desc - } } val preGroupbyClause = analyzer.preGroupbyExprs.map(_.desc).mkString(" ") val postGroupbyClause = analyzer.postGroupbyExprs.map(_.desc).mkString(" ") // 1. select statement val profilingSql = { - s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" + s"SELECT ${selCondition} ${selClause} " + + s"${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" } val profilingName = ruleParam.getOutDfName() val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala index 8a3924b..3731da9 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala @@ -18,8 +18,8 @@ under the License. */ package org.apache.griffin.measure.step.builder.dsl.transform -import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.dqdefinition.RuleParam +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.step.DQStep import org.apache.griffin.measure.step.builder.ConstantColumns @@ -79,19 +79,17 @@ case class TimelinessExpr2DQSteps(context: DQContext, // 1. in time val inTimeTableName = "__inTime" val inTimeSql = etsSelOpt match { - case Some(etsSel) => { + case Some(etsSel) => s""" |SELECT *, (${btsSel}) AS `${ConstantColumns.beginTs}`, |(${etsSel}) AS `${ConstantColumns.endTs}` |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL AND (${etsSel}) IS NOT NULL """.stripMargin - } - case _ => { + case _ => s""" |SELECT *, (${btsSel}) AS `${ConstantColumns.beginTs}` |FROM ${sourceName} WHERE (${btsSel}) IS NOT NULL """.stripMargin - } } val inTimeTransStep = SparkSqlTransformStep(inTimeTableName, inTimeSql, emptyMap) @@ -103,7 +101,8 @@ case class TimelinessExpr2DQSteps(context: DQContext, case _ => ConstantColumns.tmst } val latencySql = { - s"SELECT *, (`${etsColName}` - `${ConstantColumns.beginTs}`) AS `${latencyColName}` FROM `${inTimeTableName}`" + s"SELECT *, (`${etsColName}` - `${ConstantColumns.beginTs}`) AS `${latencyColName}` " + + s"FROM `${inTimeTableName}`" } val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true) @@ -112,14 +111,15 @@ case class TimelinessExpr2DQSteps(context: DQContext, val totalColName = details.getStringOrKey(_total) val avgColName = details.getStringOrKey(_avg) val metricSql = procType match { - case BatchProcessType => { + + case BatchProcessType => s""" |SELECT COUNT(*) AS `${totalColName}`, |CAST(AVG(`${latencyColName}`) AS BIGINT) AS `${avgColName}` |FROM `${latencyTableName}` """.stripMargin - } - case StreamingProcessType => { + + case StreamingProcessType => s""" |SELECT `${ConstantColumns.tmst}`, |COUNT(*) AS `${totalColName}`, @@ -127,7 +127,6 @@ case class TimelinessExpr2DQSteps(context: DQContext, |FROM `${latencyTableName}` |GROUP BY `${ConstantColumns.tmst}` """.stripMargin - } } val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap) val metricWriteStep = { @@ -143,24 +142,26 @@ case class TimelinessExpr2DQSteps(context: DQContext, // 4. timeliness record val (transSteps2, writeSteps2) = TimeUtil.milliseconds(details.getString(_threshold, "")) match { - case Some(tsh) => { + case Some(tsh) => val recordTableName = "__lateRecords" val recordSql = { s"SELECT * FROM `${latencyTableName}` WHERE `${latencyColName}` > ${tsh}" } val recordTransStep = SparkSqlTransformStep(recordTableName, recordSql, emptyMap) val recordWriteStep = { - val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(recordTableName) + val rwName = + ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt) + .getOrElse(recordTableName) + RecordWriteStep(rwName, recordTableName, None) } (recordTransStep :: Nil, recordWriteStep :: Nil) - } case _ => (Nil, Nil) } // 5. ranges val (transSteps3, writeSteps3) = TimeUtil.milliseconds(details.getString(_stepSize, "")) match { - case Some(stepSize) => { + case Some(stepSize) => // 5.1 range val rangeTableName = "__range" val stepColName = details.getStringOrKey(_step) @@ -176,26 +177,24 @@ case class TimelinessExpr2DQSteps(context: DQContext, val rangeMetricTableName = "__rangeMetric" val countColName = details.getStringOrKey(_count) val rangeMetricSql = procType match { - case BatchProcessType => { + case BatchProcessType => s""" |SELECT `${stepColName}`, COUNT(*) AS `${countColName}` |FROM `${rangeTableName}` GROUP BY `${stepColName}` """.stripMargin - } - case StreamingProcessType => { + case StreamingProcessType => s""" |SELECT `${ConstantColumns.tmst}`, `${stepColName}`, COUNT(*) AS `${countColName}` |FROM `${rangeTableName}` GROUP BY `${ConstantColumns.tmst}`, `${stepColName}` """.stripMargin - } } - val rangeMetricTransStep = SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap) + val rangeMetricTransStep = + SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap) val rangeMetricWriteStep = { MetricWriteStep(stepColName, rangeMetricTableName, ArrayFlattenType) } (rangeTransStep :: rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil) - } case _ => (Nil, Nil) } @@ -206,7 +205,8 @@ case class TimelinessExpr2DQSteps(context: DQContext, val percentileColName = details.getStringOrKey(_percentileColPrefix) val percentileCols = percentiles.map { pct => val pctName = (pct * 100).toInt.toString - s"floor(percentile_approx(${latencyColName}, ${pct})) AS `${percentileColName}_${pctName}`" + s"floor(percentile_approx(${latencyColName}, ${pct})) " + + s"AS `${percentileColName}_${pctName}`" }.mkString(", ") val percentileSql = { s""" @@ -214,7 +214,9 @@ case class TimelinessExpr2DQSteps(context: DQContext, |FROM `${latencyTableName}` """.stripMargin } - val percentileTransStep = SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap) + val percentileTransStep = + SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap) + val percentileWriteStep = { MetricWriteStep(percentileTableName, percentileTableName, DefaultFlattenType) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala index 443239c..28e9d48 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala @@ -18,8 +18,8 @@ under the License. */ package org.apache.griffin.measure.step.builder.dsl.transform -import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.configuration.dqdefinition.RuleParam +import org.apache.griffin.measure.configuration.enums._ import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.step.DQStep import org.apache.griffin.measure.step.builder.ConstantColumns @@ -112,7 +112,8 @@ case class UniquenessExpr2DQSteps(context: DQContext, }.mkString(", ") val dupColName = details.getStringOrKey(_dup) val groupSql = { - s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` FROM `${joinedTableName}` GROUP BY ${groupSelClause}" + s"SELECT ${groupSelClause}, (COUNT(*) - 1) AS `${dupColName}` " + + s"FROM `${joinedTableName}` GROUP BY ${groupSelClause}" } val groupTransStep = SparkSqlTransformStep(groupTableName, groupSql, emptyMap, true) @@ -121,12 +122,11 @@ case class UniquenessExpr2DQSteps(context: DQContext, val totalColName = details.getStringOrKey(_total) val totalSql = procType match { case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" - case StreamingProcessType => { + case StreamingProcessType => s""" |SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` |FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}` """.stripMargin - } } val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap) val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, EntriesFlattenType) @@ -136,22 +136,25 @@ case class UniquenessExpr2DQSteps(context: DQContext, val uniqueRecordSql = { s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` = 0" } - val uniqueRecordTransStep = SparkSqlTransformStep(uniqueRecordTableName, uniqueRecordSql, emptyMap) + val uniqueRecordTransStep = + SparkSqlTransformStep(uniqueRecordTableName, uniqueRecordSql, emptyMap) // 7. unique metric val uniqueTableName = "__uniqueMetric" val uniqueColName = details.getStringOrKey(_unique) val uniqueSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`" - case StreamingProcessType => { + case BatchProcessType => + s"SELECT COUNT(*) AS `${uniqueColName}` FROM `${uniqueRecordTableName}`" + case StreamingProcessType => s""" |SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${uniqueColName}` |FROM `${uniqueRecordTableName}` GROUP BY `${ConstantColumns.tmst}` """.stripMargin - } } val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap) - val uniqueMetricWriteStep = MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType) + + val uniqueMetricWriteStep = + MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType) val transSteps1 = sourceTransStep :: targetTransStep :: joinedTransStep :: groupTransStep :: totalTransStep :: uniqueRecordTransStep :: uniqueTransStep :: Nil @@ -164,9 +167,14 @@ case class UniquenessExpr2DQSteps(context: DQContext, val dupRecordSql = { s"SELECT * FROM `${groupTableName}` WHERE `${dupColName}` > 0" } - val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true) + val dupRecordTransStep = + SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true) + val dupRecordWriteStep = { - val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName) + val rwName = + ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt) + .getOrElse(dupRecordTableName) + RecordWriteStep(rwName, dupRecordTableName) } @@ -175,7 +183,9 @@ case class UniquenessExpr2DQSteps(context: DQContext, val numColName = details.getStringOrKey(_num) val dupMetricSelClause = procType match { case BatchProcessType => s"`${dupColName}`, COUNT(*) AS `${numColName}`" - case StreamingProcessType => s"`${ConstantColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`" + + case StreamingProcessType => + s"`${ConstantColumns.tmst}`, `${dupColName}`, COUNT(*) AS `${numColName}`" } val dupMetricGroupbyClause = procType match { case BatchProcessType => s"`${dupColName}`" http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala index b2a95ce..d57bf23 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/AccuracyAnalyzer.scala @@ -21,9 +21,11 @@ package org.apache.griffin.measure.step.builder.dsl.transform.analyzer import org.apache.griffin.measure.step.builder.dsl.expr._ -case class AccuracyAnalyzer(expr: LogicalExpr, sourceName: String, targetName: String) extends BasicAnalyzer { +case class AccuracyAnalyzer(expr: LogicalExpr, sourceName: String, targetName: String) + extends BasicAnalyzer { - val dataSourceNames = expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames) + val dataSourceNames = + expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames) val sourceSelectionExprs = { val seq = seqSelectionExprs(sourceName) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala index f8b872a..5a73bce 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/CompletenessAnalyzer.scala @@ -21,7 +21,8 @@ package org.apache.griffin.measure.step.builder.dsl.transform.analyzer import org.apache.griffin.measure.step.builder.dsl.expr._ -case class CompletenessAnalyzer(expr: CompletenessClause, sourceName: String) extends BasicAnalyzer { +case class CompletenessAnalyzer(expr: CompletenessClause, sourceName: String) + extends BasicAnalyzer { val seqAlias = (expr: Expr, v: Seq[String]) => { expr match { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala index 9cee8ab..b1f4229 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/DistinctnessAnalyzer.scala @@ -21,8 +21,8 @@ package org.apache.griffin.measure.step.builder.dsl.transform.analyzer import org.apache.griffin.measure.step.builder.dsl.expr._ -//case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String, targetName: String) extends BasicAnalyzer { -case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String) extends BasicAnalyzer { +case class DistinctnessAnalyzer(expr: DistinctnessClause, sourceName: String) + extends BasicAnalyzer { val seqAlias = (expr: Expr, v: Seq[String]) => { expr match { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala index 049e6fd..f66d482 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/ProfilingAnalyzer.scala @@ -23,7 +23,8 @@ import org.apache.griffin.measure.step.builder.dsl.expr._ case class ProfilingAnalyzer(expr: ProfilingClause, sourceName: String) extends BasicAnalyzer { - val dataSourceNames = expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames) + val dataSourceNames = + expr.preOrderTraverseDepthFirst(Set[String]())(seqDataSourceNames, combDataSourceNames) val selectionExprs: Seq[Expr] = { expr.selectClause.exprs.map(_.extractSelf).flatMap { expr => http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala index 0976d6c..8a05d17 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/analyzer/UniquenessAnalyzer.scala @@ -21,7 +21,8 @@ package org.apache.griffin.measure.step.builder.dsl.transform.analyzer import org.apache.griffin.measure.step.builder.dsl.expr.{AliasableExpr, _} -case class UniquenessAnalyzer(expr: UniquenessClause, sourceName: String, targetName: String) extends BasicAnalyzer { +case class UniquenessAnalyzer(expr: UniquenessClause, sourceName: String, targetName: String) + extends BasicAnalyzer { val seqAlias = (expr: Expr, v: Seq[String]) => { expr match { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala index eac3b2b..713dc55 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala @@ -26,9 +26,10 @@ import org.apache.griffin.measure.configuration.enums._ */ object PreProcParamMaker { - case class StringAnyMap(values:Map[String,Any]) + case class StringAnyMap ( values: Map[String, Any] ) - def makePreProcRules(rules: Seq[RuleParam], suffix: String, dfName: String): (Seq[RuleParam], String) = { + def makePreProcRules(rules: Seq[RuleParam], + suffix: String, dfName: String): (Seq[RuleParam], String) = { val len = rules.size val (newRules, _) = rules.zipWithIndex.foldLeft((Nil: Seq[RuleParam], dfName)) { (ret, pair) => val (rls, prevOutDfName) = ret @@ -47,10 +48,9 @@ object PreProcParamMaker { val rpRule = rule.replaceInOutDfName(newInDfName, newOutDfName) rule.getDslType match { case DataFrameOpsType => rpRule - case _ => { + case _ => val newRule = replaceDfNameSuffix(rule.getRule, rule.getInDfName(), suffix) rpRule.replaceRule(newRule) - } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala index 8b1df82..9194da1 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/read/ReadStep.scala @@ -18,9 +18,10 @@ under the License. */ package org.apache.griffin.measure.step.read +import org.apache.spark.sql._ + import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.step.DQStep -import org.apache.spark.sql._ trait ReadStep extends DQStep { @@ -31,15 +32,13 @@ trait ReadStep extends DQStep { def execute(context: DQContext): Boolean = { info(s"read data source [${name}]") read(context) match { - case Some(df) => { + case Some(df) => // if (needCache) context.dataFrameCache.cacheDataFrame(name, df) context.runTimeTableRegister.registerTable(name, df) true - } - case _ => { + case _ => warn(s"read data source [${name}] fails") false - } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala index 6dae1cb..88a39cf 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/read/UnionReadStep.scala @@ -18,8 +18,9 @@ under the License. */ package org.apache.griffin.measure.step.read -import org.apache.griffin.measure.context.DQContext import org.apache.spark.sql._ + +import org.apache.griffin.measure.context.DQContext import org.apache.griffin.measure.utils.DataFrameUtil._ case class UnionReadStep(name: String, http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala index 86b367c..088f328 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOps.scala @@ -20,13 +20,14 @@ package org.apache.griffin.measure.step.transform import java.util.Date +import org.apache.spark.sql.{Encoders, Row, SQLContext, _} +import org.apache.spark.sql.types.{BooleanType, LongType, StructField, StructType} + import org.apache.griffin.measure.context.ContextId -import org.apache.griffin.measure.context.streaming.metric.CacheResults.CacheResult import org.apache.griffin.measure.context.streaming.metric._ +import org.apache.griffin.measure.context.streaming.metric.CacheResults.CacheResult import org.apache.griffin.measure.step.builder.ConstantColumns import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.spark.sql.types.{BooleanType, LongType, StructField, StructType} -import org.apache.spark.sql.{Encoders, Row, SQLContext, _} /** * pre-defined data frame operations @@ -44,7 +45,9 @@ object DataFrameOps { val _matched = "matched" } - def fromJson(sqlContext: SQLContext, inputDfName: String, details: Map[String, Any]): DataFrame = { + def fromJson(sqlContext: SQLContext, + inputDfName: String, + details: Map[String, Any]): DataFrame = { val _colName = "col.name" val colNameOpt = details.get(_colName).map(_.toString) @@ -58,7 +61,10 @@ object DataFrameOps { sqlContext.read.json(rdd) // slow process } - def accuracy(sqlContext: SQLContext, inputDfName: String, contextId: ContextId, details: Map[String, Any]): DataFrame = { + def accuracy(sqlContext: SQLContext, + inputDfName: String, + contextId: ContextId, + details: Map[String, Any]): DataFrame = { import AccuracyOprKeys._ val miss = details.getStringOrKey(_miss) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala index 5f99ed2..a2bf46e 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/DataFrameOpsTransformStep.scala @@ -35,7 +35,9 @@ case class DataFrameOpsTransformStep(name: String, try { val df = rule match { case DataFrameOps._fromJson => DataFrameOps.fromJson(sqlContext, inputDfName, details) - case DataFrameOps._accuracy => DataFrameOps.accuracy(sqlContext, inputDfName, context.contextId, details) + case DataFrameOps._accuracy => + DataFrameOps.accuracy(sqlContext, inputDfName, context.contextId, details) + case DataFrameOps._clear => DataFrameOps.clear(sqlContext, inputDfName, details) case _ => throw new Exception(s"df opr [ ${rule} ] not supported") } @@ -43,10 +45,9 @@ case class DataFrameOpsTransformStep(name: String, context.runTimeTableRegister.registerTable(name, df) true } catch { - case e: Throwable => { + case e: Throwable => error(s"run data frame ops [ ${rule} ] error: ${e.getMessage}") false - } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala index ead7344..ca03f79 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/transform/SparkSqlTransformStep.scala @@ -37,10 +37,9 @@ case class SparkSqlTransformStep(name: String, context.runTimeTableRegister.registerTable(name, df) true } catch { - case e: Throwable => { + case e: Throwable => error(s"run spark sql [ ${rule} ] error: ${e.getMessage}") false - } } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala index 9415998..f34e003 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala @@ -19,9 +19,10 @@ under the License. package org.apache.griffin.measure.step.write import org.apache.commons.lang.StringUtils -import org.apache.griffin.measure.context.DQContext import org.apache.spark.sql.DataFrame +import org.apache.griffin.measure.context.DQContext + /** * update data source streaming cache */ @@ -34,12 +35,12 @@ case class DataSourceUpdateWriteStep(dsName: String, def execute(context: DQContext): Boolean = { getDataSourceCacheUpdateDf(context) match { - case Some(df) => { - context.dataSources.find(ds => StringUtils.equals(ds.name, dsName)).foreach(_.updateData(df)) - } - case _ => { + case Some(df) => + context.dataSources + .find(ds => StringUtils.equals(ds.name, dsName)) + .foreach(_.updateData(df)) + case _ => warn(s"update ${dsName} from ${inputName} fails") - } } true } @@ -49,13 +50,13 @@ case class DataSourceUpdateWriteStep(dsName: String, val df = context.sqlContext.table(s"`${name}`") Some(df) } catch { - case e: Throwable => { + case e: Throwable => error(s"get data frame ${name} fails") None - } } } - private def getDataSourceCacheUpdateDf(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName) + private def getDataSourceCacheUpdateDf(context: DQContext): Option[DataFrame] + = getDataFrame(context, inputName) } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala index 8f7d01c..e787d96 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricFlushStep.scala @@ -36,10 +36,9 @@ case class MetricFlushStep() extends WriteStep { context.getSink(t).sinkMetrics(metric) true } catch { - case e: Throwable => { + case e: Throwable => error(s"flush metrics error: ${e.getMessage}") false - } } ret && pr } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/18fc4cf4/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala index 4771891..bc721f2 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/MetricWriteStep.scala @@ -45,11 +45,12 @@ case class MetricWriteStep(name: String, // get timestamp and normalize metric val writeMode = writeTimestampOpt.map(_ => SimpleMode).getOrElse(context.writeMode) val timestampMetricMap: Map[Long, Map[String, Any]] = writeMode match { - case SimpleMode => { + + case SimpleMode => val metrics: Map[String, Any] = flattenMetric(metricMaps, name, flattenType) emptyMetricMap + (timestamp -> metrics) - } - case TimestampMode => { + + case TimestampMode => val tmstMetrics = metricMaps.map { metric => val tmst = metric.getLong(ConstantColumns.tmst, timestamp) val pureMetric = metric.removeKeys(ConstantColumns.columns) @@ -61,7 +62,6 @@ case class MetricWriteStep(name: String, val mtc = flattenMetric(maps, name, flattenType) (k, mtc) } - } } // write to metric wrapper @@ -88,10 +88,9 @@ case class MetricWriteStep(name: String, }.toSeq } else Nil } catch { - case e: Throwable => { + case e: Throwable => error(s"get metric ${name} fails") Nil - } } } @@ -100,14 +99,12 @@ case class MetricWriteStep(name: String, flattenType match { case EntriesFlattenType => metrics.headOption.getOrElse(emptyMap) case ArrayFlattenType => Map[String, Any]((name -> metrics)) - case MapFlattenType => { + case MapFlattenType => val v = metrics.headOption.getOrElse(emptyMap) Map[String, Any]((name -> v)) - } - case _ => { + case _ => if (metrics.size > 1) Map[String, Any]((name -> metrics)) else metrics.headOption.getOrElse(emptyMap) - } } }