Update dsl modification supporting select and from clause in griffin dsl
Author: Lionel Liu <[email protected]> Closes #128 from bhlx3lyx7/dsl-modify. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/417c931f Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/417c931f Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/417c931f Branch: refs/heads/master Commit: 417c931f22e7faceda86fd7813fdf5a8c30bdc6e Parents: 43f9dbf Author: Lionel Liu <[email protected]> Authored: Tue Oct 10 14:44:48 2017 +0800 Committer: Lionel Liu <[email protected]> Committed: Tue Oct 10 14:44:48 2017 +0800 ---------------------------------------------------------------------- .gitignore | 3 + griffin-doc/dsl-guide.md | 22 +- measure/derby.log | 13 - .../rule/adaptor/GriffinDslAdaptor.scala | 20 +- .../rule/dsl/expr/ClauseExpression.scala | 46 +- .../measure/rule/dsl/parser/BasicParser.scala | 21 +- .../rule/dsl/parser/GriffinDslParser.scala | 8 +- .../test/resources/config-test-profiling.json | 3 +- measure/src/test/resources/input.msg | 1 - measure/src/test/resources/output.msg | 1 - .../measure/process/BatchProcessTest.scala | 292 +++--- .../griffin/measure/process/JsonParseTest.scala | 980 +++++++++---------- .../measure/process/StreamingProcessTest.scala | 294 +++--- .../rule/dsl/parser/BasicParserTest.scala | 14 + .../griffin/measure/utils/HdfsUtilTest.scala | 104 +- .../core/measure/repo/DataSourceRepo.java | 26 - .../griffin/core/measure/repo/RuleRepo.java | 26 - 17 files changed, 943 insertions(+), 931 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 3764dc9..ad52fe5 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,6 @@ ui/bower_components/* ui/node_modules/* ui/debug.log ui/package-lock.json + +derby.log +metastore_db http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/griffin-doc/dsl-guide.md ---------------------------------------------------------------------- diff --git a/griffin-doc/dsl-guide.md b/griffin-doc/dsl-guide.md index 6a7b3f8..40a37a4 100644 --- a/griffin-doc/dsl-guide.md +++ b/griffin-doc/dsl-guide.md @@ -79,5 +79,25 @@ Griffin DSL is SQL-like, case insensitive, and easy to learn. e.g. `source.age between 3 and 30, source.age between (3, 30)` - **like**: like clause like sql. e.g. `source.name like "%abc%"` -- **logical factor**: +- **is null**: is null operator like sql. + e.g. `source.desc is not null` +- **is nan**: check if the value is not a number, the syntax like `is null` + e.g. `source.age is not nan` +- **logical factor**: math expression or logical expressions above or other logical expressions with brackets. + e.g. `(source.user_id = target.user_id AND source.age > target.age)` +- **unary logical expression**: unary logical operator with factor. + e.g. `NOT source.has_data` +- **binary logical expression**: logical factors with binary logical operators, including `and`, `or` and comparison operators. + e.g. `source.age = target.age OR source.ticket = target.tck` + +### Expression +- **expression**: logical expression and math expression. + +### Function +- **argument**: expression. +- **function**: function name with arguments between brackets. + e.g. `max(source.age, target.age), count(*)` + +### Clause +- **select clause**: \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/measure/derby.log ---------------------------------------------------------------------- diff --git a/measure/derby.log b/measure/derby.log deleted file mode 100644 index 4b93055..0000000 --- a/measure/derby.log +++ /dev/null @@ -1,13 +0,0 @@ ----------------------------------------------------------------- -Fri Sep 29 15:53:18 CST 2017: -Booting Derby version The Apache Software Foundation - Apache Derby - 10.10.2.0 - (1582446): instance a816c00e-015e-cca0-1a8b-00000f890648 -on database directory /private/var/folders/p0/462y3wrn4lv1fptxx5bwy7b839572r/T/spark-890ab6e2-ee56-4d73-8c6a-0dcce204322e/metastore with class loader sun.misc.Launcher$AppClassLoader@18b4aac2 -Loaded from file:/Users/lliu13/.m2/repository/org/apache/derby/derby/10.10.2.0/derby-10.10.2.0.jar -java.vendor=Oracle Corporation -java.runtime.version=1.8.0_101-b13 -user.dir=/Users/lliu13/git/incubator-griffin/measure -os.name=Mac OS X -os.arch=x86_64 -os.version=10.12.6 -derby.system.home=null -Database Class Loader started - derby.database.classpath='' http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala index 2a189d4..8199d80 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala @@ -261,13 +261,19 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], } } case ProfilingType => { - val sourceName = getNameOpt(details, ProfilingInfo._Source) match { - case Some(name) => name - case _ => dataSourceNames.head + val profilingClause = expr.asInstanceOf[ProfilingClause] + val sourceName = profilingClause.fromClauseOpt match { + case Some(fc) => fc.dataSource + case _ => { + getNameOpt(details, ProfilingInfo._Source) match { + case Some(name) => name + case _ => dataSourceNames.head + } + } } - val analyzer = ProfilingAnalyzer(expr.asInstanceOf[ProfilingClause], sourceName) + val analyzer = ProfilingAnalyzer(profilingClause, sourceName) - analyzer.selectionExprs.foreach(println) +// analyzer.selectionExprs.foreach(println) val selExprDescs = analyzer.selectionExprs.map { sel => val alias = sel match { @@ -284,6 +290,8 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], (s"`${GroupByColumn.tmst}`" +: selExprDescs).mkString(", ") } + val fromClause = profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc + // val tailClause = analyzer.tailsExprs.map(_.desc).mkString(" ") val tmstGroupbyClause = GroupbyClause(LiteralStringExpr(s"`${GroupByColumn.tmst}`") :: Nil, None) val mergedGroubbyClause = tmstGroupbyClause.merge(analyzer.groupbyExprOpt match { @@ -300,7 +308,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String], // 1. select statement val profilingSql = { // s"SELECT `${GroupByColumn.tmst}`, ${selClause} FROM ${sourceName} ${tailClause} GROUP BY `${GroupByColumn.tmst}`" - s"SELECT ${selClause} FROM ${sourceName} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" + s"SELECT ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}" } val profilingMetricName = resultName(details, ProfilingInfo._Profiling) val profilingStep = SparkSqlStep( http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala index 26882b4..c0986e1 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala @@ -26,7 +26,14 @@ case class SelectClause(exprs: Seq[Expr]) extends ClauseExpression { addChildren(exprs) def desc: String = s"${exprs.map(_.desc).mkString(", ")}" - def coalesceDesc: String = s"${exprs.map(_.desc).mkString(", ")}" + def coalesceDesc: String = desc + +} + +case class FromClause(dataSource: String) extends ClauseExpression { + + def desc: String = s"FROM `${dataSource}`" + def coalesceDesc: String = desc } @@ -107,44 +114,61 @@ case class LimitClause(expr: Expr) extends ClauseExpression { def coalesceDesc: String = s"LIMIT ${expr.coalesceDesc}" } -case class CombinedClause(selectClause: SelectClause, tails: Seq[ClauseExpression] +case class CombinedClause(selectClause: SelectClause, fromClauseOpt: Option[FromClause], + tails: Seq[ClauseExpression] ) extends ClauseExpression { - addChildren(selectClause +: tails) + addChildren({ + val headClauses: Seq[ClauseExpression] = selectClause +: (fromClauseOpt.toSeq) + headClauses ++ tails + }) def desc: String = { - tails.foldLeft(selectClause.desc) { (head, tail) => + val selectDesc = s"SELECT ${selectClause.desc}" + val fromDesc = fromClauseOpt.map(_.desc).mkString(" ") + val headDesc = s"${selectDesc} ${fromDesc}" + tails.foldLeft(headDesc) { (head, tail) => s"${head} ${tail.desc}" } } def coalesceDesc: String = { - tails.foldLeft(selectClause.coalesceDesc) { (head, tail) => + val selectDesc = s"SELECT ${selectClause.coalesceDesc}" + val fromDesc = fromClauseOpt.map(_.coalesceDesc).mkString(" ") + val headDesc = s"${selectDesc} ${fromDesc}" + tails.foldLeft(headDesc) { (head, tail) => s"${head} ${tail.coalesceDesc}" } } } -case class ProfilingClause(selectClause: SelectClause, groupbyClauseOpt: Option[GroupbyClause], +case class ProfilingClause(selectClause: SelectClause, + fromClauseOpt: Option[FromClause], + groupbyClauseOpt: Option[GroupbyClause], preGroupbyClauses: Seq[ClauseExpression], postGroupbyClauses: Seq[ClauseExpression] ) extends ClauseExpression { - addChildren(groupbyClauseOpt match { - case Some(gc) => (selectClause +: preGroupbyClauses) ++ (gc +: postGroupbyClauses) - case _ => (selectClause +: preGroupbyClauses) ++ postGroupbyClauses + addChildren({ + val headClauses: Seq[ClauseExpression] = selectClause +: (fromClauseOpt.toSeq) + groupbyClauseOpt match { + case Some(gc) => (headClauses ++ preGroupbyClauses) ++ (gc +: postGroupbyClauses) + case _ => (headClauses ++ preGroupbyClauses) ++ postGroupbyClauses + } }) def desc: String = { val selectDesc = selectClause.desc + val fromDesc = fromClauseOpt.map(_.desc).mkString(" ") val groupbyDesc = groupbyClauseOpt.map(_.desc).mkString(" ") val preDesc = preGroupbyClauses.map(_.desc).mkString(" ") val postDesc = postGroupbyClauses.map(_.desc).mkString(" ") - s"${selectDesc} ${preDesc} ${groupbyDesc} ${postDesc}" + s"${selectDesc} ${fromDesc} ${preDesc} ${groupbyDesc} ${postDesc}" } def coalesceDesc: String = { val selectDesc = selectClause.coalesceDesc + val fromDesc = fromClauseOpt.map(_.coalesceDesc).mkString(" ") val groupbyDesc = groupbyClauseOpt.map(_.coalesceDesc).mkString(" ") val preDesc = preGroupbyClauses.map(_.coalesceDesc).mkString(" ") val postDesc = postGroupbyClauses.map(_.coalesceDesc).mkString(" ") - s"${selectDesc} ${preDesc} ${groupbyDesc} ${postDesc}" + s"${selectDesc} ${fromDesc} ${preDesc} ${groupbyDesc} ${postDesc}" } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala index 0431354..f55b1f8 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala @@ -27,6 +27,14 @@ trait BasicParser extends JavaTokenParsers with Serializable { val dataSourceNames: Seq[String] val functionNames: Seq[String] + private def trim(str: String): String = { + val regex = """`(.*)`""".r + str match { + case regex(s) => s + case _ => str + } + } + /** * BNF for basic parser * @@ -118,6 +126,8 @@ trait BasicParser extends JavaTokenParsers with Serializable { val UQUOTE: Parser[String] = "`" val COMMA: Parser[String] = "," + val SELECT: Parser[String] = """(?i)select\s""".r + val FROM: Parser[String] = """(?i)from\s""".r val AS: Parser[String] = """(?i)as\s""".r val WHERE: Parser[String] = """(?i)where\s""".r val GROUP: Parser[String] = """(?i)group\s""".r @@ -307,7 +317,8 @@ trait BasicParser extends JavaTokenParsers with Serializable { * <limit-clause> = <limit> <expr> */ - def selectClause: Parser[SelectClause] = rep1sep(expression, COMMA) ^^ { SelectClause(_) } + def selectClause: Parser[SelectClause] = opt(SELECT) ~> rep1sep(expression, COMMA) ^^ { SelectClause(_) } + def fromClause: Parser[FromClause] = FROM ~> TableFieldName ^^ { ds => FromClause(trim(ds)) } def whereClause: Parser[WhereClause] = WHERE ~> expression ^^ { WhereClause(_) } def havingClause: Parser[Expr] = HAVING ~> expression def groupbyClause: Parser[GroupbyClause] = GROUP ~ BY ~ rep1sep(expression, COMMA) ~ opt(havingClause) ^^ { @@ -323,14 +334,14 @@ trait BasicParser extends JavaTokenParsers with Serializable { /** * -- combined clauses -- - * <combined-clauses> = <select-clause> [ <where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+ + * <combined-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+ */ - def combinedClause: Parser[CombinedClause] = selectClause ~ opt(whereClause) ~ + def combinedClause: Parser[CombinedClause] = selectClause ~ opt(fromClause) ~ opt(whereClause) ~ opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ { - case sel ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => { + case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => { val tails = Seq(whereOpt, groupbyOpt, orderbyOpt, limitOpt).flatMap(opt => opt) - CombinedClause(sel, tails) + CombinedClause(sel, fromOpt, tails) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala index 637decb..0800f45 100644 --- a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala @@ -26,15 +26,15 @@ case class GriffinDslParser(dataSourceNames: Seq[String], functionNames: Seq[Str /** * -- profiling clauses -- - * <profiling-clauses> = <select-clause> [ <where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+ + * <profiling-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+ */ - def profilingClause: Parser[ProfilingClause] = selectClause ~ opt(whereClause) ~ + def profilingClause: Parser[ProfilingClause] = selectClause ~ opt(fromClause) ~ opt(whereClause) ~ opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ { - case sel ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => { + case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => { val preClauses = Seq(whereOpt).flatMap(opt => opt) val postClauses = Seq(orderbyOpt, limitOpt).flatMap(opt => opt) - ProfilingClause(sel, groupbyOpt, preClauses, postClauses) + ProfilingClause(sel, fromOpt, groupbyOpt, preClauses, postClauses) } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/measure/src/test/resources/config-test-profiling.json ---------------------------------------------------------------------- diff --git a/measure/src/test/resources/config-test-profiling.json b/measure/src/test/resources/config-test-profiling.json index 187e88a..7c16f24 100644 --- a/measure/src/test/resources/config-test-profiling.json +++ b/measure/src/test/resources/config-test-profiling.json @@ -23,9 +23,8 @@ { "dsl.type": "griffin-dsl", "dq.type": "profiling", - "rule": "user_id as id, user_id.approx_count_distinct() as cnt group by user_id order by cnt desc, id desc limit 3", + "rule": "select user_id as id, user_id.count() as cnt from source group by user_id order by cnt desc, id desc limit 3", "details": { - "source": "source", "profiling": { "name": "count", "persist.type": "metric"
