Update dsl modification

supporting select and from clause in griffin dsl

Author: Lionel Liu <[email protected]>

Closes #128 from bhlx3lyx7/dsl-modify.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/417c931f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/417c931f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/417c931f

Branch: refs/heads/master
Commit: 417c931f22e7faceda86fd7813fdf5a8c30bdc6e
Parents: 43f9dbf
Author: Lionel Liu <[email protected]>
Authored: Tue Oct 10 14:44:48 2017 +0800
Committer: Lionel Liu <[email protected]>
Committed: Tue Oct 10 14:44:48 2017 +0800

----------------------------------------------------------------------
 .gitignore                                      |   3 +
 griffin-doc/dsl-guide.md                        |  22 +-
 measure/derby.log                               |  13 -
 .../rule/adaptor/GriffinDslAdaptor.scala        |  20 +-
 .../rule/dsl/expr/ClauseExpression.scala        |  46 +-
 .../measure/rule/dsl/parser/BasicParser.scala   |  21 +-
 .../rule/dsl/parser/GriffinDslParser.scala      |   8 +-
 .../test/resources/config-test-profiling.json   |   3 +-
 measure/src/test/resources/input.msg            |   1 -
 measure/src/test/resources/output.msg           |   1 -
 .../measure/process/BatchProcessTest.scala      | 292 +++---
 .../griffin/measure/process/JsonParseTest.scala | 980 +++++++++----------
 .../measure/process/StreamingProcessTest.scala  | 294 +++---
 .../rule/dsl/parser/BasicParserTest.scala       |  14 +
 .../griffin/measure/utils/HdfsUtilTest.scala    | 104 +-
 .../core/measure/repo/DataSourceRepo.java       |  26 -
 .../griffin/core/measure/repo/RuleRepo.java     |  26 -
 17 files changed, 943 insertions(+), 931 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 3764dc9..ad52fe5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,3 +31,6 @@ ui/bower_components/*
 ui/node_modules/*
 ui/debug.log
 ui/package-lock.json
+
+derby.log
+metastore_db

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/griffin-doc/dsl-guide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/dsl-guide.md b/griffin-doc/dsl-guide.md
index 6a7b3f8..40a37a4 100644
--- a/griffin-doc/dsl-guide.md
+++ b/griffin-doc/dsl-guide.md
@@ -79,5 +79,25 @@ Griffin DSL is SQL-like, case insensitive, and easy to learn.
        e.g. `source.age between 3 and 30, source.age between (3, 30)`
 - **like**: like clause like sql.  
        e.g. `source.name like "%abc%"`
-- **logical factor**: 
+- **is null**: is null operator like sql.  
+       e.g. `source.desc is not null`
+- **is nan**: check if the value is not a number, the syntax like `is null`  
+       e.g. `source.age is not nan`
+- **logical factor**: math expression or logical expressions above or other 
logical expressions with brackets.  
+       e.g. `(source.user_id = target.user_id AND source.age > target.age)`
+- **unary logical expression**: unary logical operator with factor.  
+       e.g. `NOT source.has_data`
+- **binary logical expression**: logical factors with binary logical 
operators, including `and`, `or` and comparison operators.  
+       e.g. `source.age = target.age OR source.ticket = target.tck`
 
+
+### Expression
+- **expression**: logical expression and math expression.
+
+### Function
+- **argument**: expression.
+- **function**: function name with arguments between brackets.  
+       e.g. `max(source.age, target.age), count(*)`
+
+### Clause
+- **select clause**: 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/measure/derby.log
----------------------------------------------------------------------
diff --git a/measure/derby.log b/measure/derby.log
deleted file mode 100644
index 4b93055..0000000
--- a/measure/derby.log
+++ /dev/null
@@ -1,13 +0,0 @@
-----------------------------------------------------------------
-Fri Sep 29 15:53:18 CST 2017:
-Booting Derby version The Apache Software Foundation - Apache Derby - 
10.10.2.0 - (1582446): instance a816c00e-015e-cca0-1a8b-00000f890648 
-on database directory 
/private/var/folders/p0/462y3wrn4lv1fptxx5bwy7b839572r/T/spark-890ab6e2-ee56-4d73-8c6a-0dcce204322e/metastore
 with class loader sun.misc.Launcher$AppClassLoader@18b4aac2 
-Loaded from 
file:/Users/lliu13/.m2/repository/org/apache/derby/derby/10.10.2.0/derby-10.10.2.0.jar
-java.vendor=Oracle Corporation
-java.runtime.version=1.8.0_101-b13
-user.dir=/Users/lliu13/git/incubator-griffin/measure
-os.name=Mac OS X
-os.arch=x86_64
-os.version=10.12.6
-derby.system.home=null
-Database Class Loader started - derby.database.classpath=''

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
index 2a189d4..8199d80 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/GriffinDslAdaptor.scala
@@ -261,13 +261,19 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
         }
       }
       case ProfilingType => {
-        val sourceName = getNameOpt(details, ProfilingInfo._Source) match {
-          case Some(name) => name
-          case _ => dataSourceNames.head
+        val profilingClause = expr.asInstanceOf[ProfilingClause]
+        val sourceName = profilingClause.fromClauseOpt match {
+          case Some(fc) => fc.dataSource
+          case _ => {
+            getNameOpt(details, ProfilingInfo._Source) match {
+              case Some(name) => name
+              case _ => dataSourceNames.head
+            }
+          }
         }
-        val analyzer = ProfilingAnalyzer(expr.asInstanceOf[ProfilingClause], 
sourceName)
+        val analyzer = ProfilingAnalyzer(profilingClause, sourceName)
 
-        analyzer.selectionExprs.foreach(println)
+//        analyzer.selectionExprs.foreach(println)
 
         val selExprDescs = analyzer.selectionExprs.map { sel =>
           val alias = sel match {
@@ -284,6 +290,8 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
           (s"`${GroupByColumn.tmst}`" +: selExprDescs).mkString(", ")
         }
 
+        val fromClause = 
profilingClause.fromClauseOpt.getOrElse(FromClause(sourceName)).desc
+
 //        val tailClause = analyzer.tailsExprs.map(_.desc).mkString(" ")
         val tmstGroupbyClause = 
GroupbyClause(LiteralStringExpr(s"`${GroupByColumn.tmst}`") :: Nil, None)
         val mergedGroubbyClause = 
tmstGroupbyClause.merge(analyzer.groupbyExprOpt match {
@@ -300,7 +308,7 @@ case class GriffinDslAdaptor(dataSourceNames: Seq[String],
           // 1. select statement
           val profilingSql = {
 //            s"SELECT `${GroupByColumn.tmst}`, ${selClause} FROM 
${sourceName} ${tailClause} GROUP BY `${GroupByColumn.tmst}`"
-            s"SELECT ${selClause} FROM ${sourceName} ${preGroupbyClause} 
${groupbyClause} ${postGroupbyClause}"
+            s"SELECT ${selClause} ${fromClause} ${preGroupbyClause} 
${groupbyClause} ${postGroupbyClause}"
           }
           val profilingMetricName = resultName(details, 
ProfilingInfo._Profiling)
           val profilingStep = SparkSqlStep(

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
index 26882b4..c0986e1 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/expr/ClauseExpression.scala
@@ -26,7 +26,14 @@ case class SelectClause(exprs: Seq[Expr]) extends 
ClauseExpression {
   addChildren(exprs)
 
   def desc: String = s"${exprs.map(_.desc).mkString(", ")}"
-  def coalesceDesc: String = s"${exprs.map(_.desc).mkString(", ")}"
+  def coalesceDesc: String = desc
+
+}
+
+case class FromClause(dataSource: String) extends ClauseExpression {
+
+  def desc: String = s"FROM `${dataSource}`"
+  def coalesceDesc: String = desc
 
 }
 
@@ -107,44 +114,61 @@ case class LimitClause(expr: Expr) extends 
ClauseExpression {
   def coalesceDesc: String = s"LIMIT ${expr.coalesceDesc}"
 }
 
-case class CombinedClause(selectClause: SelectClause, tails: 
Seq[ClauseExpression]
+case class CombinedClause(selectClause: SelectClause, fromClauseOpt: 
Option[FromClause],
+                          tails: Seq[ClauseExpression]
                          ) extends ClauseExpression {
 
-  addChildren(selectClause +: tails)
+  addChildren({
+    val headClauses: Seq[ClauseExpression] = selectClause +: 
(fromClauseOpt.toSeq)
+    headClauses ++ tails
+  })
 
   def desc: String = {
-    tails.foldLeft(selectClause.desc) { (head, tail) =>
+    val selectDesc = s"SELECT ${selectClause.desc}"
+    val fromDesc = fromClauseOpt.map(_.desc).mkString(" ")
+    val headDesc = s"${selectDesc} ${fromDesc}"
+    tails.foldLeft(headDesc) { (head, tail) =>
       s"${head} ${tail.desc}"
     }
   }
   def coalesceDesc: String = {
-    tails.foldLeft(selectClause.coalesceDesc) { (head, tail) =>
+    val selectDesc = s"SELECT ${selectClause.coalesceDesc}"
+    val fromDesc = fromClauseOpt.map(_.coalesceDesc).mkString(" ")
+    val headDesc = s"${selectDesc} ${fromDesc}"
+    tails.foldLeft(headDesc) { (head, tail) =>
       s"${head} ${tail.coalesceDesc}"
     }
   }
 }
 
-case class ProfilingClause(selectClause: SelectClause, groupbyClauseOpt: 
Option[GroupbyClause],
+case class ProfilingClause(selectClause: SelectClause,
+                           fromClauseOpt: Option[FromClause],
+                           groupbyClauseOpt: Option[GroupbyClause],
                            preGroupbyClauses: Seq[ClauseExpression],
                            postGroupbyClauses: Seq[ClauseExpression]
                           ) extends ClauseExpression {
-  addChildren(groupbyClauseOpt match {
-    case Some(gc) => (selectClause +: preGroupbyClauses) ++ (gc +: 
postGroupbyClauses)
-    case _ => (selectClause +: preGroupbyClauses) ++ postGroupbyClauses
+  addChildren({
+    val headClauses: Seq[ClauseExpression] = selectClause +: 
(fromClauseOpt.toSeq)
+    groupbyClauseOpt match {
+      case Some(gc) => (headClauses ++ preGroupbyClauses) ++ (gc +: 
postGroupbyClauses)
+      case _ => (headClauses ++ preGroupbyClauses) ++ postGroupbyClauses
+    }
   })
 
   def desc: String = {
     val selectDesc = selectClause.desc
+    val fromDesc = fromClauseOpt.map(_.desc).mkString(" ")
     val groupbyDesc = groupbyClauseOpt.map(_.desc).mkString(" ")
     val preDesc = preGroupbyClauses.map(_.desc).mkString(" ")
     val postDesc = postGroupbyClauses.map(_.desc).mkString(" ")
-    s"${selectDesc} ${preDesc} ${groupbyDesc} ${postDesc}"
+    s"${selectDesc} ${fromDesc} ${preDesc} ${groupbyDesc} ${postDesc}"
   }
   def coalesceDesc: String = {
     val selectDesc = selectClause.coalesceDesc
+    val fromDesc = fromClauseOpt.map(_.coalesceDesc).mkString(" ")
     val groupbyDesc = groupbyClauseOpt.map(_.coalesceDesc).mkString(" ")
     val preDesc = preGroupbyClauses.map(_.coalesceDesc).mkString(" ")
     val postDesc = postGroupbyClauses.map(_.coalesceDesc).mkString(" ")
-    s"${selectDesc} ${preDesc} ${groupbyDesc} ${postDesc}"
+    s"${selectDesc} ${fromDesc} ${preDesc} ${groupbyDesc} ${postDesc}"
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
index 0431354..f55b1f8 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/BasicParser.scala
@@ -27,6 +27,14 @@ trait BasicParser extends JavaTokenParsers with Serializable 
{
   val dataSourceNames: Seq[String]
   val functionNames: Seq[String]
 
+  private def trim(str: String): String = {
+    val regex = """`(.*)`""".r
+    str match {
+      case regex(s) => s
+      case _ => str
+    }
+  }
+
   /**
     * BNF for basic parser
     *
@@ -118,6 +126,8 @@ trait BasicParser extends JavaTokenParsers with 
Serializable {
     val UQUOTE: Parser[String] = "`"
     val COMMA: Parser[String] = ","
 
+    val SELECT: Parser[String] = """(?i)select\s""".r
+    val FROM: Parser[String] = """(?i)from\s""".r
     val AS: Parser[String] = """(?i)as\s""".r
     val WHERE: Parser[String] = """(?i)where\s""".r
     val GROUP: Parser[String] = """(?i)group\s""".r
@@ -307,7 +317,8 @@ trait BasicParser extends JavaTokenParsers with 
Serializable {
     * <limit-clause> = <limit> <expr>
     */
 
-  def selectClause: Parser[SelectClause] = rep1sep(expression, COMMA) ^^ { 
SelectClause(_) }
+  def selectClause: Parser[SelectClause] = opt(SELECT) ~> rep1sep(expression, 
COMMA) ^^ { SelectClause(_) }
+  def fromClause: Parser[FromClause] = FROM ~> TableFieldName ^^ { ds => 
FromClause(trim(ds)) }
   def whereClause: Parser[WhereClause] = WHERE ~> expression ^^ { 
WhereClause(_) }
   def havingClause: Parser[Expr] = HAVING ~> expression
   def groupbyClause: Parser[GroupbyClause] = GROUP ~ BY ~ rep1sep(expression, 
COMMA) ~ opt(havingClause) ^^ {
@@ -323,14 +334,14 @@ trait BasicParser extends JavaTokenParsers with 
Serializable {
 
   /**
     * -- combined clauses --
-    * <combined-clauses> = <select-clause> [ <where-clause> ]+ [ 
<groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+
+    * <combined-clauses> = <select-clause> [ <from-clause> ]+ [ <where-clause> 
]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+
     */
 
-  def combinedClause: Parser[CombinedClause] = selectClause ~ opt(whereClause) 
~
+  def combinedClause: Parser[CombinedClause] = selectClause ~ opt(fromClause) 
~ opt(whereClause) ~
     opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ {
-    case sel ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
+    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
       val tails = Seq(whereOpt, groupbyOpt, orderbyOpt, limitOpt).flatMap(opt 
=> opt)
-      CombinedClause(sel, tails)
+      CombinedClause(sel, fromOpt, tails)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
----------------------------------------------------------------------
diff --git 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
index 637decb..0800f45 100644
--- 
a/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
+++ 
b/measure/src/main/scala/org/apache/griffin/measure/rule/dsl/parser/GriffinDslParser.scala
@@ -26,15 +26,15 @@ case class GriffinDslParser(dataSourceNames: Seq[String], 
functionNames: Seq[Str
 
   /**
     * -- profiling clauses --
-    * <profiling-clauses> = <select-clause> [ <where-clause> ]+ [ 
<groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> ]+
+    * <profiling-clauses> = <select-clause> [ <from-clause> ]+ [ 
<where-clause> ]+ [ <groupby-clause> ]+ [ <orderby-clause> ]+ [ <limit-clause> 
]+
     */
 
-  def profilingClause: Parser[ProfilingClause] = selectClause ~ 
opt(whereClause) ~
+  def profilingClause: Parser[ProfilingClause] = selectClause ~ 
opt(fromClause) ~ opt(whereClause) ~
     opt(groupbyClause) ~ opt(orderbyClause) ~ opt(limitClause) ^^ {
-    case sel ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
+    case sel ~ fromOpt ~ whereOpt ~ groupbyOpt ~ orderbyOpt ~ limitOpt => {
       val preClauses = Seq(whereOpt).flatMap(opt => opt)
       val postClauses = Seq(orderbyOpt, limitOpt).flatMap(opt => opt)
-      ProfilingClause(sel, groupbyOpt, preClauses, postClauses)
+      ProfilingClause(sel, fromOpt, groupbyOpt, preClauses, postClauses)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/417c931f/measure/src/test/resources/config-test-profiling.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/config-test-profiling.json 
b/measure/src/test/resources/config-test-profiling.json
index 187e88a..7c16f24 100644
--- a/measure/src/test/resources/config-test-profiling.json
+++ b/measure/src/test/resources/config-test-profiling.json
@@ -23,9 +23,8 @@
       {
         "dsl.type": "griffin-dsl",
         "dq.type": "profiling",
-        "rule": "user_id as id, user_id.approx_count_distinct() as cnt group 
by user_id order by cnt desc, id desc limit 3",
+        "rule": "select user_id as id, user_id.count() as cnt from source 
group by user_id order by cnt desc, id desc limit 3",
         "details": {
-          "source": "source",
           "profiling": {
             "name": "count",
             "persist.type": "metric"

Reply via email to