http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala new file mode 100644 index 0000000..17e678e --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ClauseExpression.scala @@ -0,0 +1,270 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.expr + +trait ClauseExpression extends Expr { +} + +case class SelectClause(exprs: Seq[Expr], extraConditionOpt: Option[ExtraConditionExpr] + ) extends ClauseExpression { + + addChildren(exprs) + + def desc: String = { + extraConditionOpt match { + case Some(cdtn) => s"${cdtn.desc} ${exprs.map(_.desc).mkString(", ")}" + case _ => s"${exprs.map(_.desc).mkString(", ")}" + } + } + def coalesceDesc: String = desc + + override def map(func: (Expr) => Expr): SelectClause = { + SelectClause(exprs.map(func(_)), extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr])) + } + +} + +case class FromClause(dataSource: String) extends ClauseExpression { + + def desc: String = s"FROM `${dataSource}`" + def coalesceDesc: String = desc + +} + +case class WhereClause(expr: Expr) extends ClauseExpression { + + addChild(expr) + + def desc: String = s"WHERE ${expr.desc}" + def coalesceDesc: String = s"WHERE ${expr.coalesceDesc}" + + override def map(func: (Expr) => Expr): WhereClause = { + WhereClause(func(expr)) + } + +} + +case class GroupbyClause(exprs: Seq[Expr], havingClauseOpt: Option[Expr]) extends ClauseExpression { + + addChildren(exprs ++ havingClauseOpt.toSeq) + + def desc: String = { + val gbs = exprs.map(_.desc).mkString(", ") + havingClauseOpt match { + case Some(having) => s"GROUP BY ${gbs} HAVING ${having.desc}" + case _ => s"GROUP BY ${gbs}" + } + } + def coalesceDesc: String = { + val gbs = exprs.map(_.desc).mkString(", ") + havingClauseOpt match { + case Some(having) => s"GROUP BY ${gbs} HAVING ${having.coalesceDesc}" + case _ => s"GROUP BY ${gbs}" + } + } + + def merge(other: GroupbyClause): GroupbyClause = { + val newHavingClauseOpt = (havingClauseOpt, other.havingClauseOpt) match { + case (Some(hc), Some(ohc)) => { + val logical1 = LogicalFactorExpr(hc, false, None) + val logical2 = LogicalFactorExpr(ohc, false, None) + Some(BinaryLogicalExpr(logical1, ("AND", logical2) :: Nil)) + } + case (a @ Some(_), _) => a + case (_, b @ Some(_)) => b + case (_, _) => None + } + GroupbyClause(exprs ++ other.exprs, newHavingClauseOpt) + } + + override def map(func: (Expr) => Expr): GroupbyClause = { + GroupbyClause(exprs.map(func(_)), havingClauseOpt.map(func(_))) + } + +} + +case class OrderItem(expr: Expr, orderOpt: Option[String]) extends Expr { + addChild(expr) + def desc: String = { + orderOpt match { + case Some(os) => s"${expr.desc} ${os.toUpperCase}" + case _ => s"${expr.desc}" + } + } + def coalesceDesc: String = desc + + override def map(func: (Expr) => Expr): OrderItem = { + OrderItem(func(expr), orderOpt) + } +} + +case class OrderbyClause(items: Seq[OrderItem]) extends ClauseExpression { + + addChildren(items.map(_.expr)) + + def desc: String = { + val obs = items.map(_.desc).mkString(", ") + s"ORDER BY ${obs}" + } + def coalesceDesc: String = { + val obs = items.map(_.desc).mkString(", ") + s"ORDER BY ${obs}" + } + + override def map(func: (Expr) => Expr): OrderbyClause = { + OrderbyClause(items.map(func(_).asInstanceOf[OrderItem])) + } +} + +case class SortbyClause(items: Seq[OrderItem]) extends ClauseExpression { + + addChildren(items.map(_.expr)) + + def desc: String = { + val obs = items.map(_.desc).mkString(", ") + s"SORT BY ${obs}" + } + def coalesceDesc: String = { + val obs = items.map(_.desc).mkString(", ") + s"SORT BY ${obs}" + } + + override def map(func: (Expr) => Expr): SortbyClause = { + SortbyClause(items.map(func(_).asInstanceOf[OrderItem])) + } +} + +case class LimitClause(expr: Expr) extends ClauseExpression { + + addChild(expr) + + def desc: String = s"LIMIT ${expr.desc}" + def coalesceDesc: String = s"LIMIT ${expr.coalesceDesc}" + + override def map(func: (Expr) => Expr): LimitClause = { + LimitClause(func(expr)) + } +} + +case class CombinedClause(selectClause: SelectClause, fromClauseOpt: Option[FromClause], + tails: Seq[ClauseExpression] + ) extends ClauseExpression { + + addChildren({ + val headClauses: Seq[ClauseExpression] = selectClause +: (fromClauseOpt.toSeq) + headClauses ++ tails + }) + + def desc: String = { + val selectDesc = s"SELECT ${selectClause.desc}" + val fromDesc = fromClauseOpt.map(_.desc).mkString(" ") + val headDesc = s"${selectDesc} ${fromDesc}" + tails.foldLeft(headDesc) { (head, tail) => + s"${head} ${tail.desc}" + } + } + def coalesceDesc: String = { + val selectDesc = s"SELECT ${selectClause.coalesceDesc}" + val fromDesc = fromClauseOpt.map(_.coalesceDesc).mkString(" ") + val headDesc = s"${selectDesc} ${fromDesc}" + tails.foldLeft(headDesc) { (head, tail) => + s"${head} ${tail.coalesceDesc}" + } + } + + override def map(func: (Expr) => Expr): CombinedClause = { + CombinedClause(func(selectClause).asInstanceOf[SelectClause], + fromClauseOpt.map(func(_).asInstanceOf[FromClause]), + tails.map(func(_).asInstanceOf[ClauseExpression]) + ) + } +} + +case class ProfilingClause(selectClause: SelectClause, + fromClauseOpt: Option[FromClause], + groupbyClauseOpt: Option[GroupbyClause], + preGroupbyClauses: Seq[ClauseExpression], + postGroupbyClauses: Seq[ClauseExpression] + ) extends ClauseExpression { + addChildren({ + val headClauses: Seq[ClauseExpression] = selectClause +: (fromClauseOpt.toSeq) + groupbyClauseOpt match { + case Some(gc) => (headClauses ++ preGroupbyClauses) ++ (gc +: postGroupbyClauses) + case _ => (headClauses ++ preGroupbyClauses) ++ postGroupbyClauses + } + }) + + def desc: String = { + val selectDesc = selectClause.desc + val fromDesc = fromClauseOpt.map(_.desc).mkString(" ") + val groupbyDesc = groupbyClauseOpt.map(_.desc).mkString(" ") + val preDesc = preGroupbyClauses.map(_.desc).mkString(" ") + val postDesc = postGroupbyClauses.map(_.desc).mkString(" ") + s"${selectDesc} ${fromDesc} ${preDesc} ${groupbyDesc} ${postDesc}" + } + def coalesceDesc: String = { + val selectDesc = selectClause.coalesceDesc + val fromDesc = fromClauseOpt.map(_.coalesceDesc).mkString(" ") + val groupbyDesc = groupbyClauseOpt.map(_.coalesceDesc).mkString(" ") + val preDesc = preGroupbyClauses.map(_.coalesceDesc).mkString(" ") + val postDesc = postGroupbyClauses.map(_.coalesceDesc).mkString(" ") + s"${selectDesc} ${fromDesc} ${preDesc} ${groupbyDesc} ${postDesc}" + } + + override def map(func: (Expr) => Expr): ProfilingClause = { + ProfilingClause(func(selectClause).asInstanceOf[SelectClause], + fromClauseOpt.map(func(_).asInstanceOf[FromClause]), + groupbyClauseOpt.map(func(_).asInstanceOf[GroupbyClause]), + preGroupbyClauses.map(func(_).asInstanceOf[ClauseExpression]), + postGroupbyClauses.map(func(_).asInstanceOf[ClauseExpression]) + ) + } +} + +case class UniquenessClause(exprs: Seq[Expr]) extends ClauseExpression { + addChildren(exprs) + + def desc: String = exprs.map(_.desc).mkString(", ") + def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ") + override def map(func: (Expr) => Expr): UniquenessClause = UniquenessClause(exprs.map(func(_))) +} + +case class DistinctnessClause(exprs: Seq[Expr]) extends ClauseExpression { + addChildren(exprs) + + def desc: String = exprs.map(_.desc).mkString(", ") + def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ") + override def map(func: (Expr) => Expr): DistinctnessClause = DistinctnessClause(exprs.map(func(_))) +} + +case class TimelinessClause(exprs: Seq[Expr]) extends ClauseExpression { + addChildren(exprs) + + def desc: String = exprs.map(_.desc).mkString(", ") + def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ") + override def map(func: (Expr) => Expr): TimelinessClause = TimelinessClause(exprs.map(func(_))) +} + +case class CompletenessClause(exprs: Seq[Expr]) extends ClauseExpression { + addChildren(exprs) + + def desc: String = exprs.map(_.desc).mkString(", ") + def coalesceDesc: String = exprs.map(_.coalesceDesc).mkString(", ") + override def map(func: (Expr) => Expr): CompletenessClause = CompletenessClause(exprs.map(func(_))) +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/Expr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/Expr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/Expr.scala new file mode 100644 index 0000000..3cf9f39 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/Expr.scala @@ -0,0 +1,35 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.expr + +/** + * expr parsed by griffin dsl + */ +trait Expr extends TreeNode with ExprTag with Serializable { + + def desc: String + + def coalesceDesc: String + + def extractSelf: Expr = this + + // execution + def map(func: (Expr) => Expr): Expr = func(this) + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExprTag.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExprTag.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExprTag.scala new file mode 100644 index 0000000..583067f --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExprTag.scala @@ -0,0 +1,23 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.expr + +trait ExprTag { this: Expr => + var tag: String = "" +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExtraConditionExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExtraConditionExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExtraConditionExpr.scala new file mode 100644 index 0000000..856e263 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/ExtraConditionExpr.scala @@ -0,0 +1,27 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.expr + +case class ExtraConditionExpr(cdtn: String) extends Expr { + + def desc: String = cdtn.toUpperCase + + def coalesceDesc: String = desc + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/FunctionExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/FunctionExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/FunctionExpr.scala new file mode 100644 index 0000000..d87b7f7 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/FunctionExpr.scala @@ -0,0 +1,45 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.expr + +case class FunctionExpr(functionName: String, args: Seq[Expr], + extraConditionOpt: Option[ExtraConditionExpr], + aliasOpt: Option[String] + ) extends Expr with AliasableExpr { + + addChildren(args) + + def desc: String = { + extraConditionOpt match { + case Some(cdtn) => s"${functionName}(${cdtn.desc} ${args.map(_.desc).mkString(", ")})" + case _ => s"${functionName}(${args.map(_.desc).mkString(", ")})" + } + } + def coalesceDesc: String = desc + def alias: Option[String] = { + if (aliasOpt.isEmpty) { + Some(functionName) + } else aliasOpt + } + + override def map(func: (Expr) => Expr): FunctionExpr = { + FunctionExpr(functionName, args.map(func(_)), + extraConditionOpt.map(func(_).asInstanceOf[ExtraConditionExpr]), aliasOpt) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LiteralExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LiteralExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LiteralExpr.scala new file mode 100644 index 0000000..afc2dd9 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LiteralExpr.scala @@ -0,0 +1,72 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.expr + +import org.apache.griffin.measure.utils.TimeUtil + +trait LiteralExpr extends Expr { + def coalesceDesc: String = desc +} + +case class LiteralNullExpr(str: String) extends LiteralExpr { + def desc: String = "NULL" +} + +case class LiteralNanExpr(str: String) extends LiteralExpr { + def desc: String = "NaN" +} + +case class LiteralStringExpr(str: String) extends LiteralExpr { + def desc: String = str +} + +case class LiteralNumberExpr(str: String) extends LiteralExpr { + def desc: String = { + try { + if (str.contains(".")) { + str.toDouble.toString + } else { + str.toLong.toString + } + } catch { + case e: Throwable => throw new Exception(s"${str} is invalid number") + } + } +} + +case class LiteralTimeExpr(str: String) extends LiteralExpr { + def desc: String = { + TimeUtil.milliseconds(str) match { + case Some(t) => t.toString + case _ => throw new Exception(s"${str} is invalid time") + } + } +} + +case class LiteralBooleanExpr(str: String) extends LiteralExpr { + final val TrueRegex = """(?i)true""".r + final val FalseRegex = """(?i)false""".r + def desc: String = { + str match { + case TrueRegex() => true.toString + case FalseRegex() => false.toString + case _ => throw new Exception(s"${str} is invalid boolean") + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala new file mode 100644 index 0000000..af1223f --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/LogicalExpr.scala @@ -0,0 +1,204 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.expr + +trait LogicalExpr extends Expr { +} + +case class InExpr(head: Expr, is: Boolean, range: Seq[Expr]) extends LogicalExpr { + + addChildren(head +: range) + + def desc: String = { + val notStr = if (is) "" else " NOT" + s"${head.desc}${notStr} IN (${range.map(_.desc).mkString(", ")})" + } + def coalesceDesc: String = { + val notStr = if (is) "" else " NOT" + s"${head.coalesceDesc}${notStr} IN (${range.map(_.coalesceDesc).mkString(", ")})" + } + + override def map(func: (Expr) => Expr): InExpr = { + InExpr(func(head), is, range.map(func(_))) + } +} + +case class BetweenExpr(head: Expr, is: Boolean, range: Seq[Expr]) extends LogicalExpr { + + range match { + case first :: second :: _ => addChildren(head :: first :: second :: Nil) + case _ => throw new Exception("between expression exception: range less than 2") + } + + def desc: String = { + val notStr = if (is) "" else " NOT" + val rangeStr = range match { + case first :: second :: _ => s"${first.desc} AND ${second.desc}" + case _ => throw new Exception("between expression exception: range less than 2") + } + s"${head.desc}${notStr} BETWEEN ${rangeStr}" + } + def coalesceDesc: String = { + val notStr = if (is) "" else " NOT" + val rangeStr = range match { + case first :: second :: _ => s"${first.coalesceDesc} AND ${second.coalesceDesc}" + case _ => throw new Exception("between expression exception: range less than 2") + } + s"${head.coalesceDesc}${notStr} BETWEEN ${rangeStr}" + } + + override def map(func: (Expr) => Expr): BetweenExpr = { + BetweenExpr(func(head), is, range.map(func(_))) + } +} + +case class LikeExpr(head: Expr, is: Boolean, value: Expr) extends LogicalExpr { + + addChildren(head :: value :: Nil) + + def desc: String = { + val notStr = if (is) "" else " NOT" + s"${head.desc}${notStr} LIKE ${value.desc}" + } + def coalesceDesc: String = { + val notStr = if (is) "" else " NOT" + s"${head.coalesceDesc}${notStr} LIKE ${value.coalesceDesc}" + } + + override def map(func: (Expr) => Expr): LikeExpr = { + LikeExpr(func(head), is, func(value)) + } +} + +case class IsNullExpr(head: Expr, is: Boolean) extends LogicalExpr { + + addChild(head) + + def desc: String = { + val notStr = if (is) "" else " NOT" + s"${head.desc} IS${notStr} NULL" + } + def coalesceDesc: String = desc + + override def map(func: (Expr) => Expr): IsNullExpr = { + IsNullExpr(func(head), is) + } +} + +case class IsNanExpr(head: Expr, is: Boolean) extends LogicalExpr { + + addChild(head) + + def desc: String = { + val notStr = if (is) "" else "NOT " + s"${notStr}isnan(${head.desc})" + } + def coalesceDesc: String = desc + + override def map(func: (Expr) => Expr): IsNanExpr = { + IsNanExpr(func(head), is) + } +} + +// ----------- + +case class LogicalFactorExpr(factor: Expr, withBracket: Boolean, aliasOpt: Option[String] + ) extends LogicalExpr with AliasableExpr { + + addChild(factor) + + def desc: String = if (withBracket) s"(${factor.desc})" else factor.desc + def coalesceDesc: String = factor.coalesceDesc + def alias: Option[String] = aliasOpt + override def extractSelf: Expr = { + if (aliasOpt.nonEmpty) this + else factor.extractSelf + } + + override def map(func: (Expr) => Expr): LogicalFactorExpr = { + LogicalFactorExpr(func(factor), withBracket, aliasOpt) + } +} + +case class UnaryLogicalExpr(oprs: Seq[String], factor: LogicalExpr) extends LogicalExpr { + + addChild(factor) + + def desc: String = { + oprs.foldRight(factor.desc) { (opr, fac) => + s"(${trans(opr)} ${fac})" + } + } + def coalesceDesc: String = { + oprs.foldRight(factor.coalesceDesc) { (opr, fac) => + s"(${trans(opr)} ${fac})" + } + } + private def trans(s: String): String = { + s match { + case "!" => "NOT" + case _ => s.toUpperCase + } + } + override def extractSelf: Expr = { + if (oprs.nonEmpty) this + else factor.extractSelf + } + + override def map(func: (Expr) => Expr): UnaryLogicalExpr = { + UnaryLogicalExpr(oprs, func(factor).asInstanceOf[LogicalExpr]) + } +} + +case class BinaryLogicalExpr(factor: LogicalExpr, tails: Seq[(String, LogicalExpr)]) extends LogicalExpr { + + addChildren(factor +: tails.map(_._2)) + + def desc: String = { + val res = tails.foldLeft(factor.desc) { (fac, tail) => + val (opr, expr) = tail + s"${fac} ${trans(opr)} ${expr.desc}" + } + if (tails.size <= 0) res else s"${res}" + } + def coalesceDesc: String = { + val res = tails.foldLeft(factor.coalesceDesc) { (fac, tail) => + val (opr, expr) = tail + s"${fac} ${trans(opr)} ${expr.coalesceDesc}" + } + if (tails.size <= 0) res else s"${res}" + } + private def trans(s: String): String = { + s match { + case "&&" => "AND" + case "||" => "OR" + case _ => s.trim.toUpperCase + } + } + override def extractSelf: Expr = { + if (tails.nonEmpty) this + else factor.extractSelf + } + + override def map(func: (Expr) => Expr): BinaryLogicalExpr = { + BinaryLogicalExpr(func(factor).asInstanceOf[LogicalExpr], tails.map{ pair => + (pair._1, func(pair._2).asInstanceOf[LogicalExpr]) + }) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/MathExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/MathExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/MathExpr.scala new file mode 100644 index 0000000..38732a7 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/MathExpr.scala @@ -0,0 +1,94 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.expr + +trait MathExpr extends Expr { +} + +case class MathFactorExpr(factor: Expr, withBracket: Boolean, aliasOpt: Option[String] + ) extends MathExpr with AliasableExpr { + + addChild(factor) + + def desc: String = if (withBracket) s"(${factor.desc})" else factor.desc + def coalesceDesc: String = factor.coalesceDesc + def alias: Option[String] = aliasOpt + override def extractSelf: Expr = { + if (aliasOpt.nonEmpty) this + else factor.extractSelf + } + + override def map(func: (Expr) => Expr): MathFactorExpr = { + MathFactorExpr(func(factor), withBracket, aliasOpt) + } +} + +case class UnaryMathExpr(oprs: Seq[String], factor: MathExpr) extends MathExpr { + + addChild(factor) + + def desc: String = { + oprs.foldRight(factor.desc) { (opr, fac) => + s"(${opr}${fac})" + } + } + def coalesceDesc: String = { + oprs.foldRight(factor.coalesceDesc) { (opr, fac) => + s"(${opr}${fac})" + } + } + override def extractSelf: Expr = { + if (oprs.nonEmpty) this + else factor.extractSelf + } + + override def map(func: (Expr) => Expr): UnaryMathExpr = { + UnaryMathExpr(oprs, func(factor).asInstanceOf[MathExpr]) + } +} + +case class BinaryMathExpr(factor: MathExpr, tails: Seq[(String, MathExpr)]) extends MathExpr { + + addChildren(factor +: tails.map(_._2)) + + def desc: String = { + val res = tails.foldLeft(factor.desc) { (fac, tail) => + val (opr, expr) = tail + s"${fac} ${opr} ${expr.desc}" + } + if (tails.size <= 0) res else s"${res}" + } + def coalesceDesc: String = { + val res = tails.foldLeft(factor.coalesceDesc) { (fac, tail) => + val (opr, expr) = tail + s"${fac} ${opr} ${expr.coalesceDesc}" + } + if (tails.size <= 0) res else s"${res}" + } + override def extractSelf: Expr = { + if (tails.nonEmpty) this + else factor.extractSelf + } + + override def map(func: (Expr) => Expr): BinaryMathExpr = { + BinaryMathExpr(func(factor).asInstanceOf[MathExpr], tails.map{ pair => + (pair._1, func(pair._2).asInstanceOf[MathExpr]) + }) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala new file mode 100644 index 0000000..b4ed2ac --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/SelectExpr.scala @@ -0,0 +1,132 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.expr + +trait HeadExpr extends Expr with AliasableExpr { + def alias: Option[String] = None +} + +case class DataSourceHeadExpr(name: String) extends HeadExpr { + def desc: String = s"`${name}`" + def coalesceDesc: String = desc +} + +case class FieldNameHeadExpr(field: String) extends HeadExpr { + def desc: String = s"`${field}`" + def coalesceDesc: String = desc + override def alias: Option[String] = Some(field) +} + +case class AllSelectHeadExpr() extends HeadExpr { + def desc: String = "*" + def coalesceDesc: String = desc +} + +case class OtherHeadExpr(expr: Expr) extends HeadExpr { + + addChild(expr) + + def desc: String = expr.desc + def coalesceDesc: String = expr.coalesceDesc + override def alias: Option[String] = Some(expr.desc) + + override def map(func: (Expr) => Expr): OtherHeadExpr = { + OtherHeadExpr(func(expr)) + } +} + +// ------------- + +trait SelectExpr extends Expr with AliasableExpr { +} + +case class AllFieldsSelectExpr() extends SelectExpr { + def desc: String = s".*" + def coalesceDesc: String = desc + def alias: Option[String] = None +} + +case class FieldSelectExpr(field: String) extends SelectExpr { + def desc: String = s".`${field}`" + def coalesceDesc: String = desc + override def alias: Option[String] = Some(field) +} + +case class IndexSelectExpr(index: Expr) extends SelectExpr { + + addChild(index) + + def desc: String = s"[${index.desc}]" + def coalesceDesc: String = desc + def alias: Option[String] = Some(index.desc) + + override def map(func: (Expr) => Expr): IndexSelectExpr = { + IndexSelectExpr(func(index)) + } +} + +case class FunctionSelectExpr(functionName: String, args: Seq[Expr]) extends SelectExpr { + + addChildren(args) + + def desc: String = "" + def coalesceDesc: String = desc + def alias: Option[String] = Some(functionName) + + override def map(func: (Expr) => Expr): FunctionSelectExpr = { + FunctionSelectExpr(functionName, args.map(func(_))) + } +} + +// ------------- + +case class SelectionExpr(head: HeadExpr, selectors: Seq[SelectExpr], aliasOpt: Option[String]) extends SelectExpr { + + addChildren(head +: selectors) + + def desc: String = { + selectors.foldLeft(head.desc) { (hd, sel) => + sel match { + case FunctionSelectExpr(funcName, args) => { + val nargs = hd +: args.map(_.desc) + s"${funcName}(${nargs.mkString(", ")})" + } + case _ => s"${hd}${sel.desc}" + } + } + } + def coalesceDesc: String = { + selectors.lastOption match { + case None => desc + case Some(sel: FunctionSelectExpr) => desc + case _ => s"coalesce(${desc}, '')" + } + } + def alias: Option[String] = { + if (aliasOpt.isEmpty) { + val aliasSeq = (head +: selectors).flatMap(_.alias) + if (aliasSeq.size > 0) Some(aliasSeq.mkString("_")) else None + } else aliasOpt + } + + override def map(func: (Expr) => Expr): SelectionExpr = { + SelectionExpr(func(head).asInstanceOf[HeadExpr], + selectors.map(func(_).asInstanceOf[SelectExpr]), aliasOpt) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala new file mode 100644 index 0000000..32330cd --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/expr/TreeNode.scala @@ -0,0 +1,45 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.expr + +trait TreeNode extends Serializable { + + var children = Seq[TreeNode]() + + def addChild(expr: TreeNode) = { children :+= expr } + def addChildren(exprs: Seq[TreeNode]) = { children ++= exprs } + + def preOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T): T = { + if (this.isInstanceOf[A]) { + val tv = seqOp(this.asInstanceOf[A], z) + children.foldLeft(combOp(z, tv)) { (ov, tn) => + combOp(ov, tn.preOrderTraverseDepthFirst(z)(seqOp, combOp)) + } + } else z + } + def postOrderTraverseDepthFirst[T, A <: TreeNode](z: T)(seqOp: (A, T) => T, combOp: (T, T) => T): T = { + if (this.isInstanceOf[A]) { + val cv = children.foldLeft(z) { (ov, tn) => + combOp(ov, tn.postOrderTraverseDepthFirst(z)(seqOp, combOp)) + } + combOp(z, seqOp(this.asInstanceOf[A], cv)) + } else z + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala new file mode 100644 index 0000000..ac540b0 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/BasicParser.scala @@ -0,0 +1,391 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.parser + +import org.apache.griffin.measure.step.builder.dsl.expr._ + +import scala.util.parsing.combinator.JavaTokenParsers + +/** + * basic parser for sql like syntax + */ +trait BasicParser extends JavaTokenParsers with Serializable { + + val dataSourceNames: Seq[String] + val functionNames: Seq[String] + + private def trim(str: String): String = { + val regex = """`(.*)`""".r + str match { + case regex(s) => s + case _ => str + } + } + + /** + * BNF for basic parser + * + * -- literal -- + * <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean> | <literal-null> | <literal-nan> + * <literal-string> ::= <any-string> + * <literal-number> ::= <integer> | <double> + * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms") + * <literal-boolean> ::= true | false + * <literal-null> ::= null + * <literal-nan> ::= nan + * + * -- selection -- + * <selection> ::= <selection-head> [ <field-sel> | <index-sel> | <function-sel> ]* [<as-alias>]? + * <selection-head> ::= ("data source name registered") | <function> | <field-name> | <all-selection> + * <field-sel> ::= "." <field-name> | "[" <quote-field-name> "]" + * <index-sel> ::= "[" <arg> "]" + * <function-sel> ::= "." <function-name> "(" [<arg>]? [, <arg>]* ")" + * <arg> ::= <math-expr> + * + * -- as alias -- + * <as-alias> ::= <as> <field-name> + * + * -- math expr -- + * <math-factor> ::= <literal> | <function> | <selection> | "(" <math-expr> ")" [<as-alias>]? + * <unary-math-expr> ::= [<unary-opr>]* <math-factor> + * <binary-math-expr> ::= <unary-math-expr> [<binary-opr> <unary-math-expr>]+ + * <math-expr> ::= <binary-math-expr> + * + * -- logical expr -- + * <in-expr> ::= <math-expr> [<not>]? <in> <range-expr> + * <between-expr> ::= <math-expr> [<not>]? <between> (<math-expr> <and> <math-expr> | <range-expr>) + * <range-expr> ::= "(" [<math-expr>]? [, <math-expr>]+ ")" + * <like-expr> ::= <math-expr> [<not>]? <like> <math-expr> + * <is-null-expr> ::= <math-expr> <is> [<not>]? <null> + * <is-nan-expr> ::= <math-expr> <is> [<not>]? <nan> + * + * <logical-factor> ::= <math-expr> | <in-expr> | <between-expr> | <like-expr> | <is-null-expr> | <is-nan-expr> | "(" <logical-expr> ")" [<as-alias>]? + * <unary-logical-expr> ::= [<unary-logical-opr>]* <logical-factor> + * <binary-logical-expr> ::= <unary-logical-expr> [<binary-logical-opr> <unary-logical-expr>]+ + * <logical-expr> ::= <binary-logical-expr> + * + * -- expression -- + * <expr> = <math-expr> | <logical-expr> + * + * -- function expr -- + * <function> ::= <function-name> "(" [<arg>] [, <arg>]+ ")" [<as-alias>]? + * <function-name> ::= ("function name registered") + * <arg> ::= <expr> + * + * -- clauses -- + * <select-clause> = <expr> [, <expr>]* + * <where-clause> = <where> <expr> + * <from-clause> = <from> ("data source name registered") + * <having-clause> = <having> <expr> + * <groupby-clause> = <group> <by> <expr> [ <having-clause> ]? + * <orderby-item> = <expr> [ <DESC> ]? + * <orderby-clause> = <order> <by> <orderby-item> [ , <orderby-item> ]* + * <limit-clause> = <limit> <expr> + * + * -- combined clauses -- + * <combined-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+ + */ + + protected def genDataSourceNamesParser(names: Seq[String]): Parser[String] = { + names.reverse.map { + fn => s"""(?i)`${fn}`|${fn}""".r: Parser[String] + }.reduce(_ | _) + } + protected def genFunctionNamesParser(names: Seq[String]): Parser[String] = { + names.reverse.map { + fn => s"""(?i)${fn}""".r: Parser[String] + }.reduce(_ | _) + } + + object Literal { + val NULL: Parser[String] = """(?i)null""".r + val NAN: Parser[String] = """(?i)nan""".r + } + import Literal._ + + object Operator { + val MATH_UNARY: Parser[String] = "+" | "-" + val MATH_BINARIES: Seq[Parser[String]] = Seq(("*" | "/" | "%"), ("+" | "-")) + + val NOT: Parser[String] = """(?i)not\s""".r | "!" + val AND: Parser[String] = """(?i)and\s""".r | "&&" + val OR: Parser[String] = """(?i)or\s""".r | "||" + val IN: Parser[String] = """(?i)in\s""".r + val BETWEEN: Parser[String] = """(?i)between\s""".r + val AND_ONLY: Parser[String] = """(?i)and\s""".r + val IS: Parser[String] = """(?i)is\s""".r + val LIKE: Parser[String] = """(?i)like\s""".r + val COMPARE: Parser[String] = "=" | "!=" | "<>" | "<=" | ">=" | "<" | ">" + val LOGICAL_UNARY: Parser[String] = NOT + val LOGICAL_BINARIES: Seq[Parser[String]] = Seq((COMPARE), (AND), (OR)) + + val LSQBR: Parser[String] = "[" + val RSQBR: Parser[String] = "]" + val LBR: Parser[String] = "(" + val RBR: Parser[String] = ")" + + val DOT: Parser[String] = "." + val ALLSL: Parser[String] = "*" + val SQUOTE: Parser[String] = "'" + val DQUOTE: Parser[String] = "\"" + val UQUOTE: Parser[String] = "`" + val COMMA: Parser[String] = "," + + val SELECT: Parser[String] = """(?i)select\s""".r + val DISTINCT: Parser[String] = """(?i)distinct""".r +// val ALL: Parser[String] = """(?i)all""".r + val FROM: Parser[String] = """(?i)from\s""".r + val AS: Parser[String] = """(?i)as\s""".r + val WHERE: Parser[String] = """(?i)where\s""".r + val GROUP: Parser[String] = """(?i)group\s""".r + val ORDER: Parser[String] = """(?i)order\s""".r + val SORT: Parser[String] = """(?i)sort\s""".r + val BY: Parser[String] = """(?i)by\s""".r + val DESC: Parser[String] = """(?i)desc""".r + val ASC: Parser[String] = """(?i)asc""".r + val HAVING: Parser[String] = """(?i)having\s""".r + val LIMIT: Parser[String] = """(?i)limit\s""".r + } + import Operator._ + + object Strings { + def AnyString: Parser[String] = """"(?:\"|[^\"])*"""".r | """'(?:\'|[^'])*'""".r + def SimpleTableFieldName: Parser[String] = """[a-zA-Z_]\w*""".r + def UnQuoteTableFieldName: Parser[String] = """`(?:[\\][`]|[^`])*`""".r +// def FieldName: Parser[String] = UnQuoteTableFieldName | SimpleTableFieldName + def DataSourceName: Parser[String] = genDataSourceNamesParser(dataSourceNames) + def FunctionName: Parser[String] = genFunctionNamesParser(functionNames) + + def IntegerNumber: Parser[String] = """[+\-]?\d+""".r + def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r + def IndexNumber: Parser[String] = IntegerNumber + + def TimeString: Parser[String] = """([+\-]?\d+)(d|h|m|s|ms)""".r + def BooleanString: Parser[String] = """(?i)true|false""".r + } + import Strings._ + + /** + * -- literal -- + * <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean> | <literal-null> | <literal-nan> + * <literal-string> ::= <any-string> + * <literal-number> ::= <integer> | <double> + * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms") + * <literal-boolean> ::= true | false + * <literal-null> ::= null + * <literal-nan> ::= nan + */ + def literal: Parser[LiteralExpr] = literalNull | literalNan | literalBoolean | literalString | literalTime | literalNumber + def literalNull: Parser[LiteralNullExpr] = NULL ^^ { LiteralNullExpr(_) } + def literalNan: Parser[LiteralNanExpr] = NAN ^^ { LiteralNanExpr(_) } + def literalString: Parser[LiteralStringExpr] = AnyString ^^ { LiteralStringExpr(_) } + def literalNumber: Parser[LiteralNumberExpr] = (DoubleNumber | IntegerNumber) ^^ { LiteralNumberExpr(_) } + def literalTime: Parser[LiteralTimeExpr] = TimeString ^^ { LiteralTimeExpr(_) } + def literalBoolean: Parser[LiteralBooleanExpr] = BooleanString ^^ { LiteralBooleanExpr(_) } + + /** + * -- selection -- + * <selection> ::= <selection-head> [ <field-sel> | <index-sel> | <function-sel> ]* [<as-alias>]? + * <selection-head> ::= ("data source name registered") | <function> | <field-name> | <all-selection> + * <field-sel> ::= "." <field-name> | "[" <quote-field-name> "]" + * <index-sel> ::= "[" <arg> "]" + * <function-sel> ::= "." <function-name> "(" [<arg>]? [, <arg>]* ")" + * <arg> ::= <math-expr> + */ + + def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ~ opt(asAlias) ^^ { + case head ~ sels ~ aliasOpt => SelectionExpr(head, sels, aliasOpt) + } + def selectionHead: Parser[HeadExpr] = DataSourceName ^^ { + ds => DataSourceHeadExpr(trim(ds)) + } | function ^^ { + OtherHeadExpr(_) + } | SimpleTableFieldName ^^ { + FieldNameHeadExpr(_) + } | UnQuoteTableFieldName ^^ { s => + FieldNameHeadExpr(trim(s)) + } | ALLSL ^^ { _ => + AllSelectHeadExpr() + } + def selector: Parser[SelectExpr] = functionSelect | allFieldsSelect | fieldSelect | indexSelect + def allFieldsSelect: Parser[AllFieldsSelectExpr] = DOT ~> ALLSL ^^ { _ => AllFieldsSelectExpr() } + def fieldSelect: Parser[FieldSelectExpr] = DOT ~> ( + SimpleTableFieldName ^^ { + FieldSelectExpr(_) + } | UnQuoteTableFieldName ^^ { s => + FieldSelectExpr(trim(s)) + }) + def indexSelect: Parser[IndexSelectExpr] = LSQBR ~> argument <~ RSQBR ^^ { IndexSelectExpr(_) } + def functionSelect: Parser[FunctionSelectExpr] = DOT ~ FunctionName ~ LBR ~ repsep(argument, COMMA) ~ RBR ^^ { + case _ ~ name ~ _ ~ args ~ _ => FunctionSelectExpr(name, args) + } + + /** + * -- as alias -- + * <as-alias> ::= <as> <field-name> + */ + def asAlias: Parser[String] = AS ~> (SimpleTableFieldName | UnQuoteTableFieldName ^^ { trim(_) }) + + /** + * -- math expr -- + * <math-factor> ::= <literal> | <function> | <selection> | "(" <math-expr> ")" [<as-alias>]? + * <unary-math-expr> ::= [<unary-opr>]* <math-factor> + * <binary-math-expr> ::= <unary-math-expr> [<binary-opr> <unary-math-expr>]+ + * <math-expr> ::= <binary-math-expr> + */ + + def mathFactor: Parser[MathExpr] = (literal | function | selection) ^^ { + MathFactorExpr(_, false, None) + } | LBR ~ mathExpression ~ RBR ~ opt(asAlias) ^^ { + case _ ~ expr ~ _ ~ aliasOpt => MathFactorExpr(expr, true, aliasOpt) + } + def unaryMathExpression: Parser[MathExpr] = rep(MATH_UNARY) ~ mathFactor ^^ { + case Nil ~ a => a + case list ~ a => UnaryMathExpr(list, a) + } + def binaryMathExpressions: Seq[Parser[MathExpr]] = + MATH_BINARIES.foldLeft(List[Parser[MathExpr]](unaryMathExpression)) { (parsers, binaryParser) => + val pre = parsers.head + val cur = pre ~ rep(binaryParser ~ pre) ^^ { + case a ~ Nil => a + case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2))) + } + cur :: parsers + } + def mathExpression: Parser[MathExpr] = binaryMathExpressions.head + + /** + * -- logical expr -- + * <in-expr> ::= <math-expr> [<not>]? <in> <range-expr> + * <between-expr> ::= <math-expr> [<not>]? <between> (<math-expr> <and> <math-expr> | <range-expr>) + * <range-expr> ::= "(" [<math-expr>]? [, <math-expr>]+ ")" + * <like-expr> ::= <math-expr> [<not>]? <like> <math-expr> + * <is-null-expr> ::= <math-expr> <is> [<not>]? <null> + * <is-nan-expr> ::= <math-expr> <is> [<not>]? <nan> + * + * <logical-factor> ::= <math-expr> | <in-expr> | <between-expr> | <like-expr> | <is-null-expr> | <is-nan-expr> | "(" <logical-expr> ")" [<as-alias>]? + * <unary-logical-expr> ::= [<unary-logical-opr>]* <logical-factor> + * <binary-logical-expr> ::= <unary-logical-expr> [<binary-logical-opr> <unary-logical-expr>]+ + * <logical-expr> ::= <binary-logical-expr> + */ + + def inExpr: Parser[LogicalExpr] = mathExpression ~ opt(NOT) ~ IN ~ LBR ~ repsep(mathExpression, COMMA) ~ RBR ^^ { + case head ~ notOpt ~ _ ~ _ ~ list ~ _ => InExpr(head, notOpt.isEmpty, list) + } + def betweenExpr: Parser[LogicalExpr] = mathExpression ~ opt(NOT) ~ BETWEEN ~ LBR ~ repsep(mathExpression, COMMA) ~ RBR ^^ { + case head ~ notOpt ~ _ ~ _ ~ list ~ _ => BetweenExpr(head, notOpt.isEmpty, list) + } | mathExpression ~ opt(NOT) ~ BETWEEN ~ mathExpression ~ AND_ONLY ~ mathExpression ^^ { + case head ~ notOpt ~ _ ~ first ~ _ ~ second => BetweenExpr(head, notOpt.isEmpty, Seq(first, second)) + } + def likeExpr: Parser[LogicalExpr] = mathExpression ~ opt(NOT) ~ LIKE ~ mathExpression ^^ { + case head ~ notOpt ~ _ ~ value => LikeExpr(head, notOpt.isEmpty, value) + } + def isNullExpr: Parser[LogicalExpr] = mathExpression ~ IS ~ opt(NOT) ~ NULL ^^ { + case head ~ _ ~ notOpt ~ _ => IsNullExpr(head, notOpt.isEmpty) + } + def isNanExpr: Parser[LogicalExpr] = mathExpression ~ IS ~ opt(NOT) ~ NAN ^^ { + case head ~ _ ~ notOpt ~ _ => IsNanExpr(head, notOpt.isEmpty) + } + + def logicalFactor: Parser[LogicalExpr] = (inExpr | betweenExpr | likeExpr | isNullExpr | isNanExpr | mathExpression) ^^ { + LogicalFactorExpr(_, false, None) + } | LBR ~ logicalExpression ~ RBR ~ opt(asAlias) ^^ { + case _ ~ expr ~ _ ~ aliasOpt => LogicalFactorExpr(expr, true, aliasOpt) + } + def unaryLogicalExpression: Parser[LogicalExpr] = rep(LOGICAL_UNARY) ~ logicalFactor ^^ { + case Nil ~ a => a + case list ~ a => UnaryLogicalExpr(list, a) + } + def binaryLogicalExpressions: Seq[Parser[LogicalExpr]] = + LOGICAL_BINARIES.foldLeft(List[Parser[LogicalExpr]](unaryLogicalExpression)) { (parsers, binaryParser) => + val pre = parsers.head + val cur = pre ~ rep(binaryParser ~ pre) ^^ { + case a ~ Nil => a + case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2))) + } + cur :: parsers + } + def logicalExpression: Parser[LogicalExpr] = binaryLogicalExpressions.head + + /** + * -- expression -- + * <expr> = <math-expr> | <logical-expr> + */ + + def expression: Parser[Expr] = logicalExpression | mathExpression + + /** + * -- function expr -- + * <function> ::= <function-name> "(" [<arg>] [, <arg>]+ ")" [<as-alias>]? + * <function-name> ::= ("function name registered") + * <arg> ::= <expr> + */ + + def function: Parser[FunctionExpr] = FunctionName ~ LBR ~ opt(DISTINCT) ~ repsep(argument, COMMA) ~ RBR ~ opt(asAlias) ^^ { + case name ~ _ ~ extraCdtnOpt ~ args ~ _ ~ aliasOpt => + FunctionExpr(name, args, extraCdtnOpt.map(ExtraConditionExpr(_)), aliasOpt) + } + def argument: Parser[Expr] = expression + + /** + * -- clauses -- + * <select-clause> = <expr> [, <expr>]* + * <where-clause> = <where> <expr> + * <from-clause> = <from> ("data source name registered") + * <having-clause> = <having> <expr> + * <groupby-clause> = <group> <by> <expr> [ <having-clause> ]? + * <orderby-item> = <expr> [ <DESC> ]? + * <orderby-clause> = <order> <by> <orderby-item> [ , <orderby-item> ]* + * <limit-clause> = <limit> <expr> + */ + + def selectClause: Parser[SelectClause] = opt(SELECT) ~> opt(DISTINCT) ~ rep1sep(expression, COMMA) ^^ { + case extraCdtnOpt ~ exprs => SelectClause(exprs, extraCdtnOpt.map(ExtraConditionExpr(_))) + } + def fromClause: Parser[FromClause] = FROM ~> DataSourceName ^^ { ds => FromClause(trim(ds)) } + def whereClause: Parser[WhereClause] = WHERE ~> expression ^^ { WhereClause(_) } + def havingClause: Parser[Expr] = HAVING ~> expression + def groupbyClause: Parser[GroupbyClause] = GROUP ~ BY ~ rep1sep(expression, COMMA) ~ opt(havingClause) ^^ { + case _ ~ _ ~ cols ~ havingOpt => GroupbyClause(cols, havingOpt) + } + def orderItem: Parser[OrderItem] = expression ~ opt(DESC | ASC) ^^ { + case expr ~ orderOpt => OrderItem(expr, orderOpt) + } + def orderbyClause: Parser[OrderbyClause] = ORDER ~ BY ~ rep1sep(orderItem, COMMA) ^^ { + case _ ~ _ ~ cols => OrderbyClause(cols) + } + def sortbyClause: Parser[SortbyClause] = SORT ~ BY ~ rep1sep(orderItem, COMMA) ^^ { + case _ ~ _ ~ cols => SortbyClause(cols) + } + def limitClause: Parser[LimitClause] = LIMIT ~> expression ^^ { LimitClause(_) } + + /** + * -- combined clauses -- + * <combined-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+ + */ + + def combinedClause: Parser[CombinedClause] = selectClause ~ opt(fromClause) ~ opt(whereClause) ~ + opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ { + case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => { + val tails = Seq(whereOpt, groupbyOpt, orderbyOpt, limitOpt).flatMap(opt => opt) + CombinedClause(sel, fromOpt, tails) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala new file mode 100644 index 0000000..2cd638c --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/parser/GriffinDslParser.scala @@ -0,0 +1,98 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.parser + +import org.apache.griffin.measure.configuration.enums._ +import org.apache.griffin.measure.step.builder.dsl._ +import org.apache.griffin.measure.step.builder.dsl.expr._ + +/** + * parser for griffin dsl rule + */ +case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[String] + ) extends BasicParser { + + import Operator._ + + /** + * -- profiling clauses -- + * <profiling-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+ + */ + + def profilingClause: Parser[ProfilingClause] = selectClause ~ opt(fromClause) ~ opt(whereClause) ~ + opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ { + case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => { + val preClauses = Seq(whereOpt).flatMap(opt => opt) + val postClauses = Seq(orderbyOpt, limitOpt).flatMap(opt => opt) + ProfilingClause(sel, fromOpt, groupbyOpt, preClauses, postClauses) + } + } + + /** + * -- uniqueness clauses -- + * <uniqueness-clauses> = <expr> [, <expr>]+ + */ + def uniquenessClause: Parser[UniquenessClause] = rep1sep(expression, Operator.COMMA) ^^ { + case exprs => UniquenessClause(exprs) + } + + /** + * -- distinctness clauses -- + * <sqbr-expr> = "[" <expr> "]" + * <dist-expr> = <sqbr-expr> | <expr> + * <distinctness-clauses> = <distExpr> [, <distExpr>]+ + */ + def sqbrExpr: Parser[Expr] = LSQBR ~> expression <~ RSQBR ^^ { + case expr => { expr.tag = "[]"; expr} + } + def distExpr: Parser[Expr] = expression | sqbrExpr + def distinctnessClause: Parser[DistinctnessClause] = rep1sep(distExpr, Operator.COMMA) ^^ { + case exprs => DistinctnessClause(exprs) + } + + /** + * -- timeliness clauses -- + * <timeliness-clauses> = <expr> [, <expr>]+ + */ + def timelinessClause: Parser[TimelinessClause] = rep1sep(expression, Operator.COMMA) ^^ { + case exprs => TimelinessClause(exprs) + } + + /** + * -- completeness clauses -- + * <completeness-clauses> = <expr> [, <expr>]+ + */ + def completenessClause: Parser[CompletenessClause] = rep1sep(expression, Operator.COMMA) ^^ { + case exprs => CompletenessClause(exprs) + } + + def parseRule(rule: String, dqType: DqType): ParseResult[Expr] = { + val rootExpr = dqType match { + case AccuracyType => logicalExpression + case ProfilingType => profilingClause + case UniquenessType => uniquenessClause + case DistinctnessType => distinctnessClause + case TimelinessType => timelinessClause + case CompletenessType => completenessClause + case _ => expression + } + parseAll(rootExpr, rule) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala new file mode 100644 index 0000000..a0e5ca3 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala @@ -0,0 +1,200 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform + +import org.apache.griffin.measure.configuration.enums.{BatchProcessType, NormalizeType, StreamingProcessType} +import org.apache.griffin.measure.configuration.params.RuleParam +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.DQStep +import org.apache.griffin.measure.step.builder.ConstantColumns +import org.apache.griffin.measure.step.builder.dsl.expr._ +import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.AccuracyAnalyzer +import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys +import org.apache.griffin.measure.step.transform.{DataFrameOps, DataFrameOpsTransformStep, SparkSqlTransformStep} +import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, MetricWriteStep, RecordWriteStep} +import org.apache.griffin.measure.utils.ParamUtil._ + +/** + * generate accuracy dq steps + */ +case class AccuracyExpr2DQSteps(context: DQContext, + expr: Expr, + ruleParam: RuleParam + ) extends Expr2DQSteps { + + private object AccuracyKeys { + val _source = "source" + val _target = "target" + val _miss = "miss" + val _total = "total" + val _matched = "matched" + } + import AccuracyKeys._ + + def getDQSteps(): Seq[DQStep] = { + val details = ruleParam.getDetails + val accuracyExpr = expr.asInstanceOf[LogicalExpr] + + val sourceName = details.getString(_source, context.getDataSourceName(0)) + val targetName = details.getString(_target, context.getDataSourceName(1)) + val analyzer = AccuracyAnalyzer(accuracyExpr, sourceName, targetName) + + val procType = context.procType + val timestamp = context.contextId.timestamp + + if (!context.runTimeTableRegister.existsTable(sourceName)) { + warn(s"[${timestamp}] data source ${sourceName} not exists") + Nil + } else { + // 1. miss record + val missRecordsTableName = "__missRecords" + val selClause = s"`${sourceName}`.*" + val missRecordsSql = if (!context.runTimeTableRegister.existsTable(targetName)) { + warn(s"[${timestamp}] data source ${targetName} not exists") + s"SELECT ${selClause} FROM `${sourceName}`" + } else { + val onClause = expr.coalesceDesc + val sourceIsNull = analyzer.sourceSelectionExprs.map { sel => + s"${sel.desc} IS NULL" + }.mkString(" AND ") + val targetIsNull = analyzer.targetSelectionExprs.map { sel => + s"${sel.desc} IS NULL" + }.mkString(" AND ") + val whereClause = s"(NOT (${sourceIsNull})) AND (${targetIsNull})" + s"SELECT ${selClause} FROM `${sourceName}` LEFT JOIN `${targetName}` ON ${onClause} WHERE ${whereClause}" + } + val missRecordsTransStep = SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true) + val missRecordsWriteSteps = procType match { + case BatchProcessType => { + val rwName = ruleParam.recordOpt.map(_.name).getOrElse(missRecordsTableName) + RecordWriteStep(rwName, missRecordsTableName) :: Nil + } + case StreamingProcessType => Nil + } + val missRecordsUpdateWriteSteps = procType match { + case BatchProcessType => Nil + case StreamingProcessType => { + val dsName = ruleParam.dsCacheUpdateOpt.map(_.dsName).getOrElse(sourceName) + DsCacheUpdateWriteStep(dsName, missRecordsTableName) :: Nil + } + } + + // 2. miss count + val missCountTableName = "__missCount" + val missColName = details.getStringOrKey(_miss) + val missCountSql = procType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}`" + case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${missColName}` FROM `${missRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`" + } + val missCountTransStep = SparkSqlTransformStep(missCountTableName, missCountSql, emptyMap) + + // 3. total count + val totalCountTableName = "__totalCount" + val totalColName = details.getStringOrKey(_total) + val totalCountSql = procType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceName}`" + case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceName}` GROUP BY `${ConstantColumns.tmst}`" + } + val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap) + + // 4. accuracy metric + val accuracyTableName = ruleParam.name + val matchedColName = details.getStringOrKey(_matched) + val accuracyMetricSql = procType match { + case BatchProcessType => { + s""" + |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, + |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, + |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}` + |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` + """.stripMargin + } + case StreamingProcessType => { + s""" + |SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS `${ConstantColumns.tmst}`, + |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, + |coalesce(`${missCountTableName}`.`${missColName}`, 0) AS `${missColName}`, + |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${missCountTableName}`.`${missColName}`, 0)) AS `${matchedColName}` + |FROM `${totalCountTableName}` LEFT JOIN `${missCountTableName}` + |ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = `${missCountTableName}`.`${ConstantColumns.tmst}` + """.stripMargin + } + } + val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap) + val accuracyMetricWriteSteps = procType match { + case BatchProcessType => { + val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name) + val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse("")) + MetricWriteStep(mwName, accuracyTableName, collectType) :: Nil + } + case StreamingProcessType => Nil + } + + // accuracy current steps + val transSteps1 = missRecordsTransStep :: missCountTransStep :: totalCountTransStep :: accuracyTransStep :: Nil + val writeSteps1 = accuracyMetricWriteSteps ++ missRecordsWriteSteps ++ missRecordsUpdateWriteSteps + + // streaming extra steps + val (transSteps2, writeSteps2) = procType match { + case BatchProcessType => (Nil, Nil) + case StreamingProcessType => { + // 5. accuracy metric merge + val accuracyMetricTableName = "__accuracy" + val accuracyMetricRule = DataFrameOps._accuracy + val accuracyMetricDetails = Map[String, Any]( + (AccuracyOprKeys._dfName -> accuracyTableName), + (AccuracyOprKeys._miss -> missColName), + (AccuracyOprKeys._total -> totalColName), + (AccuracyOprKeys._matched -> matchedColName) + ) + val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName, + accuracyMetricRule, accuracyMetricDetails) + val accuracyMetricWriteStep = { + val mwName = ruleParam.metricOpt.map(_.name).getOrElse(ruleParam.name) + val collectType = NormalizeType(ruleParam.metricOpt.map(_.collectType).getOrElse("")) + MetricWriteStep(mwName, accuracyMetricTableName, collectType) + } + + // 6. collect accuracy records + val accuracyRecordTableName = "__accuracyRecords" + val accuracyRecordSql = { + s""" + |SELECT `${ConstantColumns.tmst}`, `${ConstantColumns.empty}` + |FROM `${accuracyMetricTableName}` WHERE `${ConstantColumns.record}` + """.stripMargin + } + val accuracyRecordTransStep = SparkSqlTransformStep( + accuracyRecordTableName, accuracyRecordSql, emptyMap) + val accuracyRecordWriteStep = { + val rwName = ruleParam.recordOpt.map(_.name).getOrElse(missRecordsTableName) + RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName)) + } + + // extra steps + (accuracyMetricTransStep :: accuracyRecordTransStep :: Nil, + accuracyMetricWriteStep :: accuracyRecordWriteStep :: Nil) + } + } + + // full steps + transSteps1 ++ transSteps2 ++ writeSteps1 ++ writeSteps2 + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/1d7acd57/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala new file mode 100644 index 0000000..fcb576c --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala @@ -0,0 +1,157 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.step.builder.dsl.transform + +import org.apache.griffin.measure.configuration.enums._ +import org.apache.griffin.measure.configuration.params.RuleParam +import org.apache.griffin.measure.context.DQContext +import org.apache.griffin.measure.step.DQStep +import org.apache.griffin.measure.step.builder.ConstantColumns +import org.apache.griffin.measure.step.builder.dsl.expr._ +import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.CompletenessAnalyzer +import org.apache.griffin.measure.step.transform.SparkSqlTransformStep +import org.apache.griffin.measure.step.write.{MetricWriteStep, RecordWriteStep} +import org.apache.griffin.measure.utils.ParamUtil._ + +/** + * generate completeness dq steps + */ +case class CompletenessExpr2DQSteps(context: DQContext, + expr: Expr, + ruleParam: RuleParam + ) extends Expr2DQSteps { + + private object CompletenessKeys { + val _source = "source" + val _total = "total" + val _complete = "complete" + val _incomplete = "incomplete" + } + import CompletenessKeys._ + + def getDQSteps(): Seq[DQStep] = { + val details = ruleParam.getDetails + val completenessExpr = expr.asInstanceOf[CompletenessClause] + + val sourceName = details.getString(_source, context.getDataSourceName(0)) + + val procType = context.procType + val timestamp = context.contextId.timestamp + + if (!context.runTimeTableRegister.existsTable(sourceName)) { + warn(s"[${timestamp}] data source ${sourceName} not exists") + Nil + } else { + val analyzer = CompletenessAnalyzer(completenessExpr, sourceName) + + val selItemsClause = analyzer.selectionPairs.map { pair => + val (expr, alias) = pair + s"${expr.desc} AS `${alias}`" + }.mkString(", ") + val aliases = analyzer.selectionPairs.map(_._2) + + val selClause = procType match { + case BatchProcessType => selItemsClause + case StreamingProcessType => s"`${ConstantColumns.tmst}`, ${selItemsClause}" + } + val selAliases = procType match { + case BatchProcessType => aliases + case StreamingProcessType => ConstantColumns.tmst +: aliases + } + + // 1. source alias + val sourceAliasTableName = "__sourceAlias" + val sourceAliasSql = { + s"SELECT ${selClause} FROM `${sourceName}`" + } + val sourceAliasTransStep = SparkSqlTransformStep(sourceAliasTableName, sourceAliasSql, emptyMap, true) + + // 2. incomplete record + val incompleteRecordsTableName = "__incompleteRecords" + val completeWhereClause = aliases.map(a => s"`${a}` IS NOT NULL").mkString(" AND ") + val incompleteWhereClause = s"NOT (${completeWhereClause})" + val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}" + val incompleteRecordTransStep = SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true) + val incompleteRecordWriteStep = { + val rwName = ruleParam.recordOpt.map(_.name).getOrElse(incompleteRecordsTableName) + RecordWriteStep(rwName, incompleteRecordsTableName) + } + + // 3. incomplete count + val incompleteCountTableName = "__incompleteCount" + val incompleteColName = details.getStringOrKey(_incomplete) + val incompleteCountSql = procType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}`" + case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${incompleteColName}` FROM `${incompleteRecordsTableName}` GROUP BY `${ConstantColumns.tmst}`" + } + val incompleteCountTransStep = SparkSqlTransformStep(incompleteCountTableName, incompleteCountSql, emptyMap) + + // 4. total count + val totalCountTableName = "__totalCount" + val totalColName = details.getStringOrKey(_total) + val totalCountSql = procType match { + case BatchProcessType => s"SELECT COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}`" + case StreamingProcessType => s"SELECT `${ConstantColumns.tmst}`, COUNT(*) AS `${totalColName}` FROM `${sourceAliasTableName}` GROUP BY `${ConstantColumns.tmst}`" + } + val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap) + + // 5. complete metric + val completeTableName = ruleParam.name + val completeColName = details.getStringOrKey(_complete) + val completeMetricSql = procType match { + case BatchProcessType => { + s""" + |SELECT `${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, + |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`, + |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}` + |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}` + """.stripMargin + } + case StreamingProcessType => { + s""" + |SELECT `${totalCountTableName}`.`${ConstantColumns.tmst}` AS `${ConstantColumns.tmst}`, + |`${totalCountTableName}`.`${totalColName}` AS `${totalColName}`, + |coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0) AS `${incompleteColName}`, + |(`${totalCountTableName}`.`${totalColName}` - coalesce(`${incompleteCountTableName}`.`${incompleteColName}`, 0)) AS `${completeColName}` + |FROM `${totalCountTableName}` LEFT JOIN `${incompleteCountTableName}` + |ON `${totalCountTableName}`.`${ConstantColumns.tmst}` = `${incompleteCountTableName}`.`${ConstantColumns.tmst}` + """.stripMargin + } + } + val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap) + val completeWriteStep = { + val mwName = ruleParam.metricOpt.map(_.name).getOrElse(completeTableName) + MetricWriteStep(mwName, completeTableName, DefaultNormalizeType) + } + + val transSteps = { + sourceAliasTransStep :: incompleteRecordTransStep :: + incompleteCountTransStep :: totalCountTransStep :: + completeTransStep :: Nil + } + val writeSteps = { + incompleteRecordWriteStep :: completeWriteStep :: Nil + } + + // full steps + transSteps ++ writeSteps + } + } + +}
