http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala new file mode 100644 index 0000000..6070189 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala @@ -0,0 +1,298 @@ +package org.apache.griffin.measure.batch.rule + +import org.apache.griffin.measure.batch.rule.expr._ + +import scala.util.parsing.combinator._ + +case class RuleParser() extends JavaTokenParsers with Serializable { + + /** + * BNF representation for grammar as below: + * + * <rule> ::= <logical-statement> [WHEN <logical-statement>] + * rule: mapping-rule [WHEN when-rule] + * - mapping-rule: the first level opr should better not be OR | NOT, otherwise it can't automatically find the groupby column + * - when-rule: only contain the general info of data source, not the special info of each data row + * + * <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")" + * logical-statement: return boolean value + * logical-operator: "AND" | "&&", "OR" | "||", "NOT" | "!" + * + * <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>) + * logical-expression example: $source.id = $target.id, $source.page_id IN ('3214', '4312', '60821') + * + * <compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">=" + * <range-opr> ::= ["NOT"] "IN" | "BETWEEN" + * <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")" + * range-expr example: ('3214', '4312', '60821'), (10, 15), () + * + * <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+ + * math-expr example: $source.price * $target.count, "hello" + " " + "world" + 123 + * + * <binary-opr> ::= "+" | "-" | "*" | "/" | "%" + * <unary-opr> ::= "+" | "-" + * + * <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")" + * + * <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+ + * selection example: $source.price, $source.json(), $source['state'], $source.numList[3], $target.json().mails['org' = 'apache'].names[*] + * + * <selection-head> ::= $source | $target + * + * <field-sel> ::= "." <field-string> + * + * <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")" + * <function-name> ::= <name-string> + * <arg> ::= <math-expr> + * + * <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]" + * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*" + * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * means all items, 'age' means item 'age' + * <index-field> ::= <index> | <field-quote> | <all-selection> + * index: 0 ~ n means position from start, -1 ~ -n means position from end + * <field-quote> ::= ' <field-string> ' | " <field-string> " + * + * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]" + * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">=" + * filter-sel example: ['name' = 'URL'], $source.man['age' > $source.graduate_age + 5 ] + * + * When <math-expr> in the selection, it mustn't contain the different <selection-head>, for example: + * $source.tags[1+2] valid + * $source.tags[$source.first] valid + * $source.tags[$target.first] invalid + * -- Such job is for validation, not for parser + * + * + * <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean> + * <literal-string> ::= <any-string> + * <literal-number> ::= <integer> | <double> + * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms") + * <literal-boolean> ::= true | false + * + */ + + object Keyword { + def WhenKeywords: Parser[String] = """(?i)when""".r + def UnaryLogicalKeywords: Parser[String] = """(?i)not""".r + def BinaryLogicalKeywords: Parser[String] = """(?i)and|or""".r + def RangeKeywords: Parser[String] = """(?i)(not\s+)?(in|between)""".r + def DataSourceKeywords: Parser[String] = """(?i)\$(source|target)""".r + def Keywords: Parser[String] = WhenKeywords | UnaryLogicalKeywords | BinaryLogicalKeywords | RangeKeywords | DataSourceKeywords + } + import Keyword._ + + object Operator { + def NotLogicalOpr: Parser[String] = """(?i)not""".r | "!" + def AndLogicalOpr: Parser[String] = """(?i)and""".r | "&&" + def OrLogicalOpr: Parser[String] = """(?i)or""".r | "||" + def CompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r + def RangeOpr: Parser[String] = RangeKeywords + + def UnaryMathOpr: Parser[String] = "+" | "-" + def BinaryMathOpr1: Parser[String] = "*" | "/" | "%" + def BinaryMathOpr2: Parser[String] = "+" | "-" + + def FilterCompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r + + def SqBracketPair: (Parser[String], Parser[String]) = ("[", "]") + def BracketPair: (Parser[String], Parser[String]) = ("(", ")") + def Dot: Parser[String] = "." + def AllSelection: Parser[String] = "*" + def SQuote: Parser[String] = "'" + def DQuote: Parser[String] = "\"" + def Comma: Parser[String] = "," + } + import Operator._ + + object SomeString { + def AnyString: Parser[String] = """[^'\"{}\[\]()=<>.$@,;+\-*/\\]*""".r + def SimpleFieldString: Parser[String] = """\w+""".r + def FieldString: Parser[String] = """[\w\s]+""".r + def NameString: Parser[String] = """[a-zA-Z_]\w*""".r + } + import SomeString._ + + object SomeNumber { + def IntegerNumber: Parser[String] = """[+\-]?\d+""".r + def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r + def IndexNumber: Parser[String] = IntegerNumber + } + import SomeNumber._ + + // -- literal -- + def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean + def literialString: Parser[LiteralStringExpr] = (SQuote ~> AnyString <~ SQuote | DQuote ~> AnyString <~ DQuote) ^^ { LiteralStringExpr(_) } + def literialNumber: Parser[LiteralNumberExpr] = (DoubleNumber | IntegerNumber) ^^ { LiteralNumberExpr(_) } + def literialTime: Parser[LiteralTimeExpr] = """(\d+(d|h|m|s|ms))+""".r ^^ { LiteralTimeExpr(_) } + def literialBoolean: Parser[LiteralBooleanExpr] = ("""(?i)true""".r | """(?i)false""".r) ^^ { LiteralBooleanExpr(_) } + + // -- selection -- + // <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+ + def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ^^ { + case head ~ selectors => SelectionExpr(head, selectors) + } + def selector: Parser[SelectExpr] = (fieldSelect | functionOperation | indexFieldRangeSelect | filterSelect) + + def selectionHead: Parser[SelectionHead] = DataSourceKeywords ^^ { SelectionHead(_) } + // <field-sel> ::= "." <field-string> + def fieldSelect: Parser[IndexFieldRangeSelectExpr] = Dot ~> SimpleFieldString ^^ { + case field => IndexFieldRangeSelectExpr(FieldDesc(field) :: Nil) + } + // <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")" + def functionOperation: Parser[FunctionOperationExpr] = Dot ~ NameString ~ BracketPair._1 ~ repsep(argument, Comma) ~ BracketPair._2 ^^ { + case _ ~ func ~ _ ~ args ~ _ => FunctionOperationExpr(func, args) + } + def argument: Parser[MathExpr] = mathExpr + // <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]" + def indexFieldRangeSelect: Parser[IndexFieldRangeSelectExpr] = SqBracketPair._1 ~> rep1sep(indexFieldRange, Comma) <~ SqBracketPair._2 ^^ { + case ifrs => IndexFieldRangeSelectExpr(ifrs) + } + // <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*" + def indexFieldRange: Parser[FieldDescOnly] = indexField | BracketPair._1 ~ indexField ~ Comma ~ indexField ~ BracketPair._2 ^^ { + case _ ~ if1 ~ _ ~ if2 ~ _ => FieldRangeDesc(if1, if2) + } + // <index-field> ::= <index> | <field-quote> | <all-selection> + // *here it can parse <math-expr>, but for simple situation, not supported now* + def indexField: Parser[FieldDescOnly] = IndexNumber ^^ { IndexDesc(_) } | fieldQuote | AllSelection ^^ { AllFieldsDesc(_) } + // <field-quote> ::= ' <field-string> ' | " <field-string> " + def fieldQuote: Parser[FieldDesc] = (SQuote ~> FieldString <~ SQuote | DQuote ~> FieldString <~ DQuote) ^^ { FieldDesc(_) } + // <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]" + def filterSelect: Parser[FilterSelectExpr] = SqBracketPair._1 ~> fieldQuote ~ FilterCompareOpr ~ mathExpr <~ SqBracketPair._2 ^^ { + case field ~ compare ~ value => FilterSelectExpr(field, compare, value) + } + + // -- math -- + // <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")" + def mathFactor: Parser[MathExpr] = (literal | selection | BracketPair._1 ~> mathExpr <~ BracketPair._2) ^^ { MathFactorExpr(_) } + // <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+ + // <unary-opr> ::= "+" | "-" + def unaryMathExpr: Parser[MathExpr] = rep(UnaryMathOpr) ~ mathFactor ^^ { + case Nil ~ a => a + case list ~ a => UnaryMathExpr(list, a) + } + // <binary-opr> ::= "+" | "-" | "*" | "/" | "%" + def binaryMathExpr1: Parser[MathExpr] = unaryMathExpr ~ rep(BinaryMathOpr1 ~ unaryMathExpr) ^^ { + case a ~ Nil => a + case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2))) + } + def binaryMathExpr2: Parser[MathExpr] = binaryMathExpr1 ~ rep(BinaryMathOpr2 ~ binaryMathExpr1) ^^ { + case a ~ Nil => a + case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2))) + } + def mathExpr: Parser[MathExpr] = binaryMathExpr2 + + // -- logical expression -- + // <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")" + def rangeExpr: Parser[RangeDesc] = BracketPair._1 ~> repsep(mathExpr, Comma) <~ BracketPair._2 ^^ { RangeDesc(_) } + // <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>) + def logicalExpr: Parser[LogicalExpr] = mathExpr ~ CompareOpr ~ mathExpr ^^ { + case left ~ opr ~ right => LogicalCompareExpr(left, opr, right) + } | mathExpr ~ RangeOpr ~ rangeExpr ^^ { + case left ~ opr ~ range => LogicalRangeExpr(left, opr, range) + } + + // -- logical statement -- + def logicalFactor: Parser[LogicalExpr] = logicalExpr | BracketPair._1 ~> logicalStatement <~ BracketPair._2 + def notLogicalStatement: Parser[LogicalExpr] = rep(NotLogicalOpr) ~ logicalFactor ^^ { + case Nil ~ a => a + case list ~ a => UnaryLogicalExpr(list, a) + } + def andLogicalStatement: Parser[LogicalExpr] = notLogicalStatement ~ rep(AndLogicalOpr ~ notLogicalStatement) ^^ { + case a ~ Nil => a + case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2))) + } + def orLogicalStatement: Parser[LogicalExpr] = andLogicalStatement ~ rep(OrLogicalOpr ~ andLogicalStatement) ^^ { + case a ~ Nil => a + case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2))) + } + // <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")" + def logicalStatement: Parser[LogicalExpr] = orLogicalStatement + + // -- rule -- + // <rule> ::= <logical-statement> [WHEN <logical-statement>] + def rule: Parser[StatementExpr] = logicalStatement ~ opt(WhenKeywords ~> logicalStatement) ^^ { + case ls ~ Some(ws) => WhenClauseStatementExpr(ls, ws) + case ls ~ _ => SimpleStatementExpr(ls) + } + + // for complie only +// case class NullStatementExpr(expression: String) extends StatementExpr { +// def genValue(values: Map[String, Any]): Option[Any] = None +// def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = Nil +// } +// def statementsExpr = mathExpr ^^ { NullStatementExpr(_) } + + +// +// // basic +// val anyString: Parser[String] = """[^'{}\[\]()=<>.$@;+\-*/\\\"]*""".r +// val variable: Parser[String] = """[a-zA-Z_]\w*""".r +// val number: Parser[String] = """[+\-]?\d+""".r +// val time: Parser[String] = """\d+(y|M|w|d|h|m|s|ms)""".r +// +// val numPosition: Parser[String] = """\d+""".r +// val anyPosition: Parser[String] = "*" +// +// val filterOpr: Parser[String] = "=" | "!=" | ">" | "<" | ">=" | "<=" +// +// val opr1: Parser[String] = "*" | "/" | "%" +// val opr2: Parser[String] = "+" | "-" +// +// val assignOpr: Parser[String] = "=" +// val compareOpr: Parser[String] = "==" | "!=" | ">" | "<" | ">=" | "<=" +// val mappingOpr: Parser[String] = "===" +// +// val exprSep: Parser[String] = ";" +// +// // simple +// def variableString: Parser[VariableExpr] = (("'" ~> anyString <~ "'") | ("\"" ~> anyString <~ "\"")) ^^ { VariableStringExpr(_) } +// def constString: Parser[ConstExpr] = (("'" ~> anyString <~ "'") | ("\"" ~> anyString <~ "\"")) ^^ { ConstStringExpr(_) } +// def constValue: Parser[ConstExpr] = time ^^ { ConstTimeExpr(_) } | number ^^ { ConstNumberExpr(_)} | constString +// def variableValue: Parser[VariableExpr] = variable ^^ { VariableStringExpr(_) } +// def quoteVariableValue: Parser[QuoteVariableExpr] = "${" ~> variable <~ "}" ^^ { QuoteVariableExpr(_) } +// def position: Parser[SelectExpr] = anyPosition ^^ { AnyPositionExpr(_) } | """\d+""".r ^^ { NumPositionExpr(_) } | (("'" ~> anyString <~ "'") | ("\"" ~> anyString <~ "\"")) ^^ { StringPositionExpr(_) } +// def argument: Parser[ConstExpr] = constValue +// def annotationExpr: Parser[AnnotationExpr] = "@" ~> variable ^^ { AnnotationExpr(_) } +// +// // selector +// def filterOpration: Parser[SelectExpr] = (variableString ~ filterOpr ~ constString) ^^ { +// case v ~ opr ~ c => FilterOprExpr(opr, v, c) +// } +// def positionExpr: Parser[SelectExpr] = "[" ~> (filterOpration | position) <~ "]" +// def functionExpr: Parser[SelectExpr] = "." ~ variable ~ "(" ~ repsep(argument, ",") ~ ")" ^^ { +// case _ ~ v ~ _ ~ args ~ _ => FunctionExpr(v, args) +// } +// def selectorExpr: Parser[SelectExpr] = positionExpr | functionExpr +// +// // data +// def selectorsExpr: Parser[DataExpr] = quoteVariableValue ~ rep(selectorExpr) ^^ { +// case q ~ tails => SelectionExpr(q, tails) +// } +// +// // calculation +// def factor: Parser[ElementExpr] = (constValue | selectorsExpr | "(" ~> expr <~ ")") ^^ { FactorExpr(_) } +// def term: Parser[ElementExpr] = factor ~ rep(opr1 ~ factor) ^^ { +// case a ~ Nil => a +// case a ~ list => CalculationExpr(a, list.map(c => (c._1, c._2))) +// } +// def expr: Parser[ElementExpr] = term ~ rep(opr2 ~ term) ^^ { +// case a ~ Nil => a +// case a ~ list => CalculationExpr(a, list.map(c => (c._1, c._2))) +// } +// +// // statement +// def assignExpr: Parser[StatementExpr] = variableValue ~ assignOpr ~ expr ^^ { +// case v ~ opr ~ c => AssignExpr(opr, v, c) +// } +// def conditionExpr: Parser[StatementExpr] = rep(annotationExpr) ~ expr ~ compareOpr ~ expr ^^ { +// case anos ~ le ~ opr ~ re => ConditionExpr(opr, le, re, anos) +// } +// def mappingExpr: Parser[StatementExpr] = rep(annotationExpr) ~ expr ~ mappingOpr ~ expr ^^ { +// case anos ~ le ~ opr ~ re => MappingExpr(opr, le, re, anos) +// } +// def statementExpr: Parser[StatementExpr] = assignExpr | conditionExpr | mappingExpr +// +// // statements +// def statementsExpr: Parser[StatementExpr] = repsep(statementExpr, exprSep) ^^ { StatementsExpr(_) } + +}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParserDescription.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParserDescription.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParserDescription.scala new file mode 100644 index 0000000..18810b3 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParserDescription.scala @@ -0,0 +1,84 @@ +/** + * BNF representation for grammar as below: + * + * <rule> ::= <logical-statement> [WHEN <logical-statement>] + * rule: mapping-rule [WHEN when-rule] + * - mapping-rule: the first level opr should better not be OR | NOT, otherwise it can't automatically find the groupby column + * - when-rule: only contain the general info of data source, not the special info of each data row + * + * <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")" + * logical-statement: return boolean value + * logical-operator: "AND" | "&&", "OR" | "||", "NOT" | "!" + * + * <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>) + * logical-expression example: $source.id = $target.id, $source.page_id IN ('3214', '4312', '60821') + * + * <compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">=" + * <range-opr> ::= ["NOT"] "IN" | "BETWEEN" + * <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")" + * range-expr example: ('3214', '4312', '60821'), (10, 15), () + * + * <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+ + * math-expr example: $source.price * $target.count, "hello" + " " + "world" + 123 + * + * <binary-opr> ::= "+" | "-" | "*" | "/" | "%" + * <unary-opr> ::= "+" | "-" + * + * <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")" + * + * <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+ + * selection example: $source.price, $source.json(), $source['state'], $source.numList[3], $target.json().mails['org' = 'apache'].names[*] + * + * <selection-head> ::= $source | $target + * + * <field-sel> ::= "." <field-string> + * + * <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")" + * <function-name> ::= <name-string> + * <arg> ::= <math-expr> + * + * <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]" + * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*" + * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * means all items, 'age' means item 'age' + * <index-field> ::= <index> | <field-quote> | <all-selection> + * index: 0 ~ n means position from start, -1 ~ -n means position from end + * <field-quote> ::= ' <field-string> ' | " <field-string> " + * + * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]" + * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">=" + * filter-sel example: ['name' = 'URL'], $source.man['age' > $source.graduate_age + 5 ] + * + * When <math-expr> in the selection, it mustn't contain the different <selection-head>, for example: + * $source.tags[1+2] valid + * $source.tags[$source.first] valid + * $source.tags[$target.first] invalid + * -- Such job is for validation, not for parser + * + * + * <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean> + * <literal-string> ::= <any-string> + * <literal-number> ::= <integer> | <double> + * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms") + * <literal-boolean> ::= true | false + * + */ + + +/** + * BACK_UP + * + * <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]" + * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*" + * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * means all items, 'age' means item 'age' + * <index-field> ::= <index> | <field-quote> | <math-result> + * <field-quote> ::= ' <any-string> ' | \" <any-string> \" + * <index> ::= <integer> + * index: 0 ~ n means position from start, -1 ~ -n means position from end + * <math-result> ::= "${" <math-expr> "}" + * + * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <filter-value> "]" + * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">=" + * <filter-value> ::= <literal> | <math-result> + * filter-sel example: ['name' = 'URL'], $source.man['age' > ${ $source.graduate_age + 5 }] + * + */ http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala new file mode 100644 index 0000000..0d54707 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala @@ -0,0 +1,7 @@ +package org.apache.griffin.measure.batch.rule.expr + + +trait AnalyzableExpr extends Serializable { + def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = Nil + def getWhenClauseExpr(): Option[LogicalExpr] = None +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala new file mode 100644 index 0000000..e062376 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala @@ -0,0 +1,15 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait Cacheable extends DataSourceable { + protected def cacheUnit: Boolean = false + def cacheable(ds: String): Boolean = { + cacheUnit && !conflict() && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && contains(ds))) + } + protected def getCacheExprs(ds: String): Iterable[Cacheable] + + protected def persistUnit: Boolean = false + def persistable(ds: String): Boolean = { + persistUnit && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && contains(ds))) + } + protected def getPersistExprs(ds: String): Iterable[Cacheable] +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala new file mode 100644 index 0000000..8018c19 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala @@ -0,0 +1,7 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait Calculatable extends Serializable { + + def calculate(values: Map[String, Any]): Option[Any] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala new file mode 100644 index 0000000..f18798a --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala @@ -0,0 +1,10 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait DataSourceable extends Serializable { + val dataSources: Set[String] + protected def conflict(): Boolean = dataSources.size > 1 + def contains(ds: String): Boolean = dataSources.contains(ds) + def dataSourceOpt: Option[String] = { + if (dataSources.size == 1) Some(dataSources.head) else None + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala new file mode 100644 index 0000000..38758a2 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala @@ -0,0 +1,15 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait Describable extends Serializable { + + val desc: String + + protected def describe(v: Any): String = { + v match { + case s: Describable => s"${s.desc}" + case s: String => s"'${s}'" + case a => s"${a}" + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala new file mode 100644 index 0000000..d7810aa --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala @@ -0,0 +1,33 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait Expr extends Serializable with Describable with Cacheable with Calculatable { + + protected val _defaultId: String = ExprIdCounter.emptyId + + val _id = ExprIdCounter.genId(_defaultId) + + protected def getSubCacheExprs(ds: String): Iterable[Expr] = Nil + final def getCacheExprs(ds: String): Iterable[Expr] = { + if (cacheable(ds)) getSubCacheExprs(ds).toList :+ this else getSubCacheExprs(ds) + } + + protected def getSubFinalCacheExprs(ds: String): Iterable[Expr] = Nil + final def getFinalCacheExprs(ds: String): Iterable[Expr] = { + if (cacheable(ds)) Nil :+ this else getSubFinalCacheExprs(ds) + } + + protected def getSubPersistExprs(ds: String): Iterable[Expr] = Nil + final def getPersistExprs(ds: String): Iterable[Expr] = { + if (persistable(ds)) getSubPersistExprs(ds).toList :+ this else getSubPersistExprs(ds) + } + + final def calculate(values: Map[String, Any]): Option[Any] = { + values.get(_id) match { + case Some(v) => Some(v) + case _ => calculateOnly(values) + } + } + protected def calculateOnly(values: Map[String, Any]): Option[Any] + +} + http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala new file mode 100644 index 0000000..0bb5085 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala @@ -0,0 +1,22 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait ExprDescOnly extends Describable { + +} + + +case class SelectionHead(expr: String) extends ExprDescOnly { + private val headRegex = """\$(\w+)""".r + val head: String = expr match { + case headRegex(v) => v.toLowerCase + case _ => expr + } + val desc: String = "$" + head +} + +case class RangeDesc(elements: Iterable[MathExpr]) extends ExprDescOnly { + val desc: String = { + val rangeDesc = elements.map(_.desc).mkString(", ") + s"(${rangeDesc})" + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala new file mode 100644 index 0000000..56e7daa --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala @@ -0,0 +1,42 @@ +package org.apache.griffin.measure.batch.rule.expr + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable.{Set => MutableSet} + +object ExprIdCounter { + + private val idCounter: AtomicLong = new AtomicLong(0L) + + private val existIdSet: MutableSet[String] = MutableSet.empty[String] + + private val invalidIdRegex = """^\d+$""".r + + val emptyId: String = "" + + def genId(defaultId: String): String = { + defaultId match { + case emptyId => increment.toString + case invalidIdRegex() => increment.toString +// case defId if (exist(defId)) => s"${increment}#${defId}" + case defId if (exist(defId)) => s"${defId}" + case _ => { + insertUserId(defaultId) + defaultId + } + } + } + + private def exist(id: String): Boolean = { + existIdSet.contains(id) + } + + private def insertUserId(id: String): Unit = { + existIdSet += id + } + + private def increment(): Long = { + idCounter.incrementAndGet() + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala new file mode 100644 index 0000000..f13f15a --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala @@ -0,0 +1,40 @@ +package org.apache.griffin.measure.batch.rule.expr + +import scala.util.{Success, Try} + +trait FieldDescOnly extends Describable with DataSourceable { + +} + +case class IndexDesc(expr: String) extends FieldDescOnly { + val index: Int = { + Try(expr.toInt) match { + case Success(v) => v + case _ => throw new Exception(s"${expr} is invalid index") + } + } + val desc: String = describe(index) + val dataSources: Set[String] = Set.empty[String] +} + +case class FieldDesc(expr: String) extends FieldDescOnly { + val field: String = expr + val desc: String = describe(field) + val dataSources: Set[String] = Set.empty[String] +} + +case class AllFieldsDesc(expr: String) extends FieldDescOnly { + val allFields: String = expr + val desc: String = allFields + val dataSources: Set[String] = Set.empty[String] +} + +case class FieldRangeDesc(startField: FieldDescOnly, endField: FieldDescOnly) extends FieldDescOnly { + val desc: String = { + (startField, endField) match { + case (f1: IndexDesc, f2: IndexDesc) => s"(${f1.desc}, ${f2.desc})" + case _ => throw new Exception("invalid field range description") + } + } + val dataSources: Set[String] = Set.empty[String] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala new file mode 100644 index 0000000..020ddc2 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala @@ -0,0 +1,68 @@ +package org.apache.griffin.measure.batch.rule.expr + +import scala.util.{Failure, Success, Try} + +trait LiteralExpr extends Expr { + val value: Option[Any] + def calculateOnly(values: Map[String, Any]): Option[Any] = value + val dataSources: Set[String] = Set.empty[String] +} + +case class LiteralStringExpr(expr: String) extends LiteralExpr { + val value: Option[String] = Some(expr) + val desc: String = value.getOrElse("") +} + +case class LiteralNumberExpr(expr: String) extends LiteralExpr { + val value: Option[Any] = { + if (expr.contains(".")) { + Try (expr.toDouble) match { + case Success(v) => Some(v) + case _ => throw new Exception(s"${expr} is invalid number") + } + } else { + Try (expr.toLong) match { + case Success(v) => Some(v) + case _ => throw new Exception(s"${expr} is invalid number") + } + } + } + val desc: String = value.getOrElse("").toString +} + +case class LiteralTimeExpr(expr: String) extends LiteralExpr { + final val TimeRegex = """(\d+)(d|h|m|s|ms)""".r + val value: Option[Long] = { + Try { + expr match { + case TimeRegex(time, unit) => { + val t = time.toLong + unit match { + case "d" => t * 24 * 60 * 60 * 1000 + case "h" => t * 60 * 60 * 1000 + case "m" => t * 60 * 1000 + case "s" => t * 1000 + case "ms" => t + case _ => throw new Exception(s"${expr} is invalid time format") + } + } + case _ => throw new Exception(s"${expr} is invalid time format") + } + } match { + case Success(v) => Some(v) + case Failure(ex) => throw ex + } + } + val desc: String = expr +} + +case class LiteralBooleanExpr(expr: String) extends LiteralExpr { + final val TrueRegex = """(?i)true""".r + final val FalseRegex = """(?i)false""".r + val value: Option[Boolean] = expr match { + case TrueRegex() => Some(true) + case FalseRegex() => Some(false) + case _ => throw new Exception(s"${expr} is invalid boolean") + } + val desc: String = value.getOrElse("").toString +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala new file mode 100644 index 0000000..74e80e3 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala @@ -0,0 +1,149 @@ +package org.apache.griffin.measure.batch.rule.expr + +import org.apache.griffin.measure.batch.utils.CalculationUtil._ + +trait LogicalExpr extends Expr with AnalyzableExpr { + override def cacheUnit: Boolean = true +} + +case class LogicalCompareExpr(left: MathExpr, compare: String, right: MathExpr) extends LogicalExpr { + private val (eqOpr, neqOpr, btOpr, bteOpr, ltOpr, lteOpr) = ("""==?""".r, """!==?""".r, ">", ">=", "<", "<=") + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val (lv, rv) = (left.calculate(values), right.calculate(values)) + compare match { + case this.eqOpr() => lv === rv + case this.neqOpr() => lv =!= rv + case this.btOpr => lv > rv + case this.bteOpr => lv >= rv + case this.ltOpr => lv < rv + case this.lteOpr => lv <= rv + case _ => None + } + } + val desc: String = s"${left.desc} ${compare} ${right.desc}" + val dataSources: Set[String] = left.dataSources ++ right.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + left.getCacheExprs(ds) ++ right.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + left.getFinalCacheExprs(ds) ++ right.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + left.getPersistExprs(ds) ++ right.getPersistExprs(ds) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = { + if (compare == "=" || compare == "==") { + (left.dataSourceOpt, right.dataSourceOpt) match { + case (Some(dsPair._1), Some(dsPair._2)) => (left, right) :: Nil + case (Some(dsPair._2), Some(dsPair._1)) => (right, left) :: Nil + case _ => Nil + } + } else Nil + } +} + +case class LogicalRangeExpr(left: MathExpr, rangeOpr: String, range: RangeDesc) extends LogicalExpr { + private val (inOpr, ninOpr, btwnOpr, nbtwnOpr) = ("""(?i)in""".r, """(?i)not\s+in""".r, """(?i)between""".r, """(?i)not\s+between""".r) + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val (lv, rvs) = (left.calculate(values), range.elements.map(_.calculate(values))) + rangeOpr match { + case this.inOpr() => lv in rvs + case this.ninOpr() => lv not_in rvs + case this.btwnOpr() => lv between rvs + case this.nbtwnOpr() => lv not_between rvs + case _ => None + } + } + val desc: String = s"${left.desc} ${rangeOpr} ${range.desc}" + val dataSources: Set[String] = left.dataSources ++ range.elements.flatMap(_.dataSources).toSet + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + left.getCacheExprs(ds) ++ range.elements.flatMap(_.getCacheExprs(ds)) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + left.getFinalCacheExprs(ds) ++ range.elements.flatMap(_.getFinalCacheExprs(ds)) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + left.getPersistExprs(ds) ++ range.elements.flatMap(_.getPersistExprs(ds)) + } +} + +// -- logical statement -- +//case class LogicalFactorExpr(self: LogicalExpr) extends LogicalExpr { +// def calculate(values: Map[String, Any]): Option[Any] = self.calculate(values) +// val desc: String = self.desc +//} + +case class UnaryLogicalExpr(oprList: Iterable[String], factor: LogicalExpr) extends LogicalExpr { + private val notOpr = """(?i)not|!""".r + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val fv = factor.calculate(values) + oprList.foldRight(fv) { (opr, v) => + opr match { + case this.notOpr() => !v + case _ => None + } + } + } + val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev}${ex}" } + val dataSources: Set[String] = factor.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + factor.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + factor.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + factor.getPersistExprs(ds) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = { + val notOprList = oprList.filter { opr => + opr match { + case this.notOpr() => true + case _ => false + } + } + if (notOprList.size % 2 == 0) factor.getGroupbyExprPairs(dsPair) else Nil + } +} + +case class BinaryLogicalExpr(first: LogicalExpr, others: Iterable[(String, LogicalExpr)]) extends LogicalExpr { + private val (andOpr, orOpr) = ("""(?i)and|&&""".r, """(?i)or|\|\|""".r) + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val fv = first.calculate(values) + others.foldLeft(fv) { (v, pair) => + val (opr, next) = pair + val nv = next.calculate(values) + opr match { + case this.andOpr() => v && nv + case this.orOpr() => v || nv + case _ => None + } + } + } + val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" } + val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds)) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds)) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds)) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = { + if (others.isEmpty) first.getGroupbyExprPairs(dsPair) + else { + val isAnd = others.exists(_._1 match { + case this.andOpr() => true + case _ => false + }) + if (isAnd) { + first.getGroupbyExprPairs(dsPair) ++ others.flatMap(_._2.getGroupbyExprPairs(dsPair)) + } else Nil + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala new file mode 100644 index 0000000..db09a0c --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala @@ -0,0 +1,79 @@ +package org.apache.griffin.measure.batch.rule.expr + +import org.apache.griffin.measure.batch.utils.CalculationUtil._ + +trait MathExpr extends Expr { + +} + +case class MathFactorExpr(self: Expr) extends MathExpr { + def calculateOnly(values: Map[String, Any]): Option[Any] = self.calculate(values) + val desc: String = self.desc + val dataSources: Set[String] = self.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + self.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + self.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + self.getPersistExprs(ds) + } +} + +case class UnaryMathExpr(oprList: Iterable[String], factor: Expr) extends MathExpr { + private val (posOpr, negOpr) = ("+", "-") + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val fv = factor.calculate(values) + oprList.foldRight(fv) { (opr, v) => + opr match { + case this.posOpr => v + case this.negOpr => -v + case _ => None + } + } + } + val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev}${ex}" } + val dataSources: Set[String] = factor.dataSources + override def cacheUnit: Boolean = true + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + factor.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + factor.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + factor.getPersistExprs(ds) + } +} + +case class BinaryMathExpr(first: MathExpr, others: Iterable[(String, MathExpr)]) extends MathExpr { + private val (addOpr, subOpr, mulOpr, divOpr, modOpr) = ("+", "-", "*", "/", "%") + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val fv = first.calculate(values) + others.foldLeft(fv) { (v, pair) => + val (opr, next) = pair + val nv = next.calculate(values) + opr match { + case this.addOpr => v + nv + case this.subOpr => v - nv + case this.mulOpr => v * nv + case this.divOpr => v / nv + case this.modOpr => v % nv + case _ => None + } + } + } + val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" } + val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet + override def cacheUnit: Boolean = true + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds)) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds)) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala new file mode 100644 index 0000000..52ebe21 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala @@ -0,0 +1,53 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait SelectExpr extends Expr { + def calculateOnly(values: Map[String, Any]): Option[Any] = None +} + +case class IndexFieldRangeSelectExpr(fields: Iterable[FieldDescOnly]) extends SelectExpr { + val desc: String = s"[${fields.map(_.desc).mkString(",")}]" + val dataSources: Set[String] = Set.empty[String] +} + +case class FunctionOperationExpr(func: String, args: Iterable[MathExpr]) extends SelectExpr { + val desc: String = s".${func}(${args.map(_.desc).mkString(",")})" + val dataSources: Set[String] = args.flatMap(_.dataSources).toSet + override def getSubCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getCacheExprs(ds)) + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getFinalCacheExprs(ds)) + override def getSubPersistExprs(ds: String): Iterable[Expr] = args.flatMap(_.getPersistExprs(ds)) +} + +case class FilterSelectExpr(field: FieldDesc, compare: String, value: MathExpr) extends SelectExpr { + val desc: String = s"[${field.desc}${compare}${value.desc}]" + val dataSources: Set[String] = value.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = value.getCacheExprs(ds) + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = value.getFinalCacheExprs(ds) + override def getSubPersistExprs(ds: String): Iterable[Expr] = value.getPersistExprs(ds) +} + +// -- selection -- +case class SelectionExpr(head: SelectionHead, selectors: Iterable[SelectExpr]) extends Expr { + def calculateOnly(values: Map[String, Any]): Option[Any] = values.get(_id) + + val desc: String = { + val argsString = selectors.map(_.desc).mkString("") + s"${head.desc}${argsString}" + } + val dataSources: Set[String] = { + val selectorDataSources = selectors.flatMap(_.dataSources).toSet + selectorDataSources + head.head + } + + override def cacheUnit: Boolean = true + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + selectors.flatMap(_.getCacheExprs(ds)) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + selectors.flatMap(_.getFinalCacheExprs(ds)) + } + + override def persistUnit: Boolean = true + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + selectors.flatMap(_.getPersistExprs(ds)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala new file mode 100644 index 0000000..3ae5139 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala @@ -0,0 +1,50 @@ +package org.apache.griffin.measure.batch.rule.expr + + +trait StatementExpr extends Expr with AnalyzableExpr { + def valid(values: Map[String, Any]): Boolean = true + override def cacheUnit: Boolean = true +} + +case class SimpleStatementExpr(expr: LogicalExpr) extends StatementExpr { + def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values) + val desc: String = expr.desc + val dataSources: Set[String] = expr.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + expr.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + expr.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + expr.getPersistExprs(ds) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = expr.getGroupbyExprPairs(dsPair) +} + +case class WhenClauseStatementExpr(expr: LogicalExpr, whenExpr: LogicalExpr) extends StatementExpr { + def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values) + val desc: String = s"${expr.desc} when ${whenExpr.desc}" + + override def valid(values: Map[String, Any]): Boolean = { + whenExpr.calculate(values) match { + case Some(r: Boolean) => r + case _ => false + } + } + + val dataSources: Set[String] = expr.dataSources ++ whenExpr.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + expr.getCacheExprs(ds) ++ whenExpr.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + expr.getFinalCacheExprs(ds) ++ whenExpr.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + expr.getPersistExprs(ds) ++ whenExpr.getPersistExprs(ds) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = expr.getGroupbyExprPairs(dsPair) + override def getWhenClauseExpr(): Option[LogicalExpr] = Some(whenExpr) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/AnnotationExpr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/AnnotationExpr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/AnnotationExpr.scala new file mode 100644 index 0000000..2152bf9 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/AnnotationExpr.scala @@ -0,0 +1,16 @@ +package org.apache.griffin.measure.batch.rule.expr_old + +case class AnnotationExpr(expression: String) extends Expr with Calculatable { + + val Key = """^(?i)Key$""".r + + def isKey: Boolean = { + expression match { + case Key() => true + case _ => false + } + } + + def genValue(values: Map[String, Any]): Option[String] = Some(expression) + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Calculatable.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Calculatable.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Calculatable.scala new file mode 100644 index 0000000..9b8a577 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Calculatable.scala @@ -0,0 +1,7 @@ +package org.apache.griffin.measure.batch.rule.expr_old + +trait Calculatable extends Serializable { + + def genValue(values: Map[String, Any]): Option[Any] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ConstExpr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ConstExpr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ConstExpr.scala new file mode 100644 index 0000000..c8e623b --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ConstExpr.scala @@ -0,0 +1,59 @@ +package org.apache.griffin.measure.batch.rule.expr_old + +import scala.util.{Success, Try} + +trait ConstExpr extends Expr with Calculatable { + + val value: Any + +} + + +case class ConstStringExpr(expression: String) extends ConstExpr { + + val value: String = expression + def genValue(values: Map[String, Any]): Option[String] = Some(value) + +} + +case class ConstTimeExpr(expression: String) extends ConstExpr { + + val TimeRegex = """(\d+)(y|M|w|d|h|m|s|ms)""".r + + val value: Long = expression match { + case TimeRegex(time, unit) => { + val t = time.toLong + val r = unit match { + case "y" => t * 365 * 30 * 24 * 60 * 60 * 1000 + case "M" => t * 30 * 24 * 60 * 60 * 1000 + case "w" => t * 7 * 24 * 60 * 60 * 1000 + case "d" => t * 24 * 60 * 60 * 1000 + case "h" => t * 60 * 60 * 1000 + case "m" => t * 60 * 1000 + case "s" => t * 1000 + case "ms" => t + case _ => t + } + r + } + case _ => 0L + } + + def genValue(values: Map[String, Any]): Option[Long] = Some(value) + +} + +case class ConstNumberExpr(expression: String) extends ConstExpr { + + val value: Long = { + Try { + expression.toLong + } match { + case Success(v) => v + case _ => 0L + } + } + + def genValue(values: Map[String, Any]): Option[Long] = Some(value) + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/DataExpr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/DataExpr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/DataExpr.scala new file mode 100644 index 0000000..e0f24ef --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/DataExpr.scala @@ -0,0 +1,27 @@ +package org.apache.griffin.measure.batch.rule.expr_old + +trait DataExpr extends Expr with Calculatable with Recordable { + + def head: QuoteVariableExpr + def args: Iterable[SelectExpr] + +} + + +case class SelectionExpr(head: QuoteVariableExpr, args: Iterable[SelectExpr]) extends { + + val recordName = { + val argsString = args.map(_.recordName).mkString("") + s"${head.recordName}${argsString}" + } + + override protected val _defaultId = recordName + +} with DataExpr { + + val expression: String = "" + + def genValue(values: Map[String, Any]): Option[Any] = values.get(_id) + +} + http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ElementExpr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ElementExpr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ElementExpr.scala new file mode 100644 index 0000000..999c1ee --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ElementExpr.scala @@ -0,0 +1,52 @@ +package org.apache.griffin.measure.batch.rule.expr_old + +import org.apache.griffin.measure.batch.utils.CalculationUtil._ + +trait ElementExpr extends Expr with ExprAnalyzable with Calculatable { + +} + + +// self: const | selection | calculation +case class FactorExpr(self: Expr with Calculatable) extends ElementExpr { + + val expression: String = self.expression + + def genValue(values: Map[String, Any]): Option[Any] = self.genValue(values) + + def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = { + self match { + case expr: DataExpr => if (expr.head.name == dataSign) expr :: Nil else Nil + case expr: CalculationExpr => expr.getDataRelatedExprs(dataSign) + case _ => Nil + } + } + +} + +case class CalculationExpr(first: ElementExpr, others: Iterable[(String, ElementExpr)]) extends ElementExpr { + + val expression: String = others.foldLeft(first.expression) { (ex, next) => s"${ex} ${next._1} ${next._2}" } + + def genValue(values: Map[String, Any]): Option[Any] = { + others.foldLeft(first.genValue(values)) { (v, next) => + val (opr, ele) = next + val eleValue = ele.genValue(values) + opr match { + case "+" => v + eleValue + case "-" => v - eleValue + case "*" => v * eleValue + case "/" => v / eleValue + case "%" => v % eleValue + case _ => v + } + } + } + + def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = { + others.foldLeft(first.getDataRelatedExprs(dataSign)) { (origin, next) => + origin ++ next._2.getDataRelatedExprs(dataSign) + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Expr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Expr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Expr.scala new file mode 100644 index 0000000..17adf16 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Expr.scala @@ -0,0 +1,11 @@ +package org.apache.griffin.measure.batch.rule.expr_old + +trait Expr extends Serializable { + + val expression: String + + protected val _defaultId: String = "" + + val _id = ExprIdCounter.genId(_defaultId) + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprAnalyzable.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprAnalyzable.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprAnalyzable.scala new file mode 100644 index 0000000..29c47cd --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprAnalyzable.scala @@ -0,0 +1,8 @@ +package org.apache.griffin.measure.batch.rule.expr_old + + +trait ExprAnalyzable extends Serializable { + + def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprIdCounter.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprIdCounter.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprIdCounter.scala new file mode 100644 index 0000000..c24f617 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/ExprIdCounter.scala @@ -0,0 +1,39 @@ +package org.apache.griffin.measure.batch.rule.expr_old + +import java.util.concurrent.atomic.AtomicLong +import scala.collection.mutable.{Set => MutableSet} + +object ExprIdCounter { + + private val idCounter: AtomicLong = new AtomicLong(0L) + + private val existIdSet: MutableSet[String] = MutableSet.empty[String] + + private val invalidIdRegex = """^\d+$""".r + + def genId(defaultId: String): String = { + defaultId match { + case "" => increment.toString + case invalidIdRegex() => increment.toString +// case defId if (exist(defId)) => s"${increment}#${defId}" + case defId if (exist(defId)) => s"${defId}" + case _ => { + insertUserId(defaultId) + defaultId + } + } + } + + private def exist(id: String): Boolean = { + existIdSet.contains(id) + } + + private def insertUserId(id: String): Unit = { + existIdSet += id + } + + private def increment(): Long = { + idCounter.incrementAndGet() + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Recordable.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Recordable.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Recordable.scala new file mode 100644 index 0000000..27e5a48 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/Recordable.scala @@ -0,0 +1,15 @@ +package org.apache.griffin.measure.batch.rule.expr_old + + +trait Recordable extends Serializable { + + val recordName: String + + protected def value2RecordString(v: Any): String = { + v match { + case s: String => s"'${s}'" + case a => s"${a}" + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/SelectExpr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/SelectExpr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/SelectExpr.scala new file mode 100644 index 0000000..4dae38d --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/SelectExpr.scala @@ -0,0 +1,48 @@ +package org.apache.griffin.measure.batch.rule.expr_old + +trait SelectExpr extends Expr with Recordable { + +} + + +case class NumPositionExpr(expression: String) extends SelectExpr { + + val index: Int = expression.toInt + + val recordName = s"[${value2RecordString(index)}]" + +} + +case class StringPositionExpr(expression: String) extends SelectExpr { + + val field: String = expression + + val recordName = s"[${value2RecordString(field)}]" + +} + +case class AnyPositionExpr(expression: String) extends SelectExpr { + + val recordName = s"[${expression}]" + +} + +case class FilterOprExpr(expression: String, left: VariableExpr, right: ConstExpr) extends SelectExpr { + + val field: String = left.name + val value: Any = right.value + + val recordName = { + s"[${value2RecordString(field)}${expression}${value2RecordString(value)}]" + } + +} + +case class FunctionExpr(expression: String, args: Iterable[ConstExpr]) extends SelectExpr { + + val recordName = { + val argsStr = args.map(value2RecordString(_)).mkString(",") + s".${expression}(${argsStr})" + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementAnalyzable.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementAnalyzable.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementAnalyzable.scala new file mode 100644 index 0000000..57968e9 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementAnalyzable.scala @@ -0,0 +1,14 @@ +package org.apache.griffin.measure.batch.rule.expr_old + + +trait StatementAnalyzable extends Serializable { + + def getAssigns(): Iterable[AssignExpr] = Nil + + def getConditions(): Iterable[ConditionExpr] = Nil + + def getMappings(): Iterable[MappingExpr] = Nil + + def getKeyMappings(): Iterable[MappingExpr] = Nil + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementExpr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementExpr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementExpr.scala new file mode 100644 index 0000000..6669f83 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/StatementExpr.scala @@ -0,0 +1,85 @@ +package org.apache.griffin.measure.batch.rule.expr_old + +trait StatementExpr extends Expr with StatementAnalyzable with ExprAnalyzable with Calculatable { + +} + +case class AssignExpr(expression: String, left: VariableExpr, right: ElementExpr) extends StatementExpr { + + def genValue(values: Map[String, Any]): Option[Any] = right.genValue(values) + + def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = right.getDataRelatedExprs(dataSign) + + override def getAssigns(): Iterable[AssignExpr] = this :: Nil + +} + +case class ConditionExpr(expression: String, left: ElementExpr, right: ElementExpr, annotations: Iterable[AnnotationExpr]) extends StatementExpr { + + def genValue(values: Map[String, Any]): Option[Boolean] = { + Some(true) // fixme: not done, need calculation + } + + def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = { + left.getDataRelatedExprs(dataSign) ++ right.getDataRelatedExprs(dataSign) + } + + override def getConditions(): Iterable[ConditionExpr] = this :: Nil + +} + +case class MappingExpr(expression: String, left: ElementExpr, right: ElementExpr, annotations: Iterable[AnnotationExpr]) extends StatementExpr { + + def isKey: Boolean = { + annotations.exists { e => + e match { + case a: AnnotationExpr => a.isKey + case _ => false + } + } + } + + def genValue(values: Map[String, Any]): Option[Boolean] = { + (left.genValue(values), right.genValue(values)) match { + case (Some(v1), Some(v2)) => Some(v1 == v2) + case (None, None) => Some(true) + case _ => Some(false) + } + } + + def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = { + left.getDataRelatedExprs(dataSign) ++ right.getDataRelatedExprs(dataSign) + } + + override def getMappings(): Iterable[MappingExpr] = this :: Nil + override def getKeyMappings(): Iterable[MappingExpr] = if (isKey) this :: Nil else Nil + +} + +case class StatementsExpr(statements: Iterable[StatementExpr]) extends StatementExpr { + + val expression: String = statements.map(_.expression).mkString("\n") + + def genValue(values: Map[String, Any]): Option[Any] = None + + def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = { + statements.flatMap(_.getDataRelatedExprs(dataSign)) + } + + override def getAssigns(): Iterable[AssignExpr] = { + statements.flatMap(_.getAssigns()) + } + + override def getConditions(): Iterable[ConditionExpr] = { + statements.flatMap(_.getConditions()) + } + + override def getMappings(): Iterable[MappingExpr] = { + statements.flatMap(_.getMappings()) + } + + override def getKeyMappings(): Iterable[MappingExpr] = { + statements.flatMap(_.getKeyMappings()) + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/VariableExpr.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/VariableExpr.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/VariableExpr.scala new file mode 100644 index 0000000..87424fc --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr_old/VariableExpr.scala @@ -0,0 +1,26 @@ +package org.apache.griffin.measure.batch.rule.expr_old + +trait VariableExpr extends Expr with Recordable { + + val name: String + +} + + +case class VariableStringExpr(expression: String) extends VariableExpr { + + val name = expression + + val recordName = name + +} + +case class QuoteVariableExpr(expression: String) extends VariableExpr with Calculatable { + + val name = expression + + val recordName = name + + def genValue(values: Map[String, Any]): Option[Any] = values.get(name) + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala new file mode 100644 index 0000000..2d10fb4 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/CalculationUtil.scala @@ -0,0 +1,259 @@ +package org.apache.griffin.measure.batch.utils + +import scala.util.{Success, Try} + + +object CalculationUtil { + + implicit def option2CalculationValue(v: Option[_]): CalculationValue = CalculationValue(v) + + case class CalculationValue(value: Option[_]) extends Serializable { + + def + (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString) + case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 + v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble) + case (None, Some(v2)) => other + case _ => value + } + } match { + case Success(opt) => opt + case _ => value + } + } + + def - (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 - v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 - v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 - v2.toString.toDouble) + case _ => value + } + } match { + case Success(opt) => opt + case _ => value + } + } + + def * (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2) + case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt) + case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 * v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 * v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 * v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 * v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 * v2.toString.toDouble) + case _ => value + } + } match { + case Success(opt) => opt + case _ => value + } + } + + def / (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 / v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 / v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 / v2.toString.toDouble) + case _ => value + } + } match { + case Success(opt) => opt + case _ => value + } + } + + def % (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 % v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 % v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 % v2.toString.toDouble) + case _ => value + } + } match { + case Success(opt) => opt + case _ => value + } + } + + def unary_- (): Option[_] = { + value match { + case Some(v: String) => Some(v.reverse.toString) + case Some(v: Boolean) => Some(!v) + case Some(v: Byte) => Some(-v) + case Some(v: Short) => Some(-v) + case Some(v: Int) => Some(-v) + case Some(v: Long) => Some(-v) + case Some(v: Float) => Some(-v) + case Some(v: Double) => Some(-v) + case Some(v) => Some(v) + case _ => None + } + } + + + def === (other: Option[_]): Option[Boolean] = { + (value, other) match { + case (Some(v1), Some(v2)) => Some(v1 == v2) + case _ => Some(false) + } + } + + def =!= (other: Option[_]): Option[Boolean] = { + (value, other) match { + case (Some(v1), Some(v2)) => Some(v1 != v2) + case _ => Some(true) + } + } + + def > (other: Option[_]): Option[Boolean] = { + Try { + (value, other) match { + case (Some(v1: String), Some(v2: String)) => Some(v1 > v2) + case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Int), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Long), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Float), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Double), Some(v2)) => Some(v1 > v2.toString.toDouble) + case _ => Some(false) + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def >= (other: Option[_]): Option[Boolean] = { + Try { + (value, other) match { + case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2) + case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Int), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Long), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Float), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Double), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case _ => Some(false) + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def < (other: Option[_]): Option[Boolean] = { + Try { + (value, other) match { + case (Some(v1: String), Some(v2: String)) => Some(v1 < v2) + case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Int), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Long), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Float), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Double), Some(v2)) => Some(v1 < v2.toString.toDouble) + case _ => Some(false) + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def <= (other: Option[_]): Option[Boolean] = { + Try { + (value, other) match { + case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2) + case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Int), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Long), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Float), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Double), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case _ => Some(false) + } + } match { + case Success(opt) => opt + case _ => None + } + } + + + def in (other: Iterable[Option[_]]): Option[Boolean] = { + Some(other.foldLeft(false) { (res, next) => + res || ===(next).getOrElse(false) + }) + } + + def not_in (other: Iterable[Option[_]]): Option[Boolean] = { + Some(other.foldLeft(true) { (res, next) => + res && =!=(next).getOrElse(false) + }) + } + + def between (other: Iterable[Option[_]]): Option[Boolean] = { + if (other.size < 2) None else { + val (begin, end) = (other.head, other.tail.head) + (>=(begin), <=(end)) match { + case (Some(b1), Some(b2)) => Some(b1 && b2) + case _ => None + } + } + } + + def not_between (other: Iterable[Option[_]]): Option[Boolean] = { + if (other.size < 2) None else { + val (begin, end) = (other.head, other.tail.head) + (<(begin), >(end)) match { + case (Some(b1), Some(b2)) => Some(b1 || b2) + case _ => None + } + } + } + + def unary_! (): Option[Boolean] = { + value match { + case Some(v: Boolean) => Some(!v) + case Some(v) => Some(false) + case _ => None + } + } + + def && (other: Option[_]): Option[Boolean] = { + (value, other) match { + case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2) + case _ => None + } + } + + def || (other: Option[_]): Option[Boolean] = { + (value, other) match { + case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2) + case _ => None + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala new file mode 100644 index 0000000..b48478a --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HdfsUtil.scala @@ -0,0 +1,62 @@ +package org.apache.griffin.measure.batch.utils + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, FileSystem, Path} + +object HdfsUtil { + + private val seprator = "/" + + private val conf = new Configuration() + conf.set("dfs.support.append", "true") + + private val dfs = FileSystem.get(conf) + + def existPath(filePath: String): Boolean = { + val path = new Path(filePath) + dfs.exists(path) + } + + def createFile(filePath: String): FSDataOutputStream = { + val path = new Path(filePath) + if (dfs.exists(path)) dfs.delete(path, true) + return dfs.create(path) + } + + def appendOrCreateFile(filePath: String): FSDataOutputStream = { + val path = new Path(filePath) + if (dfs.exists(path)) dfs.append(path) else createFile(filePath) + } + + def openFile(filePath: String): FSDataInputStream = { + val path = new Path(filePath) + dfs.open(path) + } + + def writeContent(filePath: String, message: String): Unit = { + val out = createFile(filePath) + out.write(message.getBytes("utf-8")) + out.close + } + + def appendContent(filePath: String, message: String): Unit = { + val out = appendOrCreateFile(filePath) + out.write(message.getBytes("utf-8")) + out.close + } + + def createEmptyFile(filePath: String): Unit = { + val out = createFile(filePath) + out.close + } + + + def getHdfsFilePath(parentPath: String, fileName: String): String = { + if (parentPath.endsWith(seprator)) parentPath + fileName else parentPath + seprator + fileName + } + + def deleteHdfsPath(dirPath: String): Unit = { + val path = new Path(dirPath) + if (dfs.exists(path)) dfs.delete(path, true) + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala new file mode 100644 index 0000000..747d0fa --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/HttpUtil.scala @@ -0,0 +1,30 @@ +package org.apache.griffin.measure.batch.utils + +import scalaj.http._ + +object HttpUtil { + + val GET_REGEX = """^(?i)get$""".r + val POST_REGEX = """^(?i)post$""".r + val PUT_REGEX = """^(?i)put$""".r + val DELETE_REGEX = """^(?i)delete$""".r + + def postData(url: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = { + val response = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)).postData(data).asString + response.code.toString + } + + def httpRequest(url: String, method: String, params: Map[String, Object], headers: Map[String, Object], data: String): String = { + val httpReq = Http(url).params(convertObjMap2StrMap(params)).headers(convertObjMap2StrMap(headers)) + method match { + case POST_REGEX() => httpReq.postData(data).asString.code.toString + case PUT_REGEX() => httpReq.put(data).asString.code.toString + case _ => "wrong method" + } + } + + private def convertObjMap2StrMap(map: Map[String, Object]): Map[String, String] = { + map.map(pair => pair._1 -> pair._2.toString) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala new file mode 100644 index 0000000..cdd470a --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/JsonUtil.scala @@ -0,0 +1,32 @@ +package org.apache.griffin.measure.batch.utils + +import java.io.InputStream + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import scala.reflect._ + +object JsonUtil { + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + + def toJson(value: Map[Symbol, Any]): String = { + toJson(value map { case (k,v) => k.name -> v}) + } + + def toJson(value: Any): String = { + mapper.writeValueAsString(value) + } + + def toMap[V](json:String)(implicit m: Manifest[V]) = fromJson[Map[String,V]](json) + + def fromJson[T: ClassTag](json: String)(implicit m : Manifest[T]): T = { + mapper.readValue[T](json, classTag[T].runtimeClass.asInstanceOf[Class[T]]) + } + + def fromJson[T: ClassTag](is: InputStream)(implicit m : Manifest[T]): T = { + mapper.readValue[T](is, classTag[T].runtimeClass.asInstanceOf[Class[T]]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala new file mode 100644 index 0000000..7f2b355 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/main/scala/org/apache/griffin/measure/batch/utils/StringParseUtil.scala @@ -0,0 +1,10 @@ +package org.apache.griffin.measure.batch.utils + +object StringParseUtil { + + def sepStrings(str: String, sep: String): Iterable[String] = { + val strings = str.split(sep) + strings.map(_.trim) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/a0117811/griffin-measure/griffin-measure-batch/src/test/resources/config.json ---------------------------------------------------------------------- diff --git a/griffin-measure/griffin-measure-batch/src/test/resources/config.json b/griffin-measure/griffin-measure-batch/src/test/resources/config.json new file mode 100644 index 0000000..65e0ed9 --- /dev/null +++ b/griffin-measure/griffin-measure-batch/src/test/resources/config.json @@ -0,0 +1,25 @@ +{ + "name": "accu1", + "type": "accuracy", + + "source": { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_src.avro" + } + }, + + "target": { + "type": "avro", + "version": "1.7", + "config": { + "file.name": "src/test/resources/users_info_target.avro" + } + }, + + "evaluateRule": { + "sampleRatio": 1, + "rules": "$source.user_id > 10020 AND $source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code" + } +} \ No newline at end of file
