http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelMeta.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelMeta.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelMeta.scala deleted file mode 100644 index 3cd7995..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/LabelMeta.scala +++ /dev/null @@ -1,151 +0,0 @@ -package com.kakao.s2graph.core.mysqls - -/** - * Created by shon on 6/3/15. - */ - - -import com.kakao.s2graph.core.JSONParser -import com.kakao.s2graph.core.GraphExceptions.MaxPropSizeReachedException -import play.api.libs.json.Json -import scalikejdbc._ - -object LabelMeta extends Model[LabelMeta] with JSONParser { - - /** dummy sequences */ - - val fromSeq = -4.toByte - val toSeq = -5.toByte - val lastOpSeq = -3.toByte - val lastDeletedAt = -2.toByte - val timeStampSeq = 0.toByte - val countSeq = (Byte.MaxValue - 2).toByte - val degreeSeq = (Byte.MaxValue - 1).toByte - val maxValue = Byte.MaxValue - val emptyValue = Byte.MaxValue - - /** reserved sequences */ - // val deleted = LabelMeta(id = Some(lastDeletedAt), labelId = lastDeletedAt, name = "lastDeletedAt", - // seq = lastDeletedAt, defaultValue = "", dataType = "long") - val from = LabelMeta(id = Some(fromSeq), labelId = fromSeq, name = "_from", - seq = fromSeq, defaultValue = fromSeq.toString, dataType = "long") - val to = LabelMeta(id = Some(toSeq), labelId = toSeq, name = "_to", - seq = toSeq, defaultValue = toSeq.toString, dataType = "long") - val timestamp = LabelMeta(id = Some(-1), labelId = -1, name = "_timestamp", - seq = timeStampSeq, defaultValue = "0", dataType = "long") - val degree = LabelMeta(id = Some(-1), labelId = -1, name = "_degree", - seq = degreeSeq, defaultValue = "0", dataType = "long") - val count = LabelMeta(id = Some(-1), labelId = -1, name = "_count", - seq = countSeq, defaultValue = "-1", dataType = "long") - - // Each reserved column(_timestamp, timestamp) has same seq number, starts with '_' has high priority - val reservedMetas = List(from, to, degree, timestamp, count).flatMap { lm => List(lm, lm.copy(name = lm.name.drop(1))) }.reverse - val reservedMetasInner = List(from, to, degree, timestamp, count) - - def apply(rs: WrappedResultSet): LabelMeta = { - LabelMeta(Some(rs.int("id")), rs.int("label_id"), rs.string("name"), rs.byte("seq"), rs.string("default_value"), rs.string("data_type").toLowerCase) - } - - def isValidSeq(seq: Byte): Boolean = seq >= 0 && seq <= countSeq - def isValidSeqForAdmin(seq: Byte): Boolean = seq > 0 && seq < countSeq - - def findById(id: Int)(implicit session: DBSession = AutoSession): LabelMeta = { - val cacheKey = "id=" + id - - withCache(cacheKey) { - sql"""select * from label_metas where id = ${id}""".map { rs => LabelMeta(rs) }.single.apply - }.get - } - - def findAllByLabelId(labelId: Int, useCache: Boolean = true)(implicit session: DBSession = AutoSession): List[LabelMeta] = { - val cacheKey = "labelId=" + labelId - lazy val labelMetas = sql"""select * - from label_metas - where label_id = ${labelId} order by seq ASC""".map(LabelMeta(_)).list.apply() - - if (useCache) withCaches(cacheKey)(labelMetas) - else labelMetas - } - - def findByName(labelId: Int, name: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[LabelMeta] = { - name match { - case timestamp.name => Some(timestamp) - case from.name => Some(from) - case to.name => Some(to) - case _ => - val cacheKey = "labelId=" + labelId + ":name=" + name - lazy val labelMeta = sql""" - select * - from label_metas where label_id = ${labelId} and name = ${name}""" - .map { rs => LabelMeta(rs) }.single.apply() - - if (useCache) withCache(cacheKey)(labelMeta) - else labelMeta - } - } - - def insert(labelId: Int, name: String, defaultValue: String, dataType: String)(implicit session: DBSession = AutoSession) = { - val ls = findAllByLabelId(labelId, useCache = false) - val seq = ls.size + 1 - - if (seq < maxValue) { - sql"""insert into label_metas(label_id, name, seq, default_value, data_type) - select ${labelId}, ${name}, ${seq}, ${defaultValue}, ${dataType}""".updateAndReturnGeneratedKey.apply() - } else { - throw MaxPropSizeReachedException("max property size reached") - } - } - - def findOrInsert(labelId: Int, - name: String, - defaultValue: String, - dataType: String)(implicit session: DBSession = AutoSession): LabelMeta = { - - findByName(labelId, name) match { - case Some(c) => c - case None => - insert(labelId, name, defaultValue, dataType) - val cacheKey = "labelId=" + labelId + ":name=" + name - val cacheKeys = "labelId=" + labelId - expireCache(cacheKey) - expireCaches(cacheKeys) - findByName(labelId, name, useCache = false).get - } - } - - def delete(id: Int)(implicit session: DBSession = AutoSession) = { - val labelMeta = findById(id) - val (labelId, name) = (labelMeta.labelId, labelMeta.name) - sql"""delete from label_metas where id = ${id}""".execute.apply() - val cacheKeys = List(s"id=$id", s"labelId=$labelId", s"labelId=$labelId:name=$name") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - } - - def findAll()(implicit session: DBSession = AutoSession) = { - val ls = sql"""select * from label_metas""".map { rs => LabelMeta(rs) }.list.apply - putsToCache(ls.map { x => - val cacheKey = s"id=${x.id.get}" - cacheKey -> x - }) - putsToCache(ls.map { x => - val cacheKey = s"labelId=${x.labelId}:name=${x.name}" - cacheKey -> x - }) - putsToCache(ls.map { x => - val cacheKey = s"labelId=${x.labelId}:seq=${x.seq}" - cacheKey -> x - }) - - putsToCaches(ls.groupBy(x => x.labelId).map { case (labelId, ls) => - val cacheKey = s"labelId=${labelId}" - cacheKey -> ls - }.toList) - } -} - -case class LabelMeta(id: Option[Int], labelId: Int, name: String, seq: Byte, defaultValue: String, dataType: String) extends JSONParser { - lazy val toJson = Json.obj("name" -> name, "defaultValue" -> defaultValue, "dataType" -> dataType) -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala deleted file mode 100644 index d41e9fa..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Model.scala +++ /dev/null @@ -1,100 +0,0 @@ -package com.kakao.s2graph.core.mysqls - -import java.util.concurrent.Executors - -import com.kakao.s2graph.core.utils.{SafeUpdateCache, logger} -import com.typesafe.config.Config -import scalikejdbc._ - -import scala.concurrent.ExecutionContext -import scala.language.{higherKinds, implicitConversions} -import scala.util.{Failure, Try} - -object Model { - var maxSize = 10000 - var ttl = 60 - val numOfThread = Runtime.getRuntime.availableProcessors() - val threadPool = Executors.newFixedThreadPool(numOfThread) - val ec = ExecutionContext.fromExecutor(threadPool) - - def apply(config: Config) = { - maxSize = config.getInt("cache.max.size") - ttl = config.getInt("cache.ttl.seconds") - Class.forName(config.getString("db.default.driver")) - - val settings = ConnectionPoolSettings( - initialSize = 10, - maxSize = 10, - connectionTimeoutMillis = 30000L, - validationQuery = "select 1;") - - ConnectionPool.singleton( - config.getString("db.default.url"), - config.getString("db.default.user"), - config.getString("db.default.password"), - settings) - } - - def withTx[T](block: DBSession => T): Try[T] = { - using(DB(ConnectionPool.borrow())) { conn => - val res = Try { - conn.begin() - val session = conn.withinTxSession() - val result = block(session) - - conn.commit() - - result - } recoverWith { - case e: Exception => - conn.rollbackIfActive() - Failure(e) - } - - res - } - } - - def shutdown() = { - ConnectionPool.closeAll() - } - - def loadCache() = { - Service.findAll() - ServiceColumn.findAll() - Label.findAll() - LabelMeta.findAll() - LabelIndex.findAll() - ColumnMeta.findAll() - } -} - -trait Model[V] extends SQLSyntaxSupport[V] { - - import Model._ - - implicit val ec: ExecutionContext = Model.ec - - val cName = this.getClass.getSimpleName() - logger.info(s"LocalCache[$cName]: TTL[$ttl], MaxSize[$maxSize]") - - val optionCache = new SafeUpdateCache[Option[V]](cName, maxSize, ttl) - val listCache = new SafeUpdateCache[List[V]](cName, maxSize, ttl) - - val withCache = optionCache.withCache _ - - val withCaches = listCache.withCache _ - - val expireCache = optionCache.invalidate _ - - val expireCaches = listCache.invalidate _ - - def putsToCache(kvs: List[(String, V)]) = kvs.foreach { - case (key, value) => optionCache.put(key, Option(value)) - } - - def putsToCaches(kvs: List[(String, List[V])]) = kvs.foreach { - case (key, values) => listCache.put(key, values) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala deleted file mode 100644 index 2840db8..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/Service.scala +++ /dev/null @@ -1,96 +0,0 @@ -package com.kakao.s2graph.core.mysqls - - -import java.util.UUID - -import com.kakao.s2graph.core.utils.logger -import com.kakao.s2graph.core.{Management} -import play.api.libs.json.Json -import scalikejdbc._ - -object Service extends Model[Service] { - def apply(rs: WrappedResultSet): Service = { - Service(rs.intOpt("id"), rs.string("service_name"), rs.string("access_token"), - rs.string("cluster"), rs.string("hbase_table_name"), rs.int("pre_split_size"), rs.intOpt("hbase_table_ttl")) - } - - def findByAccessToken(accessToken: String)(implicit session: DBSession = AutoSession): Option[Service] = { - val cacheKey = s"accessToken=$accessToken" - withCache(cacheKey)( sql"""select * from services where access_token = ${accessToken}""".map { rs => Service(rs) }.single.apply) - } - - def findById(id: Int)(implicit session: DBSession = AutoSession): Service = { - val cacheKey = "id=" + id - withCache(cacheKey)( sql"""select * from services where id = ${id}""".map { rs => Service(rs) }.single.apply).get - } - - def findByName(serviceName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[Service] = { - val cacheKey = "serviceName=" + serviceName - lazy val serviceOpt = sql""" - select * from services where service_name = ${serviceName} - """.map { rs => Service(rs) }.single.apply() - - if (useCache) withCache(cacheKey)(serviceOpt) - else serviceOpt - } - - def insert(serviceName: String, cluster: String, - hTableName: String, preSplitSize: Int, hTableTTL: Option[Int], - compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Unit = { - logger.info(s"$serviceName, $cluster, $hTableName, $preSplitSize, $hTableTTL, $compressionAlgorithm") - val accessToken = UUID.randomUUID().toString() - sql"""insert into services(service_name, access_token, cluster, hbase_table_name, pre_split_size, hbase_table_ttl) - values(${serviceName}, ${accessToken}, ${cluster}, ${hTableName}, ${preSplitSize}, ${hTableTTL})""".execute.apply() - } - - def delete(id: Int)(implicit session: DBSession = AutoSession) = { - val service = findById(id) - val serviceName = service.serviceName - sql"""delete from service_columns where id = ${id}""".execute.apply() - val cacheKeys = List(s"id=$id", s"serviceName=$serviceName") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - } - - def findOrInsert(serviceName: String, cluster: String, hTableName: String, - preSplitSize: Int, hTableTTL: Option[Int], compressionAlgorithm: String)(implicit session: DBSession = AutoSession): Service = { - findByName(serviceName) match { - case Some(s) => s - case None => - insert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm) - val cacheKey = "serviceName=" + serviceName - expireCache(cacheKey) - findByName(serviceName).get - } - } - - def findAll()(implicit session: DBSession = AutoSession) = { - val ls = sql"""select * from services""".map { rs => Service(rs) }.list.apply - putsToCache(ls.map { x => - val cacheKey = s"id=${x.id.get}" - (cacheKey -> x) - }) - - putsToCache(ls.map { x => - val cacheKey = s"serviceName=${x.serviceName}" - (cacheKey -> x) - }) - } - - def findAllConn()(implicit session: DBSession = AutoSession): List[String] = { - sql"""select distinct(cluster) from services""".map { rs => rs.string("cluster") }.list.apply - } -} - -case class Service(id: Option[Int], serviceName: String, accessToken: String, cluster: String, hTableName: String, preSplitSize: Int, hTableTTL: Option[Int]) { - lazy val toJson = - id match { - case Some(_id) => - Json.obj("id" -> _id, "name" -> serviceName, "accessToken" -> accessToken, "cluster" -> cluster, - "hTableName" -> hTableName, "preSplitSize" -> preSplitSize, "hTableTTL" -> hTableTTL) - case None => - Json.parse("{}") - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ServiceColumn.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ServiceColumn.scala b/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ServiceColumn.scala deleted file mode 100644 index 9ca32a0..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/mysqls/ServiceColumn.scala +++ /dev/null @@ -1,82 +0,0 @@ -package com.kakao.s2graph.core.mysqls - -/** - * Created by shon on 6/3/15. - */ - -import com.kakao.s2graph.core.JSONParser -import play.api.libs.json.Json -import scalikejdbc._ -object ServiceColumn extends Model[ServiceColumn] { - - def apply(rs: WrappedResultSet): ServiceColumn = { - ServiceColumn(rs.intOpt("id"), rs.int("service_id"), rs.string("column_name"), rs.string("column_type").toLowerCase(), rs.string("schema_version")) - } - - def findById(id: Int)(implicit session: DBSession = AutoSession): ServiceColumn = { -// val cacheKey = s"id=$id" - val cacheKey = "id=" + id - withCache(cacheKey)(sql"""select * from service_columns where id = ${id}""".map { x => ServiceColumn(x) }.single.apply).get - } - def find(serviceId: Int, columnName: String, useCache: Boolean = true)(implicit session: DBSession = AutoSession): Option[ServiceColumn] = { -// val cacheKey = s"serviceId=$serviceId:columnName=$columnName" - val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName - if (useCache) { - withCache(cacheKey) { - sql""" - select * from service_columns where service_id = ${serviceId} and column_name = ${columnName} - """.map { rs => ServiceColumn(rs) }.single.apply() - } - } else { - sql""" - select * from service_columns where service_id = ${serviceId} and column_name = ${columnName} - """.map { rs => ServiceColumn(rs) }.single.apply() - } - } - def insert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String)(implicit session: DBSession = AutoSession) = { - sql"""insert into service_columns(service_id, column_name, column_type, schema_version) - values(${serviceId}, ${columnName}, ${columnType}, ${schemaVersion})""".execute.apply() - } - def delete(id: Int)(implicit session: DBSession = AutoSession) = { - val serviceColumn = findById(id) - val (serviceId, columnName) = (serviceColumn.serviceId, serviceColumn.columnName) - sql"""delete from service_columns where id = ${id}""".execute.apply() - val cacheKeys = List(s"id=$id", s"serviceId=$serviceId:columnName=$columnName") - cacheKeys.foreach { key => - expireCache(key) - expireCaches(key) - } - } - def findOrInsert(serviceId: Int, columnName: String, columnType: Option[String], schemaVersion: String)(implicit session: DBSession = AutoSession): ServiceColumn = { - find(serviceId, columnName) match { - case Some(sc) => sc - case None => - insert(serviceId, columnName, columnType, schemaVersion) -// val cacheKey = s"serviceId=$serviceId:columnName=$columnName" - val cacheKey = "serviceId=" + serviceId + ":columnName=" + columnName - expireCache(cacheKey) - find(serviceId, columnName).get - } - } - def findAll()(implicit session: DBSession = AutoSession) = { - val ls = sql"""select * from service_columns""".map { rs => ServiceColumn(rs) }.list.apply - putsToCache(ls.map { x => - var cacheKey = s"id=${x.id.get}" - (cacheKey -> x) - }) - putsToCache(ls.map { x => - var cacheKey = s"serviceId=${x.serviceId}:columnName=${x.columnName}" - (cacheKey -> x) - }) - } -} -case class ServiceColumn(id: Option[Int], serviceId: Int, columnName: String, columnType: String, schemaVersion: String) extends JSONParser { - - lazy val service = Service.findById(serviceId) - lazy val metas = ColumnMeta.findAllByColumn(id.get) - lazy val metasInvMap = metas.map { meta => meta.name -> meta} toMap - lazy val metaNamesMap = (ColumnMeta.lastModifiedAtColumn :: metas).map(x => (x.seq.toInt, x.name)) toMap - lazy val toJson = Json.obj("serviceName" -> service.serviceName, "columnName" -> columnName, "columnType" -> columnType) - - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/parsers/WhereParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/com/kakao/s2graph/core/parsers/WhereParser.scala deleted file mode 100644 index 2a62b3b..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/parsers/WhereParser.scala +++ /dev/null @@ -1,207 +0,0 @@ -package com.kakao.s2graph.core.parsers - -import com.kakao.s2graph.core.GraphExceptions.WhereParserException -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls._ -import com.kakao.s2graph.core.types.InnerValLike - -import scala.annotation.tailrec -import scala.util.Try -import scala.util.parsing.combinator.JavaTokenParsers - -trait ExtractValue extends JSONParser { - val parent = "_parent." - - def propToInnerVal(edge: Edge, key: String) = { - val (propKey, parentEdge) = findParentEdge(edge, key) - - val label = parentEdge.label - val metaPropInvMap = label.metaPropsInvMap - val labelMeta = metaPropInvMap.getOrElse(propKey, throw WhereParserException(s"Where clause contains not existing property name: $propKey")) - val metaSeq = labelMeta.seq - - metaSeq match { - case LabelMeta.from.seq => parentEdge.srcVertex.innerId - case LabelMeta.to.seq => parentEdge.tgtVertex.innerId - case _ => parentEdge.propsWithTs.get(metaSeq) match { - case None => toInnerVal(labelMeta.defaultValue, labelMeta.dataType, label.schemaVersion) - case Some(edgeVal) => edgeVal.innerVal - } - } - } - - def valueToCompare(edge: Edge, key: String, value: String) = { - val label = edge.label - if (value.startsWith(parent) || label.metaPropsInvMap.contains(value)) propToInnerVal(edge, value) - else { - val (propKey, _) = findParentEdge(edge, key) - - val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw WhereParserException(s"Where clause contains not existing property name: $propKey")) - val (srcColumn, tgtColumn) = label.srcTgtColumn(edge.labelWithDir.dir) - val dataType = propKey match { - case "_to" | "to" => tgtColumn.columnType - case "_from" | "from" => srcColumn.columnType - case _ => labelMeta.dataType - } - toInnerVal(value, dataType, label.schemaVersion) - } - } - - @tailrec - private def findParent(edge: Edge, depth: Int): Edge = - if (depth > 0) findParent(edge.parentEdges.head.edge, depth - 1) - else edge - - private def findParentEdge(edge: Edge, key: String): (String, Edge) = { - if (!key.startsWith(parent)) (key, edge) - else { - val split = key.split(parent) - val depth = split.length - 1 - val propKey = split.last - - val parentEdge = findParent(edge, depth) - - (propKey, parentEdge) - } - } -} - -trait Clause extends ExtractValue { - def and(otherField: Clause): Clause = And(this, otherField) - - def or(otherField: Clause): Clause = Or(this, otherField) - - def filter(edge: Edge): Boolean - - def binaryOp(binOp: (InnerValLike, InnerValLike) => Boolean)(propKey: String, value: String)(edge: Edge): Boolean = { - val propValue = propToInnerVal(edge, propKey) - val compValue = valueToCompare(edge, propKey, value) - - binOp(propValue, compValue) - } -} - -case class Where(clauses: Seq[Clause] = Seq.empty[Clause]) { - def filter(edge: Edge) = - if (clauses.isEmpty) true else clauses.map(_.filter(edge)).forall(identity) -} - -case class Gt(propKey: String, value: String) extends Clause { - override def filter(edge: Edge): Boolean = binaryOp(_ > _)(propKey, value)(edge) -} - -case class Lt(propKey: String, value: String) extends Clause { - override def filter(edge: Edge): Boolean = binaryOp(_ < _)(propKey, value)(edge) -} - -case class Eq(propKey: String, value: String) extends Clause { - override def filter(edge: Edge): Boolean = binaryOp(_ == _)(propKey, value)(edge) -} - -case class InWithoutParent(label: Label, propKey: String, values: Set[String]) extends Clause { - val innerValLikeLs = values.map { value => - val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw WhereParserException(s"Where clause contains not existing property name: $propKey")) - val dataType = propKey match { - case "_to" | "to" => label.tgtColumn.columnType - case "_from" | "from" => label.srcColumn.columnType - case _ => labelMeta.dataType - } - toInnerVal(value, dataType, label.schemaVersion) - } - override def filter(edge: Edge): Boolean = { - val propVal = propToInnerVal(edge, propKey) - innerValLikeLs.contains(propVal) - } -} - -case class IN(propKey: String, values: Set[String]) extends Clause { - override def filter(edge: Edge): Boolean = { - val propVal = propToInnerVal(edge, propKey) - values.exists { value => - valueToCompare(edge, propKey, value) == propVal - } - } -} - -case class Between(propKey: String, minValue: String, maxValue: String) extends Clause { - override def filter(edge: Edge): Boolean = { - val propVal = propToInnerVal(edge, propKey) - val minVal = valueToCompare(edge, propKey, minValue) - val maxVal = valueToCompare(edge, propKey, maxValue) - - minVal <= propVal && propVal <= maxVal - } -} - -case class Not(self: Clause) extends Clause { - override def filter(edge: Edge) = !self.filter(edge) -} - -case class And(left: Clause, right: Clause) extends Clause { - override def filter(edge: Edge) = left.filter(edge) && right.filter(edge) -} - -case class Or(left: Clause, right: Clause) extends Clause { - override def filter(edge: Edge) = left.filter(edge) || right.filter(edge) -} - -object WhereParser { - val success = Where() -} - -case class WhereParser(label: Label) extends JavaTokenParsers with JSONParser { - - val anyStr = "[^\\s(),]+".r - - val and = "and|AND".r - - val or = "or|OR".r - - val between = "between|BETWEEN".r - - val in = "in|IN".r - - val notIn = "not in|NOT IN".r - - def where: Parser[Where] = rep(clause) ^^ (Where(_)) - - def paren: Parser[Clause] = "(" ~> clause <~ ")" - - def clause: Parser[Clause] = (predicate | paren) * (and ^^^ { (a: Clause, b: Clause) => And(a, b) } | or ^^^ { (a: Clause, b: Clause) => Or(a, b) }) - - def identWithDot: Parser[String] = repsep(ident, ".") ^^ { case values => values.mkString(".") } - - def predicate = { - identWithDot ~ ("!=" | "=") ~ anyStr ^^ { - case f ~ op ~ s => - if (op == "=") Eq(f, s) - else Not(Eq(f, s)) - } | identWithDot ~ (">=" | "<=" | ">" | "<") ~ anyStr ^^ { - case f ~ op ~ s => op match { - case ">" => Gt(f, s) - case ">=" => Or(Gt(f, s), Eq(f, s)) - case "<" => Lt(f, s) - case "<=" => Or(Lt(f, s), Eq(f, s)) - } - } | identWithDot ~ (between ~> anyStr <~ and) ~ anyStr ^^ { - case f ~ minV ~ maxV => Between(f, minV, maxV) - } | identWithDot ~ (notIn | in) ~ ("(" ~> repsep(anyStr, ",") <~ ")") ^^ { - case f ~ op ~ values => - val inClause = - if (f.startsWith("_parent")) IN(f, values.toSet) - else InWithoutParent(label, f, values.toSet) - if (op.toLowerCase == "in") inClause - else Not(inClause) - - - case _ => throw WhereParserException(s"Failed to parse where clause. ") - } - } - - def parse(sql: String): Try[Where] = Try { - parseAll(where, sql) match { - case Success(r, q) => r - case fail => throw WhereParserException(s"Where parsing error: ${fail.toString}") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala deleted file mode 100644 index f8129db..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RequestParser.scala +++ /dev/null @@ -1,609 +0,0 @@ -package com.kakao.s2graph.core.rest - -import java.util.concurrent.{Callable, TimeUnit} - -import com.google.common.cache.{CacheLoader, CacheBuilder} -import com.kakao.s2graph.core.GraphExceptions.{BadQueryException, ModelNotFoundException} -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls._ -import com.kakao.s2graph.core.parsers.{Where, WhereParser} -import com.kakao.s2graph.core.types._ -import com.typesafe.config.Config -import play.api.libs.json._ -import scala.util.{Failure, Success, Try} - -object TemplateHelper { - val findVar = """\"?\$\{(.*?)\}\"?""".r - val num = """(next_day|next_hour|next_week|now)?\s*(-?\s*[0-9]+)?\s*(hour|day|week)?""".r - val hour = 60 * 60 * 1000L - val day = hour * 24L - val week = day * 7L - def calculate(now: Long, n: Int, unit: String): Long = { - val duration = unit match { - case "hour" | "HOUR" => n * hour - case "day" | "DAY" => n * day - case "week" | "WEEK" => n * week - case _ => n * day - } - - duration + now - } - - def replaceVariable(now: Long, body: String): String = { - findVar.replaceAllIn(body, m => { - val matched = m group 1 - - num.replaceSomeIn(matched, m => { - val (_pivot, n, unit) = (m.group(1), m.group(2), m.group(3)) - val ts = _pivot match { - case null => now - case "now" | "NOW" => now - case "next_week" | "NEXT_WEEK" => now / week * week + week - case "next_day" | "NEXT_DAY" => now / day * day + day - case "next_hour" | "NEXT_HOUR" => now / hour * hour + hour - } - - if (_pivot == null && n == null && unit == null) None - else if (n == null || unit == null) Option(ts.toString) - else Option(calculate(ts, n.replaceAll(" ", "").toInt, unit).toString) - }) - }) - } -} - -class RequestParser(config: Config) extends JSONParser { - - import Management.JsonModel._ - - val hardLimit = 100000 - val defaultLimit = 100 - val maxLimit = Int.MaxValue - 1 - val DefaultRpcTimeout = config.getInt("hbase.rpc.timeout") - val DefaultMaxAttempt = config.getInt("hbase.client.retries.number") - val DefaultCluster = config.getString("hbase.zookeeper.quorum") - val DefaultCompressionAlgorithm = config.getString("hbase.table.compression.algorithm") - val DefaultPhase = config.getString("phase") - val parserCache = CacheBuilder.newBuilder() - .expireAfterAccess(10000, TimeUnit.MILLISECONDS) - .expireAfterWrite(10000, TimeUnit.MILLISECONDS) - .maximumSize(10000) - .initialCapacity(1000) - .build[String, Try[Where]] - - private def extractScoring(labelId: Int, value: JsValue) = { - val ret = for { - js <- parse[Option[JsObject]](value, "scoring") - } yield { - for { - (k, v) <- js.fields - labelOrderType <- LabelMeta.findByName(labelId, k) - } yield { - val value = v match { - case n: JsNumber => n.as[Double] - case _ => throw new Exception("scoring weight should be double.") - } - (labelOrderType.seq, value) - } - } - ret - } - - def extractInterval(label: Label, _jsValue: JsValue) = { - val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString()) - val jsValue = Json.parse(replaced) - - def extractKv(js: JsValue) = js match { - case JsObject(obj) => obj - case JsArray(arr) => arr.flatMap { - case JsObject(obj) => obj - case _ => throw new RuntimeException(s"cannot support json type $js") - } - case _ => throw new RuntimeException(s"cannot support json type: $js") - } - - val ret = for { - js <- parse[Option[JsObject]](jsValue, "interval") - fromJs <- (js \ "from").asOpt[JsValue] - toJs <- (js \ "to").asOpt[JsValue] - } yield { - val from = Management.toProps(label, extractKv(fromJs)) - val to = Management.toProps(label, extractKv(toJs)) - (from, to) - } - - ret - } - - def extractDuration(label: Label, _jsValue: JsValue) = { - val replaced = TemplateHelper.replaceVariable(System.currentTimeMillis(), _jsValue.toString()) - val jsValue = Json.parse(replaced) - - for { - js <- parse[Option[JsObject]](jsValue, "duration") - } yield { - val minTs = parse[Option[Long]](js, "from").getOrElse(Long.MaxValue) - val maxTs = parse[Option[Long]](js, "to").getOrElse(Long.MinValue) - - if (minTs > maxTs) { - throw new BadQueryException("Duration error. Timestamp of From cannot be larger than To.") - } - - (minTs, maxTs) - } - } - - def extractHas(label: Label, jsValue: JsValue) = { - val ret = for { - js <- parse[Option[JsObject]](jsValue, "has") - } yield { - for { - (k, v) <- js.fields - labelMeta <- LabelMeta.findByName(label.id.get, k) - value <- jsValueToInnerVal(v, labelMeta.dataType, label.schemaVersion) - } yield { - labelMeta.seq -> value - } - } - ret.map(_.toMap).getOrElse(Map.empty[Byte, InnerValLike]) - } - - def extractWhere(label: Label, whereClauseOpt: Option[String]): Try[Where] = { - whereClauseOpt match { - case None => Success(WhereParser.success) - case Some(_where) => - val where = TemplateHelper.replaceVariable(System.currentTimeMillis(), _where) - val whereParserKey = s"${label.label}_${where}" - parserCache.get(whereParserKey, new Callable[Try[Where]] { - override def call(): Try[Where] = { - WhereParser(label).parse(where) match { - case s@Success(_) => s - case Failure(ex) => throw BadQueryException(ex.getMessage, ex) - } - } - }) - } - } - - def toVertices(labelName: String, direction: String, ids: Seq[JsValue]): Seq[Vertex] = { - val vertices = for { - label <- Label.findByName(labelName).toSeq - serviceColumn = if (direction == "out") label.srcColumn else label.tgtColumn - id <- ids - innerId <- jsValueToInnerVal(id, serviceColumn.columnType, label.schemaVersion) - } yield { - Vertex(SourceVertexId(serviceColumn.id.get, innerId), System.currentTimeMillis()) - } - vertices.toSeq - } - - def toMultiQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): MultiQuery = { - val queries = for { - queryJson <- (jsValue \ "queries").asOpt[Seq[JsValue]].getOrElse(Seq.empty) - } yield { - toQuery(queryJson, isEdgeQuery) - } - val weights = (jsValue \ "weights").asOpt[Seq[Double]].getOrElse(queries.map(_ => 1.0)) - MultiQuery(queries = queries, weights = weights, - queryOption = toQueryOption(jsValue), jsonQuery = jsValue) - } - - def toQueryOption(jsValue: JsValue): QueryOption = { - val filterOutFields = (jsValue \ "filterOutFields").asOpt[List[String]].getOrElse(List(LabelMeta.to.name)) - val filterOutQuery = (jsValue \ "filterOut").asOpt[JsValue].map { v => toQuery(v) }.map { q => - q.copy(queryOption = q.queryOption.copy(filterOutFields = filterOutFields)) - } - val removeCycle = (jsValue \ "removeCycle").asOpt[Boolean].getOrElse(true) - val selectColumns = (jsValue \ "select").asOpt[List[String]].getOrElse(List.empty) - val groupByColumns = (jsValue \ "groupBy").asOpt[List[String]].getOrElse(List.empty) - val orderByColumns: List[(String, Boolean)] = (jsValue \ "orderBy").asOpt[List[JsObject]].map { jsLs => - for { - js <- jsLs - (column, orderJs) <- js.fields - } yield { - val ascending = orderJs.as[String].toUpperCase match { - case "ASC" => true - case "DESC" => false - } - column -> ascending - } - }.getOrElse(List("score" -> false, "timestamp" -> false)) - val withScore = (jsValue \ "withScore").asOpt[Boolean].getOrElse(true) - val returnTree = (jsValue \ "returnTree").asOpt[Boolean].getOrElse(false) - //TODO: Refactor this - val limitOpt = (jsValue \ "limit").asOpt[Int] - val returnAgg = (jsValue \ "returnAgg").asOpt[Boolean].getOrElse(true) - val scoreThreshold = (jsValue \ "scoreThreshold").asOpt[Double].getOrElse(Double.MinValue) - val returnDegree = (jsValue \ "returnDegree").asOpt[Boolean].getOrElse(true) - - QueryOption(removeCycle = removeCycle, - selectColumns = selectColumns, - groupByColumns = groupByColumns, - orderByColumns = orderByColumns, - filterOutQuery = filterOutQuery, - filterOutFields = filterOutFields, - withScore = withScore, - returnTree = returnTree, - limitOpt = limitOpt, - returnAgg = returnAgg, - scoreThreshold = scoreThreshold, - returnDegree = returnDegree - ) - } - def toQuery(jsValue: JsValue, isEdgeQuery: Boolean = true): Query = { - try { - val vertices = - (for { - value <- parse[List[JsValue]](jsValue, "srcVertices") - serviceName = parse[String](value, "serviceName") - column = parse[String](value, "columnName") - } yield { - val service = Service.findByName(serviceName).getOrElse(throw BadQueryException("service not found")) - val col = ServiceColumn.find(service.id.get, column).getOrElse(throw BadQueryException("bad column name")) - val (idOpt, idsOpt) = ((value \ "id").asOpt[JsValue], (value \ "ids").asOpt[List[JsValue]]) - for { - idVal <- idOpt ++ idsOpt.toSeq.flatten - - /** bug, need to use labels schemaVersion */ - innerVal <- jsValueToInnerVal(idVal, col.columnType, col.schemaVersion) - } yield { - Vertex(SourceVertexId(col.id.get, innerVal), System.currentTimeMillis()) - } - }).flatten - - if (vertices.isEmpty) throw BadQueryException("srcVertices`s id is empty") - val steps = parse[Vector[JsValue]](jsValue, "steps") - - val queryOption = toQueryOption(jsValue) - - val querySteps = - steps.zipWithIndex.map { case (step, stepIdx) => - val labelWeights = step match { - case obj: JsObject => - val converted = for { - (k, v) <- (obj \ "weights").asOpt[JsObject].getOrElse(Json.obj()).fields - l <- Label.findByName(k) - } yield { - l.id.get -> v.toString().toDouble - } - converted.toMap - case _ => Map.empty[Int, Double] - } - val queryParamJsVals = step match { - case arr: JsArray => arr.as[List[JsValue]] - case obj: JsObject => (obj \ "step").as[List[JsValue]] - case _ => List.empty[JsValue] - } - val nextStepScoreThreshold = step match { - case obj: JsObject => (obj \ "nextStepThreshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold) - case _ => QueryParam.DefaultThreshold - } - val nextStepLimit = step match { - case obj: JsObject => (obj \ "nextStepLimit").asOpt[Int].getOrElse(-1) - case _ => -1 - } - val cacheTTL = step match { - case obj: JsObject => (obj \ "cacheTTL").asOpt[Int].getOrElse(-1) - case _ => -1 - } - val queryParams = - for { - labelGroup <- queryParamJsVals - queryParam <- parseQueryParam(labelGroup) - } yield { - val (_, columnName) = - if (queryParam.labelWithDir.dir == GraphUtil.directions("out")) { - (queryParam.label.srcService.serviceName, queryParam.label.srcColumnName) - } else { - (queryParam.label.tgtService.serviceName, queryParam.label.tgtColumnName) - } - //FIXME: - if (stepIdx == 0 && vertices.nonEmpty && !vertices.exists(v => v.serviceColumn.columnName == columnName)) { - throw BadQueryException("srcVertices contains incompatiable serviceName or columnName with first step.") - } - - queryParam - } - Step(queryParams.toList, labelWeights = labelWeights, - // scoreThreshold = stepThreshold, - nextStepScoreThreshold = nextStepScoreThreshold, - nextStepLimit = nextStepLimit, - cacheTTL = cacheTTL) - - } - - val ret = Query(vertices, querySteps, queryOption, jsValue) - // logger.debug(ret.toString) - ret - } catch { - case e: BadQueryException => - throw e - case e: ModelNotFoundException => - throw BadQueryException(e.getMessage, e) - case e: Exception => - throw BadQueryException(s"$jsValue, $e", e) - } - } - - private def parseQueryParam(labelGroup: JsValue): Option[QueryParam] = { - for { - labelName <- parse[Option[String]](labelGroup, "label") - } yield { - val label = Label.findByName(labelName).getOrElse(throw BadQueryException(s"$labelName not found")) - val direction = parse[Option[String]](labelGroup, "direction").map(GraphUtil.toDirection(_)).getOrElse(0) - val limit = { - parse[Option[Int]](labelGroup, "limit") match { - case None => defaultLimit - case Some(l) if l < 0 => maxLimit - case Some(l) if l >= 0 => - val default = hardLimit - Math.min(l, default) - } - } - val offset = parse[Option[Int]](labelGroup, "offset").getOrElse(0) - val interval = extractInterval(label, labelGroup) - val duration = extractDuration(label, labelGroup) - val scoring = extractScoring(label.id.get, labelGroup).getOrElse(List.empty[(Byte, Double)]).toList - val exclude = parse[Option[Boolean]](labelGroup, "exclude").getOrElse(false) - val include = parse[Option[Boolean]](labelGroup, "include").getOrElse(false) - val hasFilter = extractHas(label, labelGroup) - val labelWithDir = LabelWithDirection(label.id.get, direction) - val indexNameOpt = (labelGroup \ "index").asOpt[String] - val indexSeq = indexNameOpt match { - case None => label.indexSeqsMap.get(scoring.map(kv => kv._1)).map(_.seq).getOrElse(LabelIndex.DefaultSeq) - case Some(indexName) => label.indexNameMap.get(indexName).map(_.seq).getOrElse(throw new RuntimeException("cannot find index")) - } - val whereClauseOpt = (labelGroup \ "where").asOpt[String] - val where = extractWhere(label, whereClauseOpt) - val includeDegree = (labelGroup \ "includeDegree").asOpt[Boolean].getOrElse(true) - val rpcTimeout = (labelGroup \ "rpcTimeout").asOpt[Int].getOrElse(DefaultRpcTimeout) - val maxAttempt = (labelGroup \ "maxAttempt").asOpt[Int].getOrElse(DefaultMaxAttempt) - val tgtVertexInnerIdOpt = (labelGroup \ "_to").asOpt[JsValue].flatMap { jsVal => - jsValueToInnerVal(jsVal, label.tgtColumnWithDir(direction).columnType, label.schemaVersion) - } - val cacheTTL = (labelGroup \ "cacheTTL").asOpt[Long].getOrElse(-1L) - val timeDecayFactor = (labelGroup \ "timeDecay").asOpt[JsObject].map { jsVal => - val propName = (jsVal \ "propName").asOpt[String].getOrElse(LabelMeta.timestamp.name) - val propNameSeq = label.metaPropsInvMap.get(propName).map(_.seq).getOrElse(LabelMeta.timeStampSeq) - val initial = (jsVal \ "initial").asOpt[Double].getOrElse(1.0) - val decayRate = (jsVal \ "decayRate").asOpt[Double].getOrElse(0.1) - if (decayRate >= 1.0 || decayRate <= 0.0) throw new BadQueryException("decay rate should be 0.0 ~ 1.0") - val timeUnit = (jsVal \ "timeUnit").asOpt[Double].getOrElse(60 * 60 * 24.0) - TimeDecay(initial, decayRate, timeUnit, propNameSeq) - } - val threshold = (labelGroup \ "threshold").asOpt[Double].getOrElse(QueryParam.DefaultThreshold) - // TODO: refactor this. dirty - val duplicate = parse[Option[String]](labelGroup, "duplicate").map(s => Query.DuplicatePolicy(s)) - - val outputField = (labelGroup \ "outputField").asOpt[String].map(s => Json.arr(Json.arr(s))) - val transformer = if (outputField.isDefined) outputField else (labelGroup \ "transform").asOpt[JsValue] - val scorePropagateOp = (labelGroup \ "scorePropagateOp").asOpt[String].getOrElse("multiply") - val sample = (labelGroup \ "sample").asOpt[Int].getOrElse(-1) - val shouldNormalize = (labelGroup \ "normalize").asOpt[Boolean].getOrElse(false) - val cursorOpt = (labelGroup \ "cursor").asOpt[String] - // FIXME: Order of command matter - QueryParam(labelWithDir) - .sample(sample) - .limit(offset, limit) - .rank(RankParam(label.id.get, scoring)) - .exclude(exclude) - .include(include) - .duration(duration) - .has(hasFilter) - .labelOrderSeq(indexSeq) - .interval(interval) - .where(where) - .duplicatePolicy(duplicate) - .includeDegree(includeDegree) - .rpcTimeout(rpcTimeout) - .maxAttempt(maxAttempt) - .tgtVertexInnerIdOpt(tgtVertexInnerIdOpt) - .cacheTTLInMillis(cacheTTL) - .timeDecay(timeDecayFactor) - .threshold(threshold) - .transformer(transformer) - .scorePropagateOp(scorePropagateOp) - .shouldNormalize(shouldNormalize) - .cursorOpt(cursorOpt) - } - } - - private def parse[R](js: JsValue, key: String)(implicit read: Reads[R]): R = { - (js \ key).validate[R] - .fold( - errors => { - val msg = (JsError.toFlatJson(errors) \ "obj").as[List[JsValue]].map(x => x \ "msg") - val e = Json.obj("args" -> key, "error" -> msg) - throw new GraphExceptions.JsonParseException(Json.obj("error" -> key).toString) - }, - r => { - r - }) - } - - def toJsValues(jsValue: JsValue): List[JsValue] = { - jsValue match { - case obj: JsObject => List(obj) - case arr: JsArray => arr.as[List[JsValue]] - case _ => List.empty[JsValue] - } - - } - - def toEdgesWithOrg(jsValue: JsValue, operation: String): (List[Edge], List[JsValue]) = { - val jsValues = toJsValues(jsValue) - val edges = jsValues.flatMap(toEdge(_, operation)) - - (edges, jsValues) - } - - def toEdges(jsValue: JsValue, operation: String): List[Edge] = { - toJsValues(jsValue).flatMap { edgeJson => - toEdge(edgeJson, operation) - } - } - - - private def toEdge(jsValue: JsValue, operation: String): List[Edge] = { - - def parseId(js: JsValue) = js match { - case s: JsString => s.as[String] - case o@_ => s"${o}" - } - val srcId = (jsValue \ "from").asOpt[JsValue].toList.map(parseId(_)) - val tgtId = (jsValue \ "to").asOpt[JsValue].toList.map(parseId(_)) - val srcIds = (jsValue \ "froms").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(js => parseId(js))) ++ srcId - val tgtIds = (jsValue \ "tos").asOpt[List[JsValue]].toList.flatMap(froms => froms.map(js => parseId(js))) ++ tgtId - - val label = parse[String](jsValue, "label") - val timestamp = parse[Long](jsValue, "timestamp") - val direction = parse[Option[String]](jsValue, "direction").getOrElse("") - val props = (jsValue \ "props").asOpt[JsValue].getOrElse("{}") - for { - srcId <- srcIds - tgtId <- tgtIds - } yield { - Management.toEdge(timestamp, operation, srcId, tgtId, label, direction, props.toString) - } - } - - def toVertices(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None) = { - toJsValues(jsValue).map(toVertex(_, operation, serviceName, columnName)) - } - - def toVertex(jsValue: JsValue, operation: String, serviceName: Option[String] = None, columnName: Option[String] = None): Vertex = { - val id = parse[JsValue](jsValue, "id") - val ts = parse[Option[Long]](jsValue, "timestamp").getOrElse(System.currentTimeMillis()) - val sName = if (serviceName.isEmpty) parse[String](jsValue, "serviceName") else serviceName.get - val cName = if (columnName.isEmpty) parse[String](jsValue, "columnName") else columnName.get - val props = (jsValue \ "props").asOpt[JsObject].getOrElse(Json.obj()) - Management.toVertex(ts, operation, id.toString, sName, cName, props.toString) - } - - def toPropElements(jsObj: JsValue) = Try { - val propName = (jsObj \ "name").as[String] - val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String]) - val defaultValue = (jsObj \ "defaultValue").as[JsValue] match { - case JsString(s) => s - case _@js => js.toString - } - Prop(propName, defaultValue, dataType) - } - - def toPropsElements(jsValue: JsValue): Seq[Prop] = for { - jsObj <- jsValue.asOpt[Seq[JsValue]].getOrElse(Nil) - } yield { - val propName = (jsObj \ "name").as[String] - val dataType = InnerVal.toInnerDataType((jsObj \ "dataType").as[String]) - val defaultValue = (jsObj \ "defaultValue").as[JsValue] match { - case JsString(s) => s - case _@js => js.toString - } - Prop(propName, defaultValue, dataType) - } - - def toIndicesElements(jsValue: JsValue): Seq[Index] = for { - jsObj <- jsValue.as[Seq[JsValue]] - indexName = (jsObj \ "name").as[String] - propNames = (jsObj \ "propNames").as[Seq[String]] - } yield Index(indexName, propNames) - - def toLabelElements(jsValue: JsValue) = Try { - val labelName = parse[String](jsValue, "label") - val srcServiceName = parse[String](jsValue, "srcServiceName") - val tgtServiceName = parse[String](jsValue, "tgtServiceName") - val srcColumnName = parse[String](jsValue, "srcColumnName") - val tgtColumnName = parse[String](jsValue, "tgtColumnName") - val srcColumnType = parse[String](jsValue, "srcColumnType") - val tgtColumnType = parse[String](jsValue, "tgtColumnType") - val serviceName = (jsValue \ "serviceName").asOpt[String].getOrElse(tgtServiceName) - val isDirected = (jsValue \ "isDirected").asOpt[Boolean].getOrElse(true) - - val allProps = toPropsElements(jsValue \ "props") - val indices = toIndicesElements(jsValue \ "indices") - - val consistencyLevel = (jsValue \ "consistencyLevel").asOpt[String].getOrElse("weak") - - // expect new label don`t provide hTableName - val hTableName = (jsValue \ "hTableName").asOpt[String] - val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int] - val schemaVersion = (jsValue \ "schemaVersion").asOpt[String].getOrElse(HBaseType.DEFAULT_VERSION) - val isAsync = (jsValue \ "isAsync").asOpt[Boolean].getOrElse(false) - val compressionAlgorithm = (jsValue \ "compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm) - - (labelName, srcServiceName, srcColumnName, srcColumnType, - tgtServiceName, tgtColumnName, tgtColumnType, isDirected, serviceName, - indices, allProps, consistencyLevel, hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm) - } - - def toIndexElements(jsValue: JsValue) = Try { - val labelName = parse[String](jsValue, "label") - val indices = toIndicesElements(jsValue \ "indices") - (labelName, indices) - } - - def toServiceElements(jsValue: JsValue) = { - val serviceName = parse[String](jsValue, "serviceName") - val cluster = (jsValue \ "cluster").asOpt[String].getOrElse(DefaultCluster) - val hTableName = (jsValue \ "hTableName").asOpt[String].getOrElse(s"${serviceName}-${DefaultPhase}") - val preSplitSize = (jsValue \ "preSplitSize").asOpt[Int].getOrElse(1) - val hTableTTL = (jsValue \ "hTableTTL").asOpt[Int] - val compressionAlgorithm = (jsValue \ "compressionAlgorithm").asOpt[String].getOrElse(DefaultCompressionAlgorithm) - (serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm) - } - - def toServiceColumnElements(jsValue: JsValue) = Try { - val serviceName = parse[String](jsValue, "serviceName") - val columnName = parse[String](jsValue, "columnName") - val columnType = parse[String](jsValue, "columnType") - val props = toPropsElements(jsValue \ "props") - (serviceName, columnName, columnType, props) - } - - def toCheckEdgeParam(jsValue: JsValue) = { - val params = jsValue.as[List[JsValue]] - var isReverted = false - val labelWithDirs = scala.collection.mutable.HashSet[LabelWithDirection]() - val quads = for { - param <- params - labelName <- (param \ "label").asOpt[String] - direction <- GraphUtil.toDir((param \ "direction").asOpt[String].getOrElse("out")) - label <- Label.findByName(labelName) - srcId <- jsValueToInnerVal((param \ "from").as[JsValue], label.srcColumnWithDir(direction.toInt).columnType, label.schemaVersion) - tgtId <- jsValueToInnerVal((param \ "to").as[JsValue], label.tgtColumnWithDir(direction.toInt).columnType, label.schemaVersion) - } yield { - val labelWithDir = LabelWithDirection(label.id.get, direction) - labelWithDirs += labelWithDir - val (src, tgt, dir) = if (direction == 1) { - isReverted = true - (Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), - Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), 0) - } else { - (Vertex(VertexId(label.srcColumnWithDir(direction.toInt).id.get, srcId)), - Vertex(VertexId(label.tgtColumnWithDir(direction.toInt).id.get, tgtId)), 0) - } - (src, tgt, QueryParam(LabelWithDirection(label.id.get, dir))) - } - (quads, isReverted) - } - - def toGraphElements(str: String): Seq[GraphElement] = { - val edgeStrs = str.split("\\n") - - for { - edgeStr <- edgeStrs - str <- GraphUtil.parseString(edgeStr) - element <- Graph.toGraphElement(str) - } yield element - } - - def toDeleteParam(json: JsValue) = { - val labelName = (json \ "label").as[String] - val labels = Label.findByName(labelName).map { l => Seq(l) }.getOrElse(Nil).filterNot(_.isAsync) - val direction = (json \ "direction").asOpt[String].getOrElse("out") - - val ids = (json \ "ids").asOpt[List[JsValue]].getOrElse(Nil) - val ts = (json \ "timestamp").asOpt[Long].getOrElse(System.currentTimeMillis()) - val vertices = toVertices(labelName, direction, ids) - (labels, direction, ids, ts, vertices) - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala b/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala deleted file mode 100644 index b130854..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/rest/RestHandler.scala +++ /dev/null @@ -1,245 +0,0 @@ -package com.kakao.s2graph.core.rest - -import java.net.URL - -import com.kakao.s2graph.core.GraphExceptions.BadQueryException -import com.kakao.s2graph.core._ -import com.kakao.s2graph.core.mysqls.{Bucket, Experiment, Service} -import com.kakao.s2graph.core.utils.logger -import play.api.libs.json._ - -import scala.concurrent.{ExecutionContext, Future} -import scala.util.Try - - -object RestHandler { - case class HandlerResult(body: Future[JsValue], headers: (String, String)*) -} - -/** - * Public API, only return Future.successful or Future.failed - * Don't throw exception - */ -class RestHandler(graph: Graph)(implicit ec: ExecutionContext) { - - import RestHandler._ - val requestParser = new RequestParser(graph.config) - - /** - * Public APIS - */ - def doPost(uri: String, body: String, impKeyOpt: => Option[String] = None): HandlerResult = { - try { - val jsQuery = Json.parse(body) - - uri match { - case "/graphs/getEdges" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)) - case "/graphs/getEdges/grouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithListFormatted)) - case "/graphs/getEdgesExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.toSimpleVertexArrJson)) - case "/graphs/getEdgesExcluded/grouped" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)) - case "/graphs/checkEdges" => checkEdges(jsQuery) - case "/graphs/getEdgesGrouped" => HandlerResult(getEdgesAsync(jsQuery)(PostProcess.summarizeWithList)) - case "/graphs/getEdgesGroupedExcluded" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExclude)) - case "/graphs/getEdgesGroupedExcludedFormatted" => HandlerResult(getEdgesExcludedAsync(jsQuery)(PostProcess.summarizeWithListExcludeFormatted)) - case "/graphs/getVertices" => HandlerResult(getVertices(jsQuery)) - case uri if uri.startsWith("/graphs/experiment") => - val Array(accessToken, experimentName, uuid) = uri.split("/").takeRight(3) - experiment(jsQuery, accessToken, experimentName, uuid, impKeyOpt) - case _ => throw new RuntimeException("route is not found") - } - } catch { - case e: Exception => HandlerResult(Future.failed(e)) - } - } - - // TODO: Refactor to doGet - def checkEdges(jsValue: JsValue): HandlerResult = { - try { - val (quads, isReverted) = requestParser.toCheckEdgeParam(jsValue) - - HandlerResult(graph.checkEdges(quads).map { case queryRequestWithResultLs => - val edgeJsons = for { - queryRequestWithResult <- queryRequestWithResultLs - (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - edgeWithScore <- queryResult.edgeWithScoreLs - (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - convertedEdge = if (isReverted) edge.duplicateEdge else edge - edgeJson = PostProcess.edgeToJson(convertedEdge, score, queryRequest.query, queryRequest.queryParam) - } yield Json.toJson(edgeJson) - - Json.toJson(edgeJsons) - }) - } catch { - case e: Exception => HandlerResult(Future.failed(e)) - } - } - - - /** - * Private APIS - */ - private def experiment(contentsBody: JsValue, accessToken: String, experimentName: String, uuid: String, impKeyOpt: => Option[String]): HandlerResult = { - - try { - val bucketOpt = for { - service <- Service.findByAccessToken(accessToken) - experiment <- Experiment.findBy(service.id.get, experimentName) - bucket <- experiment.findBucket(uuid, impKeyOpt) - } yield bucket - - val bucket = bucketOpt.getOrElse(throw new RuntimeException("bucket is not found")) - if (bucket.isGraphQuery) { - val ret = buildRequestInner(contentsBody, bucket, uuid) - HandlerResult(ret.body, Experiment.impressionKey -> bucket.impressionId) - } - else throw new RuntimeException("not supported yet") - } catch { - case e: Exception => HandlerResult(Future.failed(e)) - } - } - - private def buildRequestInner(contentsBody: JsValue, bucket: Bucket, uuid: String): HandlerResult = { - if (bucket.isEmpty) HandlerResult(Future.successful(PostProcess.emptyResults)) - else { - val body = buildRequestBody(Option(contentsBody), bucket, uuid) - val url = new URL(bucket.apiPath) - val path = url.getPath - - // dummy log for sampling - val experimentLog = s"POST $path took -1 ms 200 -1 $body" - logger.debug(experimentLog) - - doPost(path, body) - } - } - - private def eachQuery(post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue)(q: Query): Future[JsValue] = { - val filterOutQueryResultsLs = q.filterOutQuery match { - case Some(filterOutQuery) => graph.getEdges(filterOutQuery) - case None => Future.successful(Seq.empty) - } - - for { - queryResultsLs <- graph.getEdges(q) - filterOutResultsLs <- filterOutQueryResultsLs - } yield { - val json = post(queryResultsLs, filterOutResultsLs) - json - } - } - - def getEdgesAsync(jsonQuery: JsValue) - (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = { - - val fetch = eachQuery(post) _ - jsonQuery match { - case JsArray(arr) => Future.traverse(arr.map(requestParser.toQuery(_)))(fetch).map(JsArray) - case obj@JsObject(_) => - (obj \ "queries").asOpt[JsValue] match { - case None => fetch(requestParser.toQuery(obj)) - case _ => - val multiQuery = requestParser.toMultiQuery(obj) - val filterOutFuture = multiQuery.queryOption.filterOutQuery match { - case Some(filterOutQuery) => graph.getEdges(filterOutQuery) - case None => Future.successful(Seq.empty) - } - val futures = multiQuery.queries.zip(multiQuery.weights).map { case (query, weight) => - val filterOutQueryResultsLs = query.queryOption.filterOutQuery match { - case Some(filterOutQuery) => graph.getEdges(filterOutQuery) - case None => Future.successful(Seq.empty) - } - for { - queryRequestWithResultLs <- graph.getEdges(query) - filterOutResultsLs <- filterOutQueryResultsLs - } yield { - val newQueryRequestWithResult = for { - queryRequestWithResult <- queryRequestWithResultLs - queryResult = queryRequestWithResult.queryResult - } yield { - val newEdgesWithScores = for { - edgeWithScore <- queryRequestWithResult.queryResult.edgeWithScoreLs - } yield { - edgeWithScore.copy(score = edgeWithScore.score * weight) - } - queryRequestWithResult.copy(queryResult = queryResult.copy(edgeWithScoreLs = newEdgesWithScores)) - } - logger.debug(s"[Size]: ${newQueryRequestWithResult.map(_.queryResult.edgeWithScoreLs.size).sum}") - (newQueryRequestWithResult, filterOutResultsLs) - } - } - for { - filterOut <- filterOutFuture - resultWithExcludeLs <- Future.sequence(futures) - } yield { - PostProcess.toSimpleVertexArrJsonMulti(multiQuery.queryOption, resultWithExcludeLs, filterOut) - // val initial = (ListBuffer.empty[QueryRequestWithResult], ListBuffer.empty[QueryRequestWithResult]) - // val (results, excludes) = resultWithExcludeLs.foldLeft(initial) { case ((prevResults, prevExcludes), (results, excludes)) => - // (prevResults ++= results, prevExcludes ++= excludes) - // } - // PostProcess.toSimpleVertexArrJson(multiQuery.queryOption, results, excludes ++ filterOut) - } - } - case _ => throw BadQueryException("Cannot support") - } - } - - private def getEdgesExcludedAsync(jsonQuery: JsValue) - (post: (Seq[QueryRequestWithResult], Seq[QueryRequestWithResult]) => JsValue): Future[JsValue] = { - val q = requestParser.toQuery(jsonQuery) - val filterOutQuery = Query(q.vertices, Vector(q.steps.last)) - - val fetchFuture = graph.getEdges(q) - val excludeFuture = graph.getEdges(filterOutQuery) - - for { - queryResultLs <- fetchFuture - exclude <- excludeFuture - } yield { - post(queryResultLs, exclude) - } - } - - private def getVertices(jsValue: JsValue) = { - val jsonQuery = jsValue - val ts = System.currentTimeMillis() - val props = "{}" - - val vertices = jsonQuery.as[List[JsValue]].flatMap { js => - val serviceName = (js \ "serviceName").as[String] - val columnName = (js \ "columnName").as[String] - for (id <- (js \ "ids").asOpt[List[JsValue]].getOrElse(List.empty[JsValue])) yield { - Management.toVertex(ts, "insert", id.toString, serviceName, columnName, props) - } - } - - graph.getVertices(vertices) map { vertices => PostProcess.verticesToJson(vertices) } - } - - private def buildRequestBody(requestKeyJsonOpt: Option[JsValue], bucket: Bucket, uuid: String): String = { - var body = bucket.requestBody.replace("#uuid", uuid) - - // // replace variable - // body = TemplateHelper.replaceVariable(System.currentTimeMillis(), body) - - // replace param - for { - requestKeyJson <- requestKeyJsonOpt - jsObj <- requestKeyJson.asOpt[JsObject] - (key, value) <- jsObj.fieldSet - } { - val replacement = value match { - case JsString(s) => s - case _ => value.toString - } - body = body.replace(key, replacement) - } - - body - } - - def calcSize(js: JsValue): Int = js match { - case JsObject(obj) => (js \ "size").asOpt[Int].getOrElse(0) - case JsArray(seq) => seq.map(js => (js \ "size").asOpt[Int].getOrElse(0)).sum - case _ => 0 - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala deleted file mode 100644 index cff968f..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Deserializable.scala +++ /dev/null @@ -1,25 +0,0 @@ -package com.kakao.s2graph.core.storage - -import com.kakao.s2graph.core.storage.{SKeyValue, StorageDeserializable} -import com.kakao.s2graph.core.types.{LabelWithDirection, SourceVertexId, VertexId} -import org.apache.hadoop.hbase.util.Bytes - - -trait Deserializable[E] extends StorageDeserializable[E] { - import StorageDeserializable._ - - type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int) - - /** version 1 and version 2 share same code for parsing row key part */ - def parseRow(kv: SKeyValue, version: String): RowKeyRaw = { - var pos = 0 - val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version) - pos += srcIdLen - val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) - pos += 4 - val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) - - val rowLen = srcIdLen + 4 + 1 - (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/storage/SKeyValue.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/SKeyValue.scala deleted file mode 100644 index a0aa261..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/SKeyValue.scala +++ /dev/null @@ -1,47 +0,0 @@ -package com.kakao.s2graph.core.storage - -import org.apache.hadoop.hbase.util.Bytes -import org.hbase.async.KeyValue - - -object SKeyValue { - val Put = 1 - val Delete = 2 - val Increment = 3 - val Default = Put -} -case class SKeyValue(table: Array[Byte], - row: Array[Byte], - cf: Array[Byte], - qualifier: Array[Byte], - value: Array[Byte], - timestamp: Long, - operation: Int = SKeyValue.Default) { - def toLogString = { - Map("table" -> table.toList, "row" -> row.toList, "cf" -> Bytes.toString(cf), - "qualifier" -> qualifier.toList, "value" -> value.toList, "timestamp" -> timestamp, - "operation" -> operation).toString - } - override def toString(): String = toLogString -} - -trait CanSKeyValue[T] { - def toSKeyValue(from: T): SKeyValue -} - -object CanSKeyValue { - - // For asyncbase KeyValues - implicit val asyncKeyValue = new CanSKeyValue[KeyValue] { - def toSKeyValue(kv: KeyValue): SKeyValue = { - SKeyValue(Array.empty[Byte], kv.key(), kv.family(), kv.qualifier(), kv.value(), kv.timestamp()) - } - } - - // For asyncbase KeyValues - implicit val sKeyValue = new CanSKeyValue[SKeyValue] { - def toSKeyValue(kv: SKeyValue): SKeyValue = kv - } - - // For hbase KeyValues -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/com/kakao/s2graph/core/storage/Serializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Serializable.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Serializable.scala deleted file mode 100644 index bee064b..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Serializable.scala +++ /dev/null @@ -1,10 +0,0 @@ -package com.kakao.s2graph.core.storage - -import com.kakao.s2graph.core.storage.StorageSerializable - -object Serializable { - val vertexCf = "v".getBytes() - val edgeCf = "e".getBytes() -} - -trait Serializable[E] extends StorageSerializable[E]
