http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala deleted file mode 100644 index b4c35f5..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/LogicalExpr.scala +++ /dev/null @@ -1,204 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.dsl.expr - -trait LogicalExpr extends Expr { -} - -case class InExpr(head: Expr, is: Boolean, range: Seq[Expr]) extends LogicalExpr { - - addChildren(head +: range) - - def desc: String = { - val notStr = if (is) "" else " NOT" - s"${head.desc}${notStr} IN (${range.map(_.desc).mkString(", ")})" - } - def coalesceDesc: String = { - val notStr = if (is) "" else " NOT" - s"${head.coalesceDesc}${notStr} IN (${range.map(_.coalesceDesc).mkString(", ")})" - } - - override def map(func: (Expr) => Expr): InExpr = { - InExpr(func(head), is, range.map(func(_))) - } -} - -case class BetweenExpr(head: Expr, is: Boolean, range: Seq[Expr]) extends LogicalExpr { - - range match { - case first :: second :: _ => addChildren(head :: first :: second :: Nil) - case _ => throw new Exception("between expression exception: range less than 2") - } - - def desc: String = { - val notStr = if (is) "" else " NOT" - val rangeStr = range match { - case first :: second :: _ => s"${first.desc} AND ${second.desc}" - case _ => throw new Exception("between expression exception: range less than 2") - } - s"${head.desc}${notStr} BETWEEN ${rangeStr}" - } - def coalesceDesc: String = { - val notStr = if (is) "" else " NOT" - val rangeStr = range match { - case first :: second :: _ => s"${first.coalesceDesc} AND ${second.coalesceDesc}" - case _ => throw new Exception("between expression exception: range less than 2") - } - s"${head.coalesceDesc}${notStr} BETWEEN ${rangeStr}" - } - - override def map(func: (Expr) => Expr): BetweenExpr = { - BetweenExpr(func(head), is, range.map(func(_))) - } -} - -case class LikeExpr(head: Expr, is: Boolean, value: Expr) extends LogicalExpr { - - addChildren(head :: value :: Nil) - - def desc: String = { - val notStr = if (is) "" else " NOT" - s"${head.desc}${notStr} LIKE ${value.desc}" - } - def coalesceDesc: String = { - val notStr = if (is) "" else " NOT" - s"${head.coalesceDesc}${notStr} LIKE ${value.coalesceDesc}" - } - - override def map(func: (Expr) => Expr): LikeExpr = { - LikeExpr(func(head), is, func(value)) - } -} - -case class IsNullExpr(head: Expr, is: Boolean) extends LogicalExpr { - - addChild(head) - - def desc: String = { - val notStr = if (is) "" else " NOT" - s"${head.desc} IS${notStr} NULL" - } - def coalesceDesc: String = desc - - override def map(func: (Expr) => Expr): IsNullExpr = { - IsNullExpr(func(head), is) - } -} - -case class IsNanExpr(head: Expr, is: Boolean) extends LogicalExpr { - - addChild(head) - - def desc: String = { - val notStr = if (is) "" else "NOT " - s"${notStr}isnan(${head.desc})" - } - def coalesceDesc: String = desc - - override def map(func: (Expr) => Expr): IsNanExpr = { - IsNanExpr(func(head), is) - } -} - -// ----------- - -case class LogicalFactorExpr(factor: Expr, withBracket: Boolean, aliasOpt: Option[String] - ) extends LogicalExpr with AliasableExpr { - - addChild(factor) - - def desc: String = if (withBracket) s"(${factor.desc})" else factor.desc - def coalesceDesc: String = factor.coalesceDesc - def alias: Option[String] = aliasOpt - override def extractSelf: Expr = { - if (aliasOpt.nonEmpty) this - else factor.extractSelf - } - - override def map(func: (Expr) => Expr): LogicalFactorExpr = { - LogicalFactorExpr(func(factor), withBracket, aliasOpt) - } -} - -case class UnaryLogicalExpr(oprs: Seq[String], factor: LogicalExpr) extends LogicalExpr { - - addChild(factor) - - def desc: String = { - oprs.foldRight(factor.desc) { (opr, fac) => - s"(${trans(opr)} ${fac})" - } - } - def coalesceDesc: String = { - oprs.foldRight(factor.coalesceDesc) { (opr, fac) => - s"(${trans(opr)} ${fac})" - } - } - private def trans(s: String): String = { - s match { - case "!" => "NOT" - case _ => s.toUpperCase - } - } - override def extractSelf: Expr = { - if (oprs.nonEmpty) this - else factor.extractSelf - } - - override def map(func: (Expr) => Expr): UnaryLogicalExpr = { - UnaryLogicalExpr(oprs, func(factor).asInstanceOf[LogicalExpr]) - } -} - -case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, LogicalExpr)]) extends LogicalExpr { - - addChildren(factor +: tails.map(_._2)) - - def desc: String = { - val res = tails.foldLeft(factor.desc) { (fac, tail) => - val (opr, expr) = tail - s"${fac} ${trans(opr)} ${expr.desc}" - } - if (tails.size <= 0) res else s"${res}" - } - def coalesceDesc: String = { - val res = tails.foldLeft(factor.coalesceDesc) { (fac, tail) => - val (opr, expr) = tail - s"${fac} ${trans(opr)} ${expr.coalesceDesc}" - } - if (tails.size <= 0) res else s"${res}" - } - private def trans(s: String): String = { - s match { - case "&&" => "AND" - case "||" => "OR" - case _ => s.trim.toUpperCase - } - } - override def extractSelf: Expr = { - if (tails.nonEmpty) this - else factor.extractSelf - } - - override def map(func: (Expr) => Expr): BinaryLogicalExpr = { - BinaryLogicalExpr(func(factor).asInstanceOf[LogicalExpr], tails.map{ pair => - (pair._1, func(pair._2).asInstanceOf[LogicalExpr]) - }) - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala deleted file mode 100644 index 4217e44..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/MathExpr.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.dsl.expr - -trait MathExpr extends Expr { -} - -case class MathFactorExpr(factor: Expr, withBracket: Boolean, aliasOpt: Option[String] - ) extends MathExpr with AliasableExpr { - - addChild(factor) - - def desc: String = if (withBracket) s"(${factor.desc})" else factor.desc - def coalesceDesc: String = factor.coalesceDesc - def alias: Option[String] = aliasOpt - override def extractSelf: Expr = { - if (aliasOpt.nonEmpty) this - else factor.extractSelf - } - - override def map(func: (Expr) => Expr): MathFactorExpr = { - MathFactorExpr(func(factor), withBracket, aliasOpt) - } -} - -case class UnaryMathExpr(oprs: Seq[String], factor: MathExpr) extends MathExpr { - - addChild(factor) - - def desc: String = { - oprs.foldRight(factor.desc) { (opr, fac) => - s"(${opr}${fac})" - } - } - def coalesceDesc: String = { - oprs.foldRight(factor.coalesceDesc) { (opr, fac) => - s"(${opr}${fac})" - } - } - override def extractSelf: Expr = { - if (oprs.nonEmpty) this - else factor.extractSelf - } - - override def map(func: (Expr) => Expr): UnaryMathExpr = { - UnaryMathExpr(oprs, func(factor).asInstanceOf[MathExpr]) - } -} - -case class BinaryMathExpr(factor: MathExpr, tails: Seq[(String, MathExpr)]) extends MathExpr { - - addChildren(factor +: tails.map(_._2)) - - def desc: String = { - val res = tails.foldLeft(factor.desc) { (fac, tail) => - val (opr, expr) = tail - s"${fac} ${opr} ${expr.desc}" - } - if (tails.size <= 0) res else s"${res}" - } - def coalesceDesc: String = { - val res = tails.foldLeft(factor.coalesceDesc) { (fac, tail) => - val (opr, expr) = tail - s"${fac} ${opr} ${expr.coalesceDesc}" - } - if (tails.size <= 0) res else s"${res}" - } - override def extractSelf: Expr = { - if (tails.nonEmpty) this - else factor.extractSelf - } - - override def map(func: (Expr) => Expr): BinaryMathExpr = { - BinaryMathExpr(func(factor).asInstanceOf[MathExpr], tails.map{ pair => - (pair._1, func(pair._2).asInstanceOf[MathExpr]) - }) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala deleted file mode 100644 index 4f84b10..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/SelectExpr.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.dsl.expr - -trait HeadExpr extends Expr with AliasableExpr { - def alias: Option[String] = None -} - -case class DataSourceHeadExpr(name: String) extends HeadExpr { - def desc: String = s"`${name}`" - def coalesceDesc: String = desc -} - -case class FieldNameHeadExpr(field: String) extends HeadExpr { - def desc: String = s"`${field}`" - def coalesceDesc: String = desc - override def alias: Option[String] = Some(field) -} - -case class AllSelectHeadExpr() extends HeadExpr { - def desc: String = "*" - def coalesceDesc: String = desc -} - -case class OtherHeadExpr(expr: Expr) extends HeadExpr { - - addChild(expr) - - def desc: String = expr.desc - def coalesceDesc: String = expr.coalesceDesc - override def alias: Option[String] = Some(expr.desc) - - override def map(func: (Expr) => Expr): OtherHeadExpr = { - OtherHeadExpr(func(expr)) - } -} - -// ------------- - -trait SelectExpr extends Expr with AliasableExpr { -} - -case class AllFieldsSelectExpr() extends SelectExpr { - def desc: String = s".*" - def coalesceDesc: String = desc - def alias: Option[String] = None -} - -case class FieldSelectExpr(field: String) extends SelectExpr { - def desc: String = s".`${field}`" - def coalesceDesc: String = desc - override def alias: Option[String] = Some(field) -} - -case class IndexSelectExpr(index: Expr) extends SelectExpr { - - addChild(index) - - def desc: String = s"[${index.desc}]" - def coalesceDesc: String = desc - def alias: Option[String] = Some(index.desc) - - override def map(func: (Expr) => Expr): IndexSelectExpr = { - IndexSelectExpr(func(index)) - } -} - -case class FunctionSelectExpr(functionName: String, args: Seq[Expr]) extends SelectExpr { - - addChildren(args) - - def desc: String = "" - def coalesceDesc: String = desc - def alias: Option[String] = Some(functionName) - - override def map(func: (Expr) => Expr): FunctionSelectExpr = { - FunctionSelectExpr(functionName, args.map(func(_))) - } -} - -// ------------- - -case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: Option[String]) extends SelectExpr { - - addChildren(head +: selectors) - - def desc: String = { - selectors.foldLeft(head.desc) { (hd, sel) => - sel match { - case FunctionSelectExpr(funcName, args) => { - val nargs = hd +: args.map(_.desc) - s"${funcName}(${nargs.mkString(", ")})" - } - case _ => s"${hd}${sel.desc}" - } - } - } - def coalesceDesc: String = { - selectors.lastOption match { - case None => desc - case Some(sel: FunctionSelectExpr) => desc - case _ => s"coalesce(${desc}, '')" - } - } - def alias: Option[String] = { - if (aliasOpt.isEmpty) { - val aliasSeq = (head +: selectors).flatMap(_.alias) - if (aliasSeq.nonEmpty) Some(aliasSeq.mkString("_")) else None - } else aliasOpt - } - - override def map(func: (Expr) => Expr): SelectionExpr = { - SelectionExpr(func(head).asInstanceOf[HeadExpr], - selectors.map(func(_).asInstanceOf[SelectExpr]), aliasOpt) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/TreeNode.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/TreeNode.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/TreeNode.scala deleted file mode 100644 index aab16b4..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/TreeNode.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.dsl.expr - -trait TreeNode extends Serializable { - - var children = Seq[TreeNode]() - - def addChild(expr: TreeNode) = { children :+= expr } - def addChildren(exprs: Seq[TreeNode]) = { children ++= exprs } - - def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T): T = { - if (this.isInstanceOf[A]) { - val tv = seqOp(this.asInstanceOf[A], z) - children.foldLeft(combOp(z, tv)) { (ov, tn) => - combOp(ov, tn.preOrderTraverseDepthFirst(z)(seqOp, combOp)) - } - } else z - } - def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T): T = { - if (this.isInstanceOf[A]) { - val cv = children.foldLeft(z) { (ov, tn) => - combOp(ov, tn.postOrderTraverseDepthFirst(z)(seqOp, combOp)) - } - combOp(z, seqOp(this.asInstanceOf[A], cv)) - } else z - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala deleted file mode 100644 index 3a0d737..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala +++ /dev/null @@ -1,388 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.dsl.parser - -import org.apache.griffin.measure.rule.dsl.expr._ - -import scala.util.parsing.combinator.JavaTokenParsers - -trait BasicParser extends JavaTokenParsers with Serializable { - - val dataSourceNames: Seq[String] - val functionNames: Seq[String] - - private def trim(str: String): String = { - val regex = """`(.*)`""".r - str match { - case regex(s) => s - case _ => str - } - } - - /** - * BNF for basic parser - * - * -- literal -- - * <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean> | <literal-null> | <literal-nan> - * <literal-string> ::= <any-string> - * <literal-number> ::= <integer> | <double> - * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms") - * <literal-boolean> ::= true | false - * <literal-null> ::= null - * <literal-nan> ::= nan - * - * -- selection -- - * <selection> ::= <selection-head> [ <field-sel> | <index-sel> | <function-sel> ]* [<as-alias>]? - * <selection-head> ::= ("data source name registered") | <function> | <field-name> | <all-selection> - * <field-sel> ::= "." <field-name> | "[" <quote-field-name> "]" - * <index-sel> ::= "[" <arg> "]" - * <function-sel> ::= "." <function-name> "(" [<arg>]? [, <arg>]* ")" - * <arg> ::= <math-expr> - * - * -- as alias -- - * <as-alias> ::= <as> <field-name> - * - * -- math expr -- - * <math-factor> ::= <literal> | <function> | <selection> | "(" <math-expr> ")" [<as-alias>]? - * <unary-math-expr> ::= [<unary-opr>]* <math-factor> - * <binary-math-expr> ::= <unary-math-expr> [<binary-opr> <unary-math-expr>]+ - * <math-expr> ::= <binary-math-expr> - * - * -- logical expr -- - * <in-expr> ::= <math-expr> [<not>]? <in> <range-expr> - * <between-expr> ::= <math-expr> [<not>]? <between> (<math-expr> <and> <math-expr> | <range-expr>) - * <range-expr> ::= "(" [<math-expr>]? [, <math-expr>]+ ")" - * <like-expr> ::= <math-expr> [<not>]? <like> <math-expr> - * <is-null-expr> ::= <math-expr> <is> [<not>]? <null> - * <is-nan-expr> ::= <math-expr> <is> [<not>]? <nan> - * - * <logical-factor> ::= <math-expr> | <in-expr> | <between-expr> | <like-expr> | <is-null-expr> | <is-nan-expr> | "(" <logical-expr> ")" [<as-alias>]? - * <unary-logical-expr> ::= [<unary-logical-opr>]* <logical-factor> - * <binary-logical-expr> ::= <unary-logical-expr> [<binary-logical-opr> <unary-logical-expr>]+ - * <logical-expr> ::= <binary-logical-expr> - * - * -- expression -- - * <expr> = <math-expr> | <logical-expr> - * - * -- function expr -- - * <function> ::= <function-name> "(" [<arg>] [, <arg>]+ ")" [<as-alias>]? - * <function-name> ::= ("function name registered") - * <arg> ::= <expr> - * - * -- clauses -- - * <select-clause> = <expr> [, <expr>]* - * <where-clause> = <where> <expr> - * <from-clause> = <from> ("data source name registered") - * <having-clause> = <having> <expr> - * <groupby-clause> = <group> <by> <expr> [ <having-clause> ]? - * <orderby-item> = <expr> [ <DESC> ]? - * <orderby-clause> = <order> <by> <orderby-item> [ , <orderby-item> ]* - * <limit-clause> = <limit> <expr> - * - * -- combined clauses -- - * <combined-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+ - */ - - protected def genDataSourceNamesParser(names: Seq[String]): Parser[String] = { - names.reverse.map { - fn => s"""(?i)`${fn}`|${fn}""".r: Parser[String] - }.reduce(_ | _) - } - protected def genFunctionNamesParser(names: Seq[String]): Parser[String] = { - names.reverse.map { - fn => s"""(?i)${fn}""".r: Parser[String] - }.reduce(_ | _) - } - - object Literal { - val NULL: Parser[String] = """(?i)null""".r - val NAN: Parser[String] = """(?i)nan""".r - } - import Literal._ - - object Operator { - val MATH_UNARY: Parser[String] = "+" | "-" - val MATH_BINARIES: Seq[Parser[String]] = Seq(("*" | "/" | "%"), ("+" | "-")) - - val NOT: Parser[String] = """(?i)not\s""".r | "!" - val AND: Parser[String] = """(?i)and\s""".r | "&&" - val OR: Parser[String] = """(?i)or\s""".r | "||" - val IN: Parser[String] = """(?i)in\s""".r - val BETWEEN: Parser[String] = """(?i)between\s""".r - val AND_ONLY: Parser[String] = """(?i)and\s""".r - val IS: Parser[String] = """(?i)is\s""".r - val LIKE: Parser[String] = """(?i)like\s""".r - val COMPARE: Parser[String] = "=" | "!=" | "<>" | "<=" | ">=" | "<" | ">" - val LOGICAL_UNARY: Parser[String] = NOT - val LOGICAL_BINARIES: Seq[Parser[String]] = Seq((COMPARE), (AND), (OR)) - - val LSQBR: Parser[String] = "[" - val RSQBR: Parser[String] = "]" - val LBR: Parser[String] = "(" - val RBR: Parser[String] = ")" - - val DOT: Parser[String] = "." - val ALLSL: Parser[String] = "*" - val SQUOTE: Parser[String] = "'" - val DQUOTE: Parser[String] = "\"" - val UQUOTE: Parser[String] = "`" - val COMMA: Parser[String] = "," - - val SELECT: Parser[String] = """(?i)select\s""".r - val DISTINCT: Parser[String] = """(?i)distinct""".r -// val ALL: Parser[String] = """(?i)all""".r - val FROM: Parser[String] = """(?i)from\s""".r - val AS: Parser[String] = """(?i)as\s""".r - val WHERE: Parser[String] = """(?i)where\s""".r - val GROUP: Parser[String] = """(?i)group\s""".r - val ORDER: Parser[String] = """(?i)order\s""".r - val SORT: Parser[String] = """(?i)sort\s""".r - val BY: Parser[String] = """(?i)by\s""".r - val DESC: Parser[String] = """(?i)desc""".r - val ASC: Parser[String] = """(?i)asc""".r - val HAVING: Parser[String] = """(?i)having\s""".r - val LIMIT: Parser[String] = """(?i)limit\s""".r - } - import Operator._ - - object Strings { - def AnyString: Parser[String] = """"(?:\"|[^\"])*"""".r | """'(?:\'|[^'])*'""".r - def SimpleTableFieldName: Parser[String] = """[a-zA-Z_]\w*""".r - def UnQuoteTableFieldName: Parser[String] = """`(?:[\\][`]|[^`])*`""".r -// def FieldName: Parser[String] = UnQuoteTableFieldName | SimpleTableFieldName - def DataSourceName: Parser[String] = genDataSourceNamesParser(dataSourceNames) - def FunctionName: Parser[String] = genFunctionNamesParser(functionNames) - - def IntegerNumber: Parser[String] = """[+\-]?\d+""".r - def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r - def IndexNumber: Parser[String] = IntegerNumber - - def TimeString: Parser[String] = """([+\-]?\d+)(d|h|m|s|ms)""".r - def BooleanString: Parser[String] = """(?i)true|false""".r - } - import Strings._ - - /** - * -- literal -- - * <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean> | <literal-null> | <literal-nan> - * <literal-string> ::= <any-string> - * <literal-number> ::= <integer> | <double> - * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms") - * <literal-boolean> ::= true | false - * <literal-null> ::= null - * <literal-nan> ::= nan - */ - def literal: Parser[LiteralExpr] = literalNull | literalNan | literalBoolean | literalString | literalTime | literalNumber - def literalNull: Parser[LiteralNullExpr] = NULL ^^ { LiteralNullExpr(_) } - def literalNan: Parser[LiteralNanExpr] = NAN ^^ { LiteralNanExpr(_) } - def literalString: Parser[LiteralStringExpr] = AnyString ^^ { LiteralStringExpr(_) } - def literalNumber: Parser[LiteralNumberExpr] = (DoubleNumber | IntegerNumber) ^^ { LiteralNumberExpr(_) } - def literalTime: Parser[LiteralTimeExpr] = TimeString ^^ { LiteralTimeExpr(_) } - def literalBoolean: Parser[LiteralBooleanExpr] = BooleanString ^^ { LiteralBooleanExpr(_) } - - /** - * -- selection -- - * <selection> ::= <selection-head> [ <field-sel> | <index-sel> | <function-sel> ]* [<as-alias>]? - * <selection-head> ::= ("data source name registered") | <function> | <field-name> | <all-selection> - * <field-sel> ::= "." <field-name> | "[" <quote-field-name> "]" - * <index-sel> ::= "[" <arg> "]" - * <function-sel> ::= "." <function-name> "(" [<arg>]? [, <arg>]* ")" - * <arg> ::= <math-expr> - */ - - def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ~ opt(asAlias) ^^ { - case head ~ sels ~ aliasOpt => SelectionExpr(head, sels, aliasOpt) - } - def selectionHead: Parser[HeadExpr] = DataSourceName ^^ { - ds => DataSourceHeadExpr(trim(ds)) - } | function ^^ { - OtherHeadExpr(_) - } | SimpleTableFieldName ^^ { - FieldNameHeadExpr(_) - } | UnQuoteTableFieldName ^^ { s => - FieldNameHeadExpr(trim(s)) - } | ALLSL ^^ { _ => - AllSelectHeadExpr() - } - def selector: Parser[SelectExpr] = functionSelect | allFieldsSelect | fieldSelect | indexSelect - def allFieldsSelect: Parser[AllFieldsSelectExpr] = DOT ~> ALLSL ^^ { _ => AllFieldsSelectExpr() } - def fieldSelect: Parser[FieldSelectExpr] = DOT ~> ( - SimpleTableFieldName ^^ { - FieldSelectExpr(_) - } | UnQuoteTableFieldName ^^ { s => - FieldSelectExpr(trim(s)) - }) - def indexSelect: Parser[IndexSelectExpr] = LSQBR ~> argument <~ RSQBR ^^ { IndexSelectExpr(_) } - def functionSelect: Parser[FunctionSelectExpr] = DOT ~ FunctionName ~ LBR ~ repsep(argument, COMMA) ~ RBR ^^ { - case _ ~ name ~ _ ~ args ~ _ => FunctionSelectExpr(name, args) - } - - /** - * -- as alias -- - * <as-alias> ::= <as> <field-name> - */ - def asAlias: Parser[String] = AS ~> (SimpleTableFieldName | UnQuoteTableFieldName ^^ { trim(_) }) - - /** - * -- math expr -- - * <math-factor> ::= <literal> | <function> | <selection> | "(" <math-expr> ")" [<as-alias>]? - * <unary-math-expr> ::= [<unary-opr>]* <math-factor> - * <binary-math-expr> ::= <unary-math-expr> [<binary-opr> <unary-math-expr>]+ - * <math-expr> ::= <binary-math-expr> - */ - - def mathFactor: Parser[MathExpr] = (literal | function | selection) ^^ { - MathFactorExpr(_, false, None) - } | LBR ~ mathExpression ~ RBR ~ opt(asAlias) ^^ { - case _ ~ expr ~ _ ~ aliasOpt => MathFactorExpr(expr, true, aliasOpt) - } - def unaryMathExpression: Parser[MathExpr] = rep(MATH_UNARY) ~ mathFactor ^^ { - case Nil ~ a => a - case list ~ a => UnaryMathExpr(list, a) - } - def binaryMathExpressions: Seq[Parser[MathExpr]] = - MATH_BINARIES.foldLeft(List[Parser[MathExpr]](unaryMathExpression)) { (parsers, binaryParser) => - val pre = parsers.head - val cur = pre ~ rep(binaryParser ~ pre) ^^ { - case a ~ Nil => a - case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2))) - } - cur :: parsers - } - def mathExpression: Parser[MathExpr] = binaryMathExpressions.head - - /** - * -- logical expr -- - * <in-expr> ::= <math-expr> [<not>]? <in> <range-expr> - * <between-expr> ::= <math-expr> [<not>]? <between> (<math-expr> <and> <math-expr> | <range-expr>) - * <range-expr> ::= "(" [<math-expr>]? [, <math-expr>]+ ")" - * <like-expr> ::= <math-expr> [<not>]? <like> <math-expr> - * <is-null-expr> ::= <math-expr> <is> [<not>]? <null> - * <is-nan-expr> ::= <math-expr> <is> [<not>]? <nan> - * - * <logical-factor> ::= <math-expr> | <in-expr> | <between-expr> | <like-expr> | <is-null-expr> | <is-nan-expr> | "(" <logical-expr> ")" [<as-alias>]? - * <unary-logical-expr> ::= [<unary-logical-opr>]* <logical-factor> - * <binary-logical-expr> ::= <unary-logical-expr> [<binary-logical-opr> <unary-logical-expr>]+ - * <logical-expr> ::= <binary-logical-expr> - */ - - def inExpr: Parser[LogicalExpr] = mathExpression ~ opt(NOT) ~ IN ~ LBR ~ repsep(mathExpression, COMMA) ~ RBR ^^ { - case head ~ notOpt ~ _ ~ _ ~ list ~ _ => InExpr(head, notOpt.isEmpty, list) - } - def betweenExpr: Parser[LogicalExpr] = mathExpression ~ opt(NOT) ~ BETWEEN ~ LBR ~ repsep(mathExpression, COMMA) ~ RBR ^^ { - case head ~ notOpt ~ _ ~ _ ~ list ~ _ => BetweenExpr(head, notOpt.isEmpty, list) - } | mathExpression ~ opt(NOT) ~ BETWEEN ~ mathExpression ~ AND_ONLY ~ mathExpression ^^ { - case head ~ notOpt ~ _ ~ first ~ _ ~ second => BetweenExpr(head, notOpt.isEmpty, Seq(first, second)) - } - def likeExpr: Parser[LogicalExpr] = mathExpression ~ opt(NOT) ~ LIKE ~ mathExpression ^^ { - case head ~ notOpt ~ _ ~ value => LikeExpr(head, notOpt.isEmpty, value) - } - def isNullExpr: Parser[LogicalExpr] = mathExpression ~ IS ~ opt(NOT) ~ NULL ^^ { - case head ~ _ ~ notOpt ~ _ => IsNullExpr(head, notOpt.isEmpty) - } - def isNanExpr: Parser[LogicalExpr] = mathExpression ~ IS ~ opt(NOT) ~ NAN ^^ { - case head ~ _ ~ notOpt ~ _ => IsNanExpr(head, notOpt.isEmpty) - } - - def logicalFactor: Parser[LogicalExpr] = (inExpr | betweenExpr | likeExpr | isNullExpr | isNanExpr | mathExpression) ^^ { - LogicalFactorExpr(_, false, None) - } | LBR ~ logicalExpression ~ RBR ~ opt(asAlias) ^^ { - case _ ~ expr ~ _ ~ aliasOpt => LogicalFactorExpr(expr, true, aliasOpt) - } - def unaryLogicalExpression: Parser[LogicalExpr] = rep(LOGICAL_UNARY) ~ logicalFactor ^^ { - case Nil ~ a => a - case list ~ a => UnaryLogicalExpr(list, a) - } - def binaryLogicalExpressions: Seq[Parser[LogicalExpr]] = - LOGICAL_BINARIES.foldLeft(List[Parser[LogicalExpr]](unaryLogicalExpression)) { (parsers, binaryParser) => - val pre = parsers.head - val cur = pre ~ rep(binaryParser ~ pre) ^^ { - case a ~ Nil => a - case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2))) - } - cur :: parsers - } - def logicalExpression: Parser[LogicalExpr] = binaryLogicalExpressions.head - - /** - * -- expression -- - * <expr> = <math-expr> | <logical-expr> - */ - - def expression: Parser[Expr] = logicalExpression | mathExpression - - /** - * -- function expr -- - * <function> ::= <function-name> "(" [<arg>] [, <arg>]+ ")" [<as-alias>]? - * <function-name> ::= ("function name registered") - * <arg> ::= <expr> - */ - - def function: Parser[FunctionExpr] = FunctionName ~ LBR ~ opt(DISTINCT) ~ repsep(argument, COMMA) ~ RBR ~ opt(asAlias) ^^ { - case name ~ _ ~ extraCdtnOpt ~ args ~ _ ~ aliasOpt => - FunctionExpr(name, args, extraCdtnOpt.map(ExtraConditionExpr(_)), aliasOpt) - } - def argument: Parser[Expr] = expression - - /** - * -- clauses -- - * <select-clause> = <expr> [, <expr>]* - * <where-clause> = <where> <expr> - * <from-clause> = <from> ("data source name registered") - * <having-clause> = <having> <expr> - * <groupby-clause> = <group> <by> <expr> [ <having-clause> ]? - * <orderby-item> = <expr> [ <DESC> ]? - * <orderby-clause> = <order> <by> <orderby-item> [ , <orderby-item> ]* - * <limit-clause> = <limit> <expr> - */ - - def selectClause: Parser[SelectClause] = opt(SELECT) ~> opt(DISTINCT) ~ rep1sep(expression, COMMA) ^^ { - case extraCdtnOpt ~ exprs => SelectClause(exprs, extraCdtnOpt.map(ExtraConditionExpr(_))) - } - def fromClause: Parser[FromClause] = FROM ~> DataSourceName ^^ { ds => FromClause(trim(ds)) } - def whereClause: Parser[WhereClause] = WHERE ~> expression ^^ { WhereClause(_) } - def havingClause: Parser[Expr] = HAVING ~> expression - def groupbyClause: Parser[GroupbyClause] = GROUP ~ BY ~ rep1sep(expression, COMMA) ~ opt(havingClause) ^^ { - case _ ~ _ ~ cols ~ havingOpt => GroupbyClause(cols, havingOpt) - } - def orderItem: Parser[OrderItem] = expression ~ opt(DESC | ASC) ^^ { - case expr ~ orderOpt => OrderItem(expr, orderOpt) - } - def orderbyClause: Parser[OrderbyClause] = ORDER ~ BY ~ rep1sep(orderItem, COMMA) ^^ { - case _ ~ _ ~ cols => OrderbyClause(cols) - } - def sortbyClause: Parser[SortbyClause] = SORT ~ BY ~ rep1sep(orderItem, COMMA) ^^ { - case _ ~ _ ~ cols => SortbyClause(cols) - } - def limitClause: Parser[LimitClause] = LIMIT ~> expression ^^ { LimitClause(_) } - - /** - * -- combined clauses -- - * <combined-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+ - */ - - def combinedClause: Parser[CombinedClause] = selectClause ~ opt(fromClause) ~ opt(whereClause) ~ - opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ { - case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => { - val tails = Seq(whereOpt, groupbyOpt, orderbyOpt, limitOpt).flatMap(opt => opt) - CombinedClause(sel, fromOpt, tails) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala deleted file mode 100644 index b4496e7..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala +++ /dev/null @@ -1,94 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.dsl.parser - -import org.apache.griffin.measure.rule.dsl._ -import org.apache.griffin.measure.rule.dsl.expr._ - -case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[String] - ) extends BasicParser { - - import Operator._ - - /** - * -- profiling clauses -- - * <profiling-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+ - */ - - def profilingClause: Parser[ProfilingClause] = selectClause ~ opt(fromClause) ~ opt(whereClause) ~ - opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ { - case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => { - val preClauses = Seq(whereOpt).flatMap(opt => opt) - val postClauses = Seq(orderbyOpt, limitOpt).flatMap(opt => opt) - ProfilingClause(sel, fromOpt, groupbyOpt, preClauses, postClauses) - } - } - - /** - * -- uniqueness clauses -- - * <uniqueness-clauses> = <expr> [, <expr>]+ - */ - def uniquenessClause: Parser[UniquenessClause] = rep1sep(expression, Operator.COMMA) ^^ { - case exprs => UniquenessClause(exprs) - } - - /** - * -- distinctness clauses -- - * <sqbr-expr> = "[" <expr> "]" - * <dist-expr> = <sqbr-expr> | <expr> - * <distinctness-clauses> = <distExpr> [, <distExpr>]+ - */ - def sqbrExpr: Parser[Expr] = LSQBR ~> expression <~ RSQBR ^^ { - case expr => { expr.tag = "[]"; expr} - } - def distExpr: Parser[Expr] = expression | sqbrExpr - def distinctnessClause: Parser[DistinctnessClause] = rep1sep(distExpr, Operator.COMMA) ^^ { - case exprs => DistinctnessClause(exprs) - } - - /** - * -- timeliness clauses -- - * <timeliness-clauses> = <expr> [, <expr>]+ - */ - def timelinessClause: Parser[TimelinessClause] = rep1sep(expression, Operator.COMMA) ^^ { - case exprs => TimelinessClause(exprs) - } - - /** - * -- completeness clauses -- - * <completeness-clauses> = <expr> [, <expr>]+ - */ - def completenessClause: Parser[CompletenessClause] = rep1sep(expression, Operator.COMMA) ^^ { - case exprs => CompletenessClause(exprs) - } - - def parseRule(rule: String, dqType: DqType): ParseResult[Expr] = { - val rootExpr = dqType match { - case AccuracyType => logicalExpression - case ProfilingType => profilingClause - case UniquenessType => uniquenessClause - case DistinctnessType => distinctnessClause - case TimelinessType => timelinessClause - case CompletenessType => completenessClause - case _ => expression - } - parseAll(rootExpr, rule) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala deleted file mode 100644 index f0afc6c..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DfOprStep.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.plan - -import org.apache.griffin.measure.rule.dsl._ - -case class DfOprStep(name: String, - rule: String, - details: Map[String, Any], - cache: Boolean = false, - global: Boolean = false - ) extends RuleStep { - - val dslType: DslType = DfOprType - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala deleted file mode 100644 index 4956b29..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/DsUpdate.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.plan - -case class DsUpdate(dsName: String, - stepName: String - ) extends Serializable { -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala deleted file mode 100644 index 84313e4..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/MetricExport.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.plan - -import org.apache.griffin.measure.process.ExportMode -import org.apache.griffin.measure.rule.dsl._ - -case class MetricExport(name: String, - stepName: String, - collectType: CollectType, - defTimestamp: Long, - mode: ExportMode - ) extends RuleExport { - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala deleted file mode 100644 index c69dc55..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RecordExport.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.plan - -import org.apache.griffin.measure.process.ExportMode - -case class RecordExport(name: String, - stepName: String, - dataSourceCacheOpt: Option[String], - originDFOpt: Option[String], - defTimestamp: Long, - mode: ExportMode - ) extends RuleExport { - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala deleted file mode 100644 index da5eb9d..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleExport.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.plan - -import org.apache.griffin.measure.process.ExportMode - -trait RuleExport extends Serializable { - - val name: String // export name - - val stepName: String // the dependant step name - - val defTimestamp: Long // the default timestamp if tmst not in value - - val mode: ExportMode // export mode - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala deleted file mode 100644 index 678ab3e..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RulePlan.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.plan - -import scala.reflect.ClassTag - -case class RulePlan(ruleSteps: Seq[RuleStep], - ruleExports: Seq[RuleExport], - dsUpdates: Seq[DsUpdate] = Nil - ) extends Serializable { - - val globalRuleSteps = filterRuleSteps(_.global) - val normalRuleSteps = filterRuleSteps(!_.global) - - val metricExports = filterRuleExports[MetricExport](ruleExports) - val recordExports = filterRuleExports[RecordExport](ruleExports) - - private def filterRuleSteps(func: (RuleStep) => Boolean): Seq[RuleStep] = { - ruleSteps.filter(func) - } - - private def filterRuleExports[T <: RuleExport: ClassTag](exports: Seq[RuleExport]): Seq[T] = { - exports.flatMap { exp => - exp match { - case e: T => Some(e) - case _ => None - } - } - } - -// def ruleStepNames(func: (RuleStep) => Boolean): Seq[String] = { -// ruleSteps.filter(func).map(_.name) -// } - - def merge(rp: RulePlan): RulePlan = { - RulePlan( - this.ruleSteps ++ rp.ruleSteps, - this.ruleExports ++ rp.ruleExports, - this.dsUpdates ++ rp.dsUpdates - ) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala deleted file mode 100644 index dbdb2d5..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/RuleStep.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.plan - -import org.apache.griffin.measure.rule.dsl.DslType - -trait RuleStep extends Serializable { - - val dslType: DslType - - val name: String - - val rule: String - - val cache: Boolean - - val global: Boolean - - val details: Map[String, Any] - - def needCache: Boolean = cache || global - - def isGlobal: Boolean = global -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala deleted file mode 100644 index 16da9a5..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/SparkSqlStep.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.plan - -import org.apache.griffin.measure.rule.dsl._ - -case class SparkSqlStep(name: String, - rule: String, - details: Map[String, Any], - cache: Boolean = false, - global: Boolean = false - ) extends RuleStep { - - val dslType: DslType = SparkSqlType - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala deleted file mode 100644 index 129d068..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/plan/TimeInfo.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.plan - -trait TimeInfo extends Serializable { - val calcTime: Long -// val tmst: Long - val head: String - - def key: String = if (head.nonEmpty) s"${head}_${calcTime}" else s"${calcTime}" - def setHead(h: String): TimeInfo -} - -case class CalcTimeInfo(calcTime: Long, head: String = "") extends TimeInfo { -// val tmst: Long = calcTime - def setHead(h: String): TimeInfo = CalcTimeInfo(calcTime, h) -} - -//case class TmstTimeInfo(calcTime: Long, tmst: Long, head: String = "") extends TimeInfo { -// def setHead(h: String): TimeInfo = TmstTimeInfo(calcTime, tmst, h) -//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala deleted file mode 100644 index 22d64d8..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/preproc/PreProcRuleGenerator.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.preproc - -object PreProcRuleGenerator { - - val _name = "name" - - def genPreProcRules(rules: Seq[Map[String, Any]], suffix: String): Seq[Map[String, Any]] = { - if (rules == null) Nil else { - rules.map { rule => - genPreProcRule(rule, suffix) - } - } - } - - def getRuleNames(rules: Seq[Map[String, Any]]): Seq[String] = { - if (rules == null) Nil else { - rules.flatMap { rule => - rule.get(_name) match { - case Some(s: String) => Some(s) - case _ => None - } - } - } - } - - private def genPreProcRule(param: Map[String, Any], suffix: String - ): Map[String, Any] = { - val keys = param.keys - keys.foldLeft(param) { (map, key) => - map.get(key) match { - case Some(s: String) => map + (key -> genNewString(s, suffix)) - case Some(subMap: Map[String, Any]) => map + (key -> genPreProcRule(subMap, suffix)) - case Some(arr: Seq[_]) => map + (key -> genPreProcRule(arr, suffix)) - case _ => map - } - } - } - - private def genPreProcRule(paramArr: Seq[Any], suffix: String): Seq[Any] = { - paramArr.foldLeft(Nil: Seq[Any]) { (res, param) => - param match { - case s: String => res :+ genNewString(s, suffix) - case map: Map[String, Any] => res :+ genPreProcRule(map, suffix) - case arr: Seq[_] => res :+ genPreProcRule(arr, suffix) - case _ => res :+ param - } - } - } - - private def genNewString(str: String, suffix: String): String = { - str.replaceAll("""\$\{(.*)\}""", s"$$1_${suffix}") - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala deleted file mode 100644 index ec746d2..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/AccuracyRulePlanTrans.scala +++ /dev/null @@ -1,200 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.trans - -import org.apache.griffin.measure.process.engine.DataFrameOprs.AccuracyOprKeys -import org.apache.griffin.measure.process.temp.TableRegisters -import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, ProcessType, StreamingProcessType} -import org.apache.griffin.measure.rule.adaptor._ -import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._ -import org.apache.griffin.measure.rule.dsl.analyzer.AccuracyAnalyzer -import org.apache.griffin.measure.rule.dsl.expr.{Expr, LogicalExpr} -import org.apache.griffin.measure.rule.plan._ -import org.apache.griffin.measure.utils.ParamUtil._ -import org.apache.griffin.measure.rule.trans.RuleExportFactory._ -import org.apache.griffin.measure.rule.trans.DsUpdateFactory._ - -import scala.util.Try - -case class AccuracyRulePlanTrans(dataSourceNames: Seq[String], - timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], procType: ProcessType - ) extends RulePlanTrans { - - private object AccuracyKeys { - val _source = "source" - val _target = "target" - val _miss = "miss" - val _total = "total" - val _matched = "matched" - } - import AccuracyKeys._ - - def trans(): Try[RulePlan] = Try { - val details = getDetails(param) - val sourceName = details.getString(_source, dataSourceNames.head) - val targetName = details.getString(_target, dataSourceNames.tail.head) - val analyzer = AccuracyAnalyzer(expr.asInstanceOf[LogicalExpr], sourceName, targetName) - - val mode = ExportMode.defaultMode(procType) - - val ct = timeInfo.calcTime - - if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { - println(s"[${ct}] data source ${sourceName} not exists") - emptyRulePlan - } else { - // 1. miss record - val missRecordsTableName = "__missRecords" - val selClause = s"`${sourceName}`.*" - val missRecordsSql = if (!TableRegisters.existRunTempTable(timeInfo.key, targetName)) { - println(s"[${ct}] data source ${targetName} not exists") - s"SELECT ${selClause} FROM `${sourceName}`" - } else { - val onClause = expr.coalesceDesc - val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => - s"${sel.desc} IS NULL" - }.mkString(" AND ") - val targetIsNull = analyzer.targetSelectionExprs.map { sel => - s"${sel.desc} IS NULL" - }.mkString(" AND ") - val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" - s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" - } - val missRecordsStep = SparkSqlStep(missRecordsTableName, missRecordsSql, emptyMap, true) - val missRecordsExports = procType match { - case BatchProcessType => { - val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - genRecordExport(recordParam, missRecordsTableName, missRecordsTableName, ct, mode) :: Nil - } - case StreamingProcessType => Nil - } - val missRecordsUpdates = procType match { - case BatchProcessType => Nil - case StreamingProcessType => { - val updateParam = emptyMap - genDsUpdate(updateParam, sourceName, missRecordsTableName) :: Nil - } - } - - // 2. miss count - val missCountTableName = "__missCount" - val missColName = details.getStringOrKey(_miss) - val missCountSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`" - case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${InternalColumns.tmst}`" - } - val missCountStep = SparkSqlStep(missCountTableName, missCountSql, emptyMap) - - // 3. total count - val totalCountTableName = "__totalCount" - val totalColName = details.getStringOrKey(_total) - val totalCountSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" - case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${InternalColumns.tmst}`" - } - val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, emptyMap) - - // 4. accuracy metric - val accuracyTableName = name - val matchedColName = details.getStringOrKey(_matched) - val accuracyMetricSql = procType match { - case BatchProcessType => { - s""" - |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, - |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, - |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}` - |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` - """.stripMargin - } - case StreamingProcessType => { - s""" - |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, - |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, - |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, - |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}` - |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` - |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${missCountTableName}`.`${InternalColumns.tmst}` - """.stripMargin - } - } - val accuracyStep = SparkSqlStep(accuracyTableName, accuracyMetricSql, emptyMap) - val accuracyExports = procType match { - case BatchProcessType => { - val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - genMetricExport(metricParam, accuracyTableName, accuracyTableName, ct, mode) :: Nil - } - case StreamingProcessType => Nil - } - - // current accu plan - val accuSteps = missRecordsStep :: missCountStep :: totalCountStep :: accuracyStep :: Nil - val accuExports = missRecordsExports ++ accuracyExports - val accuUpdates = missRecordsUpdates - val accuPlan = RulePlan(accuSteps, accuExports, accuUpdates) - - // streaming extra accu plan - val streamingAccuPlan = procType match { - case BatchProcessType => emptyRulePlan - case StreamingProcessType => { - // 5. accuracy metric merge - val accuracyMetricTableName = "__accuracy" - val accuracyMetricRule = "accuracy" - val accuracyMetricDetails = Map[String, Any]( - (AccuracyOprKeys._dfName -> accuracyTableName), - (AccuracyOprKeys._miss -> missColName), - (AccuracyOprKeys._total -> totalColName), - (AccuracyOprKeys._matched -> matchedColName) - ) - val accuracyMetricStep = DfOprStep(accuracyMetricTableName, - accuracyMetricRule, accuracyMetricDetails) - val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - val accuracyMetricExports = genMetricExport(metricParam, name, accuracyMetricTableName, ct, mode) :: Nil - - // 6. collect accuracy records - val accuracyRecordTableName = "__accuracyRecords" - val accuracyRecordSql = { - s""" - |SELECT `${InternalColumns.tmst}`, `${InternalColumns.empty}` - |FROM `${accuracyMetricTableName}` WHERE `${InternalColumns.record}` - """.stripMargin - } - val accuracyRecordStep = SparkSqlStep(accuracyRecordTableName, accuracyRecordSql, emptyMap) - val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val accuracyRecordParam = recordParam.addIfNotExist(ExportParamKeys._dataSourceCache, sourceName) - .addIfNotExist(ExportParamKeys._originDF, missRecordsTableName) - val accuracyRecordExports = genRecordExport( - accuracyRecordParam, missRecordsTableName, accuracyRecordTableName, ct, mode) :: Nil - - // gen accu plan - val extraSteps = accuracyMetricStep :: accuracyRecordStep :: Nil - val extraExports = accuracyMetricExports ++ accuracyRecordExports - val extraPlan = RulePlan(extraSteps, extraExports) - - extraPlan - } - } - - // return accu plan - accuPlan.merge(streamingAccuPlan) - - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala deleted file mode 100644 index 1b89587..0000000 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/trans/CompletenessRulePlanTrans.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ -package org.apache.griffin.measure.rule.trans - -import org.apache.griffin.measure.process.temp.TableRegisters -import org.apache.griffin.measure.process.{BatchProcessType, ExportMode, ProcessType, StreamingProcessType} -import org.apache.griffin.measure.rule.adaptor.RuleParamKeys._ -import org.apache.griffin.measure.rule.adaptor._ -import org.apache.griffin.measure.rule.dsl.analyzer.CompletenessAnalyzer -import org.apache.griffin.measure.rule.dsl.expr._ -import org.apache.griffin.measure.rule.plan._ -import org.apache.griffin.measure.rule.trans.RuleExportFactory._ -import org.apache.griffin.measure.utils.ParamUtil._ - -import scala.util.Try - -case class CompletenessRulePlanTrans(dataSourceNames: Seq[String], - timeInfo: TimeInfo, name: String, expr: Expr, - param: Map[String, Any], procType: ProcessType - ) extends RulePlanTrans { - - private object CompletenessKeys { - val _source = "source" - val _total = "total" - val _complete = "complete" - val _incomplete = "incomplete" - } - import CompletenessKeys._ - - def trans(): Try[RulePlan] = Try { - val details = getDetails(param) - val completenessClause = expr.asInstanceOf[CompletenessClause] - val sourceName = details.getString(_source, dataSourceNames.head) - - val mode = ExportMode.defaultMode(procType) - - val ct = timeInfo.calcTime - - if (!TableRegisters.existRunTempTable(timeInfo.key, sourceName)) { - emptyRulePlan - } else { - val analyzer = CompletenessAnalyzer(completenessClause, sourceName) - - val selItemsClause = analyzer.selectionPairs.map { pair => - val (expr, alias) = pair - s"${expr.desc} AS `${alias}`" - }.mkString(", ") - val aliases = analyzer.selectionPairs.map(_._2) - - val selClause = procType match { - case BatchProcessType => selItemsClause - case StreamingProcessType => s"`${InternalColumns.tmst}`, ${selItemsClause}" - } - val selAliases = procType match { - case BatchProcessType => aliases - case StreamingProcessType => InternalColumns.tmst +: aliases - } - - // 1. source alias - val sourceAliasTableName = "__sourceAlias" - val sourceAliasSql = { - s"SELECT ${selClause} FROM `${sourceName}`" - } - val sourceAliasStep = SparkSqlStep(sourceAliasTableName, sourceAliasSql, emptyMap, true) - - // 2. incomplete record - val incompleteRecordsTableName = "__incompleteRecords" - val completeWhereClause = aliases.map(a => s"`${a}` IS NOT NULL").mkString(" AND ") - val incompleteWhereClause = s"NOT (${completeWhereClause})" - val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}" - val incompleteRecordStep = SparkSqlStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true) - val recordParam = RuleParamKeys.getRecordOpt(param).getOrElse(emptyMap) - val incompleteRecordExport = genRecordExport(recordParam, incompleteRecordsTableName, incompleteRecordsTableName, ct, mode) - - // 3. incomplete count - val incompleteCountTableName = "__incompleteCount" - val incompleteColName = details.getStringOrKey(_incomplete) - val incompleteCountSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}`" - case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}` GROUP BY `${InternalColumns.tmst}`" - } - val incompleteCountStep = SparkSqlStep(incompleteCountTableName, incompleteCountSql, emptyMap) - - // 4. total count - val totalCountTableName = "__totalCount" - val totalColName = details.getStringOrKey(_total) - val totalCountSql = procType match { - case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`" - case StreamingProcessType => s"SELECT `${InternalColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}` GROUP BY `${InternalColumns.tmst}`" - } - val totalCountStep = SparkSqlStep(totalCountTableName, totalCountSql, emptyMap) - - // 5. complete metric - val completeTableName = name - val completeColName = details.getStringOrKey(_complete) - val completeMetricSql = procType match { - case BatchProcessType => { - s""" - |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, - |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`, - |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}` - |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}` - """.stripMargin - } - case StreamingProcessType => { - s""" - |SELECT `${totalCountTableName}`.`${InternalColumns.tmst}` AS `${InternalColumns.tmst}`, - |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, - |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`, - |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}` - |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}` - |ON `${totalCountTableName}`.`${InternalColumns.tmst}` = `${incompleteCountTableName}`.`${InternalColumns.tmst}` - """.stripMargin - } - } - val completeStep = SparkSqlStep(completeTableName, completeMetricSql, emptyMap) - val metricParam = RuleParamKeys.getMetricOpt(param).getOrElse(emptyMap) - val completeExport = genMetricExport(metricParam, completeTableName, completeTableName, ct, mode) - - // complete plan - val completeSteps = sourceAliasStep :: incompleteRecordStep :: incompleteCountStep :: totalCountStep :: completeStep :: Nil - val completeExports = incompleteRecordExport :: completeExport :: Nil - val completePlan = RulePlan(completeSteps, completeExports) - - completePlan - } - } - -}
