http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala new file mode 100644 index 0000000..b641ec5 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelMeta.scala @@ -0,0 +1,150 @@ +package org.apache.s2graph.core.mysqls + +/** + * Created by shon on 6/3/15. + */ + +import org.apache.s2graph.core.GraphExceptions.MaxPropSizeReachedException +import org.apache.s2graph.core.{GraphExceptions, JSONParser} +import play.api.libs.json.Json +import scalikejdbc._ + +object LabelMeta extends Model[LabelMeta] with JSONParser { + + /** dummy sequences */ + + val fromSeq = -4.toByte + val toSeq = -5.toByte + val lastOpSeq = -3.toByte + val lastDeletedAt = -2.toByte + val timeStampSeq = 0.toByte + val countSeq = (Byte.MaxValue - 2).toByte + val degreeSeq = (Byte.MaxValue - 1).toByte + val maxValue = Byte.MaxValue + val emptyValue = Byte.MaxValue + + /** reserved sequences */ + // val deleted = LabelMeta(id = Some(lastDeletedAt), labelId = lastDeletedAt, name = "lastDeletedAt", + // seq = lastDeletedAt, defaultValue = "", dataType = "long") + val from = LabelMeta(id = Some(fromSeq), labelId = fromSeq, name = "_from", + seq = fromSeq, defaultValue = fromSeq.toString, dataType = "long") + val to = LabelMeta(id = Some(toSeq), labelId = toSeq, name = "_to", + seq = toSeq, defaultValue = toSeq.toString, dataType = "long") + val timestamp = LabelMeta(id = Some(-1), labelId = -1, name = "_timestamp", + seq = timeStampSeq, defaultValue = "0", dataType = "long") + val degree = LabelMeta(id = Some(-1), labelId = -1, name = "_degree", + seq = degreeSeq, defaultValue = "0", dataType = "long") + val count = LabelMeta(id = Some(-1), labelId = -1, name = "_count", + seq = countSeq, defaultValue = "-1", dataType = "long") + + // Each reserved column(_timestamp, timestamp) has same seq number, starts with '_' has high priority + val reservedMetas = List(from, to, degree, timestamp, count).flatMap { lm => List(lm, lm.copy(name = lm.name.drop(1))) }.reverse + val reservedMetasInner = List(from, to, degree, timestamp, count) + + def apply(rs: WrappedResultSet): LabelMeta = { + LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), rs.byte("seq"), rs.string("default_value"), rs.string("data_type").toLowerCase) + } + + def isValidSeq(seq: Byte): Boolean = seq >= 0 && seq <= countSeq + def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq + + def findById(id: Int)(implicit session: DBSession = AutoSession): LabelMeta = { + val cacheKey = "id=" + id + + withCache(cacheKey) { + sql"""select * from label_metas where id = ${id}""".map { rs => LabelMeta(rs) }.single.apply + }.get + } + + def findAllByLabelId(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[LabelMeta] = { + val cacheKey = "labelId=" + labelId + lazy val labelMetas = sql"""select * + from label_metas + where label_id = ${labelId} order by seq ASC""".map(LabelMeta(_)).list.apply() + + if (useCache) withCaches(cacheKey)(labelMetas) + else labelMetas + } + + def findByName(labelId: Int, name: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelMeta] = { + name match { + case timestamp.name => Some(timestamp) + case from.name => Some(from) + case to.name => Some(to) + case _ => + val cacheKey = "labelId=" + labelId + ":name=" + name + lazy val labelMeta = sql""" + select * + from label_metas where label_id = ${labelId} and name = ${name}""" + .map { rs => LabelMeta(rs) }.single.apply() + + if (useCache) withCache(cacheKey)(labelMeta) + else labelMeta + } + } + + def insert(labelId: Int, name: String, defaultValue: String, dataType: String)(implicit session: DBSession = AutoSession) = { + val ls = findAllByLabelId(labelId, useCache = false) + val seq = ls.size + 1 + + if (seq < maxValue) { + sql"""insert into label_metas(label_id, name, seq, default_value, data_type) + select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}""".updateAndReturnGeneratedKey.apply() + } else { + throw MaxPropSizeReachedException("max property size reached") + } + } + + def findOrInsert(labelId: Int, + name: String, + defaultValue: String, + dataType: String)(implicit session: DBSession = AutoSession): LabelMeta = { + + findByName(labelId, name) match { + case Some(c) => c + case None => + insert(labelId, name, defaultValue, dataType) + val cacheKey = "labelId=" + labelId + ":name=" + name + val cacheKeys = "labelId=" + labelId + expireCache(cacheKey) + expireCaches(cacheKeys) + findByName(labelId, name, useCache = false).get + } + } + + def delete(id: Int)(implicit session: DBSession = AutoSession) = { + val labelMeta = findById(id) + val (labelId, name) = (labelMeta.labelId, labelMeta.name) + sql"""delete from label_metas where id = ${id}""".execute.apply() + val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:name=$name") + cacheKeys.foreach { key => + expireCache(key) + expireCaches(key) + } + } + + def findAll()(implicit session: DBSession = AutoSession) = { + val ls = sql"""select * from label_metas""".map { rs => LabelMeta(rs) }.list.apply + putsToCache(ls.map { x => + val cacheKey = s"id=${x.id.get}" + cacheKey -> x + }) + putsToCache(ls.map { x => + val cacheKey = s"labelId=${x.labelId}:name=${x.name}" + cacheKey -> x + }) + putsToCache(ls.map { x => + val cacheKey = s"labelId=${x.labelId}:seq=${x.seq}" + cacheKey -> x + }) + + putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) => + val cacheKey = s"labelId=${labelId}" + cacheKey -> ls + }.toList) + } +} + +case class LabelMeta(id: Option[Int], labelId: Int, name: String, seq: Byte, defaultValue: String, dataType: String) extends JSONParser { + lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, "dataType" -> dataType) +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala new file mode 100644 index 0000000..700b8d7 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Model.scala @@ -0,0 +1,100 @@ +package org.apache.s2graph.core.mysqls + +import java.util.concurrent.Executors + +import com.typesafe.config.Config +import org.apache.s2graph.core.utils.{SafeUpdateCache, logger} +import scalikejdbc._ + +import scala.concurrent.ExecutionContext +import scala.language.{higherKinds, implicitConversions} +import scala.util.{Failure, Try} + +object Model { + var maxSize = 10000 + var ttl = 60 + val numOfThread = Runtime.getRuntime.availableProcessors() + val threadPool = Executors.newFixedThreadPool(numOfThread) + val ec = ExecutionContext.fromExecutor(threadPool) + + def apply(config: Config) = { + maxSize = config.getInt("cache.max.size") + ttl = config.getInt("cache.ttl.seconds") + Class.forName(config.getString("db.default.driver")) + + val settings = ConnectionPoolSettings( + initialSize = 10, + maxSize = 10, + connectionTimeoutMillis = 30000L, + validationQuery = "select 1;") + + ConnectionPool.singleton( + config.getString("db.default.url"), + config.getString("db.default.user"), + config.getString("db.default.password"), + settings) + } + + def withTx[T](block: DBSession => T): Try[T] = { + using(DB(ConnectionPool.borrow())) { conn => + val res = Try { + conn.begin() + val session = conn.withinTxSession() + val result = block(session) + + conn.commit() + + result + } recoverWith { + case e: Exception => + conn.rollbackIfActive() + Failure(e) + } + + res + } + } + + def shutdown() = { + ConnectionPool.closeAll() + } + + def loadCache() = { + Service.findAll() + ServiceColumn.findAll() + Label.findAll() + LabelMeta.findAll() + LabelIndex.findAll() + ColumnMeta.findAll() + } +} + +trait Model[V] extends SQLSyntaxSupport[V] { + + import Model._ + + implicit val ec: ExecutionContext = Model.ec + + val cName = this.getClass.getSimpleName() + logger.info(s"LocalCache[$cName]: TTL[$ttl], MaxSize[$maxSize]") + + val optionCache = new SafeUpdateCache[Option[V]](cName, maxSize, ttl) + val listCache = new SafeUpdateCache[List[V]](cName, maxSize, ttl) + + val withCache = optionCache.withCache _ + + val withCaches = listCache.withCache _ + + val expireCache = optionCache.invalidate _ + + val expireCaches = listCache.invalidate _ + + def putsToCache(kvs: List[(String, V)]) = kvs.foreach { + case (key, value) => optionCache.put(key, Option(value)) + } + + def putsToCaches(kvs: List[(String, List[V])]) = kvs.foreach { + case (key, values) => listCache.put(key, values) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala new file mode 100644 index 0000000..98e1866 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/Service.scala @@ -0,0 +1,94 @@ +package org.apache.s2graph.core.mysqls + +import java.util.UUID + +import org.apache.s2graph.core.utils.logger +import play.api.libs.json.Json +import scalikejdbc._ + +object Service extends Model[Service] { + def apply(rs: WrappedResultSet): Service = { + Service(rs.intOpt("id"), rs.string("service_name"), rs.string("access_token"), + rs.string("cluster"), rs.string("hbase_table_name"), rs.int("pre_split_size"), rs.intOpt("hbase_table_ttl")) + } + + def findByAccessToken(accessToken: String)(implicit session: DBSession = AutoSession): Option[Service] = { + val cacheKey = s"accessToken=$accessToken" + withCache(cacheKey)( sql"""select * from services where access_token = ${accessToken}""".map { rs => Service(rs) }.single.apply) + } + + def findById(id: Int)(implicit session: DBSession = AutoSession): Service = { + val cacheKey = "id=" + id + withCache(cacheKey)( sql"""select * from services where id = ${id}""".map { rs => Service(rs) }.single.apply).get + } + + def findByName(serviceName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Service] = { + val cacheKey = "serviceName=" + serviceName + lazy val serviceOpt = sql""" + select * from services where service_name = ${serviceName} + """.map { rs => Service(rs) }.single.apply() + + if (useCache) withCache(cacheKey)(serviceOpt) + else serviceOpt + } + + def insert(serviceName: String, cluster: String, + hTableName: String, preSplitSize: Int, hTableTTL: Option[Int], + compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Unit = { + logger.info(s"$serviceName, $cluster, $hTableName, $preSplitSize, $hTableTTL, $compressionAlgorithm") + val accessToken = UUID.randomUUID().toString() + sql"""insert into services(service_name, access_token, cluster, hbase_table_name, pre_split_size, hbase_table_ttl) + values(${serviceName}, ${accessToken}, ${cluster}, ${hTableName}, ${preSplitSize}, ${hTableTTL})""".execute.apply() + } + + def delete(id: Int)(implicit session: DBSession = AutoSession) = { + val service = findById(id) + val serviceName = service.serviceName + sql"""delete from service_columns where id = ${id}""".execute.apply() + val cacheKeys = List(s"id=$id", s"serviceName=$serviceName") + cacheKeys.foreach { key => + expireCache(key) + expireCaches(key) + } + } + + def findOrInsert(serviceName: String, cluster: String, hTableName: String, + preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Service = { + findByName(serviceName) match { + case Some(s) => s + case None => + insert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm) + val cacheKey = "serviceName=" + serviceName + expireCache(cacheKey) + findByName(serviceName).get + } + } + + def findAll()(implicit session: DBSession = AutoSession) = { + val ls = sql"""select * from services""".map { rs => Service(rs) }.list.apply + putsToCache(ls.map { x => + val cacheKey = s"id=${x.id.get}" + (cacheKey -> x) + }) + + putsToCache(ls.map { x => + val cacheKey = s"serviceName=${x.serviceName}" + (cacheKey -> x) + }) + } + + def findAllConn()(implicit session: DBSession = AutoSession): List[String] = { + sql"""select distinct(cluster) from services""".map { rs => rs.string("cluster") }.list.apply + } +} + +case class Service(id: Option[Int], serviceName: String, accessToken: String, cluster: String, hTableName: String, preSplitSize: Int, hTableTTL: Option[Int]) { + lazy val toJson = + id match { + case Some(_id) => + Json.obj("id" -> _id, "name" -> serviceName, "accessToken" -> accessToken, "cluster" -> cluster, + "hTableName" -> hTableName, "preSplitSize" -> preSplitSize, "hTableTTL" -> hTableTTL) + case None => + Json.parse("{}") + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala new file mode 100644 index 0000000..6d33fe2 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/ServiceColumn.scala @@ -0,0 +1,82 @@ +package org.apache.s2graph.core.mysqls + +/** + * Created by shon on 6/3/15. + */ + +import org.apache.s2graph.core.JSONParser +import play.api.libs.json.Json +import scalikejdbc._ +object ServiceColumn extends Model[ServiceColumn] { + + def apply(rs: WrappedResultSet): ServiceColumn = { + ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version")) + } + + def findById(id: Int)(implicit session: DBSession = AutoSession): ServiceColumn = { +// val cacheKey = s"id=$id" + val cacheKey = "id=" + id + withCache(cacheKey)(sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn(x) }.single.apply).get + } + def find(serviceId: Int, columnName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[ServiceColumn] = { +// val cacheKey = s"serviceId=$serviceId:columnName=$columnName" + val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName + if (useCache) { + withCache(cacheKey) { + sql""" + select * from service_columns where service_id = ${serviceId} and column_name = ${columnName} + """.map { rs => ServiceColumn(rs) }.single.apply() + } + } else { + sql""" + select * from service_columns where service_id = ${serviceId} and column_name = ${columnName} + """.map { rs => ServiceColumn(rs) }.single.apply() + } + } + def insert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String)(implicit session: DBSession = AutoSession) = { + sql"""insert into service_columns(service_id, column_name, column_type, schema_version) + values(${serviceId}, ${columnName}, ${columnType}, ${schemaVersion})""".execute.apply() + } + def delete(id: Int)(implicit session: DBSession = AutoSession) = { + val serviceColumn = findById(id) + val (serviceId, columnName) = (serviceColumn.serviceId, serviceColumn.columnName) + sql"""delete from service_columns where id = ${id}""".execute.apply() + val cacheKeys = List(s"id=$id", s"serviceId=$serviceId:columnName=$columnName") + cacheKeys.foreach { key => + expireCache(key) + expireCaches(key) + } + } + def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String)(implicit session: DBSession = AutoSession): ServiceColumn = { + find(serviceId, columnName) match { + case Some(sc) => sc + case None => + insert(serviceId, columnName, columnType, schemaVersion) +// val cacheKey = s"serviceId=$serviceId:columnName=$columnName" + val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName + expireCache(cacheKey) + find(serviceId, columnName).get + } + } + def findAll()(implicit session: DBSession = AutoSession) = { + val ls = sql"""select * from service_columns""".map { rs => ServiceColumn(rs) }.list.apply + putsToCache(ls.map { x => + var cacheKey = s"id=${x.id.get}" + (cacheKey -> x) + }) + putsToCache(ls.map { x => + var cacheKey = s"serviceId=${x.serviceId}:columnName=${x.columnName}" + (cacheKey -> x) + }) + } +} +case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String) extends JSONParser { + + lazy val service = Service.findById(serviceId) + lazy val metas = ColumnMeta.findAllByColumn(id.get) + lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap + lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap + lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType) + + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala new file mode 100644 index 0000000..d5a3687 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala @@ -0,0 +1,207 @@ +package org.apache.s2graph.core.parsers + +import org.apache.s2graph.core.GraphExceptions.WhereParserException +import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.types.InnerValLike +import org.apache.s2graph.core.{Edge, GraphExceptions, JSONParser} + +import scala.annotation.tailrec +import scala.util.Try +import scala.util.parsing.combinator.JavaTokenParsers + +trait ExtractValue extends JSONParser { + val parent = "_parent." + + def propToInnerVal(edge: Edge, key: String) = { + val (propKey, parentEdge) = findParentEdge(edge, key) + + val label = parentEdge.label + val metaPropInvMap = label.metaPropsInvMap + val labelMeta = metaPropInvMap.getOrElse(propKey, throw WhereParserException(s"Where clause contains not existing property name: $propKey")) + val metaSeq = labelMeta.seq + + metaSeq match { + case LabelMeta.from.seq => parentEdge.srcVertex.innerId + case LabelMeta.to.seq => parentEdge.tgtVertex.innerId + case _ => parentEdge.propsWithTs.get(metaSeq) match { + case None => toInnerVal(labelMeta.defaultValue, labelMeta.dataType, label.schemaVersion) + case Some(edgeVal) => edgeVal.innerVal + } + } + } + + def valueToCompare(edge: Edge, key: String, value: String) = { + val label = edge.label + if (value.startsWith(parent) || label.metaPropsInvMap.contains(value)) propToInnerVal(edge, value) + else { + val (propKey, _) = findParentEdge(edge, key) + + val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw WhereParserException(s"Where clause contains not existing property name: $propKey")) + val (srcColumn, tgtColumn) = label.srcTgtColumn(edge.labelWithDir.dir) + val dataType = propKey match { + case "_to" | "to" => tgtColumn.columnType + case "_from" | "from" => srcColumn.columnType + case _ => labelMeta.dataType + } + toInnerVal(value, dataType, label.schemaVersion) + } + } + + @tailrec + private def findParent(edge: Edge, depth: Int): Edge = + if (depth > 0) findParent(edge.parentEdges.head.edge, depth - 1) + else edge + + private def findParentEdge(edge: Edge, key: String): (String, Edge) = { + if (!key.startsWith(parent)) (key, edge) + else { + val split = key.split(parent) + val depth = split.length - 1 + val propKey = split.last + + val parentEdge = findParent(edge, depth) + + (propKey, parentEdge) + } + } +} + +trait Clause extends ExtractValue { + def and(otherField: Clause): Clause = And(this, otherField) + + def or(otherField: Clause): Clause = Or(this, otherField) + + def filter(edge: Edge): Boolean + + def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: String, value: String)(edge: Edge): Boolean = { + val propValue = propToInnerVal(edge, propKey) + val compValue = valueToCompare(edge, propKey, value) + + binOp(propValue, compValue) + } +} + +case class Where(clauses: Seq[Clause] = Seq.empty[Clause]) { + def filter(edge: Edge) = + if (clauses.isEmpty) true else clauses.map(_.filter(edge)).forall(identity) +} + +case class Gt(propKey: String, value: String) extends Clause { + override def filter(edge: Edge): Boolean = binaryOp(_ > _)(propKey, value)(edge) +} + +case class Lt(propKey: String, value: String) extends Clause { + override def filter(edge: Edge): Boolean = binaryOp(_ < _)(propKey, value)(edge) +} + +case class Eq(propKey: String, value: String) extends Clause { + override def filter(edge: Edge): Boolean = binaryOp(_ == _)(propKey, value)(edge) +} + +case class InWithoutParent(label: Label, propKey: String, values: Set[String]) extends Clause { + val innerValLikeLs = values.map { value => + val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw WhereParserException(s"Where clause contains not existing property name: $propKey")) + val dataType = propKey match { + case "_to" | "to" => label.tgtColumn.columnType + case "_from" | "from" => label.srcColumn.columnType + case _ => labelMeta.dataType + } + toInnerVal(value, dataType, label.schemaVersion) + } + override def filter(edge: Edge): Boolean = { + val propVal = propToInnerVal(edge, propKey) + innerValLikeLs.contains(propVal) + } +} + +case class IN(propKey: String, values: Set[String]) extends Clause { + override def filter(edge: Edge): Boolean = { + val propVal = propToInnerVal(edge, propKey) + values.exists { value => + valueToCompare(edge, propKey, value) == propVal + } + } +} + +case class Between(propKey: String, minValue: String, maxValue: String) extends Clause { + override def filter(edge: Edge): Boolean = { + val propVal = propToInnerVal(edge, propKey) + val minVal = valueToCompare(edge, propKey, minValue) + val maxVal = valueToCompare(edge, propKey, maxValue) + + minVal <= propVal && propVal <= maxVal + } +} + +case class Not(self: Clause) extends Clause { + override def filter(edge: Edge) = !self.filter(edge) +} + +case class And(left: Clause, right: Clause) extends Clause { + override def filter(edge: Edge) = left.filter(edge) && right.filter(edge) +} + +case class Or(left: Clause, right: Clause) extends Clause { + override def filter(edge: Edge) = left.filter(edge) || right.filter(edge) +} + +object WhereParser { + val success = Where() +} + +case class WhereParser(label: Label) extends JavaTokenParsers with JSONParser { + + val anyStr = "[^\\s(),]+".r + + val and = "and|AND".r + + val or = "or|OR".r + + val between = "between|BETWEEN".r + + val in = "in|IN".r + + val notIn = "not in|NOT IN".r + + def where: Parser[Where] = rep(clause) ^^ (Where(_)) + + def paren: Parser[Clause] = "(" ~> clause <~ ")" + + def clause: Parser[Clause] = (predicate | paren) * (and ^^^ { (a: Clause, b: Clause) => And(a, b) } | or ^^^ { (a: Clause, b: Clause) => Or(a, b) }) + + def identWithDot: Parser[String] = repsep(ident, ".") ^^ { case values => values.mkString(".") } + + def predicate = { + identWithDot ~ ("!=" | "=") ~ anyStr ^^ { + case f ~ op ~ s => + if (op == "=") Eq(f, s) + else Not(Eq(f, s)) + } | identWithDot ~ (">=" | "<=" | ">" | "<") ~ anyStr ^^ { + case f ~ op ~ s => op match { + case ">" => Gt(f, s) + case ">=" => Or(Gt(f, s), Eq(f, s)) + case "<" => Lt(f, s) + case "<=" => Or(Lt(f, s), Eq(f, s)) + } + } | identWithDot ~ (between ~> anyStr <~ and) ~ anyStr ^^ { + case f ~ minV ~ maxV => Between(f, minV, maxV) + } | identWithDot ~ (notIn | in) ~ ("(" ~> repsep(anyStr, ",") <~ ")") ^^ { + case f ~ op ~ values => + val inClause = + if (f.startsWith("_parent")) IN(f, values.toSet) + else InWithoutParent(label, f, values.toSet) + if (op.toLowerCase == "in") inClause + else Not(inClause) + + + case _ => throw WhereParserException(s"Failed to parse where clause. ") + } + } + + def parse(sql: String): Try[Where] = Try { + parseAll(where, sql) match { + case Success(r, q) => r + case fail => throw WhereParserException(s"Where parsing error: ${fail.toString}") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala new file mode 100644 index 0000000..b0af967 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala @@ -0,0 +1,610 @@ +package org.apache.s2graph.core.rest + +import java.util.concurrent.{Callable, TimeUnit} + +import com.google.common.cache.CacheBuilder +import com.typesafe.config.Config +import org.apache.s2graph.core.GraphExceptions.{BadQueryException, ModelNotFoundException} +import org.apache.s2graph.core._ +import org.apache.s2graph.core.mysqls._ +import org.apache.s2graph.core.parsers.{Where, WhereParser} +import org.apache.s2graph.core.types._ +import play.api.libs.json._ + +import scala.util.{Failure, Success, Try} + +object TemplateHelper { + val findVar = """\"?\$\{(.*?)\}\"?""".r + val num = """(next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(hour|day|week)?""".r + val hour = 60 * 60 * 1000L + val day = hour * 24L + val week = day * 7L + def calculate(now: Long, n: Int, unit: String): Long = { + val duration = unit match { + case "hour" | "HOUR" => n * hour + case "day" | "DAY" => n * day + case "week" | "WEEK" => n * week + case _ => n * day + } + + duration + now + } + + def replaceVariable(now: Long, body: String): String = { + findVar.replaceAllIn(body, m => { + val matched = m group 1 + + num.replaceSomeIn(matched, m => { + val (_pivot, n, unit) = (m.group(1), m.group(2), m.group(3)) + val ts = _pivot match { + case null => now + case "now" | "NOW" => now + case "next_week" | "NEXT_WEEK" => now / week * week + week + case "next_day" | "NEXT_DAY" => now / day * day + day + case "next_hour" | "NEXT_HOUR" => now / hour * hour + hour + } + + if (_pivot == null && n == null && unit == null) None + else if (n == null || unit == null) Option(ts.toString) + else Option(calculate(ts, n.replaceAll(" ", "").toInt, unit).toString) + }) + }) + } +} + +class RequestParser(config: Config) extends JSONParser { + + import Management.JsonModel._ + + val hardLimit = 100000 + val defaultLimit = 100 + val maxLimit = Int.MaxValue - 1 + val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout") + val DefaultMaxAttempt = config.getInt("hbase.client.retries.number") + val DefaultCluster = config.getString("hbase.zookeeper.quorum") + val DefaultCompressionAlgorithm = config.getString("hbase.table.compression.algorithm") + val DefaultPhase = config.getString("phase") + val parserCache = CacheBuilder.newBuilder() + .expireAfterAccess(10000, TimeUnit.MILLISECONDS) + .expireAfterWrite(10000, TimeUnit.MILLISECONDS) + .maximumSize(10000) + .initialCapacity(1000) + .build[String, Try[Where]] + + private def extractScoring(labelId: Int, value: JsValue) = { + val ret = for { + js <- parse[Option[JsObject]](value, "scoring") + } yield { + for { + (k, v) <- js.fields + labelOrderType <- LabelMeta.findByName(labelId, k) + } yield { + val value = v match { + case n: JsNumber => n.as[Double] + case _ => throw new Exception("scoring weight should be double.") + } + (labelOrderType.seq, value) + } + } + ret + } + + def extractInterval(label: Label, _jsValue: JsValue) = { + val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString()) + val jsValue = Json.parse(replaced) + + def extractKv(js: JsValue) = js match { + case JsObject(obj) => obj + case JsArray(arr) => arr.flatMap { + case JsObject(obj) => obj + case _ => throw new RuntimeException(s"cannot support json type $js") + } + case _ => throw new RuntimeException(s"cannot support json type: $js") + } + + val ret = for { + js <- parse[Option[JsObject]](jsValue, "interval") + fromJs <- (js \ "from").asOpt[JsValue] + toJs <- (js \ "to").asOpt[JsValue] + } yield { + val from = Management.toProps(label, extractKv(fromJs)) + val to = Management.toProps(label, extractKv(toJs)) + (from, to) + } + + ret + } + + def extractDuration(label: Label, _jsValue: JsValue) = { + val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString()) + val jsValue = Json.parse(replaced) + + for { + js <- parse[Option[JsObject]](jsValue, "duration") + } yield { + val minTs = parse[Option[Long]](js, "from").getOrElse(Long.MaxValue) + val maxTs = parse[Option[Long]](js, "to").getOrElse(Long.MinValue) + + if (minTs > maxTs) { + throw new BadQueryException("Duration error. Timestamp of From cannot be larger than To.") + } + + (minTs, maxTs) + } + } + + def extractHas(label: Label, jsValue: JsValue) = { + val ret = for { + js <- parse[Option[JsObject]](jsValue, "has") + } yield { + for { + (k, v) <- js.fields + labelMeta <- LabelMeta.findByName(label.id.get, k) + value <- jsValueToInnerVal(v, labelMeta.dataType, label.schemaVersion) + } yield { + labelMeta.seq -> value + } + } + ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike]) + } + + def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = { + whereClauseOpt match { + case None => Success(WhereParser.success) + case Some(_where) => + val where = TemplateHelper.replaceVariable(System.currentTimeMillis(), _where) + val whereParserKey = s"${label.label}_${where}" + parserCache.get(whereParserKey, new Callable[Try[Where]] { + override def call(): Try[Where] = { + WhereParser(label).parse(where) match { + case s@Success(_) => s + case Failure(ex) => throw BadQueryException(ex.getMessage, ex) + } + } + }) + } + } + + def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): Seq[Vertex] = { + val vertices = for { + label <- Label.findByName(labelName).toSeq + serviceColumn = if (direction == "out") label.srcColumn else label.tgtColumn + id <- ids + innerId <- jsValueToInnerVal(id, serviceColumn.columnType, label.schemaVersion) + } yield { + Vertex(SourceVertexId(serviceColumn.id.get, innerId), System.currentTimeMillis()) + } + vertices.toSeq + } + + def toMultiQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): MultiQuery = { + val queries = for { + queryJson <- (jsValue \ "queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty) + } yield { + toQuery(queryJson, isEdgeQuery) + } + val weights = (jsValue \ "weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0)) + MultiQuery(queries = queries, weights = weights, + queryOption = toQueryOption(jsValue), jsonQuery = jsValue) + } + + def toQueryOption(jsValue: JsValue): QueryOption = { + val filterOutFields = (jsValue \ "filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name)) + val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v) }.map { q => + q.copy(queryOption = q.queryOption.copy(filterOutFields = filterOutFields)) + } + val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true) + val selectColumns = (jsValue \ "select").asOpt[List[String]].getOrElse(List.empty) + val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty) + val orderByColumns: List[(String, Boolean)] = (jsValue \ "orderBy").asOpt[List[JsObject]].map { jsLs => + for { + js <- jsLs + (column, orderJs) <- js.fields + } yield { + val ascending = orderJs.as[String].toUpperCase match { + case "ASC" => true + case "DESC" => false + } + column -> ascending + } + }.getOrElse(List("score" -> false, "timestamp" -> false)) + val withScore = (jsValue \ "withScore").asOpt[Boolean].getOrElse(true) + val returnTree = (jsValue \ "returnTree").asOpt[Boolean].getOrElse(false) + //TODO: Refactor this + val limitOpt = (jsValue \ "limit").asOpt[Int] + val returnAgg = (jsValue \ "returnAgg").asOpt[Boolean].getOrElse(true) + val scoreThreshold = (jsValue \ "scoreThreshold").asOpt[Double].getOrElse(Double.MinValue) + val returnDegree = (jsValue \ "returnDegree").asOpt[Boolean].getOrElse(true) + + QueryOption(removeCycle = removeCycle, + selectColumns = selectColumns, + groupByColumns = groupByColumns, + orderByColumns = orderByColumns, + filterOutQuery = filterOutQuery, + filterOutFields = filterOutFields, + withScore = withScore, + returnTree = returnTree, + limitOpt = limitOpt, + returnAgg = returnAgg, + scoreThreshold = scoreThreshold, + returnDegree = returnDegree + ) + } + def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = { + try { + val vertices = + (for { + value <- parse[List[JsValue]](jsValue, "srcVertices") + serviceName = parse[String](value, "serviceName") + column = parse[String](value, "columnName") + } yield { + val service = Service.findByName(serviceName).getOrElse(throw BadQueryException("service not found")) + val col = ServiceColumn.find(service.id.get, column).getOrElse(throw BadQueryException("bad column name")) + val (idOpt, idsOpt) = ((value \ "id").asOpt[JsValue], (value \ "ids").asOpt[List[JsValue]]) + for { + idVal <- idOpt ++ idsOpt.toSeq.flatten + + /** bug, need to use labels schemaVersion */ + innerVal <- jsValueToInnerVal(idVal, col.columnType, col.schemaVersion) + } yield { + Vertex(SourceVertexId(col.id.get, innerVal), System.currentTimeMillis()) + } + }).flatten + + if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is empty") + val steps = parse[Vector[JsValue]](jsValue, "steps") + + val queryOption = toQueryOption(jsValue) + + val querySteps = + steps.zipWithIndex.map { case (step, stepIdx) => + val labelWeights = step match { + case obj: JsObject => + val converted = for { + (k, v) <- (obj \ "weights").asOpt[JsObject].getOrElse(Json.obj()).fields + l <- Label.findByName(k) + } yield { + l.id.get -> v.toString().toDouble + } + converted.toMap + case _ => Map.empty[Int, Double] + } + val queryParamJsVals = step match { + case arr: JsArray => arr.as[List[JsValue]] + case obj: JsObject => (obj \ "step").as[List[JsValue]] + case _ => List.empty[JsValue] + } + val nextStepScoreThreshold = step match { + case obj: JsObject => (obj \ "nextStepThreshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold) + case _ => QueryParam.DefaultThreshold + } + val nextStepLimit = step match { + case obj: JsObject => (obj \ "nextStepLimit").asOpt[Int].getOrElse(-1) + case _ => -1 + } + val cacheTTL = step match { + case obj: JsObject => (obj \ "cacheTTL").asOpt[Int].getOrElse(-1) + case _ => -1 + } + val queryParams = + for { + labelGroup <- queryParamJsVals + queryParam <- parseQueryParam(labelGroup) + } yield { + val (_, columnName) = + if (queryParam.labelWithDir.dir == GraphUtil.directions("out")) { + (queryParam.label.srcService.serviceName, queryParam.label.srcColumnName) + } else { + (queryParam.label.tgtService.serviceName, queryParam.label.tgtColumnName) + } + //FIXME: + if (stepIdx == 0 && vertices.nonEmpty && !vertices.exists(v => v.serviceColumn.columnName == columnName)) { + throw BadQueryException("srcVertices contains incompatiable serviceName or columnName with first step.") + } + + queryParam + } + Step(queryParams.toList, labelWeights = labelWeights, + // scoreThreshold = stepThreshold, + nextStepScoreThreshold = nextStepScoreThreshold, + nextStepLimit = nextStepLimit, + cacheTTL = cacheTTL) + + } + + val ret = Query(vertices, querySteps, queryOption, jsValue) + // logger.debug(ret.toString) + ret + } catch { + case e: BadQueryException => + throw e + case e: ModelNotFoundException => + throw BadQueryException(e.getMessage, e) + case e: Exception => + throw BadQueryException(s"$jsValue, $e", e) + } + } + + private def parseQueryParam(labelGroup: JsValue): Option[QueryParam] = { + for { + labelName <- parse[Option[String]](labelGroup, "label") + } yield { + val label = Label.findByName(labelName).getOrElse(throw BadQueryException(s"$labelName not found")) + val direction = parse[Option[String]](labelGroup, "direction").map(GraphUtil.toDirection(_)).getOrElse(0) + val limit = { + parse[Option[Int]](labelGroup, "limit") match { + case None => defaultLimit + case Some(l) if l < 0 => maxLimit + case Some(l) if l >= 0 => + val default = hardLimit + Math.min(l, default) + } + } + val offset = parse[Option[Int]](labelGroup, "offset").getOrElse(0) + val interval = extractInterval(label, labelGroup) + val duration = extractDuration(label, labelGroup) + val scoring = extractScoring(label.id.get, labelGroup).getOrElse(List.empty[(Byte, Double)]).toList + val exclude = parse[Option[Boolean]](labelGroup, "exclude").getOrElse(false) + val include = parse[Option[Boolean]](labelGroup, "include").getOrElse(false) + val hasFilter = extractHas(label, labelGroup) + val labelWithDir = LabelWithDirection(label.id.get, direction) + val indexNameOpt = (labelGroup \ "index").asOpt[String] + val indexSeq = indexNameOpt match { + case None => label.indexSeqsMap.get(scoring.map(kv => kv._1)).map(_.seq).getOrElse(LabelIndex.DefaultSeq) + case Some(indexName) => label.indexNameMap.get(indexName).map(_.seq).getOrElse(throw new RuntimeException("cannot find index")) + } + val whereClauseOpt = (labelGroup \ "where").asOpt[String] + val where = extractWhere(label, whereClauseOpt) + val includeDegree = (labelGroup \ "includeDegree").asOpt[Boolean].getOrElse(true) + val rpcTimeout = (labelGroup \ "rpcTimeout").asOpt[Int].getOrElse(DefaultRpcTimeout) + val maxAttempt = (labelGroup \ "maxAttempt").asOpt[Int].getOrElse(DefaultMaxAttempt) + val tgtVertexInnerIdOpt = (labelGroup \ "_to").asOpt[JsValue].flatMap { jsVal => + jsValueToInnerVal(jsVal, label.tgtColumnWithDir(direction).columnType, label.schemaVersion) + } + val cacheTTL = (labelGroup \ "cacheTTL").asOpt[Long].getOrElse(-1L) + val timeDecayFactor = (labelGroup \ "timeDecay").asOpt[JsObject].map { jsVal => + val propName = (jsVal \ "propName").asOpt[String].getOrElse(LabelMeta.timestamp.name) + val propNameSeq = label.metaPropsInvMap.get(propName).map(_.seq).getOrElse(LabelMeta.timeStampSeq) + val initial = (jsVal \ "initial").asOpt[Double].getOrElse(1.0) + val decayRate = (jsVal \ "decayRate").asOpt[Double].getOrElse(0.1) + if (decayRate >= 1.0 || decayRate <= 0.0) throw new BadQueryException("decay rate should be 0.0 ~ 1.0") + val timeUnit = (jsVal \ "timeUnit").asOpt[Double].getOrElse(60 * 60 * 24.0) + TimeDecay(initial, decayRate, timeUnit, propNameSeq) + } + val threshold = (labelGroup \ "threshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold) + // TODO: refactor this. dirty + val duplicate = parse[Option[String]](labelGroup, "duplicate").map(s => Query.DuplicatePolicy(s)) + + val outputField = (labelGroup \ "outputField").asOpt[String].map(s => Json.arr(Json.arr(s))) + val transformer = if (outputField.isDefined) outputField else (labelGroup \ "transform").asOpt[JsValue] + val scorePropagateOp = (labelGroup \ "scorePropagateOp").asOpt[String].getOrElse("multiply") + val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1) + val shouldNormalize = (labelGroup \ "normalize").asOpt[Boolean].getOrElse(false) + val cursorOpt = (labelGroup \ "cursor").asOpt[String] + // FIXME: Order of command matter + QueryParam(labelWithDir) + .sample(sample) + .limit(offset, limit) + .rank(RankParam(label.id.get, scoring)) + .exclude(exclude) + .include(include) + .duration(duration) + .has(hasFilter) + .labelOrderSeq(indexSeq) + .interval(interval) + .where(where) + .duplicatePolicy(duplicate) + .includeDegree(includeDegree) + .rpcTimeout(rpcTimeout) + .maxAttempt(maxAttempt) + .tgtVertexInnerIdOpt(tgtVertexInnerIdOpt) + .cacheTTLInMillis(cacheTTL) + .timeDecay(timeDecayFactor) + .threshold(threshold) + .transformer(transformer) + .scorePropagateOp(scorePropagateOp) + .shouldNormalize(shouldNormalize) + .cursorOpt(cursorOpt) + } + } + + private def parse[R](js: JsValue, key: String)(implicit read: Reads[R]): R = { + (js \ key).validate[R] + .fold( + errors => { + val msg = (JsError.toFlatJson(errors) \ "obj").as[List[JsValue]].map(x => x \ "msg") + val e = Json.obj("args" -> key, "error" -> msg) + throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString) + }, + r => { + r + }) + } + + def toJsValues(jsValue: JsValue): List[JsValue] = { + jsValue match { + case obj: JsObject => List(obj) + case arr: JsArray => arr.as[List[JsValue]] + case _ => List.empty[JsValue] + } + + } + + def toEdgesWithOrg(jsValue: JsValue, operation: String): (List[Edge], List[JsValue]) = { + val jsValues = toJsValues(jsValue) + val edges = jsValues.flatMap(toEdge(_, operation)) + + (edges, jsValues) + } + + def toEdges(jsValue: JsValue, operation: String): List[Edge] = { + toJsValues(jsValue).flatMap { edgeJson => + toEdge(edgeJson, operation) + } + } + + + private def toEdge(jsValue: JsValue, operation: String): List[Edge] = { + + def parseId(js: JsValue) = js match { + case s: JsString => s.as[String] + case o@_ => s"${o}" + } + val srcId = (jsValue \ "from").asOpt[JsValue].toList.map(parseId(_)) + val tgtId = (jsValue \ "to").asOpt[JsValue].toList.map(parseId(_)) + val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(js => parseId(js))) ++ srcId + val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(js => parseId(js))) ++ tgtId + + val label = parse[String](jsValue, "label") + val timestamp = parse[Long](jsValue, "timestamp") + val direction = parse[Option[String]](jsValue, "direction").getOrElse("") + val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}") + for { + srcId <- srcIds + tgtId <- tgtIds + } yield { + Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString) + } + } + + def toVertices(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None) = { + toJsValues(jsValue).map(toVertex(_, operation, serviceName, columnName)) + } + + def toVertex(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None): Vertex = { + val id = parse[JsValue](jsValue, "id") + val ts = parse[Option[Long]](jsValue, "timestamp").getOrElse(System.currentTimeMillis()) + val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get + val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") else columnName.get + val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()) + Management.toVertex(ts, operation, id.toString, sName, cName, props.toString) + } + + def toPropElements(jsObj: JsValue) = Try { + val propName = (jsObj \ "name").as[String] + val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String]) + val defaultValue = (jsObj \ "defaultValue").as[JsValue] match { + case JsString(s) => s + case _@js => js.toString + } + Prop(propName, defaultValue, dataType) + } + + def toPropsElements(jsValue: JsValue): Seq[Prop] = for { + jsObj <- jsValue.asOpt[Seq[JsValue]].getOrElse(Nil) + } yield { + val propName = (jsObj \ "name").as[String] + val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String]) + val defaultValue = (jsObj \ "defaultValue").as[JsValue] match { + case JsString(s) => s + case _@js => js.toString + } + Prop(propName, defaultValue, dataType) + } + + def toIndicesElements(jsValue: JsValue): Seq[Index] = for { + jsObj <- jsValue.as[Seq[JsValue]] + indexName = (jsObj \ "name").as[String] + propNames = (jsObj \ "propNames").as[Seq[String]] + } yield Index(indexName, propNames) + + def toLabelElements(jsValue: JsValue) = Try { + val labelName = parse[String](jsValue, "label") + val srcServiceName = parse[String](jsValue, "srcServiceName") + val tgtServiceName = parse[String](jsValue, "tgtServiceName") + val srcColumnName = parse[String](jsValue, "srcColumnName") + val tgtColumnName = parse[String](jsValue, "tgtColumnName") + val srcColumnType = parse[String](jsValue, "srcColumnType") + val tgtColumnType = parse[String](jsValue, "tgtColumnType") + val serviceName = (jsValue \ "serviceName").asOpt[String].getOrElse(tgtServiceName) + val isDirected = (jsValue \ "isDirected").asOpt[Boolean].getOrElse(true) + + val allProps = toPropsElements(jsValue \ "props") + val indices = toIndicesElements(jsValue \ "indices") + + val consistencyLevel = (jsValue \ "consistencyLevel").asOpt[String].getOrElse("weak") + + // expect new label don`t provide hTableName + val hTableName = (jsValue \ "hTableName").asOpt[String] + val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int] + val schemaVersion = (jsValue \ "schemaVersion").asOpt[String].getOrElse(HBaseType.DEFAULT_VERSION) + val isAsync = (jsValue \ "isAsync").asOpt[Boolean].getOrElse(false) + val compressionAlgorithm = (jsValue \ "compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm) + + (labelName, srcServiceName, srcColumnName, srcColumnType, + tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName, + indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm) + } + + def toIndexElements(jsValue: JsValue) = Try { + val labelName = parse[String](jsValue, "label") + val indices = toIndicesElements(jsValue \ "indices") + (labelName, indices) + } + + def toServiceElements(jsValue: JsValue) = { + val serviceName = parse[String](jsValue, "serviceName") + val cluster = (jsValue \ "cluster").asOpt[String].getOrElse(DefaultCluster) + val hTableName = (jsValue \ "hTableName").asOpt[String].getOrElse(s"${serviceName}-${DefaultPhase}") + val preSplitSize = (jsValue \ "preSplitSize").asOpt[Int].getOrElse(1) + val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int] + val compressionAlgorithm = (jsValue \ "compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm) + (serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm) + } + + def toServiceColumnElements(jsValue: JsValue) = Try { + val serviceName = parse[String](jsValue, "serviceName") + val columnName = parse[String](jsValue, "columnName") + val columnType = parse[String](jsValue, "columnType") + val props = toPropsElements(jsValue \ "props") + (serviceName, columnName, columnType, props) + } + + def toCheckEdgeParam(jsValue: JsValue) = { + val params = jsValue.as[List[JsValue]] + var isReverted = false + val labelWithDirs = scala.collection.mutable.HashSet[LabelWithDirection]() + val quads = for { + param <- params + labelName <- (param \ "label").asOpt[String] + direction <- GraphUtil.toDir((param \ "direction").asOpt[String].getOrElse("out")) + label <- Label.findByName(labelName) + srcId <- jsValueToInnerVal((param \ "from").as[JsValue], label.srcColumnWithDir(direction.toInt).columnType, label.schemaVersion) + tgtId <- jsValueToInnerVal((param \ "to").as[JsValue], label.tgtColumnWithDir(direction.toInt).columnType, label.schemaVersion) + } yield { + val labelWithDir = LabelWithDirection(label.id.get, direction) + labelWithDirs += labelWithDir + val (src, tgt, dir) = if (direction == 1) { + isReverted = true + (Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), + Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), 0) + } else { + (Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), + Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), 0) + } + (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir))) + } + (quads, isReverted) + } + + def toGraphElements(str: String): Seq[GraphElement] = { + val edgeStrs = str.split("\\n") + + for { + edgeStr <- edgeStrs + str <- GraphUtil.parseString(edgeStr) + element <- Graph.toGraphElement(str) + } yield element + } + + def toDeleteParam(json: JsValue) = { + val labelName = (json \ "label").as[String] + val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil).filterNot(_.isAsync) + val direction = (json \ "direction").asOpt[String].getOrElse("out") + + val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil) + val ts = (json \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()) + val vertices = toVertices(labelName, direction, ids) + (labels, direction, ids, ts, vertices) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala new file mode 100644 index 0000000..a48bc7c --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RestHandler.scala @@ -0,0 +1,244 @@ +package org.apache.s2graph.core.rest + +import java.net.URL + +import org.apache.s2graph.core.GraphExceptions.BadQueryException +import org.apache.s2graph.core._ +import org.apache.s2graph.core.mysqls.{Bucket, Experiment, Service} +import org.apache.s2graph.core.utils.logger +import play.api.libs.json._ + +import scala.concurrent.{ExecutionContext, Future} + + +object RestHandler { + case class HandlerResult(body: Future[JsValue], headers: (String, String)*) +} + +/** + * Public API, only return Future.successful or Future.failed + * Don't throw exception + */ +class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { + + import RestHandler._ + val requestParser = new RequestParser(graph.config) + + /** + * Public APIS + */ + def doPost(uri: String, body: String, impKeyOpt: => Option[String] = None): HandlerResult = { + try { + val jsQuery = Json.parse(body) + + uri match { + case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)) + case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted)) + case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)) + case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)) + case "/graphs/checkEdges" => checkEdges(jsQuery) + case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList)) + case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude)) + case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)) + case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery)) + case uri if uri.startsWith("/graphs/experiment") => + val Array(accessToken, experimentName, uuid) = uri.split("/").takeRight(3) + experiment(jsQuery, accessToken, experimentName, uuid, impKeyOpt) + case _ => throw new RuntimeException("route is not found") + } + } catch { + case e: Exception => HandlerResult(Future.failed(e)) + } + } + + // TODO: Refactor to doGet + def checkEdges(jsValue: JsValue): HandlerResult = { + try { + val (quads, isReverted) = requestParser.toCheckEdgeParam(jsValue) + + HandlerResult(graph.checkEdges(quads).map { case queryRequestWithResultLs => + val edgeJsons = for { + queryRequestWithResult <- queryRequestWithResultLs + (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + edgeWithScore <- queryResult.edgeWithScoreLs + (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + convertedEdge = if (isReverted) edge.duplicateEdge else edge + edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam) + } yield Json.toJson(edgeJson) + + Json.toJson(edgeJsons) + }) + } catch { + case e: Exception => HandlerResult(Future.failed(e)) + } + } + + + /** + * Private APIS + */ + private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String]): HandlerResult = { + + try { + val bucketOpt = for { + service <- Service.findByAccessToken(accessToken) + experiment <- Experiment.findBy(service.id.get, experimentName) + bucket <- experiment.findBucket(uuid, impKeyOpt) + } yield bucket + + val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found")) + if (bucket.isGraphQuery) { + val ret = buildRequestInner(contentsBody, bucket, uuid) + HandlerResult(ret.body, Experiment.impressionKey -> bucket.impressionId) + } + else throw new RuntimeException("not supported yet") + } catch { + case e: Exception => HandlerResult(Future.failed(e)) + } + } + + private def buildRequestInner(contentsBody: JsValue, bucket: Bucket, uuid: String): HandlerResult = { + if (bucket.isEmpty) HandlerResult(Future.successful(PostProcess.emptyResults)) + else { + val body = buildRequestBody(Option(contentsBody), bucket, uuid) + val url = new URL(bucket.apiPath) + val path = url.getPath + + // dummy log for sampling + val experimentLog = s"POST $path took -1 ms 200 -1 $body" + logger.debug(experimentLog) + + doPost(path, body) + } + } + + private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = { + val filterOutQueryResultsLs = q.filterOutQuery match { + case Some(filterOutQuery) => graph.getEdges(filterOutQuery) + case None => Future.successful(Seq.empty) + } + + for { + queryResultsLs <- graph.getEdges(q) + filterOutResultsLs <- filterOutQueryResultsLs + } yield { + val json = post(queryResultsLs, filterOutResultsLs) + json + } + } + + def getEdgesAsync(jsonQuery: JsValue) + (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = { + + val fetch = eachQuery(post) _ + jsonQuery match { + case JsArray(arr) => Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray) + case obj@JsObject(_) => + (obj \ "queries").asOpt[JsValue] match { + case None => fetch(requestParser.toQuery(obj)) + case _ => + val multiQuery = requestParser.toMultiQuery(obj) + val filterOutFuture = multiQuery.queryOption.filterOutQuery match { + case Some(filterOutQuery) => graph.getEdges(filterOutQuery) + case None => Future.successful(Seq.empty) + } + val futures = multiQuery.queries.zip(multiQuery.weights).map { case (query, weight) => + val filterOutQueryResultsLs = query.queryOption.filterOutQuery match { + case Some(filterOutQuery) => graph.getEdges(filterOutQuery) + case None => Future.successful(Seq.empty) + } + for { + queryRequestWithResultLs <- graph.getEdges(query) + filterOutResultsLs <- filterOutQueryResultsLs + } yield { + val newQueryRequestWithResult = for { + queryRequestWithResult <- queryRequestWithResultLs + queryResult = queryRequestWithResult.queryResult + } yield { + val newEdgesWithScores = for { + edgeWithScore <- queryRequestWithResult.queryResult.edgeWithScoreLs + } yield { + edgeWithScore.copy(score = edgeWithScore.score * weight) + } + queryRequestWithResult.copy(queryResult = queryResult.copy(edgeWithScoreLs = newEdgesWithScores)) + } + logger.debug(s"[Size]: ${newQueryRequestWithResult.map(_.queryResult.edgeWithScoreLs.size).sum}") + (newQueryRequestWithResult, filterOutResultsLs) + } + } + for { + filterOut <- filterOutFuture + resultWithExcludeLs <- Future.sequence(futures) + } yield { + PostProcess.toSimpleVertexArrJsonMulti(multiQuery.queryOption, resultWithExcludeLs, filterOut) + // val initial = (ListBuffer.empty[QueryRequestWithResult], ListBuffer.empty[QueryRequestWithResult]) + // val (results, excludes) = resultWithExcludeLs.foldLeft(initial) { case ((prevResults, prevExcludes), (results, excludes)) => + // (prevResults ++= results, prevExcludes ++= excludes) + // } + // PostProcess.toSimpleVertexArrJson(multiQuery.queryOption, results, excludes ++ filterOut) + } + } + case _ => throw BadQueryException("Cannot support") + } + } + + private def getEdgesExcludedAsync(jsonQuery: JsValue) + (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = { + val q = requestParser.toQuery(jsonQuery) + val filterOutQuery = Query(q.vertices, Vector(q.steps.last)) + + val fetchFuture = graph.getEdges(q) + val excludeFuture = graph.getEdges(filterOutQuery) + + for { + queryResultLs <- fetchFuture + exclude <- excludeFuture + } yield { + post(queryResultLs, exclude) + } + } + + private def getVertices(jsValue: JsValue) = { + val jsonQuery = jsValue + val ts = System.currentTimeMillis() + val props = "{}" + + val vertices = jsonQuery.as[List[JsValue]].flatMap { js => + val serviceName = (js \ "serviceName").as[String] + val columnName = (js \ "columnName").as[String] + for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield { + Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props) + } + } + + graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) } + } + + private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): String = { + var body = bucket.requestBody.replace("#uuid", uuid) + + // // replace variable + // body = TemplateHelper.replaceVariable(System.currentTimeMillis(), body) + + // replace param + for { + requestKeyJson <- requestKeyJsonOpt + jsObj <- requestKeyJson.asOpt[JsObject] + (key, value) <- jsObj.fieldSet + } { + val replacement = value match { + case JsString(s) => s + case _ => value.toString + } + body = body.replace(key, replacement) + } + + body + } + + def calcSize(js: JsValue): Int = js match { + case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0) + case JsArray(seq) => seq.map(js => (js \ "size").asOpt[Int].getOrElse(0)).sum + case _ => 0 + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala new file mode 100644 index 0000000..12e9547 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala @@ -0,0 +1,24 @@ +package org.apache.s2graph.core.storage + +import org.apache.hadoop.hbase.util.Bytes +import org.apache.s2graph.core.types.{LabelWithDirection, SourceVertexId, VertexId} + + +trait Deserializable[E] extends StorageDeserializable[E] { + import StorageDeserializable._ + + type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int) + + /** version 1 and version 2 share same code for parsing row key part */ + def parseRow(kv: SKeyValue, version: String): RowKeyRaw = { + var pos = 0 + val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version) + pos += srcIdLen + val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) + pos += 4 + val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) + + val rowLen = srcIdLen + 4 + 1 + (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala new file mode 100644 index 0000000..5310248 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala @@ -0,0 +1,47 @@ +package org.apache.s2graph.core.storage + +import org.apache.hadoop.hbase.util.Bytes +import org.hbase.async.KeyValue + + +object SKeyValue { + val Put = 1 + val Delete = 2 + val Increment = 3 + val Default = Put +} +case class SKeyValue(table: Array[Byte], + row: Array[Byte], + cf: Array[Byte], + qualifier: Array[Byte], + value: Array[Byte], + timestamp: Long, + operation: Int = SKeyValue.Default) { + def toLogString = { + Map("table" -> table.toList, "row" -> row.toList, "cf" -> Bytes.toString(cf), + "qualifier" -> qualifier.toList, "value" -> value.toList, "timestamp" -> timestamp, + "operation" -> operation).toString + } + override def toString(): String = toLogString +} + +trait CanSKeyValue[T] { + def toSKeyValue(from: T): SKeyValue +} + +object CanSKeyValue { + + // For asyncbase KeyValues + implicit val asyncKeyValue = new CanSKeyValue[KeyValue] { + def toSKeyValue(kv: KeyValue): SKeyValue = { + SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), kv.value(), kv.timestamp()) + } + } + + // For asyncbase KeyValues + implicit val sKeyValue = new CanSKeyValue[SKeyValue] { + def toSKeyValue(kv: SKeyValue): SKeyValue = kv + } + + // For hbase KeyValues +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala new file mode 100644 index 0000000..08a3f73 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala @@ -0,0 +1,8 @@ +package org.apache.s2graph.core.storage + +object Serializable { + val vertexCf = "v".getBytes() + val edgeCf = "e".getBytes() +} + +trait Serializable[E] extends StorageSerializable[E]
