http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala new file mode 100644 index 0000000..c969012 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/CalculationUtil.scala @@ -0,0 +1,315 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule + +import scala.util.{Success, Try} + + +object CalculationUtil { + + implicit def option2CalculationValue(v: Option[_]): CalculationValue = CalculationValue(v) + + // redefine the calculation method of operators in DSL + case class CalculationValue(value: Option[_]) extends Serializable { + + def + (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (None, _) | (_, None) => None + case (Some(null), _) | (_, Some(null)) => None + case (Some(v1: String), Some(v2)) => Some(v1 + v2.toString) + case (Some(v1: Byte), Some(v2)) => Some(v1 + v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 + v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 + v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 + v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 + v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 + v2.toString.toDouble) + case _ => value + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def - (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (None, _) | (_, None) => None + case (Some(null), _) | (_, Some(null)) => None + case (Some(v1: Byte), Some(v2)) => Some(v1 - v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 - v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 - v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 - v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 - v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 - v2.toString.toDouble) + case _ => value + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def * (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (None, _) | (_, None) => None + case (Some(null), _) | (_, Some(null)) => None + case (Some(s1: String), Some(n2: Int)) => Some(s1 * n2) + case (Some(s1: String), Some(n2: Long)) => Some(s1 * n2.toInt) + case (Some(v1: Byte), Some(v2)) => Some(v1 * v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 * v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 * v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 * v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 * v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 * v2.toString.toDouble) + case _ => value + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def / (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (None, _) | (_, None) => None + case (Some(null), _) | (_, Some(null)) => None + case (Some(v1: Byte), Some(v2)) => Some(v1 / v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 / v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 / v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 / v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 / v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 / v2.toString.toDouble) + case _ => value + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def % (other: Option[_]): Option[_] = { + Try { + (value, other) match { + case (None, _) | (_, None) => None + case (Some(null), _) | (_, Some(null)) => None + case (Some(v1: Byte), Some(v2)) => Some(v1 % v2.toString.toByte) + case (Some(v1: Short), Some(v2)) => Some(v1 % v2.toString.toShort) + case (Some(v1: Int), Some(v2)) => Some(v1 % v2.toString.toInt) + case (Some(v1: Long), Some(v2)) => Some(v1 % v2.toString.toLong) + case (Some(v1: Float), Some(v2)) => Some(v1 % v2.toString.toFloat) + case (Some(v1: Double), Some(v2)) => Some(v1 % v2.toString.toDouble) + case _ => value + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def unary_- (): Option[_] = { + value match { + case None => None + case Some(null) => None + case Some(v: String) => Some(v.reverse.toString) + case Some(v: Boolean) => Some(!v) + case Some(v: Byte) => Some(-v) + case Some(v: Short) => Some(-v) + case Some(v: Int) => Some(-v) + case Some(v: Long) => Some(-v) + case Some(v: Float) => Some(-v) + case Some(v: Double) => Some(-v) + case Some(v) => Some(v) + case _ => None + } + } + + + def === (other: Option[_]): Option[Boolean] = { + (value, other) match { + case (None, None) => Some(true) + case (Some(v1), Some(v2)) => Some(v1 == v2) + case _ => Some(false) + } + } + + def =!= (other: Option[_]): Option[Boolean] = { + (value, other) match { + case (None, None) => Some(false) + case (Some(v1), Some(v2)) => Some(v1 != v2) + case _ => Some(true) + } + } + + def > (other: Option[_]): Option[Boolean] = { + Try { + (value, other) match { + case (None, _) | (_, None) => None + case (Some(null), _) | (_, Some(null)) => None + case (Some(v1: String), Some(v2: String)) => Some(v1 > v2) + case (Some(v1: Byte), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Short), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Int), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Long), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Float), Some(v2)) => Some(v1 > v2.toString.toDouble) + case (Some(v1: Double), Some(v2)) => Some(v1 > v2.toString.toDouble) + case _ => None + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def >= (other: Option[_]): Option[Boolean] = { + Try { + (value, other) match { + case (None, None) | (Some(null), Some(null)) => Some(true) + case (None, _) | (_, None) => None + case (Some(null), _) | (_, Some(null)) => None + case (Some(v1: String), Some(v2: String)) => Some(v1 >= v2) + case (Some(v1: Byte), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Short), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Int), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Long), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Float), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case (Some(v1: Double), Some(v2)) => Some(v1 >= v2.toString.toDouble) + case _ => None + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def < (other: Option[_]): Option[Boolean] = { + Try { + (value, other) match { + case (None, _) | (_, None) => None + case (Some(null), _) | (_, Some(null)) => None + case (Some(v1: String), Some(v2: String)) => Some(v1 < v2) + case (Some(v1: Byte), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Short), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Int), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Long), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Float), Some(v2)) => Some(v1 < v2.toString.toDouble) + case (Some(v1: Double), Some(v2)) => Some(v1 < v2.toString.toDouble) + case _ => None + } + } match { + case Success(opt) => opt + case _ => None + } + } + + def <= (other: Option[_]): Option[Boolean] = { + Try { + (value, other) match { + case (None, None) | (Some(null), Some(null)) => Some(true) + case (None, _) | (_, None) => None + case (Some(null), _) | (_, Some(null)) => None + case (Some(v1: String), Some(v2: String)) => Some(v1 <= v2) + case (Some(v1: Byte), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Short), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Int), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Long), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Float), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case (Some(v1: Double), Some(v2)) => Some(v1 <= v2.toString.toDouble) + case _ => None + } + } match { + case Success(opt) => opt + case _ => None + } + } + + + def in (other: Iterable[Option[_]]): Option[Boolean] = { + other.foldLeft(Some(false): Option[Boolean]) { (res, next) => + optOr(res, ===(next)) + } + } + + def not_in (other: Iterable[Option[_]]): Option[Boolean] = { + other.foldLeft(Some(true): Option[Boolean]) { (res, next) => + optAnd(res, =!=(next)) + } + } + + def between (other: Iterable[Option[_]]): Option[Boolean] = { + if (other.size < 2) None else { + val (begin, end) = (other.head, other.tail.head) + if (begin.isEmpty && end.isEmpty) Some(value.isEmpty) + else optAnd(>=(begin), <=(end)) + } + } + + def not_between (other: Iterable[Option[_]]): Option[Boolean] = { + if (other.size < 2) None else { + val (begin, end) = (other.head, other.tail.head) + if (begin.isEmpty && end.isEmpty) Some(value.nonEmpty) + else optOr(<(begin), >(end)) + } + } + + def unary_! (): Option[Boolean] = { + optNot(value) + } + + def && (other: Option[_]): Option[Boolean] = { + optAnd(value, other) + } + + def || (other: Option[_]): Option[Boolean] = { + optOr(value, other) + } + + + private def optNot(a: Option[_]): Option[Boolean] = { + a match { + case None => None + case Some(null) => None + case Some(v: Boolean) => Some(!v) + case _ => None + } + } + private def optAnd(a: Option[_], b: Option[_]): Option[Boolean] = { + (a, b) match { + case (None, _) | (_, None) => None + case (Some(null), _) | (_, Some(null)) => None + case (Some(false), _) | (_, Some(false)) => Some(false) + case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 && b2) + case _ => None + } + } + private def optOr(a: Option[_], b: Option[_]): Option[Boolean] = { + (a, b) match { + case (None, _) | (_, None) => None + case (Some(null), _) | (_, Some(null)) => None + case (Some(true), _) | (_, Some(true)) => Some(true) + case (Some(b1: Boolean), Some(b2: Boolean)) => Some(b1 || b2) + case _ => None + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala new file mode 100644 index 0000000..9d027ec --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/DataTypeCalculationUtil.scala @@ -0,0 +1,159 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule + +import org.apache.spark.sql.types._ + +object DataTypeCalculationUtil { + + implicit def dataType2CalculationType(tp: DataType): CalculationType = CalculationType(tp) + + case class CalculationType(tp: DataType) extends Serializable { + def binaryOpr (other: DataType): DataType = { + (tp, other) match { + case (NullType, _) | (_, NullType) => NullType + case (t, _) => t + } + } + def unaryOpr (): DataType = { + tp + } + } + + case class DataTypeException() extends Exception {} + + def getDataType(value: Any): DataType = { + value match { + case v: String => StringType + case v: Boolean => BooleanType + case v: Long => LongType + case v: Int => IntegerType + case v: Short => ShortType + case v: Byte => ByteType + case v: Double => DoubleType + case v: Float => FloatType + case v: Map[_, _] => MapType(getSameDataType(v.keys), getSameDataType(v.values)) + case v: Iterable[_] => ArrayType(getSameDataType(v)) + case _ => NullType + } + } + + private def getSameDataType(values: Iterable[Any]): DataType = { + values.foldLeft(NullType: DataType)((a, c) => genericTypeOf(a, getDataType(c))) + } + + private def genericTypeOf(dt1: DataType, dt2: DataType): DataType = { + if (dt1 == dt2) dt1 else { + dt1 match { + case NullType => dt2 + case StringType => dt1 + case DoubleType => { + dt2 match { + case StringType => dt2 + case DoubleType | FloatType | LongType | IntegerType | ShortType | ByteType => dt1 + case _ => throw DataTypeException() + } + } + case FloatType => { + dt2 match { + case StringType | DoubleType => dt2 + case FloatType | LongType | IntegerType | ShortType | ByteType => dt1 + case _ => throw DataTypeException() + } + } + case LongType => { + dt2 match { + case StringType | DoubleType | FloatType => dt2 + case LongType | IntegerType | ShortType | ByteType => dt1 + case _ => throw DataTypeException() + } + } + case IntegerType => { + dt2 match { + case StringType | DoubleType | FloatType | LongType => dt2 + case IntegerType | ShortType | ByteType => dt1 + case _ => throw DataTypeException() + } + } + case ShortType => { + dt2 match { + case StringType | DoubleType | FloatType | LongType | IntegerType => dt2 + case ShortType | ByteType => dt1 + case _ => throw DataTypeException() + } + } + case ByteType => { + dt2 match { + case StringType | DoubleType | FloatType | LongType | IntegerType | ShortType => dt2 + case ByteType => dt1 + case _ => throw DataTypeException() + } + } + case BooleanType => { + dt2 match { + case StringType => dt2 + case BooleanType => dt1 + case _ => throw DataTypeException() + } + } + case MapType(kdt1, vdt1, _) => { + dt2 match { + case MapType(kdt2, vdt2, _) => MapType(genericTypeOf(kdt1, kdt2), genericTypeOf(vdt1, vdt2)) + case _ => throw DataTypeException() + } + } + case ArrayType(vdt1, _) => { + dt2 match { + case ArrayType(vdt2, _) => ArrayType(genericTypeOf(vdt1, vdt2)) + case _ => throw DataTypeException() + } + } + case _ => throw DataTypeException() + } + } + } + + def sequenceDataTypeMap(aggr: Map[String, DataType], value: Map[String, Any]): Map[String, DataType] = { + val dataTypes = value.foldLeft(Map[String, DataType]()) { (map, pair) => + val (k, v) = pair + try { + map + (k -> getDataType(v)) + } catch { + case e: DataTypeException => map + } + } + combineDataTypeMap(aggr, dataTypes) + } + + def combineDataTypeMap(aggr1: Map[String, DataType], aggr2: Map[String, DataType]): Map[String, DataType] = { + aggr2.foldLeft(aggr1) { (a, c) => + a.get(c._1) match { + case Some(t) => { + try { + a + (c._1 -> genericTypeOf(t, c._2)) + } catch { + case e: DataTypeException => a + } + } + case _ => a + c + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala new file mode 100644 index 0000000..940d0cb --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/ExprValueUtil.scala @@ -0,0 +1,263 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule + +import org.apache.griffin.measure.rule.expr._ +import org.apache.griffin.measure.rule.func._ +import org.apache.spark.sql.Row + +import scala.util.{Success, Try} + +object ExprValueUtil { + + private def append(path: List[String], step: String): List[String] = { + path :+ step + } + + private def value2Map(key: String, value: Option[Any]): Map[String, Any] = { + value.flatMap(v => Some((key -> v))).toMap + } + + private def getSingleValue(data: Option[Any], desc: FieldDescOnly): Option[Any] = { + data match { + case Some(row: Row) => { + desc match { + case i: IndexDesc => try { Some(row.getAs[Any](i.index)) } catch { case _ => None } + case f: FieldDesc => try { Some(row.getAs[Any](f.field)) } catch { case _ => None } + case _ => None + } + } + case Some(d: Map[String, Any]) => { + desc match { + case f: FieldDesc => d.get(f.field) + case _ => None + } + } + case Some(d: Seq[Any]) => { + desc match { + case i: IndexDesc => if (i.index >= 0 && i.index < d.size) Some(d(i.index)) else None + case _ => None + } + } + } + } + + private def calcExprValues(pathDatas: List[(List[String], Option[Any])], expr: Expr, existExprValueMap: Map[String, Any]): List[(List[String], Option[Any])] = { + Try { + expr match { + case selection: SelectionExpr => { + selection.selectors.foldLeft(pathDatas) { (pds, selector) => + calcExprValues(pds, selector, existExprValueMap) + } + } + case selector: IndexFieldRangeSelectExpr => { + pathDatas.flatMap { pathData => + val (path, data) = pathData + data match { + case Some(row: Row) => { + selector.fields.flatMap { field => + field match { + case (_: IndexDesc) | (_: FieldDesc) => { + getSingleValue(data, field).map { v => (append(path, field.desc), Some(v)) } + } + case a: AllFieldsDesc => { + (0 until row.size).flatMap { i => + getSingleValue(data, IndexDesc(i.toString)).map { v => + (append(path, s"${a.desc}_${i}"), Some(v)) + } + }.toList + } + case r: FieldRangeDesc => { + (r.startField, r.endField) match { + case (si: IndexDesc, ei: IndexDesc) => { + (si.index to ei.index).flatMap { i => + (append(path, s"${r.desc}_${i}"), getSingleValue(data, IndexDesc(i.toString))) + getSingleValue(data, IndexDesc(i.toString)).map { v => + (append(path, s"${r.desc}_${i}"), Some(v)) + } + }.toList + } + case _ => Nil + } + } + case _ => Nil + } + } + } + case Some(d: Map[String, Any]) => { + selector.fields.flatMap { field => + field match { + case (_: IndexDesc) | (_: FieldDesc) => { + getSingleValue(data, field).map { v => (append(path, field.desc), Some(v)) } + } + case a: AllFieldsDesc => { + d.keySet.flatMap { k => + getSingleValue(data, FieldDesc(k)).map { v => + (append(path, s"${a.desc}_${k}"), Some(v)) + } + } + } + case _ => None + } + } + } + case Some(d: Seq[Any]) => { + selector.fields.flatMap { field => + field match { + case (_: IndexDesc) | (_: FieldDesc) => { + getSingleValue(data, field).map { v => (append(path, field.desc), Some(v)) } + } + case a: AllFieldsDesc => { + (0 until d.size).flatMap { i => + (append(path, s"${a.desc}_${i}"), getSingleValue(data, IndexDesc(i.toString))) + getSingleValue(data, IndexDesc(i.toString)).map { v => + (append(path, s"${a.desc}_${i}"), Some(v)) + } + }.toList + } + case r: FieldRangeDesc => { + (r.startField, r.endField) match { + case (si: IndexDesc, ei: IndexDesc) => { + (si.index to ei.index).flatMap { i => + (append(path, s"${r.desc}_${i}"), getSingleValue(data, IndexDesc(i.toString))) + getSingleValue(data, IndexDesc(i.toString)).map { v => + (append(path, s"${r.desc}_${i}"), Some(v)) + } + }.toList + } + case _ => None + } + } + case _ => None + } + } + } + } + } + } + case selector: FunctionOperationExpr => { + val args: Array[Option[Any]] = selector.args.map { arg => + arg.calculate(existExprValueMap) + }.toArray + pathDatas.flatMap { pathData => + val (path, data) = pathData + data match { + case Some(d: String) => { + val res = FunctionUtil.invoke(selector.func, Some(d) +: args) + val residx = res.zipWithIndex + residx.map { vi => + val (v, i) = vi + val step = if (i == 0) s"${selector.desc}" else s"${selector.desc}_${i}" + (append(path, step), v) + } + } + case _ => None + } + } + } + case selector: FilterSelectExpr => { // fileter means select the items fit the condition + pathDatas.flatMap { pathData => + val (path, data) = pathData + data match { + case Some(row: Row) => { + // right value could not be selection + val rmap = value2Map(selector.value._id, selector.value.calculate(existExprValueMap)) + (0 until row.size).flatMap { i => + val dt = getSingleValue(data, IndexDesc(i.toString)) + val lmap = value2Map(selector.fieldKey, getSingleValue(dt, selector.field)) + val partValueMap = lmap ++ rmap + selector.calculate(partValueMap) match { + case Some(true) => Some((append(path, s"${selector.desc}_${i}"), dt)) + case _ => None + } + } + } + case Some(d: Map[String, Any]) => { + val rmap = value2Map(selector.value._id, selector.value.calculate(existExprValueMap)) + d.keySet.flatMap { k => + val dt = getSingleValue(data, FieldDesc(k)) + val lmap = value2Map(selector.fieldKey, getSingleValue(dt, selector.field)) + val partValueMap = lmap ++ rmap + selector.calculate(partValueMap) match { + case Some(true) => Some((append(path, s"${selector.desc}_${k}"), dt)) + case _ => None + } + } + } + case Some(d: Seq[Any]) => { + val rmap = value2Map(selector.value._id, selector.value.calculate(existExprValueMap)) + (0 until d.size).flatMap { i => + val dt = getSingleValue(data, IndexDesc(i.toString)) + val lmap = value2Map(selector.fieldKey, getSingleValue(dt, selector.field)) + val partValueMap = lmap ++ rmap + selector.calculate(partValueMap) match { + case Some(true) => Some((append(path, s"${selector.desc}_${i}"), dt)) + case _ => None + } + } + } + } + } + } + case _ => { + (expr.desc :: Nil, expr.calculate(existExprValueMap)) :: Nil + } + } + } match { + case Success(v) => v + case _ => Nil + } + } + + private def calcExprsValues(data: Option[Any], exprs: Iterable[Expr], existExprValueMap: Map[String, Any]): List[Map[String, Any]] = { + val selectionValues: Map[String, List[(List[String], Any)]] = exprs.map { expr => + (expr._id, calcExprValues((Nil, data) :: Nil, expr, existExprValueMap).flatMap { pair => + pair._2 match { + case Some(v) => Some((pair._1, v)) + case _ => None + } + }) + }.toMap + // if exprs is empty, return an empty value map for each row + if (selectionValues.isEmpty) List(Map[String, Any]()) + else SchemaValueCombineUtil.cartesian(selectionValues) + } + + // try to calculate some exprs from data and initExprValueMap, generate a new expression value map + // depends on origin data and existed expr value map + def genExprValueMaps(data: Option[Any], exprs: Iterable[Expr], initExprValueMap: Map[String, Any]): List[Map[String, Any]] = { + val (selections, nonSelections) = exprs.partition(_.isInstanceOf[SelectionExpr]) + val valueMaps = calcExprsValues(data, selections, initExprValueMap) + updateExprValueMaps(nonSelections, valueMaps) + } + + // with exprValueMap, calculate expressions, update the expression value map + // only depends on existed expr value map, only calculation, not need origin data + def updateExprValueMaps(exprs: Iterable[Expr], exprValueMaps: List[Map[String, Any]]): List[Map[String, Any]] = { + exprValueMaps.map { valueMap => + exprs.foldLeft(valueMap) { (em, expr) => + expr.calculate(em) match { + case Some(v) => em + (expr._id -> v) + case _ => em + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala new file mode 100644 index 0000000..5ec143f --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleAnalyzer.scala @@ -0,0 +1,72 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule + +import org.apache.griffin.measure.rule.expr._ + +case class RuleAnalyzer(rule: StatementExpr) extends Serializable { + + val constData = "" + private val SourceData = "source" + private val TargetData = "target" + + val constCacheExprs: Iterable[Expr] = rule.getCacheExprs(constData) + private val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData) + private val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData) + + private val sourcePersistExprs: Iterable[Expr] = rule.getPersistExprs(SourceData) + private val targetPersistExprs: Iterable[Expr] = rule.getPersistExprs(TargetData) + + val constFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(constData).toSet + private val sourceFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(SourceData).toSet ++ sourcePersistExprs.toSet + private val targetFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(TargetData).toSet ++ targetPersistExprs.toSet + + private val groupbyExprPairs: Seq[(Expr, Expr)] = rule.getGroupbyExprPairs((SourceData, TargetData)) + private val sourceGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._1) + private val targetGroupbyExprs: Seq[Expr] = groupbyExprPairs.map(_._2) + + val sourceRuleExprs: RuleExprs = RuleExprs(sourceGroupbyExprs, sourceCacheExprs, + sourceFinalCacheExprs, sourcePersistExprs) + val targetRuleExprs: RuleExprs = RuleExprs(targetGroupbyExprs, targetCacheExprs, + targetFinalCacheExprs, targetPersistExprs) + +} + + +// for a single data source +// groupbyExprs: in accuracy case, these exprs could be groupby exprs +// Data keys for accuracy case, generated by the equal statements, to improve the calculation efficiency +// cacheExprs: the exprs value could be caculated independently, and cached for later use +// Cached for the finalCacheExprs calculation, it has some redundant values, saving it wastes a lot +// finalCacheExprs: the root of cachedExprs, cached for later use, plus with persistExprs +// Cached for the calculation usage, and can be saved for the re-calculation in streaming mode +// persistExprs: the expr values should be persisted, only the direct selection exprs are persistable +// Persisted for record usage, to record the missing data, need be readable as raw data +case class RuleExprs(groupbyExprs: Seq[Expr], + cacheExprs: Iterable[Expr], + finalCacheExprs: Iterable[Expr], + persistExprs: Iterable[Expr] + ) extends Serializable { + // for example: for a rule "$source.name = $target.name AND $source.age < $target.age + (3 * 4)" + // in this rule, for the target data source, the targetRuleExprs looks like below + // groupbyExprs: $target.name + // cacheExprs: $target.name, $target.age, $target.age + (3 * 4) + // finalCacheExprs: $target.name, $target.age + (3 * 4), $target.age + // persistExprs: $target.name, $target.age +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala new file mode 100644 index 0000000..bbaf5cb --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleFactory.scala @@ -0,0 +1,52 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule + +import org.apache.griffin.measure.config.params.user._ + +import scala.util.Failure +//import org.apache.griffin.measure.rule.expr_old._ +import org.apache.griffin.measure.rule.expr._ + +import scala.util.{Success, Try} + + +case class RuleFactory(evaluateRuleParam: EvaluateRuleParam) { + + val ruleParser: RuleParser = RuleParser() + + def generateRule(): StatementExpr = { + val rules = evaluateRuleParam.rules + val statement = parseExpr(rules) match { + case Success(se) => se + case Failure(ex) => throw ex + } + statement + } + + private def parseExpr(rules: String): Try[StatementExpr] = { + Try { + val result = ruleParser.parseAll(ruleParser.rule, rules) + if (result.successful) result.get + else throw new Exception("parse rule error!") +// throw new Exception("parse rule error!") + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala new file mode 100644 index 0000000..55d9f45 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/RuleParser.scala @@ -0,0 +1,244 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule + +import org.apache.griffin.measure.rule.expr._ + +import scala.util.parsing.combinator._ + +case class RuleParser() extends JavaTokenParsers with Serializable { + + /** + * BNF representation for grammar as below: + * + * <rule> ::= <logical-statement> [WHEN <logical-statement>] + * rule: mapping-rule [WHEN when-rule] + * - mapping-rule: the first level opr should better not be OR | NOT, otherwise it can't automatically find the groupby column + * - when-rule: only contain the general info of data source, not the special info of each data row + * + * <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")" + * logical-statement: return boolean value + * logical-operator: "AND" | "&&", "OR" | "||", "NOT" | "!" + * + * <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>) + * logical-expression example: $source.id = $target.id, $source.page_id IN ('3214', '4312', '60821') + * + * <compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">=" + * <range-opr> ::= ["NOT"] "IN" | "BETWEEN" + * <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")" + * range-expr example: ('3214', '4312', '60821'), (10, 15), () + * + * <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+ + * math-expr example: $source.price * $target.count, "hello" + " " + "world" + 123 + * + * <binary-opr> ::= "+" | "-" | "*" | "/" | "%" + * <unary-opr> ::= "+" | "-" + * + * <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")" + * + * <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+ + * selection example: $source.price, $source.json(), $source['state'], $source.numList[3], $target.json().mails['org' = 'apache'].names[*] + * + * <selection-head> ::= $source | $target + * + * <field-sel> ::= "." <field-string> + * + * <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")" + * <function-name> ::= <name-string> + * <arg> ::= <math-expr> + * + * <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]" + * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*" + * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * means all items, 'age' means item 'age' + * <index-field> ::= <index> | <field-quote> | <all-selection> + * index: 0 ~ n means position from start, -1 ~ -n means position from end + * <field-quote> ::= ' <field-string> ' | " <field-string> " + * + * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]" + * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">=" + * filter-sel example: ['name' = 'URL'], $source.man['age' > $source.graduate_age + 5 ] + * + * When <math-expr> in the selection, it mustn't contain the different <selection-head>, for example: + * $source.tags[1+2] valid + * $source.tags[$source.first] valid + * $source.tags[$target.first] invalid + * -- Such job is for validation, not for parser + * + * + * <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean> | <literal-null> | <literal-none> + * <literal-string> ::= <any-string> + * <literal-number> ::= <integer> | <double> + * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms") + * <literal-boolean> ::= true | false + * <literal-null> ::= null | undefined + * <literal-none> ::= none + * + */ + + object Keyword { + def WhenKeywords: Parser[String] = """(?i)when""".r + def UnaryLogicalKeywords: Parser[String] = """(?i)not""".r + def BinaryLogicalKeywords: Parser[String] = """(?i)and|or""".r + def RangeKeywords: Parser[String] = """(?i)(not\s+)?(in|between)""".r + def DataSourceKeywords: Parser[String] = """(?i)\$(source|target)""".r + def Keywords: Parser[String] = WhenKeywords | UnaryLogicalKeywords | BinaryLogicalKeywords | RangeKeywords | DataSourceKeywords + } + import Keyword._ + + object Operator { + def NotLogicalOpr: Parser[String] = """(?i)not""".r | "!" + def AndLogicalOpr: Parser[String] = """(?i)and""".r | "&&" + def OrLogicalOpr: Parser[String] = """(?i)or""".r | "||" + def CompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r + def RangeOpr: Parser[String] = RangeKeywords + + def UnaryMathOpr: Parser[String] = "+" | "-" + def BinaryMathOpr1: Parser[String] = "*" | "/" | "%" + def BinaryMathOpr2: Parser[String] = "+" | "-" + + def FilterCompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r + + def SqBracketPair: (Parser[String], Parser[String]) = ("[", "]") + def BracketPair: (Parser[String], Parser[String]) = ("(", ")") + def Dot: Parser[String] = "." + def AllSelection: Parser[String] = "*" + def SQuote: Parser[String] = "'" + def DQuote: Parser[String] = "\"" + def Comma: Parser[String] = "," + } + import Operator._ + + object SomeString { +// def AnyString: Parser[String] = """[^'\"{}\[\]()=<>.$@,;+\-*/\\]*""".r + def AnyString: Parser[String] = """[^'\"]*""".r + def SimpleFieldString: Parser[String] = """\w+""".r + def FieldString: Parser[String] = """[\w\s]+""".r + def NameString: Parser[String] = """[a-zA-Z_]\w*""".r + } + import SomeString._ + + object SomeNumber { + def IntegerNumber: Parser[String] = """[+\-]?\d+""".r + def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r + def IndexNumber: Parser[String] = IntegerNumber + } + import SomeNumber._ + + // -- literal -- + def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean | literialNull | literialNone + def literialString: Parser[LiteralStringExpr] = (SQuote ~> AnyString <~ SQuote | DQuote ~> AnyString <~ DQuote) ^^ { LiteralStringExpr(_) } + def literialNumber: Parser[LiteralNumberExpr] = (DoubleNumber | IntegerNumber) ^^ { LiteralNumberExpr(_) } + def literialTime: Parser[LiteralTimeExpr] = """(\d+(d|h|m|s|ms))+""".r ^^ { LiteralTimeExpr(_) } + def literialBoolean: Parser[LiteralBooleanExpr] = ("""(?i)true""".r | """(?i)false""".r) ^^ { LiteralBooleanExpr(_) } + def literialNull: Parser[LiteralNullExpr] = ("""(?i)null""".r | """(?i)undefined""".r) ^^ { LiteralNullExpr(_) } + def literialNone: Parser[LiteralNoneExpr] = """(?i)none""".r ^^ { LiteralNoneExpr(_) } + + // -- selection -- + // <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+ + def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ^^ { + case head ~ selectors => SelectionExpr(head, selectors) + } + def selector: Parser[SelectExpr] = (functionOperation | fieldSelect | indexFieldRangeSelect | filterSelect) + + def selectionHead: Parser[SelectionHead] = DataSourceKeywords ^^ { SelectionHead(_) } + // <field-sel> ::= "." <field-string> + def fieldSelect: Parser[IndexFieldRangeSelectExpr] = Dot ~> SimpleFieldString ^^ { + case field => IndexFieldRangeSelectExpr(FieldDesc(field) :: Nil) + } + // <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")" + def functionOperation: Parser[FunctionOperationExpr] = Dot ~ NameString ~ BracketPair._1 ~ repsep(argument, Comma) ~ BracketPair._2 ^^ { + case _ ~ func ~ _ ~ args ~ _ => FunctionOperationExpr(func, args) + } + def argument: Parser[MathExpr] = mathExpr + // <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]" + def indexFieldRangeSelect: Parser[IndexFieldRangeSelectExpr] = SqBracketPair._1 ~> rep1sep(indexFieldRange, Comma) <~ SqBracketPair._2 ^^ { + case ifrs => IndexFieldRangeSelectExpr(ifrs) + } + // <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*" + def indexFieldRange: Parser[FieldDescOnly] = indexField | BracketPair._1 ~ indexField ~ Comma ~ indexField ~ BracketPair._2 ^^ { + case _ ~ if1 ~ _ ~ if2 ~ _ => FieldRangeDesc(if1, if2) + } + // <index-field> ::= <index> | <field-quote> | <all-selection> + // *here it can parse <math-expr>, but for simple situation, not supported now* + def indexField: Parser[FieldDescOnly] = IndexNumber ^^ { IndexDesc(_) } | fieldQuote | AllSelection ^^ { AllFieldsDesc(_) } + // <field-quote> ::= ' <field-string> ' | " <field-string> " + def fieldQuote: Parser[FieldDesc] = (SQuote ~> FieldString <~ SQuote | DQuote ~> FieldString <~ DQuote) ^^ { FieldDesc(_) } + // <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]" + def filterSelect: Parser[FilterSelectExpr] = SqBracketPair._1 ~> fieldQuote ~ FilterCompareOpr ~ mathExpr <~ SqBracketPair._2 ^^ { + case field ~ compare ~ value => FilterSelectExpr(field, compare, value) + } + + // -- math -- + // <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")" + def mathFactor: Parser[MathExpr] = (literal | selection | BracketPair._1 ~> mathExpr <~ BracketPair._2) ^^ { MathFactorExpr(_) } + // <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+ + // <unary-opr> ::= "+" | "-" + def unaryMathExpr: Parser[MathExpr] = rep(UnaryMathOpr) ~ mathFactor ^^ { + case Nil ~ a => a + case list ~ a => UnaryMathExpr(list, a) + } + // <binary-opr> ::= "+" | "-" | "*" | "/" | "%" + def binaryMathExpr1: Parser[MathExpr] = unaryMathExpr ~ rep(BinaryMathOpr1 ~ unaryMathExpr) ^^ { + case a ~ Nil => a + case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2))) + } + def binaryMathExpr2: Parser[MathExpr] = binaryMathExpr1 ~ rep(BinaryMathOpr2 ~ binaryMathExpr1) ^^ { + case a ~ Nil => a + case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2))) + } + def mathExpr: Parser[MathExpr] = binaryMathExpr2 + + // -- logical expression -- + // <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")" + def rangeExpr: Parser[RangeDesc] = BracketPair._1 ~> repsep(mathExpr, Comma) <~ BracketPair._2 ^^ { RangeDesc(_) } + // <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>) + def logicalExpr: Parser[LogicalExpr] = mathExpr ~ CompareOpr ~ mathExpr ^^ { + case left ~ opr ~ right => LogicalCompareExpr(left, opr, right) + } | mathExpr ~ RangeOpr ~ rangeExpr ^^ { + case left ~ opr ~ range => LogicalRangeExpr(left, opr, range) + } | mathExpr ^^ { LogicalSimpleExpr(_) } + + // -- logical statement -- + def logicalFactor: Parser[LogicalExpr] = logicalExpr | BracketPair._1 ~> logicalStatement <~ BracketPair._2 + def notLogicalStatement: Parser[LogicalExpr] = rep(NotLogicalOpr) ~ logicalFactor ^^ { + case Nil ~ a => a + case list ~ a => UnaryLogicalExpr(list, a) + } + def andLogicalStatement: Parser[LogicalExpr] = notLogicalStatement ~ rep(AndLogicalOpr ~ notLogicalStatement) ^^ { + case a ~ Nil => a + case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2))) + } + def orLogicalStatement: Parser[LogicalExpr] = andLogicalStatement ~ rep(OrLogicalOpr ~ andLogicalStatement) ^^ { + case a ~ Nil => a + case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2))) + } + // <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")" + def logicalStatement: Parser[LogicalExpr] = orLogicalStatement + + // -- clause statement -- + def whereClause: Parser[WhereClauseExpr] = logicalStatement ^^ { WhereClauseExpr(_) } + def whenClause: Parser[WhenClauseExpr] = WhenKeywords ~> logicalStatement ^^ { WhenClauseExpr(_) } + + // -- rule -- + // <rule> ::= <logical-statement> [WHEN <logical-statement>] + def rule: Parser[StatementExpr] = whereClause ~ opt(whenClause) ^^ { + case a ~ b => StatementExpr(a, b) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala new file mode 100644 index 0000000..ed3b3fc --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/SchemaValueCombineUtil.scala @@ -0,0 +1,187 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule + +object SchemaValueCombineUtil { + + // Map[String, List[(List[String], T)]]: Map[key, List[(path, value)]] + def cartesian[T](valuesMap: Map[String, List[(List[String], T)]]): List[Map[String, T]] = { + val fieldsList: List[(String, List[(List[String], T)])] = valuesMap.toList + + // List[key, List[(path, value)]] to List[(path, (key, value))] + val valueList: List[(List[String], (String, T))] = fieldsList.flatMap { fields => + val (key, list) = fields + list.map { pv => + val (path, value) = pv + (path, (key, value)) + } + } + + // 1. generate tree from value list, and return root node + val root = TreeUtil.genRootTree(valueList) + + // 2. deep first visit tree from root, merge datas into value map list + val valueMapList: List[Map[String, _]] = TreeUtil.mergeDatasIntoMap(root, Nil) + + // 3. simple change + val result = valueMapList.map { mp => + mp.map { kv => + val (k, v) = kv + (k, v.asInstanceOf[T]) + } + } + + result + + } + + + case class TreeNode(key: String, var datas: List[(String, _)]) { + var children = List[TreeNode]() + def addChild(node: TreeNode): Unit = children = children :+ node + def mergeSelf(node: TreeNode): Unit = datas = datas ::: node.datas + } + + object TreeUtil { + private def genTree(path: List[String], datas: List[(String, _)]): Option[TreeNode] = { + path match { + case Nil => None + case head :: tail => { + genTree(tail, datas) match { + case Some(child) => { + val curNode = TreeNode(head, Nil) + curNode.addChild(child) + Some(curNode) + } + case _ => Some(TreeNode(head, datas)) + } + } + } + } + + private def mergeTrees(trees: List[TreeNode], newTreeOpt: Option[TreeNode]): List[TreeNode] = { + newTreeOpt match { + case Some(newTree) => { + trees.find(tree => tree.key == newTree.key) match { + case Some(tree) => { + // children merge + for (child <- newTree.children) { + tree.children = mergeTrees(tree.children, Some(child)) + } + // self data merge + tree.mergeSelf(newTree) + trees + } + case _ => trees :+ newTree + } + } + case _ => trees + } + } + + private def root(): TreeNode = TreeNode("", Nil) + + def genRootTree(values: List[(List[String], (String, _))]): TreeNode = { + val rootNode = root() + val nodeOpts = values.map(value => genTree(value._1, value._2 :: Nil)) + rootNode.children = nodeOpts.foldLeft(List[TreeNode]()) { (trees, treeOpt) => + mergeTrees(trees, treeOpt) + } + rootNode + } + + private def add(mapList1: List[Map[String, _]], mapList2: List[Map[String, _]]): List[Map[String, _]] = { + mapList1 ::: mapList2 + } + private def multiply(mapList1: List[Map[String, _]], mapList2: List[Map[String, _]]): List[Map[String, _]] = { + mapList1.flatMap { map1 => + mapList2.map { map2 => + map1 ++ map2 + } + } + } + + private def keysList(mapList: List[Map[String, _]]): List[String] = { + val keySet = mapList match { + case Nil => Set[String]() + case head :: _ => head.keySet + } + keySet.toList + } + + def mergeDatasIntoMap(root: TreeNode, mapDatas: List[Map[String, _]]): List[Map[String, _]] = { + val childrenKeysMapDatas = root.children.foldLeft(Map[List[String], List[Map[String, _]]]()) { (keysMap, child) => + val childMdts = mergeDatasIntoMap(child, List[Map[String, _]]()) + childMdts match { + case Nil => keysMap + case _ => { + val keys = keysList(childMdts) + val afterList = keysMap.get(keys) match { + case Some(list) => add(list, childMdts) + case _ => childMdts + } + keysMap + (keys -> afterList) + } + } + } + val childrenMergeMaps = childrenKeysMapDatas.values.foldLeft(List[Map[String, _]]()) { (originList, list) => + originList match { + case Nil => list + case _ => multiply(originList, list) + } + } + val result = mergeNodeChildrenDatasIntoMap(root, childrenMergeMaps) + result + } + + private def mergeNodeChildrenDatasIntoMap(node: TreeNode, mapDatas: List[Map[String, _]]): List[Map[String, _]] = { + val datas: List[(String, (String, Any))] = node.children.flatMap { child => + child.datas.map(dt => (dt._1, (child.key, dt._2))) + } + val childrenDataKeys: Set[String] = datas.map(_._1).toSet + val childrenDataLists: Map[String, List[(String, _)]] = datas.foldLeft(childrenDataKeys.map(k => (k, List[(String, _)]())).toMap) { (maps, data) => + maps.get(data._1) match { + case Some(list) => maps + (data._1 -> (list :+ data._2)) + case _ => maps + } + } + + // multiply different key datas + childrenDataLists.foldLeft(mapDatas) { (mdts, klPair) => + val (key, list) = klPair + mdts match { + case Nil => list.map(pr => Map[String, Any]((key -> pr._2))) + case _ => { + list.flatMap { kvPair => + val (path, value) = kvPair + mdts.map { mp => + mp + (key -> value) + } + } + } + } + } + + } + } + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/AnalyzableExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/AnalyzableExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/AnalyzableExpr.scala new file mode 100644 index 0000000..aefcaad --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/AnalyzableExpr.scala @@ -0,0 +1,24 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + + +trait AnalyzableExpr extends Serializable { + def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = Nil +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Cacheable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Cacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Cacheable.scala new file mode 100644 index 0000000..feb8156 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Cacheable.scala @@ -0,0 +1,33 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + +trait Cacheable extends DataSourceable { + protected def cacheUnit: Boolean = false + def cacheable(ds: String): Boolean = { + cacheUnit && !conflict() && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && contains(ds))) + } + protected def getCacheExprs(ds: String): Iterable[Cacheable] + + protected def persistUnit: Boolean = false + def persistable(ds: String): Boolean = { + persistUnit && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && contains(ds))) + } + protected def getPersistExprs(ds: String): Iterable[Cacheable] +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Calculatable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Calculatable.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Calculatable.scala new file mode 100644 index 0000000..904e823 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Calculatable.scala @@ -0,0 +1,25 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + +trait Calculatable extends Serializable { + + def calculate(values: Map[String, Any]): Option[Any] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ClauseExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ClauseExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ClauseExpr.scala new file mode 100644 index 0000000..a56e0db --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ClauseExpr.scala @@ -0,0 +1,109 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + + +trait ClauseExpr extends Expr with AnalyzableExpr { + def valid(values: Map[String, Any]): Boolean = true + override def cacheUnit: Boolean = true +} + +case class WhereClauseExpr(expr: LogicalExpr) extends ClauseExpr { + def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values) + val desc: String = expr.desc + val dataSources: Set[String] = expr.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + expr.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + expr.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + expr.getPersistExprs(ds) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = expr.getGroupbyExprPairs(dsPair) +} + +case class WhenClauseExpr(expr: LogicalExpr) extends ClauseExpr { + def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values) + val desc: String = s"WHEN ${expr.desc}" + val dataSources: Set[String] = expr.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + expr.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + expr.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + expr.getPersistExprs(ds) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = expr.getGroupbyExprPairs(dsPair) +} + +case class StatementExpr(whereClause: WhereClauseExpr, whenClauseOpt: Option[WhenClauseExpr]) extends ClauseExpr { + def calculateOnly(values: Map[String, Any]): Option[Any] = whereClause.calculate(values) + val desc: String = { + whenClauseOpt match { + case Some(expr) => s"${whereClause.desc} ${expr.desc}" + case _ => whereClause.desc + } + } + val dataSources: Set[String] = whereClause.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + whereClause.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + whereClause.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + whereClause.getPersistExprs(ds) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = whereClause.getGroupbyExprPairs(dsPair) +} + +//case class WhenClauseStatementExpr(expr: LogicalExpr, whenExpr: LogicalExpr) extends ClauseExpr { +// def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values) +// val desc: String = s"${expr.desc} when ${whenExpr.desc}" +// +// override def valid(values: Map[String, Any]): Boolean = { +// whenExpr.calculate(values) match { +// case Some(r: Boolean) => r +// case _ => false +// } +// } +// +// val dataSources: Set[String] = expr.dataSources ++ whenExpr.dataSources +// override def getSubCacheExprs(ds: String): Iterable[Expr] = { +// expr.getCacheExprs(ds) ++ whenExpr.getCacheExprs(ds) +// } +// override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { +// expr.getFinalCacheExprs(ds) ++ whenExpr.getFinalCacheExprs(ds) +// } +// override def getSubPersistExprs(ds: String): Iterable[Expr] = { +// expr.getPersistExprs(ds) ++ whenExpr.getPersistExprs(ds) +// } +// +// override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(Expr, Expr)] = { +// expr.getGroupbyExprPairs(dsPair) ++ whenExpr.getGroupbyExprPairs(dsPair) +// } +// override def getWhenClauseExpr(): Option[LogicalExpr] = Some(whenExpr) +//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/DataSourceable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/DataSourceable.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/DataSourceable.scala new file mode 100644 index 0000000..e2cf172 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/DataSourceable.scala @@ -0,0 +1,28 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + +trait DataSourceable extends Serializable { + val dataSources: Set[String] + protected def conflict(): Boolean = dataSources.size > 1 + def contains(ds: String): Boolean = dataSources.contains(ds) + def dataSourceOpt: Option[String] = { + if (dataSources.size == 1) Some(dataSources.head) else None + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Describable.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Describable.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Describable.scala new file mode 100644 index 0000000..393d7a6 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Describable.scala @@ -0,0 +1,33 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + +trait Describable extends Serializable { + + val desc: String + + protected def describe(v: Any): String = { + v match { + case s: Describable => s"${s.desc}" + case s: String => s"'${s}'" + case a => s"${a}" + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Expr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Expr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Expr.scala new file mode 100644 index 0000000..726b5b6 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/Expr.scala @@ -0,0 +1,53 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + +import org.apache.spark.sql.types.DataType + +trait Expr extends Serializable with Describable with Cacheable with Calculatable { + + protected val _defaultId: String = ExprIdCounter.emptyId + + val _id = ExprIdCounter.genId(_defaultId) + + protected def getSubCacheExprs(ds: String): Iterable[Expr] = Nil + final def getCacheExprs(ds: String): Iterable[Expr] = { + if (cacheable(ds)) getSubCacheExprs(ds).toList :+ this else getSubCacheExprs(ds) + } + + protected def getSubFinalCacheExprs(ds: String): Iterable[Expr] = Nil + final def getFinalCacheExprs(ds: String): Iterable[Expr] = { + if (cacheable(ds)) Nil :+ this else getSubFinalCacheExprs(ds) + } + + protected def getSubPersistExprs(ds: String): Iterable[Expr] = Nil + final def getPersistExprs(ds: String): Iterable[Expr] = { + if (persistable(ds)) getSubPersistExprs(ds).toList :+ this else getSubPersistExprs(ds) + } + + final def calculate(values: Map[String, Any]): Option[Any] = { + values.get(_id) match { + case Some(v) => Some(v) + case _ => calculateOnly(values) + } + } + protected def calculateOnly(values: Map[String, Any]): Option[Any] + +} + http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprDescOnly.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprDescOnly.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprDescOnly.scala new file mode 100644 index 0000000..01b7e3c --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprDescOnly.scala @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + +trait ExprDescOnly extends Describable { + +} + + +case class SelectionHead(expr: String) extends ExprDescOnly { + private val headRegex = """\$(\w+)""".r + val head: String = expr match { + case headRegex(v) => v.toLowerCase + case _ => expr + } + val desc: String = "$" + head +} + +case class RangeDesc(elements: Iterable[MathExpr]) extends ExprDescOnly { + val desc: String = { + val rangeDesc = elements.map(_.desc).mkString(", ") + s"(${rangeDesc})" + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprIdCounter.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprIdCounter.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprIdCounter.scala new file mode 100644 index 0000000..ae76aef --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/ExprIdCounter.scala @@ -0,0 +1,60 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable.{Set => MutableSet} + +object ExprIdCounter { + + private val idCounter: AtomicLong = new AtomicLong(0L) + + private val existIdSet: MutableSet[String] = MutableSet.empty[String] + + private val invalidIdRegex = """^\d+$""".r + + val emptyId: String = "" + + def genId(defaultId: String): String = { + defaultId match { + case emptyId => increment.toString + case invalidIdRegex() => increment.toString +// case defId if (exist(defId)) => s"${increment}#${defId}" + case defId if (exist(defId)) => s"${defId}" + case _ => { + insertUserId(defaultId) + defaultId + } + } + } + + private def exist(id: String): Boolean = { + existIdSet.contains(id) + } + + private def insertUserId(id: String): Unit = { + existIdSet += id + } + + private def increment(): Long = { + idCounter.incrementAndGet() + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/FieldDescOnly.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/FieldDescOnly.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/FieldDescOnly.scala new file mode 100644 index 0000000..dca037b --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/FieldDescOnly.scala @@ -0,0 +1,58 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + +import scala.util.{Success, Try} + +trait FieldDescOnly extends Describable with DataSourceable { + +} + +case class IndexDesc(expr: String) extends FieldDescOnly { + val index: Int = { + Try(expr.toInt) match { + case Success(v) => v + case _ => throw new Exception(s"${expr} is invalid index") + } + } + val desc: String = describe(index) + val dataSources: Set[String] = Set.empty[String] +} + +case class FieldDesc(expr: String) extends FieldDescOnly { + val field: String = expr + val desc: String = describe(field) + val dataSources: Set[String] = Set.empty[String] +} + +case class AllFieldsDesc(expr: String) extends FieldDescOnly { + val allFields: String = expr + val desc: String = allFields + val dataSources: Set[String] = Set.empty[String] +} + +case class FieldRangeDesc(startField: FieldDescOnly, endField: FieldDescOnly) extends FieldDescOnly { + val desc: String = { + (startField, endField) match { + case (f1: IndexDesc, f2: IndexDesc) => s"(${f1.desc}, ${f2.desc})" + case _ => throw new Exception("invalid field range description") + } + } + val dataSources: Set[String] = Set.empty[String] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6fd22ae7/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LiteralExpr.scala ---------------------------------------------------------------------- diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LiteralExpr.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LiteralExpr.scala new file mode 100644 index 0000000..acf1589 --- /dev/null +++ b/measure/src/main/scala/org/apache/griffin/measure/rule/expr/LiteralExpr.scala @@ -0,0 +1,83 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.griffin.measure.rule.expr + +import org.apache.griffin.measure.utils.TimeUtil +import org.apache.spark.sql.types._ + +import scala.util.{Failure, Success, Try} + +trait LiteralExpr extends Expr { + val value: Option[Any] + def calculateOnly(values: Map[String, Any]): Option[Any] = value + val dataSources: Set[String] = Set.empty[String] +} + +case class LiteralValueExpr(value: Option[Any]) extends LiteralExpr { + val desc: String = value.getOrElse("").toString +} + +case class LiteralStringExpr(expr: String) extends LiteralExpr { + val value: Option[String] = Some(expr) + val desc: String = s"'${value.getOrElse("")}'" +} + +case class LiteralNumberExpr(expr: String) extends LiteralExpr { + val value: Option[Any] = { + if (expr.contains(".")) { + Try (expr.toDouble) match { + case Success(v) => Some(v) + case _ => throw new Exception(s"${expr} is invalid number") + } + } else { + Try (expr.toLong) match { + case Success(v) => Some(v) + case _ => throw new Exception(s"${expr} is invalid number") + } + } + } + val desc: String = value.getOrElse("").toString +} + +case class LiteralTimeExpr(expr: String) extends LiteralExpr { + final val TimeRegex = """(\d+)(d|h|m|s|ms)""".r + val value: Option[Long] = TimeUtil.milliseconds(expr) + val desc: String = expr +} + +case class LiteralBooleanExpr(expr: String) extends LiteralExpr { + final val TrueRegex = """(?i)true""".r + final val FalseRegex = """(?i)false""".r + val value: Option[Boolean] = expr match { + case TrueRegex() => Some(true) + case FalseRegex() => Some(false) + case _ => throw new Exception(s"${expr} is invalid boolean") + } + val desc: String = value.getOrElse("").toString +} + +case class LiteralNullExpr(expr: String) extends LiteralExpr { + val value: Option[Any] = Some(null) + val desc: String = "null" +} + +case class LiteralNoneExpr(expr: String) extends LiteralExpr { + val value: Option[Any] = None + val desc: String = "none" +} \ No newline at end of file
