http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala new file mode 100644 index 0000000..b0ab8b5 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/AllParamValidator.scala @@ -0,0 +1,16 @@ +package org.apache.griffin.measure.batch.config.validator + +import org.apache.griffin.measure.batch.config.params.Param + +import scala.util.Try + +case class AllParamValidator() extends ParamValidator { + + def validate[T <: Param](param: Param): Try[Boolean] = { + Try { + // fixme: not done, need to validate param + true + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala new file mode 100644 index 0000000..9dc9e60 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/config/validator/ParamValidator.scala @@ -0,0 +1,12 @@ +package org.apache.griffin.measure.batch.config.validator + +import org.apache.griffin.measure.batch.log.Loggable +import org.apache.griffin.measure.batch.config.params.Param + +import scala.util.Try + +trait ParamValidator extends Loggable with Serializable { + + def validate[T <: Param](param: Param): Try[Boolean] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala new file mode 100644 index 0000000..4de22fb --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/AvroDataConnector.scala @@ -0,0 +1,91 @@ +package org.apache.griffin.measure.batch.connector + +import org.apache.griffin.measure.batch.rule.expr._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import com.databricks.spark.avro._ + +import scala.util.{Success, Try} +import java.nio.file.{Files, Paths} + +import org.apache.griffin.measure.batch.utils.HdfsUtil + +case class AvroDataConnector(sqlContext: SQLContext, config: Map[String, Any], + groupbyExprs: Seq[MathExpr], cacheExprs: Iterable[Expr], + finalCacheExprs: Iterable[Expr], globalFinalCacheMap: Map[String, Any], + whenClauseOpt: Option[LogicalExpr] + ) extends DataConnector { + + val FilePath = "file.path" + val FileName = "file.name" + + val filePath = config.getOrElse(FilePath, "").toString + val fileName = config.getOrElse(FileName, "").toString + + val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName + + private def pathPrefix(): Boolean = { + filePath.nonEmpty + } + + private def fileExist(): Boolean = { + HdfsUtil.existPath(concreteFileFullPath) + } + + def available(): Boolean = { + (!concreteFileFullPath.isEmpty) && fileExist + } + + def metaData(): Try[Iterable[(String, String)]] = { + Try { + val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema + st.fields.map(f => (f.name, f.dataType.typeName)) + } + } + + def data(): Try[RDD[(Product, Map[String, Any])]] = { + Try { + loadDataFile.flatMap { row => + // generate cache data + val cacheData: Map[String, Any] = cacheExprs.foldLeft(globalFinalCacheMap) { (cachedMap, expr) => + CacheDataUtil.genCachedMap(Some(row), expr, cachedMap) + } + val finalCacheData = CacheDataUtil.filterCachedMap(finalCacheExprs, cacheData) + + // when clause filter data source + val whenResult = whenClauseOpt match { + case Some(whenClause) => whenClause.calculate(finalCacheData) + case _ => None + } + + // get groupby data + whenResult match { + case Some(false) => None + case _ => { + val groupbyData: Seq[AnyRef] = groupbyExprs.flatMap { expr => + expr.calculate(finalCacheData) match { + case Some(v) => Some(v.asInstanceOf[AnyRef]) + case _ => None + } + } + val key = toTuple(groupbyData) + + Some((key, finalCacheData)) + } + } + } + } + } + + private def loadDataFile() = { + sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath) + } + + private def toTuple[A <: AnyRef](as: Seq[A]): Product = { + if (as.size > 0) { + val tupleClass = Class.forName("scala.Tuple" + as.size) + tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product] + } else None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala new file mode 100644 index 0000000..884df5d --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/CacheDataUtil.scala @@ -0,0 +1,63 @@ +package org.apache.griffin.measure.batch.connector + +import org.apache.griffin.measure.batch.rule.expr._ +import org.apache.spark.sql.Row + +import scala.util.{Success, Try} + +object CacheDataUtil { + + // for now, one expr only get one value, not supporting one expr get multiple values + private def getCacheData(data: Option[Any], expr: Expr, cachedMap: Map[String, Any]): Option[Any] = { + Try { + expr match { + case selection: SelectionExpr => { + selection.selectors.foldLeft(data) { (dt, selector) => + getCacheData(dt, selector, cachedMap) + } + } + case selector: IndexFieldRangeSelectExpr => { + data match { + case Some(row: Row) => { + if (selector.fields.size == 1) { + selector.fields.head match { + case i: IndexDesc => Some(row.getAs[Any](i.index)) + case f: FieldDesc => Some(row.getAs[Any](f.field)) + case _ => None + } + } else None + } + case _ => None + } + } + case _ => expr.calculate(cachedMap) + } + } match { + case Success(v) => v + case _ => None + } + } + + def genCachedMap(data: Option[Any], expr: Expr, initialCachedMap: Map[String, Any]): Map[String, Any] = { + val valueOpt = getCacheData(data, expr, initialCachedMap) + if (valueOpt.nonEmpty) { + initialCachedMap + (expr._id -> valueOpt.get) + } else initialCachedMap + } + + def genCachedMap(data: Option[Any], exprs: Iterable[Expr], initialCachedMap: Map[String, Any]): Map[String, Any] = { + exprs.foldLeft(initialCachedMap) { (cachedMap, expr) => + CacheDataUtil.genCachedMap(None, expr, cachedMap) + } + } + + def filterCachedMap(exprs: Iterable[Expr], cachedMap: Map[String, Any]): Map[String, Any] = { + exprs.foldLeft(Map[String, Any]()) { (newMap, expr) => + val valueOpt = expr.calculate(cachedMap) + if (valueOpt.nonEmpty) { + newMap + (expr._id -> valueOpt.get) + } else newMap + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala new file mode 100644 index 0000000..9cc9be6 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnector.scala @@ -0,0 +1,16 @@ +package org.apache.griffin.measure.batch.connector + +import org.apache.spark.rdd.RDD + +import scala.util.Try + + +trait DataConnector extends Serializable { + + def available(): Boolean + + def metaData(): Try[Iterable[(String, String)]] + + def data(): Try[RDD[(Product, Map[String, Any])]] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala new file mode 100644 index 0000000..8f636c1 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/DataConnectorFactory.scala @@ -0,0 +1,35 @@ +package org.apache.griffin.measure.batch.connector + +import org.apache.griffin.measure.batch.config.params.user._ +import org.apache.griffin.measure.batch.rule.expr._ +import org.apache.spark.sql.SQLContext + +import scala.util.Try + +object DataConnectorFactory { + + val HiveRegex = """^(?i)hive$""".r + val AvroRegex = """^(?i)avro$""".r + + def getDataConnector(sqlContext: SQLContext, + dataConnectorParam: DataConnectorParam, + groupbyExprs: Seq[MathExpr], + cacheExprs: Iterable[Expr], + finalCacheExprs: Iterable[Expr], + globalFinalCacheMap: Map[String, Any], + whenClauseOpt: Option[LogicalExpr] + ): Try[DataConnector] = { + val conType = dataConnectorParam.conType + val version = dataConnectorParam.version + Try { + conType match { + case HiveRegex() => HiveDataConnector(sqlContext, dataConnectorParam.config, + groupbyExprs, cacheExprs, finalCacheExprs, globalFinalCacheMap, whenClauseOpt) + case AvroRegex() => AvroDataConnector(sqlContext, dataConnectorParam.config, + groupbyExprs, cacheExprs, finalCacheExprs, globalFinalCacheMap, whenClauseOpt) + case _ => throw new Exception("connector creation error!") + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala new file mode 100644 index 0000000..a0fd414 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/connector/HiveDataConnector.scala @@ -0,0 +1,113 @@ +package org.apache.griffin.measure.batch.connector + +import org.apache.griffin.measure.batch.rule.expr._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} + +import scala.util.{Success, Try} + +case class HiveDataConnector(sqlContext: SQLContext, config: Map[String, Any], + groupbyExprs: Seq[MathExpr], cacheExprs: Iterable[Expr], + finalCacheExprs: Iterable[Expr], globalFinalCacheMap: Map[String, Any], + whenClauseOpt: Option[LogicalExpr] + ) extends DataConnector { + + val Database = "database" + val TableName = "table.name" + val Partitions = "partitions" + + val database = config.getOrElse(Database, "").toString + val tableName = config.getOrElse(TableName, "").toString + val partitionsString = config.getOrElse(Partitions, "").toString + + val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName + val partitions = partitionsString.split(";").map(s => s.split(",").map(_.trim)) + + private def dbPrefix(): Boolean = { + database.nonEmpty && !database.equals("default") + } + + def available(): Boolean = { + (!tableName.isEmpty) && { + Try { + if (dbPrefix) { + sqlContext.tables(database).filter(tableExistsSql).collect.size + } else { + sqlContext.tables().filter(tableExistsSql).collect.size + } + } match { + case Success(s) => s > 0 + case _ => false + } + } + } + + def metaData(): Try[Iterable[(String, String)]] = { + Try { + val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), r.getString(1))).collect + val partitionPos: Int = originRows.indexWhere(pair => pair._1.startsWith("# ")) + if (partitionPos < 0) originRows + else originRows.take(partitionPos) + } + } + + def data(): Try[RDD[(Product, Map[String, Any])]] = { + Try { + sqlContext.sql(dataSql).flatMap { row => + // generate cache data + val cacheData: Map[String, Any] = cacheExprs.foldLeft(globalFinalCacheMap) { (cachedMap, expr) => + CacheDataUtil.genCachedMap(Some(row), expr, cachedMap) + } + val finalCacheData = CacheDataUtil.filterCachedMap(finalCacheExprs, cacheData) + + // when clause filter data source + val whenResult = whenClauseOpt match { + case Some(whenClause) => whenClause.calculate(finalCacheData) + case _ => None + } + + // get groupby data + whenResult match { + case Some(false) => None + case _ => { + val groupbyData: Seq[AnyRef] = groupbyExprs.flatMap { expr => + expr.calculate(finalCacheData) match { + case Some(v) => Some(v.asInstanceOf[AnyRef]) + case _ => None + } + } + val key = toTuple(groupbyData) + + Some((key, finalCacheData)) + } + } + } + } + } + + private def tableExistsSql(): String = { +// s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql + s"tableName LIKE '${tableName}'" + } + + private def metaDataSql(): String = { + s"DESCRIBE ${concreteTableName}" + } + + private def dataSql(): String = { + val clauses = partitions.map { prtn => + val cls = prtn.mkString(" AND ") + if (cls.isEmpty) s"SELECT * FROM ${concreteTableName}" + else s"SELECT * FROM ${concreteTableName} WHERE ${cls}" + } + clauses.mkString(" UNION ALL ") + } + + private def toTuple[A <: AnyRef](as: Seq[A]): Product = { + if (as.size > 0) { + val tupleClass = Class.forName("scala.Tuple" + as.size) + tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product] + } else None + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala new file mode 100644 index 0000000..f73e86c --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/log/Loggable.scala @@ -0,0 +1,25 @@ +package org.apache.griffin.measure.batch.log + +import org.slf4j.LoggerFactory + +trait Loggable { + + @transient private lazy val logger = LoggerFactory.getLogger(getClass) + + protected def info(msg: String): Unit = { + logger.info(msg) + } + + protected def debug(msg: String): Unit = { + logger.debug(msg) + } + + protected def warn(msg: String): Unit = { + logger.warn(msg) + } + + protected def error(msg: String): Unit = { + logger.error(msg) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala new file mode 100644 index 0000000..7bff3b6 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HdfsPersist.scala @@ -0,0 +1,119 @@ +package org.apache.griffin.measure.batch.persist + +import java.util.Date + +import org.apache.griffin.measure.batch.result._ +import org.apache.griffin.measure.batch.utils.HdfsUtil +import org.apache.spark.rdd.RDD + + +case class HdfsPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + + val Path = "path" + val MaxPersistLines = "max.persist.lines" + val MaxLinesPerFile = "max.lines.per.file" + + val path = config.getOrElse(Path, "").toString + val maxPersistLines = config.getOrElse(MaxPersistLines, -1).toString.toLong + val maxLinesPerFile = config.getOrElse(MaxLinesPerFile, 10000).toString.toLong + + val separator = "/" + + val StartFile = filePath("_START") + val FinishFile = filePath("_FINISH") + val ResultFile = filePath("_RESULT") + + val MissRecFile = filePath("_MISSREC") // optional + + val LogFile = filePath("_LOG") + + var _init = true + private def isInit = { + val i = _init + _init = false + i + } + + def available(): Boolean = { + (path.nonEmpty) && (maxPersistLines < Int.MaxValue) + } + + private def persistHead: String = { + val dt = new Date(timeStamp) + s"================ log of ${dt} ================\n" + } + + private def timeHead(rt: Long): String = { + val dt = new Date(rt) + s"--- ${dt} ---\n" + } + + protected def getFilePath(parentPath: String, fileName: String): String = { + if (parentPath.endsWith(separator)) parentPath + fileName else parentPath + separator + fileName + } + + protected def filePath(file: String): String = { + getFilePath(path, s"${metricName}/${timeStamp}/${file}") + } + + protected def withSuffix(path: String, suffix: String): String = { + s"${path}.${suffix}" + } + + def start(msg: String): Unit = { + HdfsUtil.writeContent(StartFile, msg) + } + def finish(): Unit = { + HdfsUtil.createEmptyFile(FinishFile) + } + + def result(rt: Long, result: Result): Unit = { + val resStr = result match { + case ar: AccuracyResult => { + s"match percentage: ${ar.matchPercentage}\ntotal count: ${ar.getTotal}\nmiss count: ${ar.getMiss}, match count: ${ar.getMatch}" + } + case _ => { + s"result: ${result}" + } + } + HdfsUtil.writeContent(ResultFile, timeHead(rt) + resStr) + log(rt, resStr) + + info(resStr) + } + + // need to avoid string too long + def missRecords(records: RDD[String]): Unit = { + val recordCount = records.count + val count = if (maxPersistLines < 0) recordCount else scala.math.min(maxPersistLines, recordCount) + if (count > 0) { + val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt + if (groupCount <= 1) { + val recs = records.take(count.toInt) + persistRecords(MissRecFile, recs) + } else { + val groupedRecords: RDD[(Long, Iterable[String])] = + records.zipWithIndex.flatMap { r => + val gid = r._2 / maxLinesPerFile + if (gid < groupCount) Some((gid, r._1)) else None + }.groupByKey() + groupedRecords.foreach { group => + val (gid, recs) = group + val hdfsPath = if (gid == 0) MissRecFile else withSuffix(MissRecFile, gid.toString) + persistRecords(hdfsPath, recs) + } + } + } + } + + private def persistRecords(hdfsPath: String, records: Iterable[String]): Unit = { + val recStr = records.mkString("\n") + HdfsUtil.appendContent(hdfsPath, recStr) + } + + def log(rt: Long, msg: String): Unit = { + val logStr = (if (isInit) persistHead else "") + timeHead(rt) + s"${msg}\n\n" + HdfsUtil.appendContent(LogFile, logStr) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala new file mode 100644 index 0000000..5765927 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/HttpPersist.scala @@ -0,0 +1,44 @@ +package org.apache.griffin.measure.batch.persist + +import org.apache.griffin.measure.batch.result._ +import org.apache.griffin.measure.batch.utils.{HttpUtil, JsonUtil} +import org.apache.spark.rdd.RDD + +case class HttpPersist(config: Map[String, Any], metricName: String, timeStamp: Long) extends Persist { + + val Api = "api" + val Method = "method" + + val api = config.getOrElse(Api, "").toString + val method = config.getOrElse(Method, "post").toString + + def available(): Boolean = { + api.nonEmpty + } + + def start(msg: String): Unit = {} + def finish(): Unit = {} + + def result(rt: Long, result: Result): Unit = { + result match { + case ar: AccuracyResult => { + val dataMap = Map[String, Any](("metricName" -> metricName), ("timestamp" -> timeStamp), ("value" -> ar.matchPercentage)) + val data = JsonUtil.toJson(dataMap) + + // post + val params = Map[String, Object]() + val header = Map[String, Object](("content-type" -> "application/json")) + val status = HttpUtil.httpRequest(api, method, params, header, data) + info(s"${method} to ${api} response status: ${status}") + } + case _ => { + info(s"result: ${result}") + } + } + } + + def missRecords(records: RDD[String]): Unit = {} + + def log(rt: Long, msg: String): Unit = {} + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala new file mode 100644 index 0000000..2fa6942 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/MultiPersists.scala @@ -0,0 +1,27 @@ +package org.apache.griffin.measure.batch.persist + +import org.apache.griffin.measure.batch.result._ +import org.apache.griffin.measure.batch.utils.{HttpUtil, JsonUtil} +import org.apache.spark.rdd.RDD + +case class MultiPersists(persists: Iterable[Persist]) extends Persist { + + val timeStamp: Long = persists match { + case Nil => 0 + case _ => persists.head.timeStamp + } + + val config: Map[String, Any] = Map[String, Any]() + + def available(): Boolean = { persists.exists(_.available()) } + + def start(msg: String): Unit = { persists.foreach(_.start(msg)) } + def finish(): Unit = {persists.foreach(_.finish())} + + def result(rt: Long, result: Result): Unit = { persists.foreach(_.result(rt, result)) } + + def missRecords(records: RDD[String]): Unit = { persists.foreach(_.missRecords(records)) } + + def log(rt: Long, msg: String): Unit = { persists.foreach(_.log(rt, msg)) } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala new file mode 100644 index 0000000..7398c24 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/Persist.scala @@ -0,0 +1,23 @@ +package org.apache.griffin.measure.batch.persist + +import org.apache.griffin.measure.batch.log.Loggable +import org.apache.griffin.measure.batch.result._ +import org.apache.spark.rdd.RDD + + +trait Persist extends Loggable with Serializable { + val timeStamp: Long + + val config: Map[String, Any] + + def available(): Boolean + + def start(msg: String): Unit + def finish(): Unit + + def result(rt: Long, result: Result): Unit + + def missRecords(records: RDD[String]): Unit + + def log(rt: Long, msg: String): Unit +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala new file mode 100644 index 0000000..84dc4ce --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/persist/PersistFactory.scala @@ -0,0 +1,30 @@ +package org.apache.griffin.measure.batch.persist + +import org.apache.griffin.measure.batch.config.params.env._ + +import scala.util.{Success, Try} + + +case class PersistFactory(persistParams: Iterable[PersistParam], metricName: String) extends Serializable { + + val HDFS_REGEX = """^(?i)hdfs$""".r + val HTTP_REGEX = """^(?i)http$""".r + + def getPersists(timeStamp: Long): MultiPersists = { + MultiPersists(persistParams.flatMap(param => getPersist(timeStamp, param))) + } + + private def getPersist(timeStamp: Long, persistParam: PersistParam): Option[Persist] = { + val persistConfig = persistParam.config + val persistTry = persistParam.persistType match { + case HDFS_REGEX() => Try(HdfsPersist(persistConfig, metricName, timeStamp)) + case HTTP_REGEX() => Try(HttpPersist(persistConfig, metricName, timeStamp)) + case _ => throw new Exception("not supported persist type") + } + persistTry match { + case Success(persist) if (persist.available) => Some(persist) + case _ => None + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala new file mode 100644 index 0000000..55505ac --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/AccuracyResult.scala @@ -0,0 +1,26 @@ +package org.apache.griffin.measure.batch.result + + +case class AccuracyResult(miss: Long, total: Long) extends Result { + + type T = AccuracyResult + + def update(delta: T): T = { + AccuracyResult(delta.miss, total) + } + + def eventual(): Boolean = { + this.miss <= 0 + } + + def differsFrom(other: T): Boolean = { + (this.miss != other.miss) || (this.total != other.total) + } + + def getMiss = miss + def getTotal = total + def getMatch = total - miss + + def matchPercentage: Double = if (getTotal <= 0) 0 else getMatch.toDouble / getTotal * 100 + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala new file mode 100644 index 0000000..8d529db --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/Result.scala @@ -0,0 +1,14 @@ +package org.apache.griffin.measure.batch.result + + +trait Result extends Serializable { + + type T <: Result + + def update(delta: T): T + + def eventual(): Boolean + + def differsFrom(other: T): Boolean + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala new file mode 100644 index 0000000..62df447 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/result/ResultInfo.scala @@ -0,0 +1,39 @@ +package org.apache.griffin.measure.batch.result + + +sealed trait ResultInfo { + type T + val key: String + val tp: String + def wrap(value: T) = (key -> value) +} + +final case object TimeGroupInfo extends ResultInfo { + type T = Long + val key = "__time__" + val tp = "bigint" +} + +final case object NextFireTimeInfo extends ResultInfo { + type T = Long + val key = "__next_fire_time__" + val tp = "bigint" +} + +final case object MismatchInfo extends ResultInfo { + type T = String + val key = "__mismatch__" + val tp = "string" +} + +final case object TargetInfo extends ResultInfo { + type T = Map[String, Any] + val key = "__target__" + val tp = "map" +} + +final case object ErrorInfo extends ResultInfo { + type T = String + val key = "__error__" + val tp = "string" +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala new file mode 100644 index 0000000..78665f9 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleAnalyzer.scala @@ -0,0 +1,28 @@ +package org.apache.griffin.measure.batch.rule + +import org.apache.griffin.measure.batch.rule.expr._ + +case class RuleAnalyzer(rule: StatementExpr) extends Serializable { + + val GlobalData = "" + val SourceData = "source" + val TargetData = "target" + + val globalCacheExprs: Iterable[Expr] = rule.getCacheExprs(GlobalData) + val sourceCacheExprs: Iterable[Expr] = rule.getCacheExprs(SourceData) + val targetCacheExprs: Iterable[Expr] = rule.getCacheExprs(TargetData) + + val sourcePersistExprs: Iterable[Expr] = rule.getPersistExprs(SourceData) + val targetPersistExprs: Iterable[Expr] = rule.getPersistExprs(TargetData) + + val globalFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(GlobalData).toSet + val sourceFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(SourceData).toSet ++ sourcePersistExprs.toSet + val targetFinalCacheExprs: Iterable[Expr] = rule.getFinalCacheExprs(TargetData).toSet ++ targetPersistExprs.toSet + + val groupbyExprPairs: Seq[(MathExpr, MathExpr)] = rule.getGroupbyExprPairs((SourceData, TargetData)) + val sourceGroupbyExprs: Seq[MathExpr] = groupbyExprPairs.map(_._1) + val targetGroupbyExprs: Seq[MathExpr] = groupbyExprPairs.map(_._2) + + val whenClauseExpr: Option[LogicalExpr] = rule.getWhenClauseExpr + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala new file mode 100644 index 0000000..cc3e8b3 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleFactory.scala @@ -0,0 +1,34 @@ +package org.apache.griffin.measure.batch.rule + +import org.apache.griffin.measure.batch.config.params.user._ + +import scala.util.Failure +//import org.apache.griffin.measure.batch.rule.expr_old._ +import org.apache.griffin.measure.batch.rule.expr._ + +import scala.util.{Success, Try} + + +case class RuleFactory(evaluateRuleParam: EvaluateRuleParam) { + + val ruleParser: RuleParser = RuleParser() + + def generateRule(): StatementExpr = { + val rules = evaluateRuleParam.rules + val statement = parseExpr(rules) match { + case Success(se) => se + case Failure(ex) => throw ex + } + statement + } + + private def parseExpr(rules: String): Try[StatementExpr] = { + Try { + val result = ruleParser.parseAll(ruleParser.rule, rules) + if (result.successful) result.get + else throw new Exception("parse rule error!") +// throw new Exception("parse rule error!") + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala new file mode 100644 index 0000000..996e808 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/RuleParser.scala @@ -0,0 +1,298 @@ +package org.apache.griffin.measure.batch.rule + +import org.apache.griffin.measure.batch.rule.expr._ + +import scala.util.parsing.combinator._ + +case class RuleParser() extends JavaTokenParsers with Serializable { + + /** + * BNF representation for grammar as below: + * + * <rule> ::= <logical-statement> [WHEN <logical-statement>] + * rule: mapping-rule [WHEN when-rule] + * - mapping-rule: the first level opr should better not be OR | NOT, otherwise it can't automatically find the groupby column + * - when-rule: only contain the general info of data source, not the special info of each data row + * + * <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")" + * logical-statement: return boolean value + * logical-operator: "AND" | "&&", "OR" | "||", "NOT" | "!" + * + * <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>) + * logical-expression example: $source.id = $target.id, $source.page_id IN ('3214', '4312', '60821') + * + * <compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">=" + * <range-opr> ::= ["NOT"] "IN" | "BETWEEN" + * <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")" + * range-expr example: ('3214', '4312', '60821'), (10, 15), () + * + * <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+ + * math-expr example: $source.price * $target.count, "hello" + " " + "world" + 123 + * + * <binary-opr> ::= "+" | "-" | "*" | "/" | "%" + * <unary-opr> ::= "+" | "-" + * + * <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")" + * + * <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+ + * selection example: $source.price, $source.json(), $source['state'], $source.numList[3], $target.json().mails['org' = 'apache'].names[*] + * + * <selection-head> ::= $source | $target + * + * <field-sel> ::= "." <field-string> + * + * <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")" + * <function-name> ::= <name-string> + * <arg> ::= <math-expr> + * + * <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]" + * <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*" + * index-field-range: 2 means the 3rd item, (0, 3) means first 4 items, * means all items, 'age' means item 'age' + * <index-field> ::= <index> | <field-quote> | <all-selection> + * index: 0 ~ n means position from start, -1 ~ -n means position from end + * <field-quote> ::= ' <field-string> ' | " <field-string> " + * + * <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]" + * <filter-compare-opr> ::= "=" | "!=" | "<" | ">" | "<=" | ">=" + * filter-sel example: ['name' = 'URL'], $source.man['age' > $source.graduate_age + 5 ] + * + * When <math-expr> in the selection, it mustn't contain the different <selection-head>, for example: + * $source.tags[1+2] valid + * $source.tags[$source.first] valid + * $source.tags[$target.first] invalid + * -- Such job is for validation, not for parser + * + * + * <literal> ::= <literal-string> | <literal-number> | <literal-time> | <literal-boolean> + * <literal-string> ::= <any-string> + * <literal-number> ::= <integer> | <double> + * <literal-time> ::= <integer> ("d"|"h"|"m"|"s"|"ms") + * <literal-boolean> ::= true | false + * + */ + + object Keyword { + def WhenKeywords: Parser[String] = """(?i)when""".r + def UnaryLogicalKeywords: Parser[String] = """(?i)not""".r + def BinaryLogicalKeywords: Parser[String] = """(?i)and|or""".r + def RangeKeywords: Parser[String] = """(?i)(not\s+)?(in|between)""".r + def DataSourceKeywords: Parser[String] = """(?i)\$(source|target)""".r + def Keywords: Parser[String] = WhenKeywords | UnaryLogicalKeywords | BinaryLogicalKeywords | RangeKeywords | DataSourceKeywords + } + import Keyword._ + + object Operator { + def NotLogicalOpr: Parser[String] = """(?i)not""".r | "!" + def AndLogicalOpr: Parser[String] = """(?i)and""".r | "&&" + def OrLogicalOpr: Parser[String] = """(?i)or""".r | "||" + def CompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r + def RangeOpr: Parser[String] = RangeKeywords + + def UnaryMathOpr: Parser[String] = "+" | "-" + def BinaryMathOpr1: Parser[String] = "*" | "/" | "%" + def BinaryMathOpr2: Parser[String] = "+" | "-" + + def FilterCompareOpr: Parser[String] = """!?==?""".r | """<=?""".r | """>=?""".r + + def SqBracketPair: (Parser[String], Parser[String]) = ("[", "]") + def BracketPair: (Parser[String], Parser[String]) = ("(", ")") + def Dot: Parser[String] = "." + def AllSelection: Parser[String] = "*" + def SQuote: Parser[String] = "'" + def DQuote: Parser[String] = "\"" + def Comma: Parser[String] = "," + } + import Operator._ + + object SomeString { + def AnyString: Parser[String] = """[^'\"{}\[\]()=<>.$@,;+\-*/\\]*""".r + def SimpleFieldString: Parser[String] = """\w+""".r + def FieldString: Parser[String] = """[\w\s]+""".r + def NameString: Parser[String] = """[a-zA-Z_]\w*""".r + } + import SomeString._ + + object SomeNumber { + def IntegerNumber: Parser[String] = """[+\-]?\d+""".r + def DoubleNumber: Parser[String] = """[+\-]?(\.\d+|\d+\.\d*)""".r + def IndexNumber: Parser[String] = IntegerNumber + } + import SomeNumber._ + + // -- literal -- + def literal: Parser[LiteralExpr] = literialString | literialTime | literialNumber | literialBoolean + def literialString: Parser[LiteralStringExpr] = (SQuote ~> AnyString <~ SQuote | DQuote ~> AnyString <~ DQuote) ^^ { LiteralStringExpr(_) } + def literialNumber: Parser[LiteralNumberExpr] = (DoubleNumber | IntegerNumber) ^^ { LiteralNumberExpr(_) } + def literialTime: Parser[LiteralTimeExpr] = """(\d+(d|h|m|s|ms))+""".r ^^ { LiteralTimeExpr(_) } + def literialBoolean: Parser[LiteralBooleanExpr] = ("""(?i)true""".r | """(?i)false""".r) ^^ { LiteralBooleanExpr(_) } + + // -- selection -- + // <selection> ::= <selection-head> [ <field-sel> | <function-operation> | <index-field-range-sel> | <filter-sel> ]+ + def selection: Parser[SelectionExpr] = selectionHead ~ rep(selector) ^^ { + case head ~ selectors => SelectionExpr(head, selectors) + } + def selector: Parser[SelectExpr] = (fieldSelect | functionOperation | indexFieldRangeSelect | filterSelect) + + def selectionHead: Parser[SelectionHead] = DataSourceKeywords ^^ { SelectionHead(_) } + // <field-sel> ::= "." <field-string> + def fieldSelect: Parser[IndexFieldRangeSelectExpr] = Dot ~> SimpleFieldString ^^ { + case field => IndexFieldRangeSelectExpr(FieldDesc(field) :: Nil) + } + // <function-operation> ::= "." <function-name> "(" <arg> [, <arg>]+ ")" + def functionOperation: Parser[FunctionOperationExpr] = Dot ~ NameString ~ BracketPair._1 ~ repsep(argument, Comma) ~ BracketPair._2 ^^ { + case _ ~ func ~ _ ~ args ~ _ => FunctionOperationExpr(func, args) + } + def argument: Parser[MathExpr] = mathExpr + // <index-field-range-sel> ::= "[" <index-field-range> [, <index-field-range>]+ "]" + def indexFieldRangeSelect: Parser[IndexFieldRangeSelectExpr] = SqBracketPair._1 ~> rep1sep(indexFieldRange, Comma) <~ SqBracketPair._2 ^^ { + case ifrs => IndexFieldRangeSelectExpr(ifrs) + } + // <index-field-range> ::= <index-field> | (<index-field>, <index-field>) | "*" + def indexFieldRange: Parser[FieldDescOnly] = indexField | BracketPair._1 ~ indexField ~ Comma ~ indexField ~ BracketPair._2 ^^ { + case _ ~ if1 ~ _ ~ if2 ~ _ => FieldRangeDesc(if1, if2) + } + // <index-field> ::= <index> | <field-quote> | <all-selection> + // *here it can parse <math-expr>, but for simple situation, not supported now* + def indexField: Parser[FieldDescOnly] = IndexNumber ^^ { IndexDesc(_) } | fieldQuote | AllSelection ^^ { AllFieldsDesc(_) } + // <field-quote> ::= ' <field-string> ' | " <field-string> " + def fieldQuote: Parser[FieldDesc] = (SQuote ~> FieldString <~ SQuote | DQuote ~> FieldString <~ DQuote) ^^ { FieldDesc(_) } + // <filter-sel> ::= "[" <field-quote> <filter-compare-opr> <math-expr> "]" + def filterSelect: Parser[FilterSelectExpr] = SqBracketPair._1 ~> fieldQuote ~ FilterCompareOpr ~ mathExpr <~ SqBracketPair._2 ^^ { + case field ~ compare ~ value => FilterSelectExpr(field, compare, value) + } + + // -- math -- + // <math-factor> ::= <literal> | <selection> | "(" <math-expr> ")" + def mathFactor: Parser[MathExpr] = (literal | selection | BracketPair._1 ~> mathExpr <~ BracketPair._2) ^^ { MathFactorExpr(_) } + // <math-expr> ::= [<unary-opr>] <math-factor> [<binary-opr> <math-factor>]+ + // <unary-opr> ::= "+" | "-" + def unaryMathExpr: Parser[MathExpr] = rep(UnaryMathOpr) ~ mathFactor ^^ { + case Nil ~ a => a + case list ~ a => UnaryMathExpr(list, a) + } + // <binary-opr> ::= "+" | "-" | "*" | "/" | "%" + def binaryMathExpr1: Parser[MathExpr] = unaryMathExpr ~ rep(BinaryMathOpr1 ~ unaryMathExpr) ^^ { + case a ~ Nil => a + case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2))) + } + def binaryMathExpr2: Parser[MathExpr] = binaryMathExpr1 ~ rep(BinaryMathOpr2 ~ binaryMathExpr1) ^^ { + case a ~ Nil => a + case a ~ list => BinaryMathExpr(a, list.map(c => (c._1, c._2))) + } + def mathExpr: Parser[MathExpr] = binaryMathExpr2 + + // -- logical expression -- + // <range-expr> ::= "(" [<math-expr>] [, <math-expr>]+ ")" + def rangeExpr: Parser[RangeDesc] = BracketPair._1 ~> repsep(mathExpr, Comma) <~ BracketPair._2 ^^ { RangeDesc(_) } + // <logical-expression> ::= <math-expr> (<compare-opr> <math-expr> | <range-opr> <range-expr>) + def logicalExpr: Parser[LogicalExpr] = mathExpr ~ CompareOpr ~ mathExpr ^^ { + case left ~ opr ~ right => LogicalCompareExpr(left, opr, right) + } | mathExpr ~ RangeOpr ~ rangeExpr ^^ { + case left ~ opr ~ range => LogicalRangeExpr(left, opr, range) + } | mathExpr ^^ { LogicalSimpleExpr(_) } + + // -- logical statement -- + def logicalFactor: Parser[LogicalExpr] = logicalExpr | BracketPair._1 ~> logicalStatement <~ BracketPair._2 + def notLogicalStatement: Parser[LogicalExpr] = rep(NotLogicalOpr) ~ logicalFactor ^^ { + case Nil ~ a => a + case list ~ a => UnaryLogicalExpr(list, a) + } + def andLogicalStatement: Parser[LogicalExpr] = notLogicalStatement ~ rep(AndLogicalOpr ~ notLogicalStatement) ^^ { + case a ~ Nil => a + case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2))) + } + def orLogicalStatement: Parser[LogicalExpr] = andLogicalStatement ~ rep(OrLogicalOpr ~ andLogicalStatement) ^^ { + case a ~ Nil => a + case a ~ list => BinaryLogicalExpr(a, list.map(c => (c._1, c._2))) + } + // <logical-statement> ::= [NOT] <logical-expression> [(AND | OR) <logical-expression>]+ | "(" <logical-statement> ")" + def logicalStatement: Parser[LogicalExpr] = orLogicalStatement + + // -- rule -- + // <rule> ::= <logical-statement> [WHEN <logical-statement>] + def rule: Parser[StatementExpr] = logicalStatement ~ opt(WhenKeywords ~> logicalStatement) ^^ { + case ls ~ Some(ws) => WhenClauseStatementExpr(ls, ws) + case ls ~ _ => SimpleStatementExpr(ls) + } + + // for complie only +// case class NullStatementExpr(expression: String) extends StatementExpr { +// def genValue(values: Map[String, Any]): Option[Any] = None +// def getDataRelatedExprs(dataSign: String): Iterable[DataExpr] = Nil +// } +// def statementsExpr = mathExpr ^^ { NullStatementExpr(_) } + + +// +// // basic +// val anyString: Parser[String] = """[^'{}\[\]()=<>.$@;+\-*/\\\"]*""".r +// val variable: Parser[String] = """[a-zA-Z_]\w*""".r +// val number: Parser[String] = """[+\-]?\d+""".r +// val time: Parser[String] = """\d+(y|M|w|d|h|m|s|ms)""".r +// +// val numPosition: Parser[String] = """\d+""".r +// val anyPosition: Parser[String] = "*" +// +// val filterOpr: Parser[String] = "=" | "!=" | ">" | "<" | ">=" | "<=" +// +// val opr1: Parser[String] = "*" | "/" | "%" +// val opr2: Parser[String] = "+" | "-" +// +// val assignOpr: Parser[String] = "=" +// val compareOpr: Parser[String] = "==" | "!=" | ">" | "<" | ">=" | "<=" +// val mappingOpr: Parser[String] = "===" +// +// val exprSep: Parser[String] = ";" +// +// // simple +// def variableString: Parser[VariableExpr] = (("'" ~> anyString <~ "'") | ("\"" ~> anyString <~ "\"")) ^^ { VariableStringExpr(_) } +// def constString: Parser[ConstExpr] = (("'" ~> anyString <~ "'") | ("\"" ~> anyString <~ "\"")) ^^ { ConstStringExpr(_) } +// def constValue: Parser[ConstExpr] = time ^^ { ConstTimeExpr(_) } | number ^^ { ConstNumberExpr(_)} | constString +// def variableValue: Parser[VariableExpr] = variable ^^ { VariableStringExpr(_) } +// def quoteVariableValue: Parser[QuoteVariableExpr] = "${" ~> variable <~ "}" ^^ { QuoteVariableExpr(_) } +// def position: Parser[SelectExpr] = anyPosition ^^ { AnyPositionExpr(_) } | """\d+""".r ^^ { NumPositionExpr(_) } | (("'" ~> anyString <~ "'") | ("\"" ~> anyString <~ "\"")) ^^ { StringPositionExpr(_) } +// def argument: Parser[ConstExpr] = constValue +// def annotationExpr: Parser[AnnotationExpr] = "@" ~> variable ^^ { AnnotationExpr(_) } +// +// // selector +// def filterOpration: Parser[SelectExpr] = (variableString ~ filterOpr ~ constString) ^^ { +// case v ~ opr ~ c => FilterOprExpr(opr, v, c) +// } +// def positionExpr: Parser[SelectExpr] = "[" ~> (filterOpration | position) <~ "]" +// def functionExpr: Parser[SelectExpr] = "." ~ variable ~ "(" ~ repsep(argument, ",") ~ ")" ^^ { +// case _ ~ v ~ _ ~ args ~ _ => FunctionExpr(v, args) +// } +// def selectorExpr: Parser[SelectExpr] = positionExpr | functionExpr +// +// // data +// def selectorsExpr: Parser[DataExpr] = quoteVariableValue ~ rep(selectorExpr) ^^ { +// case q ~ tails => SelectionExpr(q, tails) +// } +// +// // calculation +// def factor: Parser[ElementExpr] = (constValue | selectorsExpr | "(" ~> expr <~ ")") ^^ { FactorExpr(_) } +// def term: Parser[ElementExpr] = factor ~ rep(opr1 ~ factor) ^^ { +// case a ~ Nil => a +// case a ~ list => CalculationExpr(a, list.map(c => (c._1, c._2))) +// } +// def expr: Parser[ElementExpr] = term ~ rep(opr2 ~ term) ^^ { +// case a ~ Nil => a +// case a ~ list => CalculationExpr(a, list.map(c => (c._1, c._2))) +// } +// +// // statement +// def assignExpr: Parser[StatementExpr] = variableValue ~ assignOpr ~ expr ^^ { +// case v ~ opr ~ c => AssignExpr(opr, v, c) +// } +// def conditionExpr: Parser[StatementExpr] = rep(annotationExpr) ~ expr ~ compareOpr ~ expr ^^ { +// case anos ~ le ~ opr ~ re => ConditionExpr(opr, le, re, anos) +// } +// def mappingExpr: Parser[StatementExpr] = rep(annotationExpr) ~ expr ~ mappingOpr ~ expr ^^ { +// case anos ~ le ~ opr ~ re => MappingExpr(opr, le, re, anos) +// } +// def statementExpr: Parser[StatementExpr] = assignExpr | conditionExpr | mappingExpr +// +// // statements +// def statementsExpr: Parser[StatementExpr] = repsep(statementExpr, exprSep) ^^ { StatementsExpr(_) } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala new file mode 100644 index 0000000..0d54707 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/AnalyzableExpr.scala @@ -0,0 +1,7 @@ +package org.apache.griffin.measure.batch.rule.expr + + +trait AnalyzableExpr extends Serializable { + def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = Nil + def getWhenClauseExpr(): Option[LogicalExpr] = None +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala new file mode 100644 index 0000000..e062376 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Cacheable.scala @@ -0,0 +1,15 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait Cacheable extends DataSourceable { + protected def cacheUnit: Boolean = false + def cacheable(ds: String): Boolean = { + cacheUnit && !conflict() && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && contains(ds))) + } + protected def getCacheExprs(ds: String): Iterable[Cacheable] + + protected def persistUnit: Boolean = false + def persistable(ds: String): Boolean = { + persistUnit && ((ds.isEmpty && dataSources.isEmpty) || (ds.nonEmpty && contains(ds))) + } + protected def getPersistExprs(ds: String): Iterable[Cacheable] +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala new file mode 100644 index 0000000..8018c19 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Calculatable.scala @@ -0,0 +1,7 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait Calculatable extends Serializable { + + def calculate(values: Map[String, Any]): Option[Any] + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala new file mode 100644 index 0000000..f18798a --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/DataSourceable.scala @@ -0,0 +1,10 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait DataSourceable extends Serializable { + val dataSources: Set[String] + protected def conflict(): Boolean = dataSources.size > 1 + def contains(ds: String): Boolean = dataSources.contains(ds) + def dataSourceOpt: Option[String] = { + if (dataSources.size == 1) Some(dataSources.head) else None + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala new file mode 100644 index 0000000..38758a2 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Describable.scala @@ -0,0 +1,15 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait Describable extends Serializable { + + val desc: String + + protected def describe(v: Any): String = { + v match { + case s: Describable => s"${s.desc}" + case s: String => s"'${s}'" + case a => s"${a}" + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala new file mode 100644 index 0000000..d7810aa --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/Expr.scala @@ -0,0 +1,33 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait Expr extends Serializable with Describable with Cacheable with Calculatable { + + protected val _defaultId: String = ExprIdCounter.emptyId + + val _id = ExprIdCounter.genId(_defaultId) + + protected def getSubCacheExprs(ds: String): Iterable[Expr] = Nil + final def getCacheExprs(ds: String): Iterable[Expr] = { + if (cacheable(ds)) getSubCacheExprs(ds).toList :+ this else getSubCacheExprs(ds) + } + + protected def getSubFinalCacheExprs(ds: String): Iterable[Expr] = Nil + final def getFinalCacheExprs(ds: String): Iterable[Expr] = { + if (cacheable(ds)) Nil :+ this else getSubFinalCacheExprs(ds) + } + + protected def getSubPersistExprs(ds: String): Iterable[Expr] = Nil + final def getPersistExprs(ds: String): Iterable[Expr] = { + if (persistable(ds)) getSubPersistExprs(ds).toList :+ this else getSubPersistExprs(ds) + } + + final def calculate(values: Map[String, Any]): Option[Any] = { + values.get(_id) match { + case Some(v) => Some(v) + case _ => calculateOnly(values) + } + } + protected def calculateOnly(values: Map[String, Any]): Option[Any] + +} + http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala new file mode 100644 index 0000000..0bb5085 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprDescOnly.scala @@ -0,0 +1,22 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait ExprDescOnly extends Describable { + +} + + +case class SelectionHead(expr: String) extends ExprDescOnly { + private val headRegex = """\$(\w+)""".r + val head: String = expr match { + case headRegex(v) => v.toLowerCase + case _ => expr + } + val desc: String = "$" + head +} + +case class RangeDesc(elements: Iterable[MathExpr]) extends ExprDescOnly { + val desc: String = { + val rangeDesc = elements.map(_.desc).mkString(", ") + s"(${rangeDesc})" + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala new file mode 100644 index 0000000..56e7daa --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/ExprIdCounter.scala @@ -0,0 +1,42 @@ +package org.apache.griffin.measure.batch.rule.expr + +import java.util.concurrent.atomic.AtomicLong + +import scala.collection.mutable.{Set => MutableSet} + +object ExprIdCounter { + + private val idCounter: AtomicLong = new AtomicLong(0L) + + private val existIdSet: MutableSet[String] = MutableSet.empty[String] + + private val invalidIdRegex = """^\d+$""".r + + val emptyId: String = "" + + def genId(defaultId: String): String = { + defaultId match { + case emptyId => increment.toString + case invalidIdRegex() => increment.toString +// case defId if (exist(defId)) => s"${increment}#${defId}" + case defId if (exist(defId)) => s"${defId}" + case _ => { + insertUserId(defaultId) + defaultId + } + } + } + + private def exist(id: String): Boolean = { + existIdSet.contains(id) + } + + private def insertUserId(id: String): Unit = { + existIdSet += id + } + + private def increment(): Long = { + idCounter.incrementAndGet() + } + +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala new file mode 100644 index 0000000..f13f15a --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/FieldDescOnly.scala @@ -0,0 +1,40 @@ +package org.apache.griffin.measure.batch.rule.expr + +import scala.util.{Success, Try} + +trait FieldDescOnly extends Describable with DataSourceable { + +} + +case class IndexDesc(expr: String) extends FieldDescOnly { + val index: Int = { + Try(expr.toInt) match { + case Success(v) => v + case _ => throw new Exception(s"${expr} is invalid index") + } + } + val desc: String = describe(index) + val dataSources: Set[String] = Set.empty[String] +} + +case class FieldDesc(expr: String) extends FieldDescOnly { + val field: String = expr + val desc: String = describe(field) + val dataSources: Set[String] = Set.empty[String] +} + +case class AllFieldsDesc(expr: String) extends FieldDescOnly { + val allFields: String = expr + val desc: String = allFields + val dataSources: Set[String] = Set.empty[String] +} + +case class FieldRangeDesc(startField: FieldDescOnly, endField: FieldDescOnly) extends FieldDescOnly { + val desc: String = { + (startField, endField) match { + case (f1: IndexDesc, f2: IndexDesc) => s"(${f1.desc}, ${f2.desc})" + case _ => throw new Exception("invalid field range description") + } + } + val dataSources: Set[String] = Set.empty[String] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala new file mode 100644 index 0000000..020ddc2 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LiteralExpr.scala @@ -0,0 +1,68 @@ +package org.apache.griffin.measure.batch.rule.expr + +import scala.util.{Failure, Success, Try} + +trait LiteralExpr extends Expr { + val value: Option[Any] + def calculateOnly(values: Map[String, Any]): Option[Any] = value + val dataSources: Set[String] = Set.empty[String] +} + +case class LiteralStringExpr(expr: String) extends LiteralExpr { + val value: Option[String] = Some(expr) + val desc: String = value.getOrElse("") +} + +case class LiteralNumberExpr(expr: String) extends LiteralExpr { + val value: Option[Any] = { + if (expr.contains(".")) { + Try (expr.toDouble) match { + case Success(v) => Some(v) + case _ => throw new Exception(s"${expr} is invalid number") + } + } else { + Try (expr.toLong) match { + case Success(v) => Some(v) + case _ => throw new Exception(s"${expr} is invalid number") + } + } + } + val desc: String = value.getOrElse("").toString +} + +case class LiteralTimeExpr(expr: String) extends LiteralExpr { + final val TimeRegex = """(\d+)(d|h|m|s|ms)""".r + val value: Option[Long] = { + Try { + expr match { + case TimeRegex(time, unit) => { + val t = time.toLong + unit match { + case "d" => t * 24 * 60 * 60 * 1000 + case "h" => t * 60 * 60 * 1000 + case "m" => t * 60 * 1000 + case "s" => t * 1000 + case "ms" => t + case _ => throw new Exception(s"${expr} is invalid time format") + } + } + case _ => throw new Exception(s"${expr} is invalid time format") + } + } match { + case Success(v) => Some(v) + case Failure(ex) => throw ex + } + } + val desc: String = expr +} + +case class LiteralBooleanExpr(expr: String) extends LiteralExpr { + final val TrueRegex = """(?i)true""".r + final val FalseRegex = """(?i)false""".r + val value: Option[Boolean] = expr match { + case TrueRegex() => Some(true) + case FalseRegex() => Some(false) + case _ => throw new Exception(s"${expr} is invalid boolean") + } + val desc: String = value.getOrElse("").toString +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala new file mode 100644 index 0000000..611da38 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/LogicalExpr.scala @@ -0,0 +1,159 @@ +package org.apache.griffin.measure.batch.rule.expr + +import org.apache.griffin.measure.batch.utils.CalculationUtil._ + +trait LogicalExpr extends Expr with AnalyzableExpr { + override def cacheUnit: Boolean = true +} + +case class LogicalSimpleExpr(expr: MathExpr) extends LogicalExpr { + def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values) + val desc: String = expr.desc + val dataSources: Set[String] = expr.dataSources + override def cacheUnit: Boolean = false + override def getSubCacheExprs(ds: String): Iterable[Expr] = expr.getCacheExprs(ds) + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = expr.getFinalCacheExprs(ds) + override def getSubPersistExprs(ds: String): Iterable[Expr] = expr.getPersistExprs(ds) +} + +case class LogicalCompareExpr(left: MathExpr, compare: String, right: MathExpr) extends LogicalExpr { + private val (eqOpr, neqOpr, btOpr, bteOpr, ltOpr, lteOpr) = ("""==?""".r, """!==?""".r, ">", ">=", "<", "<=") + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val (lv, rv) = (left.calculate(values), right.calculate(values)) + compare match { + case this.eqOpr() => lv === rv + case this.neqOpr() => lv =!= rv + case this.btOpr => lv > rv + case this.bteOpr => lv >= rv + case this.ltOpr => lv < rv + case this.lteOpr => lv <= rv + case _ => None + } + } + val desc: String = s"${left.desc} ${compare} ${right.desc}" + val dataSources: Set[String] = left.dataSources ++ right.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + left.getCacheExprs(ds) ++ right.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + left.getFinalCacheExprs(ds) ++ right.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + left.getPersistExprs(ds) ++ right.getPersistExprs(ds) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = { + if (compare == "=" || compare == "==") { + (left.dataSourceOpt, right.dataSourceOpt) match { + case (Some(dsPair._1), Some(dsPair._2)) => (left, right) :: Nil + case (Some(dsPair._2), Some(dsPair._1)) => (right, left) :: Nil + case _ => Nil + } + } else Nil + } +} + +case class LogicalRangeExpr(left: MathExpr, rangeOpr: String, range: RangeDesc) extends LogicalExpr { + private val (inOpr, ninOpr, btwnOpr, nbtwnOpr) = ("""(?i)in""".r, """(?i)not\s+in""".r, """(?i)between""".r, """(?i)not\s+between""".r) + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val (lv, rvs) = (left.calculate(values), range.elements.map(_.calculate(values))) + rangeOpr match { + case this.inOpr() => lv in rvs + case this.ninOpr() => lv not_in rvs + case this.btwnOpr() => lv between rvs + case this.nbtwnOpr() => lv not_between rvs + case _ => None + } + } + val desc: String = s"${left.desc} ${rangeOpr} ${range.desc}" + val dataSources: Set[String] = left.dataSources ++ range.elements.flatMap(_.dataSources).toSet + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + left.getCacheExprs(ds) ++ range.elements.flatMap(_.getCacheExprs(ds)) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + left.getFinalCacheExprs(ds) ++ range.elements.flatMap(_.getFinalCacheExprs(ds)) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + left.getPersistExprs(ds) ++ range.elements.flatMap(_.getPersistExprs(ds)) + } +} + +// -- logical statement -- +//case class LogicalFactorExpr(self: LogicalExpr) extends LogicalExpr { +// def calculate(values: Map[String, Any]): Option[Any] = self.calculate(values) +// val desc: String = self.desc +//} + +case class UnaryLogicalExpr(oprList: Iterable[String], factor: LogicalExpr) extends LogicalExpr { + private val notOpr = """(?i)not|!""".r + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val fv = factor.calculate(values) + oprList.foldRight(fv) { (opr, v) => + opr match { + case this.notOpr() => !v + case _ => None + } + } + } + val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev}${ex}" } + val dataSources: Set[String] = factor.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + factor.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + factor.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + factor.getPersistExprs(ds) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = { + val notOprList = oprList.filter { opr => + opr match { + case this.notOpr() => true + case _ => false + } + } + if (notOprList.size % 2 == 0) factor.getGroupbyExprPairs(dsPair) else Nil + } +} + +case class BinaryLogicalExpr(first: LogicalExpr, others: Iterable[(String, LogicalExpr)]) extends LogicalExpr { + private val (andOpr, orOpr) = ("""(?i)and|&&""".r, """(?i)or|\|\|""".r) + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val fv = first.calculate(values) + others.foldLeft(fv) { (v, pair) => + val (opr, next) = pair + val nv = next.calculate(values) + opr match { + case this.andOpr() => v && nv + case this.orOpr() => v || nv + case _ => None + } + } + } + val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" } + val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds)) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds)) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds)) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = { + if (others.isEmpty) first.getGroupbyExprPairs(dsPair) + else { + val isAnd = others.exists(_._1 match { + case this.andOpr() => true + case _ => false + }) + if (isAnd) { + first.getGroupbyExprPairs(dsPair) ++ others.flatMap(_._2.getGroupbyExprPairs(dsPair)) + } else Nil + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala new file mode 100644 index 0000000..db09a0c --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/MathExpr.scala @@ -0,0 +1,79 @@ +package org.apache.griffin.measure.batch.rule.expr + +import org.apache.griffin.measure.batch.utils.CalculationUtil._ + +trait MathExpr extends Expr { + +} + +case class MathFactorExpr(self: Expr) extends MathExpr { + def calculateOnly(values: Map[String, Any]): Option[Any] = self.calculate(values) + val desc: String = self.desc + val dataSources: Set[String] = self.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + self.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + self.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + self.getPersistExprs(ds) + } +} + +case class UnaryMathExpr(oprList: Iterable[String], factor: Expr) extends MathExpr { + private val (posOpr, negOpr) = ("+", "-") + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val fv = factor.calculate(values) + oprList.foldRight(fv) { (opr, v) => + opr match { + case this.posOpr => v + case this.negOpr => -v + case _ => None + } + } + } + val desc: String = oprList.foldRight(factor.desc) { (prev, ex) => s"${prev}${ex}" } + val dataSources: Set[String] = factor.dataSources + override def cacheUnit: Boolean = true + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + factor.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + factor.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + factor.getPersistExprs(ds) + } +} + +case class BinaryMathExpr(first: MathExpr, others: Iterable[(String, MathExpr)]) extends MathExpr { + private val (addOpr, subOpr, mulOpr, divOpr, modOpr) = ("+", "-", "*", "/", "%") + def calculateOnly(values: Map[String, Any]): Option[Any] = { + val fv = first.calculate(values) + others.foldLeft(fv) { (v, pair) => + val (opr, next) = pair + val nv = next.calculate(values) + opr match { + case this.addOpr => v + nv + case this.subOpr => v - nv + case this.mulOpr => v * nv + case this.divOpr => v / nv + case this.modOpr => v % nv + case _ => None + } + } + } + val desc: String = others.foldLeft(first.desc) { (ex, next) => s"${ex} ${next._1} ${next._2.desc}" } + val dataSources: Set[String] = first.dataSources ++ others.flatMap(_._2.dataSources).toSet + override def cacheUnit: Boolean = true + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + first.getCacheExprs(ds) ++ others.flatMap(_._2.getCacheExprs(ds)) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + first.getFinalCacheExprs(ds) ++ others.flatMap(_._2.getFinalCacheExprs(ds)) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + first.getPersistExprs(ds) ++ others.flatMap(_._2.getPersistExprs(ds)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala new file mode 100644 index 0000000..52ebe21 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/SelectExpr.scala @@ -0,0 +1,53 @@ +package org.apache.griffin.measure.batch.rule.expr + +trait SelectExpr extends Expr { + def calculateOnly(values: Map[String, Any]): Option[Any] = None +} + +case class IndexFieldRangeSelectExpr(fields: Iterable[FieldDescOnly]) extends SelectExpr { + val desc: String = s"[${fields.map(_.desc).mkString(",")}]" + val dataSources: Set[String] = Set.empty[String] +} + +case class FunctionOperationExpr(func: String, args: Iterable[MathExpr]) extends SelectExpr { + val desc: String = s".${func}(${args.map(_.desc).mkString(",")})" + val dataSources: Set[String] = args.flatMap(_.dataSources).toSet + override def getSubCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getCacheExprs(ds)) + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = args.flatMap(_.getFinalCacheExprs(ds)) + override def getSubPersistExprs(ds: String): Iterable[Expr] = args.flatMap(_.getPersistExprs(ds)) +} + +case class FilterSelectExpr(field: FieldDesc, compare: String, value: MathExpr) extends SelectExpr { + val desc: String = s"[${field.desc}${compare}${value.desc}]" + val dataSources: Set[String] = value.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = value.getCacheExprs(ds) + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = value.getFinalCacheExprs(ds) + override def getSubPersistExprs(ds: String): Iterable[Expr] = value.getPersistExprs(ds) +} + +// -- selection -- +case class SelectionExpr(head: SelectionHead, selectors: Iterable[SelectExpr]) extends Expr { + def calculateOnly(values: Map[String, Any]): Option[Any] = values.get(_id) + + val desc: String = { + val argsString = selectors.map(_.desc).mkString("") + s"${head.desc}${argsString}" + } + val dataSources: Set[String] = { + val selectorDataSources = selectors.flatMap(_.dataSources).toSet + selectorDataSources + head.head + } + + override def cacheUnit: Boolean = true + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + selectors.flatMap(_.getCacheExprs(ds)) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + selectors.flatMap(_.getFinalCacheExprs(ds)) + } + + override def persistUnit: Boolean = true + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + selectors.flatMap(_.getPersistExprs(ds)) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f629d0f4/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala ---------------------------------------------------------------------- diff --git a/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala new file mode 100644 index 0000000..a872e18 --- /dev/null +++ b/measure/measure-batch/src/main/scala/org/apache/griffin/measure/batch/rule/expr/StatementExpr.scala @@ -0,0 +1,52 @@ +package org.apache.griffin.measure.batch.rule.expr + + +trait StatementExpr extends Expr with AnalyzableExpr { + def valid(values: Map[String, Any]): Boolean = true + override def cacheUnit: Boolean = true +} + +case class SimpleStatementExpr(expr: LogicalExpr) extends StatementExpr { + def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values) + val desc: String = expr.desc + val dataSources: Set[String] = expr.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + expr.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + expr.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + expr.getPersistExprs(ds) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = expr.getGroupbyExprPairs(dsPair) +} + +case class WhenClauseStatementExpr(expr: LogicalExpr, whenExpr: LogicalExpr) extends StatementExpr { + def calculateOnly(values: Map[String, Any]): Option[Any] = expr.calculate(values) + val desc: String = s"${expr.desc} when ${whenExpr.desc}" + + override def valid(values: Map[String, Any]): Boolean = { + whenExpr.calculate(values) match { + case Some(r: Boolean) => r + case _ => false + } + } + + val dataSources: Set[String] = expr.dataSources ++ whenExpr.dataSources + override def getSubCacheExprs(ds: String): Iterable[Expr] = { + expr.getCacheExprs(ds) ++ whenExpr.getCacheExprs(ds) + } + override def getSubFinalCacheExprs(ds: String): Iterable[Expr] = { + expr.getFinalCacheExprs(ds) ++ whenExpr.getFinalCacheExprs(ds) + } + override def getSubPersistExprs(ds: String): Iterable[Expr] = { + expr.getPersistExprs(ds) ++ whenExpr.getPersistExprs(ds) + } + + override def getGroupbyExprPairs(dsPair: (String, String)): Seq[(MathExpr, MathExpr)] = { + expr.getGroupbyExprPairs(dsPair) ++ whenExpr.getGroupbyExprPairs(dsPair) + } + override def getWhenClauseExpr(): Option[LogicalExpr] = Some(whenExpr) +} \ No newline at end of file
