http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/loader/src/test/scala/subscriber/TransferToHFileTest.scala ---------------------------------------------------------------------- diff --git a/loader/src/test/scala/subscriber/TransferToHFileTest.scala b/loader/src/test/scala/subscriber/TransferToHFileTest.scala deleted file mode 100644 index 7b3f72f..0000000 --- a/loader/src/test/scala/subscriber/TransferToHFileTest.scala +++ /dev/null @@ -1,169 +0,0 @@ -package subscriber - -import com.kakao.s2graph.core.Management -import com.kakao.s2graph.core.types.HBaseType -import org.apache.spark.{SparkConf, SparkContext} -import org.scalatest._ -import subscriber.TransferToHFile._ - -/** - * Created by Eric on 2015. 12. 2.. - */ -class TransferToHFileTest extends FlatSpec with BeforeAndAfterAll with Matchers { - - private val master = "local[2]" - private val appName = "example-spark" - - private var sc: SparkContext = _ - - val dataWithoutDir = - """ - |1447686000000 insertBulk e a b friends_rel {} - |1447686000000 insertBulk e a c friends_rel {} - |1447686000000 insertBulk e a d friends_rel {} - |1447686000000 insertBulk e b d friends_rel {} - |1447686000000 insertBulk e b e friends_rel {} - """.stripMargin.trim - - val dataWithDir = - """ - |1447686000000 insertBulk e a b friends_rel {} out - |1447686000000 insertBulk e b a friends_rel {} in - |1447686000000 insertBulk e a c friends_rel {} out - |1447686000000 insertBulk e c a friends_rel {} in - |1447686000000 insertBulk e a d friends_rel {} out - |1447686000000 insertBulk e d a friends_rel {} in - |1447686000000 insertBulk e b d friends_rel {} out - |1447686000000 insertBulk e d b friends_rel {} in - |1447686000000 insertBulk e b e friends_rel {} out - |1447686000000 insertBulk e e b friends_rel {} in - """.stripMargin.trim - - override def beforeAll(): Unit = { - println("### beforeAll") - - GraphSubscriberHelper.apply("dev", "none", "none", "none") - // 1. create service - if(Management.findService("loader-test").isEmpty) { - println(">>> create service...") - Management.createService("loader-test", "localhost", "loader-test-dev", 1, None, "gz") - } - - // 2. create label - if(Management.findLabel("friends_rel").isEmpty) { - println(">>> create label...") - Management.createLabel( - "friends_rel", - "loader-test", "user_id", "string", - "loader-test", "user_id", "string", - true, - "loader-test", - Seq(), - Seq(), - "weak", - None, None, - HBaseType.DEFAULT_VERSION, - false, - Management.defaultCompressionAlgorithm - ) - } - - // create spark context - val conf = new SparkConf() - .setMaster(master) - .setAppName(appName) - - sc = new SparkContext(conf) - } - - override def afterAll(): Unit = { - println("### afterALL") - if (sc != null) { - sc.stop() - } - - Management.deleteLabel("friends_rel") - } - - "buildDegreePutRequest" should "transform degree to PutRequest" in { - val putReqs = buildDegreePutRequests("a", "friends_rel", "out", 3L) - putReqs.size should equal(1) - } - - "toKeyValues" should "transform edges to KeyValues on edge format data without direction" in { - val rdd = sc.parallelize(dataWithoutDir.split("\n")) - - val kvs = rdd.mapPartitions { iter => - GraphSubscriberHelper.apply("dev", "none", "none", "none") - TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false) - } - kvs.foreach(println) - // edges * 2 (snapshot edges + indexed edges) - kvs.count() should equal(10) - - - val kvsAutoCreated = rdd.mapPartitions { iter => - GraphSubscriberHelper.apply("dev", "none", "none", "none") - TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], true) - } - - // edges * 3 (snapshot edges + indexed edges + reverse edges) - kvsAutoCreated.count() should equal(15) - } - - "toKeyValues" should "transform edges to KeyValues on edge format data with direction" in { - val rdd = sc.parallelize(dataWithDir.split("\n")) - - val kvs = rdd.mapPartitions { iter => - GraphSubscriberHelper.apply("dev", "none", "none", "none") - TransferToHFile.toKeyValues(iter.toSeq, Map.empty[String, String], false) - } - - // edges * 2 (snapshot edges + indexed edges) - kvs.count() should equal(20) - } - - "buildDegrees" should "build degrees on edge format data without direction" in { - val rdd = sc.parallelize(dataWithoutDir.split("\n")) - - // autoCreate = false - val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], false).reduceByKey { (agg, current) => - agg + current - }.collectAsMap() - degrees.size should equal(2) - - degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L) - degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L) - - - // autoCreate = true - val degreesAutoCreated = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], true).reduceByKey { (agg, current) => - agg + current - }.collectAsMap() - degreesAutoCreated.size should equal(6) - - degreesAutoCreated should contain(DegreeKey("a", "friends_rel", "out") -> 3L) - degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "out") -> 2L) - degreesAutoCreated should contain(DegreeKey("b", "friends_rel", "in") -> 1L) - degreesAutoCreated should contain(DegreeKey("c", "friends_rel", "in") -> 1L) - degreesAutoCreated should contain(DegreeKey("d", "friends_rel", "in") -> 2L) - degreesAutoCreated should contain(DegreeKey("e", "friends_rel", "in") -> 1L) - } - - "buildDegrees" should "build degrees on edge format data with direction" in { - val rdd = sc.parallelize(dataWithDir.split("\n")) - - val degrees = TransferToHFile.buildDegrees(rdd, Map.empty[String, String], false).reduceByKey { (agg, current) => - agg + current - }.collectAsMap() - - degrees.size should equal(6) - - degrees should contain(DegreeKey("a", "friends_rel", "out") -> 3L) - degrees should contain(DegreeKey("b", "friends_rel", "out") -> 2L) - degrees should contain(DegreeKey("b", "friends_rel", "in") -> 1L) - degrees should contain(DegreeKey("c", "friends_rel", "in") -> 1L) - degrees should contain(DegreeKey("d", "friends_rel", "in") -> 2L) - degrees should contain(DegreeKey("e", "friends_rel", "in") -> 1L) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala deleted file mode 100644 index 8e6ad7d..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/Edge.scala +++ /dev/null @@ -1,568 +0,0 @@ -package com.kakao.s2graph.core - - -import com.kakao.s2graph.core.mysqls._ -import com.kakao.s2graph.core.types._ -import com.kakao.s2graph.core.utils.logger -import play.api.libs.json.{JsNumber, Json} - -import scala.collection.JavaConversions._ -import scala.util.hashing.MurmurHash3 - - -case class SnapshotEdge(srcVertex: Vertex, - tgtVertex: Vertex, - labelWithDir: LabelWithDirection, - op: Byte, - version: Long, - props: Map[Byte, InnerValLikeWithTs], - pendingEdgeOpt: Option[Edge], - statusCode: Byte = 0, - lockTs: Option[Long]) extends JSONParser { - - if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") - - val label = Label.findById(labelWithDir.labelId) - val schemaVer = label.schemaVersion - lazy val propsWithoutTs = props.mapValues(_.innerVal) - val ts = props(LabelMeta.timeStampSeq).innerVal.toString().toLong - - def toEdge: Edge = { - val ts = props.get(LabelMeta.timeStampSeq).map(v => v.ts).getOrElse(version) - Edge(srcVertex, tgtVertex, labelWithDir, op, - version, props, pendingEdgeOpt = pendingEdgeOpt, - statusCode = statusCode, lockTs = lockTs) - } - - def propsWithName = (for { - (seq, v) <- props - meta <- label.metaPropsMap.get(seq) - jsValue <- innerValToJsValue(v.innerVal, meta.dataType) - } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version)) - - // only for debug - def toLogString() = { - List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t") - } -} - -case class IndexEdge(srcVertex: Vertex, - tgtVertex: Vertex, - labelWithDir: LabelWithDirection, - op: Byte, - version: Long, - labelIndexSeq: Byte, - props: Map[Byte, InnerValLike]) extends JSONParser { - if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") - // assert(props.containsKey(LabelMeta.timeStampSeq)) - - val ts = props(LabelMeta.timeStampSeq).toString.toLong - val degreeEdge = props.contains(LabelMeta.degreeSeq) - lazy val label = Label.findById(labelWithDir.labelId) - val schemaVer = label.schemaVersion - lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get - lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta => - val innerVal = toInnerVal(meta.defaultValue, meta.dataType, schemaVer) - meta.seq -> innerVal - }.toMap - - lazy val labelIndexMetaSeqs = labelIndex.metaSeqs - - /** TODO: make sure call of this class fill props as this assumes */ - lazy val orders = for (k <- labelIndexMetaSeqs) yield { - props.get(k) match { - case None => - - /** - * TODO: agly hack - * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once - */ - val v = k match { - case LabelMeta.timeStampSeq => InnerVal.withLong(version, schemaVer) - case LabelMeta.toSeq => tgtVertex.innerId - case LabelMeta.fromSeq => //srcVertex.innerId - // for now, it does not make sense to build index on srcVertex.innerId since all edges have same data. - throw new RuntimeException("_from on indexProps is not supported") - case _ => defaultIndexMetas(k) - } - - k -> v - case Some(v) => k -> v - } - } - - lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet - lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v - - lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } - - //TODO: - // lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList - - lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length - - def propsWithName = for { - (seq, v) <- props - meta <- label.metaPropsMap.get(seq) if seq >= 0 - jsValue <- innerValToJsValue(v, meta.dataType) - } yield meta.name -> jsValue - - - def toEdge: Edge = Edge(srcVertex, tgtVertex, labelWithDir, op, version, propsWithTs) - - // only for debug - def toLogString() = { - List(version, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)).mkString("\t") - } -} - -case class Edge(srcVertex: Vertex, - tgtVertex: Vertex, - labelWithDir: LabelWithDirection, - op: Byte = GraphUtil.defaultOpByte, - version: Long = System.currentTimeMillis(), - propsWithTs: Map[Byte, InnerValLikeWithTs], - parentEdges: Seq[EdgeWithScore] = Nil, - originalEdgeOpt: Option[Edge] = None, - pendingEdgeOpt: Option[Edge] = None, - statusCode: Byte = 0, - lockTs: Option[Long] = None) extends GraphElement with JSONParser { - - if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") - // assert(propsWithTs.containsKey(LabelMeta.timeStampSeq)) - val schemaVer = label.schemaVersion - val ts = propsWithTs(LabelMeta.timeStampSeq).innerVal.toString.toLong - - def props = propsWithTs.mapValues(_.innerVal) - - def relatedEdges = { - if (labelWithDir.isDirected) List(this, duplicateEdge) - else { - val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) - val base = copy(labelWithDir = outDir) - List(base, base.reverseSrcTgtEdge) - } - } - - // def relatedEdges = List(this) - - def srcForVertex = { - val belongLabelIds = Seq(labelWithDir.labelId) - if (labelWithDir.dir == GraphUtil.directions("in")) { - Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) - } else { - Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) - } - } - - def tgtForVertex = { - val belongLabelIds = Seq(labelWithDir.labelId) - if (labelWithDir.dir == GraphUtil.directions("in")) { - Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) - } else { - Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) - } - } - - def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge - - def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled) - - def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex) - - def label = Label.findById(labelWithDir.labelId) - - def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId) - - override def serviceName = label.serviceName - - override def queueKey = Seq(ts.toString, tgtVertex.serviceName).mkString("|") - - override def queuePartitionKey = Seq(srcVertex.innerId, tgtVertex.innerId).mkString("|") - - override def isAsync = label.isAsync - - def isDegree = propsWithTs.contains(LabelMeta.degreeSeq) - - def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { - case Some(_) => props - case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) - } - - def propsPlusTsValid = propsPlusTs.filter(kv => kv._1 >= 0) - - def edgesWithIndex = for (labelOrder <- labelOrders) yield { - IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTs) - } - - def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { - IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTsValid) - } - - /** force direction as out on invertedEdge */ - def toSnapshotEdge: SnapshotEdge = { - val (smaller, larger) = (srcForVertex, tgtForVertex) - - val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out")) - - val ret = SnapshotEdge(smaller, larger, newLabelWithDir, op, version, - Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, schemaVer), ts)) ++ propsWithTs, - pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs) - ret - } - - override def hashCode(): Int = { - MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId) - } - - override def equals(other: Any): Boolean = other match { - case e: Edge => - srcVertex.innerId == e.srcVertex.innerId && - tgtVertex.innerId == e.tgtVertex.innerId && - labelWithDir == e.labelWithDir - case _ => false - } - - def propsWithName = for { - (seq, v) <- props - meta <- label.metaPropsMap.get(seq) if seq > 0 - jsValue <- innerValToJsValue(v, meta.dataType) - } yield meta.name -> jsValue - - def updateTgtVertex(id: InnerValLike) = { - val newId = TargetVertexId(tgtVertex.id.colId, id) - val newTgtVertex = Vertex(newId, tgtVertex.ts, tgtVertex.props) - Edge(srcVertex, newTgtVertex, labelWithDir, op, version, propsWithTs) - } - - def rank(r: RankParam): Double = - if (r.keySeqAndWeights.size <= 0) 1.0f - else { - var sum: Double = 0 - - for ((seq, w) <- r.keySeqAndWeights) { - seq match { - case LabelMeta.countSeq => sum += 1 - case _ => { - propsWithTs.get(seq) match { - case None => // do nothing - case Some(innerValWithTs) => { - val cost = try innerValWithTs.innerVal.toString.toDouble catch { - case e: Exception => - logger.error("toInnerval failed in rank", e) - 1.0 - } - sum += w * cost - } - } - } - } - } - sum - } - - def toLogString: String = { - val ret = - if (propsWithName.nonEmpty) - List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)) - else - List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label) - - ret.mkString("\t") - } -} - -case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge], - edgesToInsert: List[IndexEdge] = List.empty[IndexEdge], - newSnapshotEdge: Option[SnapshotEdge] = None) { - - def toLogString: String = { - val l = (0 until 50).map(_ => "-").mkString("") - val deletes = s"deletes: ${edgesToDelete.map(e => e.toLogString).mkString("\n")}" - val inserts = s"inserts: ${edgesToInsert.map(e => e.toLogString).mkString("\n")}" - val updates = s"snapshot: ${newSnapshotEdge.map(e => e.toLogString).mkString("\n")}" - - List("\n", l, deletes, inserts, updates, l, "\n").mkString("\n") - } -} - -object Edge extends JSONParser { - val incrementVersion = 1L - val minTsVal = 0L - - /** now version information is required also **/ - type State = Map[Byte, InnerValLikeWithTs] - type PropsPairWithTs = (State, State, Long, String) - type MergeState = PropsPairWithTs => (State, Boolean) - type UpdateFunc = (Option[Edge], Edge, MergeState) - - def allPropsDeleted(props: Map[Byte, InnerValLikeWithTs]): Boolean = - if (!props.containsKey(LabelMeta.lastDeletedAt)) false - else { - val lastDeletedAt = props.get(LabelMeta.lastDeletedAt).get.ts - val propsWithoutLastDeletedAt = props - LabelMeta.lastDeletedAt - - propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt } - } - - def buildDeleteBulk(invertedEdge: Option[Edge], requestEdge: Edge): (Edge, EdgeMutate) = { - // assert(invertedEdge.isEmpty) - // assert(requestEdge.op == GraphUtil.operations("delete")) - - val edgesToDelete = requestEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } - val edgeInverted = Option(requestEdge.toSnapshotEdge) - - (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, edgeInverted)) - } - - def buildOperation(invertedEdge: Option[Edge], requestEdges: Seq[Edge]): (Edge, EdgeMutate) = { - // logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}") - // logger.debug(s"requestEdge: ${requestEdge.toStringRaw}") - val oldPropsWithTs = - if (invertedEdge.isEmpty) Map.empty[Byte, InnerValLikeWithTs] else invertedEdge.get.propsWithTs - - val funcs = requestEdges.map { edge => - if (edge.op == GraphUtil.operations("insert")) { - edge.label.consistencyLevel match { - case "strong" => Edge.mergeUpsert _ - case _ => Edge.mergeInsertBulk _ - } - } else if (edge.op == GraphUtil.operations("insertBulk")) { - Edge.mergeInsertBulk _ - } else if (edge.op == GraphUtil.operations("delete")) { - edge.label.consistencyLevel match { - case "strong" => Edge.mergeDelete _ - case _ => throw new RuntimeException("not supported") - } - } - else if (edge.op == GraphUtil.operations("update")) Edge.mergeUpdate _ - else if (edge.op == GraphUtil.operations("increment")) Edge.mergeIncrement _ - else throw new RuntimeException(s"not supported operation on edge: $edge") - } - - val oldTs = invertedEdge.map(_.ts).getOrElse(minTsVal) - val requestWithFuncs = requestEdges.zip(funcs).filter(oldTs != _._1.ts).sortBy(_._1.ts) - - if (requestWithFuncs.isEmpty) { - (requestEdges.head, EdgeMutate()) - } else { - val requestEdge = requestWithFuncs.last._1 - var prevPropsWithTs = oldPropsWithTs - - for { - (requestEdge, func) <- requestWithFuncs - } { - val (_newPropsWithTs, _) = func(prevPropsWithTs, requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer) - prevPropsWithTs = _newPropsWithTs - // logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n") - } - val requestTs = requestEdge.ts - /** version should be monotoniously increasing so our RPC mutation should be applied safely */ - val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs) - val maxTs = prevPropsWithTs.map(_._2.ts).max - val newTs = if (maxTs > requestTs) maxTs else requestTs - val propsWithTs = prevPropsWithTs ++ - Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.label.schemaVersion), newTs)) - val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) - - // logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}") - // logger.error(s"$propsWithTs") - (requestEdge, edgeMutate) - } - } - - def buildMutation(snapshotEdgeOpt: Option[Edge], - requestEdge: Edge, - newVersion: Long, - oldPropsWithTs: Map[Byte, InnerValLikeWithTs], - newPropsWithTs: Map[Byte, InnerValLikeWithTs]): EdgeMutate = { - if (oldPropsWithTs == newPropsWithTs) { - // all requests should be dropped. so empty mutation. - // logger.error(s"Case 1") - EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = None) - } else { - val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAt) - val newOp = snapshotEdgeOpt match { - case None => requestEdge.op - case Some(old) => - val oldMaxTs = old.propsWithTs.map(_._2.ts).max - if (oldMaxTs > requestEdge.ts) old.op - else requestEdge.op - } - - val newSnapshotEdgeOpt = - Option(requestEdge.copy(op = newOp, propsWithTs = newPropsWithTs, version = newVersion).toSnapshotEdge) - // delete request must always update snapshot. - if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.containsKey(LabelMeta.lastDeletedAt)) { - // no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt. - // logger.error(s"Case 2") - EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt) - } else { - // logger.error(s"Case 3") - val edgesToDelete = snapshotEdgeOpt match { - case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") => - snapshotEdge.copy(op = GraphUtil.defaultOpByte). - relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } - case _ => Nil - } - - val edgesToInsert = - if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil - else - requestEdge.copy(version = newVersion, propsWithTs = newPropsWithTs, op = GraphUtil.defaultOpByte). - relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } - - EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = edgesToInsert, newSnapshotEdge = newSnapshotEdgeOpt) - } - } - } - - def mergeUpsert(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { - var shouldReplace = false - val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs - val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal) - val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { - propsWithTs.get(k) match { - case Some(newValWithTs) => - assert(oldValWithTs.ts >= lastDeletedAt) - val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs - else { - shouldReplace = true - newValWithTs - } - Some(k -> v) - - case None => - assert(oldValWithTs.ts >= lastDeletedAt) - if (oldValWithTs.ts >= requestTs || k < 0) Some(k -> oldValWithTs) - else { - shouldReplace = true - None - } - } - } - val existInNew = - for { - (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt - } yield { - shouldReplace = true - Some(k -> newValWithTs) - } - - ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace) - } - - def mergeUpdate(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { - var shouldReplace = false - val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs - val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal) - val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { - propsWithTs.get(k) match { - case Some(newValWithTs) => - assert(oldValWithTs.ts >= lastDeletedAt) - val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs - else { - shouldReplace = true - newValWithTs - } - Some(k -> v) - case None => - // important: update need to merge previous valid values. - assert(oldValWithTs.ts >= lastDeletedAt) - Some(k -> oldValWithTs) - } - } - val existInNew = for { - (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt - } yield { - shouldReplace = true - Some(k -> newValWithTs) - } - - ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace) - } - - def mergeIncrement(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { - var shouldReplace = false - val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs - val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal) - val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { - propsWithTs.get(k) match { - case Some(newValWithTs) => - if (k == LabelMeta.timeStampSeq) { - val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs - else { - shouldReplace = true - newValWithTs - } - Some(k -> v) - } else { - if (oldValWithTs.ts >= newValWithTs.ts) { - Some(k -> oldValWithTs) - } else { - assert(oldValWithTs.ts < newValWithTs.ts && oldValWithTs.ts >= lastDeletedAt) - shouldReplace = true - // incr(t0), incr(t2), d(t1) => deleted - Some(k -> InnerValLikeWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, oldValWithTs.ts)) - } - } - - case None => - assert(oldValWithTs.ts >= lastDeletedAt) - Some(k -> oldValWithTs) - // if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) else None - } - } - val existInNew = for { - (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt - } yield { - shouldReplace = true - Some(k -> newValWithTs) - } - - ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace) - } - - def mergeDelete(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { - var shouldReplace = false - val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs - val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt) match { - case Some(prevDeletedAt) => - if (prevDeletedAt.ts >= requestTs) prevDeletedAt.ts - else { - shouldReplace = true - requestTs - } - case None => { - shouldReplace = true - requestTs - } - } - val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { - if (k == LabelMeta.timeStampSeq) { - if (oldValWithTs.ts >= requestTs) Some(k -> oldValWithTs) - else { - shouldReplace = true - Some(k -> InnerValLikeWithTs.withLong(requestTs, requestTs, version)) - } - } else { - if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) - else { - shouldReplace = true - None - } - } - } - val mustExistInNew = Map(LabelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(lastDeletedAt, lastDeletedAt, version)) - ((existInOld.flatten ++ mustExistInNew).toMap, shouldReplace) - } - - def mergeInsertBulk(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { - val (_, propsWithTs, _, _) = propsPairWithTs - (propsWithTs, true) - } - - def fromString(s: String): Option[Edge] = Graph.toEdge(s) - - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/ExceptionHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/com/kakao/s2graph/core/ExceptionHandler.scala deleted file mode 100644 index a965e90..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/ExceptionHandler.scala +++ /dev/null @@ -1,162 +0,0 @@ -package com.kakao.s2graph.core - -import java.util.Properties -import java.util.concurrent.atomic.AtomicLong - -import akka.actor._ -import akka.routing.{Broadcast, RoundRobinPool} -import com.typesafe.config.Config -import org.apache.kafka.clients.producer._ - -import scala.concurrent.duration._ - -/** - * Created by shon on 7/16/15. - */ -object ExceptionHandler { - - var producer: Option[Producer[Key, Val]] = None - var properties: Option[Properties] = None - val numOfRoutees = 1 - val actorSystem = ActorSystem("ExceptionHandler") - var routees: Option[ActorRef] = None - var shutdownTime = 1000 millis - var phase = "dev" - lazy val failTopic = s"mutateFailed_${phase}" - - def apply(config: Config) = { - properties = - if (config.hasPath("kafka.metadata.broker.list")) Option(kafkaConfig(config)) - else None - phase = if (config.hasPath("phase")) config.getString("phase") else "dev" - producer = for { - props <- properties - p <- try { - Option(new KafkaProducer[Key, Val](props)) - } catch { - case e: Throwable => None - } - } yield { - p - } - init() - } - - def props(producer: Producer[Key, Val]) = Props(classOf[KafkaAggregatorActor], producer) - - def init() = { - for { - p <- producer - } { - routees = Option(actorSystem.actorOf(RoundRobinPool(numOfRoutees).props(props(p)))) - } - } - - def shutdown() = { - routees.map(_ ! Broadcast(PoisonPill)) - Thread.sleep(shutdownTime.length) - } - - def enqueues(msgs: Seq[KafkaMessage]) = { - msgs.foreach(enqueue) - } - - def enqueue(msg: KafkaMessage) = { - routees.map(_ ! msg) - } - - - def kafkaConfig(config: Config) = { - val props = new Properties(); - - /** all default configuration for new producer */ - val brokers = - if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list") - else "localhost" - props.put("bootstrap.servers", brokers) - props.put("acks", "1") - props.put("buffer.memory", "33554432") - props.put("compression.type", "snappy") - props.put("retries", "0") - props.put("batch.size", "16384") - props.put("linger.ms", "0") - props.put("max.request.size", "1048576") - props.put("receive.buffer.bytes", "32768") - props.put("send.buffer.bytes", "131072") - props.put("timeout.ms", "30000") - props.put("block.on.buffer.full", "false") - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") - props - } - - type Key = String - type Val = String - - def toKafkaMessage(topic: String = failTopic, element: GraphElement, originalString: Option[String] = None) = { - KafkaMessage(new ProducerRecord[Key, Val](topic, element.queuePartitionKey, - originalString.getOrElse(element.toLogString()))) - } - - case class KafkaMessage(msg: ProducerRecord[Key, Val]) - - case class Message(topic: String, msg: String) - - case class BufferedKafkaMessage(msgs: Seq[ProducerRecord[Key, Val]], bufferSize: Int) - - case class BufferedMessage(topic: String, bufferedMsgs: String, bufferSize: Int) - - case object FlushBuffer - - case class UpdateHealth(isHealty: Boolean) - - case object ShowMetrics - -} - -class KafkaAggregatorActor(kafkaProducer: Producer[String, String]) extends Stash with ActorLogging { - - import ExceptionHandler._ - - val failedCount = new AtomicLong(0L) - val successCount = new AtomicLong(0L) - val stashCount = new AtomicLong(0L) - - implicit val ex = context.system.dispatcher - - context.system.scheduler.schedule(0 millis, 10 seconds) { - self ! ShowMetrics - } - - override def receive = { - case ShowMetrics => - log.info(s"[Stats]: failed[${failedCount.get}], stashed[${stashCount.get}], success[${successCount.get}]") - - case m: KafkaMessage => - val replayTo = self - try { - kafkaProducer.send(m.msg, new Callback() { - override def onCompletion(meta: RecordMetadata, e: Exception) = { - if (e == null) { - // success - successCount.incrementAndGet() - unstashAll() - stashCount.set(0L) - } else { - // failure - log.error(s"onCompletion: $e", e) - failedCount.incrementAndGet() - replayTo ! m - } - } - }) - } catch { - case e@(_: org.apache.kafka.clients.producer.BufferExhaustedException | _: Throwable) => - log.error(s"$e", e) - log.info(s"stash") - stash() - stashCount.incrementAndGet() - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala deleted file mode 100644 index fdc8553..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/Graph.scala +++ /dev/null @@ -1,381 +0,0 @@ -package com.kakao.s2graph.core - -import java.util -import java.util.concurrent.ConcurrentHashMap - -import com.google.common.cache.CacheBuilder -import com.kakao.s2graph.core.mysqls._ -import com.kakao.s2graph.core.parsers.WhereParser -import com.kakao.s2graph.core.storage.Storage -import com.kakao.s2graph.core.storage.hbase._ -import com.kakao.s2graph.core.types._ -import com.kakao.s2graph.core.utils.logger -import com.typesafe.config.{Config, ConfigFactory} - -import scala.collection.JavaConversions._ -import scala.collection._ -import scala.collection.mutable.ListBuffer -import scala.concurrent._ -import scala.util.Try - -object Graph { - val DefaultScore = 1.0 - - private val DefaultConfigs: Map[String, AnyRef] = Map( - "hbase.zookeeper.quorum" -> "localhost", - "hbase.table.name" -> "s2graph", - "hbase.table.compression.algorithm" -> "gz", - "phase" -> "dev", - "db.default.driver" -> "com.mysql.jdbc.Driver", - "db.default.url" -> "jdbc:mysql://localhost:3306/graph_dev", - "db.default.password" -> "graph", - "db.default.user" -> "graph", - "cache.max.size" -> java.lang.Integer.valueOf(10000), - "cache.ttl.seconds" -> java.lang.Integer.valueOf(60), - "hbase.client.retries.number" -> java.lang.Integer.valueOf(20), - "hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort), - "hbase.rpc.timeout" -> java.lang.Integer.valueOf(1000), - "max.retry.number" -> java.lang.Integer.valueOf(100), - "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10), - "max.back.off" -> java.lang.Integer.valueOf(100), - "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1), - "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000), - "future.cache.max.size" -> java.lang.Integer.valueOf(100000), - "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), - "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), - "s2graph.storage.backend" -> "hbase" - ) - - var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs) - - /** helpers for filterEdges */ - type HashKey = (Int, Int, Int, Int, Boolean) - type FilterHashKey = (Int, Int) - type Result = (ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]], - ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)], - ListBuffer[(HashKey, FilterHashKey, Edge, Double)]) - - def toHashKey(queryParam: QueryParam, edge: Edge, isDegree: Boolean): (HashKey, FilterHashKey) = { - val src = edge.srcVertex.innerId.hashCode() - val tgt = edge.tgtVertex.innerId.hashCode() - val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree) - val filterHashKey = (src, tgt) - - (hashKey, filterHashKey) - } - - def alreadyVisitedVertices(queryResultLs: Seq[QueryResult]): Map[(LabelWithDirection, Vertex), Boolean] = { - val vertices = for { - queryResult <- queryResultLs - edgeWithScore <- queryResult.edgeWithScoreLs - edge = edgeWithScore.edge - vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex - } yield (edge.labelWithDir, vertex) -> true - - vertices.toMap - } - - /** common methods for filter out, transform, aggregate queryResult */ - def convertEdges(queryParam: QueryParam, edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = { - for { - convertedEdge <- queryParam.transformer.transform(edge, nextStepOpt) if !edge.isDegree - } yield convertedEdge - } - - def processTimeDecay(queryParam: QueryParam, edge: Edge) = { - /** process time decay */ - val tsVal = queryParam.timeDecay match { - case None => 1.0 - case Some(timeDecay) => - val tsVal = try { - val labelMeta = edge.label.metaPropsMap(timeDecay.labelMetaSeq) - val innerValWithTsOpt = edge.propsWithTs.get(timeDecay.labelMetaSeq) - innerValWithTsOpt.map { innerValWithTs => - val innerVal = innerValWithTs.innerVal - labelMeta.dataType match { - case InnerVal.LONG => innerVal.value match { - case n: BigDecimal => n.bigDecimal.longValue() - case _ => innerVal.toString().toLong - } - case _ => innerVal.toString().toLong - } - } getOrElse(edge.ts) - } catch { - case e: Exception => - logger.error(s"processTimeDecay error. ${edge.toLogString}", e) - edge.ts - } - val timeDiff = queryParam.timestamp - tsVal - timeDecay.decay(timeDiff) - } - - tsVal - } - - def aggregateScore(newScore: Double, - resultEdges: ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)], - duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]], - edgeWithScoreSorted: ListBuffer[(HashKey, FilterHashKey, Edge, Double)], - hashKey: HashKey, - filterHashKey: FilterHashKey, - queryParam: QueryParam, - convertedEdge: Edge) = { - - /** skip duplicate policy check if consistencyLevel is strong */ - if (queryParam.label.consistencyLevel != "strong" && resultEdges.containsKey(hashKey)) { - val (oldFilterHashKey, oldEdge, oldScore) = resultEdges.get(hashKey) - //TODO: - queryParam.duplicatePolicy match { - case Query.DuplicatePolicy.First => // do nothing - case Query.DuplicatePolicy.Raw => - if (duplicateEdges.containsKey(hashKey)) { - duplicateEdges.get(hashKey).append(convertedEdge -> newScore) - } else { - val newBuffer = new ListBuffer[(Edge, Double)] - newBuffer.append(convertedEdge -> newScore) - duplicateEdges.put(hashKey, newBuffer) - } - case Query.DuplicatePolicy.CountSum => - resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + 1)) - case _ => - resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + newScore)) - } - } else { - resultEdges.put(hashKey, (filterHashKey, convertedEdge, newScore)) - edgeWithScoreSorted.append((hashKey, filterHashKey, convertedEdge, newScore)) - } - } - - def aggregateResults(queryRequestWithResult: QueryRequestWithResult, - queryParamResult: Result, - edgesToInclude: util.HashSet[FilterHashKey], - edgesToExclude: util.HashSet[FilterHashKey]): QueryRequestWithResult = { - val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - val (query, stepIdx, _, queryParam) = QueryRequest.unapply(queryRequest).get - - val (duplicateEdges, resultEdges, edgeWithScoreSorted) = queryParamResult - val edgesWithScores = for { - (hashKey, filterHashKey, edge, _) <- edgeWithScoreSorted if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) - score = resultEdges.get(hashKey)._3 - (duplicateEdge, aggregatedScore) <- fetchDuplicatedEdges(edge, score, hashKey, duplicateEdges) if aggregatedScore >= queryParam.threshold - } yield EdgeWithScore(duplicateEdge, aggregatedScore) - - QueryRequestWithResult(queryRequest, QueryResult(edgesWithScores)) - } - - def fetchDuplicatedEdges(edge: Edge, - score: Double, - hashKey: HashKey, - duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]) = { - (edge -> score) +: (if (duplicateEdges.containsKey(hashKey)) duplicateEdges.get(hashKey) else Seq.empty) - } - - def queryResultWithFilter(queryRequestWithResult: QueryRequestWithResult) = { - val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - val (_, _, _, queryParam) = QueryRequest.unapply(queryRequest).get - val whereFilter = queryParam.where.get - if (whereFilter == WhereParser.success) queryResult.edgeWithScoreLs - else queryResult.edgeWithScoreLs.withFilter(edgeWithScore => whereFilter.filter(edgeWithScore.edge)) - } - - def filterEdges(queryResultLsFuture: Future[Seq[QueryRequestWithResult]], - alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty[(LabelWithDirection, Vertex), Boolean]) - (implicit ec: scala.concurrent.ExecutionContext): Future[Seq[QueryRequestWithResult]] = { - - queryResultLsFuture.map { queryRequestWithResultLs => - if (queryRequestWithResultLs.isEmpty) Nil - else { - val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs.head).get - val (q, stepIdx, srcVertex, queryParam) = QueryRequest.unapply(queryRequest).get - val step = q.steps(stepIdx) - - val nextStepOpt = if (stepIdx < q.steps.size - 1) Option(q.steps(stepIdx + 1)) else None - - val excludeLabelWithDirSet = new util.HashSet[(Int, Int)] - val includeLabelWithDirSet = new util.HashSet[(Int, Int)] - step.queryParams.filter(_.exclude).foreach(l => excludeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir)) - step.queryParams.filter(_.include).foreach(l => includeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir)) - - val edgesToExclude = new util.HashSet[FilterHashKey]() - val edgesToInclude = new util.HashSet[FilterHashKey]() - - val queryParamResultLs = new ListBuffer[Result] - queryRequestWithResultLs.foreach { queryRequestWithResult => - val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get - val queryParam = queryRequest.queryParam - val duplicateEdges = new util.concurrent.ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]() - val resultEdges = new util.concurrent.ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)]() - val edgeWithScoreSorted = new ListBuffer[(HashKey, FilterHashKey, Edge, Double)] - val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0) - - // store degree value with Array.empty so if degree edge exist, it comes at very first. - def checkDegree() = queryResult.edgeWithScoreLs.headOption.exists { edgeWithScore => - edgeWithScore.edge.isDegree - } - var isDegree = checkDegree() - - val includeExcludeKey = queryParam.labelWithDir.labelId -> queryParam.labelWithDir.dir - val shouldBeExcluded = excludeLabelWithDirSet.contains(includeExcludeKey) - val shouldBeIncluded = includeLabelWithDirSet.contains(includeExcludeKey) - - queryResultWithFilter(queryRequestWithResult).foreach { edgeWithScore => - val (edge, score) = EdgeWithScore.unapply(edgeWithScore).get - if (queryParam.transformer.isDefault) { - val convertedEdge = edge - - val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree) - - /** check if this edge should be exlcuded. */ - if (shouldBeExcluded && !isDegree) { - edgesToExclude.add(filterHashKey) - } else { - if (shouldBeIncluded && !isDegree) { - edgesToInclude.add(filterHashKey) - } - val tsVal = processTimeDecay(queryParam, convertedEdge) - val newScore = labelWeight * score * tsVal - aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge) - } - } else { - convertEdges(queryParam, edge, nextStepOpt).foreach { convertedEdge => - val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree) - - /** check if this edge should be exlcuded. */ - if (shouldBeExcluded && !isDegree) { - edgesToExclude.add(filterHashKey) - } else { - if (shouldBeIncluded && !isDegree) { - edgesToInclude.add(filterHashKey) - } - val tsVal = processTimeDecay(queryParam, convertedEdge) - val newScore = labelWeight * score * tsVal - aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge) - } - } - } - isDegree = false - } - val ret = (duplicateEdges, resultEdges, edgeWithScoreSorted) - queryParamResultLs.append(ret) - } - - val aggregatedResults = for { - (queryRequestWithResult, queryParamResult) <- queryRequestWithResultLs.zip(queryParamResultLs) - } yield { - aggregateResults(queryRequestWithResult, queryParamResult, edgesToInclude, edgesToExclude) - } - - aggregatedResults - } - } - } - - def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try { - val parts = GraphUtil.split(s) - val logType = parts(2) - val element = if (logType == "edge" | logType == "e") { - /** current only edge is considered to be bulk loaded */ - labelMapping.get(parts(5)) match { - case None => - case Some(toReplace) => - parts(5) = toReplace - } - toEdge(parts) - } else if (logType == "vertex" | logType == "v") { - toVertex(parts) - } else { - throw new GraphExceptions.JsonParseException("log type is not exist in log.") - } - - element - } recover { - case e: Exception => - logger.error(s"$e", e) - None - } get - - - def toVertex(s: String): Option[Vertex] = { - toVertex(GraphUtil.split(s)) - } - - def toEdge(s: String): Option[Edge] = { - toEdge(GraphUtil.split(s)) - } - - //"1418342849000\tu\te\t3286249\t71770\ttalk_friend\t{\"is_hidden\":false}" - //{"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":1417616431}, - def toEdge(parts: Array[String]): Option[Edge] = Try { - val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) - val props = if (parts.length >= 7) parts(6) else "{}" - val tempDirection = if (parts.length >= 8) parts(7) else "out" - val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection - - val edge = Management.toEdge(ts.toLong, operation, srcId, tgtId, label, direction, props) - // logger.debug(s"toEdge: $edge") - Some(edge) - } recover { - case e: Exception => - logger.error(s"toEdge: $e", e) - throw e - } get - - //"1418342850000\ti\tv\t168756793\ttalk_user_id\t{\"country_iso\":\"KR\"}" - def toVertex(parts: Array[String]): Option[Vertex] = Try { - val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) - val props = if (parts.length >= 7) parts(6) else "{}" - Some(Management.toVertex(ts.toLong, operation, srcId, serviceName, colName, props)) - } recover { - case e: Throwable => - logger.error(s"toVertex: $e", e) - throw e - } get - - def initStorage(config: Config)(ec: ExecutionContext) = { - config.getString("s2graph.storage.backend") match { - case "hbase" => new AsynchbaseStorage(config)(ec) - case _ => throw new RuntimeException("not supported storage.") - } - } -} - -class Graph(_config: Config)(implicit val ec: ExecutionContext) { - val config = _config.withFallback(Graph.DefaultConfig) - - Model.apply(config) - Model.loadCache() - - // TODO: Make storage client by config param - val storage = Graph.initStorage(config)(ec) - - - for { - entry <- config.entrySet() if Graph.DefaultConfigs.contains(entry.getKey) - (k, v) = (entry.getKey, entry.getValue) - } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}") - - /** select */ - def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = storage.checkEdges(params) - - def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = storage.getEdges(q) - - def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = storage.getVertices(vertices) - - /** write */ - def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] = - storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts) - - def mutateElements(elements: Seq[GraphElement], withWait: Boolean = false): Future[Seq[Boolean]] = - storage.mutateElements(elements, withWait) - - def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait) - - def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateVertices(vertices, withWait) - - def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = storage.incrementCounts(edges, withWait) - - def shutdown(): Unit = { - storage.flush() - Model.shutdown() - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/GraphElement.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/GraphElement.scala b/s2core/src/main/scala/com/kakao/s2graph/core/GraphElement.scala deleted file mode 100644 index e6c043d..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/GraphElement.scala +++ /dev/null @@ -1,12 +0,0 @@ -package com.kakao.s2graph.core - -import org.hbase.async.{HBaseRpc} - -trait GraphElement { - def serviceName: String - def ts: Long - def isAsync: Boolean - def queueKey: String - def queuePartitionKey: String - def toLogString(): String -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/GraphExceptions.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/GraphExceptions.scala b/s2core/src/main/scala/com/kakao/s2graph/core/GraphExceptions.scala deleted file mode 100644 index fa186c2..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/GraphExceptions.scala +++ /dev/null @@ -1,29 +0,0 @@ -package com.kakao.s2graph.core - - -object GraphExceptions { - - case class JsonParseException(msg: String) extends Exception(msg) - - case class LabelNotExistException(msg: String) extends Exception(msg) - - case class ModelNotFoundException(msg: String) extends Exception(msg) - - case class MaxPropSizeReachedException(msg: String) extends Exception(msg) - - case class LabelAlreadyExistException(msg: String) extends Exception(msg) - - case class InternalException(msg: String) extends Exception(msg) - - case class IllegalDataTypeException(msg: String) extends Exception(msg) - - case class WhereParserException(msg: String, ex: Exception = null) extends Exception(msg, ex) - - case class BadQueryException(msg: String, ex: Throwable = null) extends Exception(msg, ex) - - case class InvalidHTableException(msg: String) extends Exception(msg) - - case class FetchTimeoutException(msg: String) extends Exception(msg) - - case class DropRequestException(msg: String) extends Exception(msg) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/GraphUtil.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/GraphUtil.scala b/s2core/src/main/scala/com/kakao/s2graph/core/GraphUtil.scala deleted file mode 100644 index 0359963..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/GraphUtil.scala +++ /dev/null @@ -1,138 +0,0 @@ -package com.kakao.s2graph.core - -import scala.util.Random -import scala.util.hashing.MurmurHash3 -import java.util.regex.Pattern -import play.api.libs.json.Json - -object GraphUtil { - private val TOKEN_DELIMITER = Pattern.compile("[\t]") - val operations = Map("i" -> 0, "insert" -> 0, "u" -> 1, "update" -> 1, - "increment" -> 2, "d" -> 3, "delete" -> 3, - "deleteAll" -> 4, "insertBulk" -> 5, "incrementCount" -> 6).map { - case (k, v) => - k -> v.toByte - } - val BitsForMurMurHash = 16 - val bytesForMurMurHash = 2 - val defaultOpByte = operations("insert") - val directions = Map("out" -> 0, "in" -> 1, "undirected" -> 2, "u" -> 2, "directed" -> 0, "d" -> 0) - val consistencyLevel = Map("weak" -> 0, "strong" -> 1) - - def toType(t: String) = { - t.trim().toLowerCase match { - case "e" | "edge" => "edge" - case "v" | "vertex" => "vertex" - } - } - - def toDir(direction: String): Option[Byte] = { - val d = direction.trim().toLowerCase match { - case "directed" | "d" => Some(0) - case "undirected" | "u" => Some(2) - case "out" => Some(0) - case "in" => Some(1) - case _ => None - } - d.map(x => x.toByte) - } - - def toDirection(direction: String): Int = { - direction.trim().toLowerCase match { - case "directed" | "d" => 0 - case "undirected" | "u" => 2 - case "out" => 0 - case "in" => 1 - case _ => 2 - } - } - - def fromDirection(direction: Int) = { - direction match { - case 0 => "out" - case 1 => "in" - case 2 => "undirected" - } - } - - def toggleDir(dir: Int) = { - dir match { - case 0 => 1 - case 1 => 0 - case 2 => 2 - case _ => throw new UnsupportedOperationException(s"toggleDirection: $dir") - } - } - - def toOp(op: String): Option[Byte] = { - op.trim() match { - case "i" | "insert" => Some(0) - case "d" | "delete" => Some(3) - case "u" | "update" => Some(1) - case "increment" => Some(2) - case "deleteAll" => Some(4) - case "insertBulk" => Some(5) - case "incrementCount" => Option(6) - case _ => None - } - } - - def fromOp(op: Byte): String = { - op match { - case 0 => "insert" - case 3 => "delete" - case 1 => "update" - case 2 => "increment" - case 4 => "deleteAll" - case 5 => "insertBulk" - case 6 => "incrementCount" - case _ => - throw new UnsupportedOperationException(s"op : $op (only support 0(insert),1(delete),2(updaet),3(increment))") - } - } - - // def toggleOp(op: Byte): Byte = { - // val ret = op match { - // case 0 => 1 - // case 1 => 0 - // case x: Byte => x - // } - // ret.toByte - // } - // 2^31 - 1 - - def transformHash(h: Int): Int = { - // h / 2 + (Int.MaxValue / 2 - 1) - if (h < 0) -1 * (h + 1) else h - } - def murmur3Int(s: String): Int = { - val hash = MurmurHash3.stringHash(s) - transformHash(hash) - } - def murmur3(s: String): Short = { - val hash = MurmurHash3.stringHash(s) - val positiveHash = transformHash(hash) >> BitsForMurMurHash - positiveHash.toShort -// Random.nextInt(Short.MaxValue).toShort - } - - def smartSplit(s: String, delemiter: String) = { - val trimed_string = s.trim() - if (trimed_string.equals("")) { - Seq[String]() - } else { - trimed_string.split(delemiter).toList - } - } - - def split(line: String) = TOKEN_DELIMITER.split(line) - - def parseString(s: String): List[String] = { - if (!s.startsWith("[")) { - s.split("\n").toList - } else { - Json.parse(s).asOpt[List[String]].getOrElse(List.empty[String]) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/JSONParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/JSONParser.scala b/s2core/src/main/scala/com/kakao/s2graph/core/JSONParser.scala deleted file mode 100644 index 9663f69..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/JSONParser.scala +++ /dev/null @@ -1,134 +0,0 @@ -package com.kakao.s2graph.core - -import com.kakao.s2graph.core.types.{InnerVal, InnerValLike} -import com.kakao.s2graph.core.utils.logger -import play.api.libs.json._ - - -trait JSONParser { - - //TODO: check result notation on bigDecimal. - def innerValToJsValue(innerVal: InnerValLike, dataType: String): Option[JsValue] = { - try { - val dType = InnerVal.toInnerDataType(dataType) - val jsValue = dType match { - case InnerVal.STRING => JsString(innerVal.value.asInstanceOf[String]) - case InnerVal.BOOLEAN => JsBoolean(innerVal.value.asInstanceOf[Boolean]) - case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE => - // case t if InnerVal.NUMERICS.contains(t) => - innerVal.value match { - case l: Long => JsNumber(l) - case i: Int => JsNumber(i) - case s: Short => JsNumber(s.toLong) - case b: Byte => JsNumber(b.toLong) - case f: Float => JsNumber(f.toDouble) - case d: Double => - // JsNumber(d) - dType match { - case InnerVal.BYTE => JsNumber(d.toInt) - case InnerVal.SHORT => JsNumber(d.toInt) - case InnerVal.INT => JsNumber(d.toInt) - case InnerVal.LONG => JsNumber(d.toLong) - case InnerVal.FLOAT => JsNumber(d.toDouble) - case InnerVal.DOUBLE => JsNumber(d.toDouble) - case _ => throw new RuntimeException(s"$innerVal, $dType => $dataType") - } - case num: BigDecimal => - // JsNumber(num) - // JsNumber(InnerVal.scaleNumber(num.asInstanceOf[BigDecimal], dType)) - dType match { - case InnerVal.BYTE => JsNumber(num.toInt) - case InnerVal.SHORT => JsNumber(num.toInt) - case InnerVal.INT => JsNumber(num.toInt) - case InnerVal.LONG => JsNumber(num.toLong) - case InnerVal.FLOAT => JsNumber(num.toDouble) - case InnerVal.DOUBLE => JsNumber(num.toDouble) - case _ => throw new RuntimeException(s"$innerVal, $dType => $dataType") - } - // JsNumber(num.toLong) - case _ => throw new RuntimeException(s"$innerVal, Numeric Unknown => $dataType") - } - // JsNumber(InnerVal.scaleNumber(innerVal.asInstanceOf[BigDecimal], dType)) - case _ => throw new RuntimeException(s"$innerVal, Unknown => $dataType") - } - Some(jsValue) - } catch { - case e: Exception => - logger.info(s"JSONParser.innerValToJsValue: $e") - None - } - } - - // def innerValToString(innerVal: InnerValLike, dataType: String): String = { - // val dType = InnerVal.toInnerDataType(dataType) - // InnerVal.toInnerDataType(dType) match { - // case InnerVal.STRING => innerVal.toString - // case InnerVal.BOOLEAN => innerVal.toString - // // case t if InnerVal.NUMERICS.contains(t) => - // case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE => - // BigDecimal(innerVal.toString).bigDecimal.toPlainString - // case _ => innerVal.toString - // // throw new RuntimeException("innerVal to jsValue failed.") - // } - // } - - def toInnerVal(str: String, dataType: String, version: String): InnerValLike = { - //TODO: - // logger.error(s"toInnerVal: $str, $dataType, $version") - val s = - if (str.startsWith("\"") && str.endsWith("\"")) str.substring(1, str.length - 1) - else str - val dType = InnerVal.toInnerDataType(dataType) - - dType match { - case InnerVal.STRING => InnerVal.withStr(s, version) - // case t if InnerVal.NUMERICS.contains(t) => InnerVal.withNumber(BigDecimal(s), version) - case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE => - InnerVal.withNumber(BigDecimal(s), version) - case InnerVal.BOOLEAN => InnerVal.withBoolean(s.toBoolean, version) - case InnerVal.BLOB => InnerVal.withBlob(s.getBytes, version) - case _ => - // InnerVal.withStr("") - throw new RuntimeException(s"illegal datatype for string: dataType is $dataType for $s") - } - } - - def jsValueToInnerVal(jsValue: JsValue, dataType: String, version: String): Option[InnerValLike] = { - val ret = try { - val dType = InnerVal.toInnerDataType(dataType.toLowerCase()) - jsValue match { - case n: JsNumber => - dType match { - case InnerVal.STRING => Some(InnerVal.withStr(jsValue.toString, version)) - // case t if InnerVal.NUMERICS.contains(t) => - case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE => - Some(InnerVal.withNumber(n.value, version)) - case _ => None - } - case s: JsString => - dType match { - case InnerVal.STRING => Some(InnerVal.withStr(s.value, version)) - case InnerVal.BOOLEAN => Some(InnerVal.withBoolean(s.as[String].toBoolean, version)) - // case t if InnerVal.NUMERICS.contains(t) => - case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE => - Some(InnerVal.withNumber(BigDecimal(s.value), version)) - case _ => None - } - case b: JsBoolean => - dType match { - case InnerVal.STRING => Some(InnerVal.withStr(b.toString, version)) - case InnerVal.BOOLEAN => Some(InnerVal.withBoolean(b.value, version)) - case _ => None - } - case _ => - None - } - } catch { - case e: Exception => - logger.error(e.getMessage) - None - } - - ret - } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala deleted file mode 100644 index ccf9d1f..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala +++ /dev/null @@ -1,368 +0,0 @@ -package com.kakao.s2graph.core - - -import com.kakao.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException} -import com.kakao.s2graph.core.Management.JsonModel.{Index, Prop} -import com.kakao.s2graph.core.mysqls._ -import com.kakao.s2graph.core.types.HBaseType._ -import com.kakao.s2graph.core.types._ -import play.api.libs.json.Reads._ -import play.api.libs.json._ -import scala.util.Try - -/** - * This is designed to be bridge between rest to s2core. - * s2core never use this for finding models. - */ -object Management extends JSONParser { - - object JsonModel { - - case class Prop(name: String, defaultValue: String, datatType: String) - - object Prop extends ((String, String, String) => Prop) - - case class Index(name: String, propNames: Seq[String]) - - } - - import HBaseType._ - - val DefaultCompressionAlgorithm = "gz" - - - def findService(serviceName: String) = { - Service.findByName(serviceName, useCache = false) - } - - def deleteService(serviceName: String) = { - Service.findByName(serviceName).foreach { service => - // service.deleteAll() - } - } - - def updateHTable(labelName: String, newHTableName: String): Try[Int] = Try { - val targetLabel = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(s"Target label $labelName does not exist.")) - if (targetLabel.hTableName == newHTableName) throw new InvalidHTableException(s"New HTable name is already in use for target label.") - - Label.updateHTableName(targetLabel.label, newHTableName) - } - - - - def createServiceColumn(serviceName: String, - columnName: String, - columnType: String, - props: Seq[Prop], - schemaVersion: String = DEFAULT_VERSION) = { - - Model withTx { implicit session => - val serviceOpt = Service.findByName(serviceName) - serviceOpt match { - case None => throw new RuntimeException(s"create service $serviceName has not been created.") - case Some(service) => - val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion) - for { - Prop(propName, defaultValue, dataType) <- props - } yield { - ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType) - } - } - } - } - - def deleteColumn(serviceName: String, columnName: String, schemaVersion: String = DEFAULT_VERSION) = { - Model withTx { implicit session => - val service = Service.findByName(serviceName, useCache = false).getOrElse(throw new RuntimeException("Service not Found")) - val serviceColumns = ServiceColumn.find(service.id.get, columnName, useCache = false) - val columnNames = serviceColumns.map { serviceColumn => - ServiceColumn.delete(serviceColumn.id.get) - serviceColumn.columnName - } - - columnNames.getOrElse(throw new RuntimeException("column not found")) - } - } - - def findLabel(labelName: String): Option[Label] = { - Label.findByName(labelName, useCache = false) - } - - def deleteLabel(labelName: String) = { - Model withTx { implicit session => - Label.findByName(labelName, useCache = false).foreach { label => - Label.deleteAll(label) - } - labelName - } - } - - def addIndex(labelStr: String, indices: Seq[Index]): Try[Label] = { - Model withTx { implicit session => - val label = Label.findByName(labelStr).getOrElse(throw LabelNotExistException(s"$labelStr not found")) - val labelMetaMap = label.metaPropsInvMap - - indices.foreach { index => - val metaSeq = index.propNames.map { name => labelMetaMap(name).seq } - LabelIndex.findOrInsert(label.id.get, index.name, metaSeq.toList, "none") - } - - label - } - } - - def addProp(labelStr: String, prop: Prop) = { - Model withTx { implicit session => - val labelOpt = Label.findByName(labelStr) - val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found")) - - LabelMeta.findOrInsert(label.id.get, prop.name, prop.defaultValue, prop.datatType) - } - } - - def addProps(labelStr: String, props: Seq[Prop]) = { - Model withTx { implicit session => - val labelOpt = Label.findByName(labelStr) - val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found")) - - props.map { - case Prop(propName, defaultValue, dataType) => - LabelMeta.findOrInsert(label.id.get, propName, defaultValue, dataType) - } - } - } - - def addVertexProp(serviceName: String, - columnName: String, - propsName: String, - propsType: String, - schemaVersion: String = DEFAULT_VERSION): ColumnMeta = { - val result = for { - service <- Service.findByName(serviceName, useCache = false) - serviceColumn <- ServiceColumn.find(service.id.get, columnName) - } yield { - ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType) - } - result.getOrElse({ - throw new RuntimeException(s"add property on vertex failed") - }) - } - - def getServiceLable(label: String): Option[Label] = { - Label.findByName(label, useCache = true) - } - - /** - * - */ - - def toLabelWithDirectionAndOp(label: Label, direction: String): Option[LabelWithDirection] = { - for { - labelId <- label.id - dir = GraphUtil.toDirection(direction) - } yield LabelWithDirection(labelId, dir) - } - - def tryOption[A, R](key: A, f: A => Option[R]) = { - f(key) match { - case None => throw new GraphExceptions.InternalException(s"$key is not found in DB. create $key first.") - case Some(r) => r - } - } - - def toEdge(ts: Long, operation: String, srcId: String, tgtId: String, - labelStr: String, direction: String = "", props: String): Edge = { - - val label = tryOption(labelStr, getServiceLable) - val dir = - if (direction == "") -// GraphUtil.toDirection(label.direction) - GraphUtil.directions("out") - else - GraphUtil.toDirection(direction) - - // logger.debug(s"$srcId, ${label.srcColumnWithDir(dir)}") - // logger.debug(s"$tgtId, ${label.tgtColumnWithDir(dir)}") - - val srcVertexId = toInnerVal(srcId, label.srcColumn.columnType, label.schemaVersion) - val tgtVertexId = toInnerVal(tgtId, label.tgtColumn.columnType, label.schemaVersion) - - val srcColId = label.srcColumn.id.get - val tgtColId = label.tgtColumn.id.get - val (srcVertex, tgtVertex) = if (dir == GraphUtil.directions("out")) { - (Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis()), - Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis())) - } else { - (Vertex(SourceVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()), - Vertex(TargetVertexId(srcColId, srcVertexId), System.currentTimeMillis())) - } - - // val dir = if (direction == "") GraphUtil.toDirection(label.direction) else GraphUtil.toDirection(direction) - val labelWithDir = LabelWithDirection(label.id.get, dir) - val op = tryOption(operation, GraphUtil.toOp) - - val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj()) - val parsedProps = toProps(label, jsObject.fields).toMap - val propsWithTs = parsedProps.map(kv => (kv._1 -> InnerValLikeWithTs(kv._2, ts))) ++ - Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, label.schemaVersion), ts)) - - Edge(srcVertex, tgtVertex, labelWithDir, op, version = ts, propsWithTs = propsWithTs) - - } - - def toVertex(ts: Long, operation: String, id: String, serviceName: String, columnName: String, props: String): Vertex = { - Service.findByName(serviceName) match { - case None => throw new RuntimeException(s"$serviceName does not exist. create service first.") - case Some(service) => - ServiceColumn.find(service.id.get, columnName) match { - case None => throw new RuntimeException(s"$columnName is not exist. create service column first.") - case Some(col) => - val idVal = toInnerVal(id, col.columnType, col.schemaVersion) - val op = tryOption(operation, GraphUtil.toOp) - val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj()) - val parsedProps = toProps(col, jsObject).toMap - Vertex(VertexId(col.id.get, idVal), ts, parsedProps, op = op) - } - } - } - - def toProps(column: ServiceColumn, js: JsObject): Seq[(Int, InnerValLike)] = { - - val props = for { - (k, v) <- js.fields - meta <- column.metasInvMap.get(k) - } yield { - val innerVal = jsValueToInnerVal(v, meta.dataType, column.schemaVersion).getOrElse( - throw new RuntimeException(s"$k is not defined. create schema for vertex.")) - - (meta.seq.toInt, innerVal) - } - props - - } - - def toProps(label: Label, js: Seq[(String, JsValue)]): Seq[(Byte, InnerValLike)] = { - val props = for { - (k, v) <- js - meta <- label.metaPropsInvMap.get(k) - innerVal <- jsValueToInnerVal(v, meta.dataType, label.schemaVersion) - } yield (meta.seq, innerVal) - - props - } - - - /** - * update label name. - */ - def updateLabelName(oldLabelName: String, newLabelName: String) = { - for { - old <- Label.findByName(oldLabelName) - } { - Label.findByName(newLabelName) match { - case None => - Label.updateName(oldLabelName, newLabelName) - case Some(_) => - // throw new RuntimeException(s"$newLabelName already exist") - } - } - } -} - -class Management(graph: Graph) { - import Management._ - val storage = graph.storage - - def createTable(zkAddr: String, - tableName: String, - cfs: List[String], - regionMultiplier: Int, - ttl: Option[Int], - compressionAlgorithm: String = DefaultCompressionAlgorithm): Unit = - storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm) - - /** HBase specific code */ - def createService(serviceName: String, - cluster: String, hTableName: String, - preSplitSize: Int, hTableTTL: Option[Int], - compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = { - - Model withTx { implicit session => - val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm) - /** create hbase table for service */ - storage.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm) - service - } - } - - /** HBase specific code */ - def createLabel(label: String, - srcServiceName: String, - srcColumnName: String, - srcColumnType: String, - tgtServiceName: String, - tgtColumnName: String, - tgtColumnType: String, - isDirected: Boolean = true, - serviceName: String, - indices: Seq[Index], - props: Seq[Prop], - consistencyLevel: String, - hTableName: Option[String], - hTableTTL: Option[Int], - schemaVersion: String = DEFAULT_VERSION, - isAsync: Boolean, - compressionAlgorithm: String = "gz"): Try[Label] = { - - val labelOpt = Label.findByName(label, useCache = false) - - Model withTx { implicit session => - labelOpt match { - case Some(l) => - throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.") - case None => - /** create all models */ - val newLabel = Label.insertAll(label, - srcServiceName, srcColumnName, srcColumnType, - tgtServiceName, tgtColumnName, tgtColumnType, - isDirected, serviceName, indices, props, consistencyLevel, - hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm) - - /** create hbase table */ - val service = newLabel.service - (hTableName, hTableTTL) match { - case (None, None) => // do nothing - case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also") - case (Some(hbaseTableName), None) => - // create own hbase table with default ttl on service level. - storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm) - case (Some(hbaseTableName), Some(hbaseTableTTL)) => - // create own hbase table with own ttl. - storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL, compressionAlgorithm) - } - newLabel - } - } - } - - /** - * label - */ - /** - * copy label when if oldLabel exist and newLabel do not exist. - * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster. - */ - def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = { - val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists.")) - if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.") - - val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) } - val allIndices = old.indices.map { index => Index(index.name, index.propNames) } - - createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType, - old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType, - old.isDirected, old.serviceName, - allIndices, allProps, - old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b6fe32fc/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala b/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala deleted file mode 100644 index 33ec7d9..0000000 --- a/s2core/src/main/scala/com/kakao/s2graph/core/OrderingUtil.scala +++ /dev/null @@ -1,146 +0,0 @@ -package com.kakao.s2graph.core - -import com.kakao.s2graph.core.types.InnerValLike -import play.api.libs.json.{JsNumber, JsString, JsValue} - -object OrderingUtil { - - implicit object JsValueOrdering extends Ordering[JsValue] { - override def compare(x: JsValue, y: JsValue): Int = { - (x, y) match { - case (JsNumber(xv), JsNumber(yv)) => - Ordering.BigDecimal.compare(xv, yv) - case (JsString(xv), JsString(yv)) => - Ordering.String.compare(xv, yv) - case _ => throw new Exception(s"unsupported type") - } - } - } - - implicit object InnerValLikeOrdering extends Ordering[InnerValLike] { - override def compare(x: InnerValLike, y: InnerValLike): Int = { - x.compare(y) - } - } - - implicit object MultiValueOrdering extends Ordering[Any] { - override def compare(x: Any, y: Any): Int = { - (x, y) match { - case (xv: Int, yv: Int) => implicitly[Ordering[Int]].compare(xv, yv) - case (xv: Long, yv: Long) => implicitly[Ordering[Long]].compare(xv, yv) - case (xv: Double, yv: Double) => implicitly[Ordering[Double]].compare(xv, yv) - case (xv: String, yv: String) => implicitly[Ordering[String]].compare(xv, yv) - case (xv: JsValue, yv: JsValue) => implicitly[Ordering[JsValue]].compare(xv, yv) - case (xv: InnerValLike, yv: InnerValLike) => implicitly[Ordering[InnerValLike]].compare(xv, yv) - } - } - } - - def TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: Ordering[T]): Ordering[(T, T, T, T)] = { - new Ordering[(T, T, T, T)] { - override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = { - val len = ascendingLs.length - val it = ascendingLs.iterator - if (len >= 1) { - val (x, y) = it.next() match { - case true => tx -> ty - case false => ty -> tx - } - val compare1 = ord.compare(x._1, y._1) - if (compare1 != 0) return compare1 - } - - if (len >= 2) { - val (x, y) = it.next() match { - case true => tx -> ty - case false => ty -> tx - } - val compare2 = ord.compare(x._2, y._2) - if (compare2 != 0) return compare2 - } - - if (len >= 3) { - val (x, y) = it.next() match { - case true => tx -> ty - case false => ty -> tx - } - val compare3 = ord.compare(x._3, y._3) - if (compare3 != 0) return compare3 - } - - if (len >= 4) { - val (x, y) = it.next() match { - case true => tx -> ty - case false => ty -> tx - } - val compare4 = ord.compare(x._4, y._4) - if (compare4 != 0) return compare4 - } - 0 - } - } - } -} - -class SeqMultiOrdering[T](ascendingLs: Seq[Boolean], defaultAscending: Boolean = true)(implicit ord: Ordering[T]) extends Ordering[Seq[T]] { - override def compare(x: Seq[T], y: Seq[T]): Int = { - val xe = x.iterator - val ye = y.iterator - val oe = ascendingLs.iterator - - while (xe.hasNext && ye.hasNext) { - val ascending = if (oe.hasNext) oe.next() else defaultAscending - val (xev, yev) = ascending match { - case true => xe.next() -> ye.next() - case false => ye.next() -> xe.next() - } - val res = ord.compare(xev, yev) - if (res != 0) return res - } - - Ordering.Boolean.compare(xe.hasNext, ye.hasNext) - } -} - -//class TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: Ordering[T]) extends Ordering[(T, T, T, T)] { -// override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = { -// val len = ascendingLs.length -// val it = ascendingLs.iterator -// if (len >= 1) { -// val (x, y) = it.next() match { -// case true => tx -> ty -// case false => ty -> tx -// } -// val compare1 = ord.compare(x._1, y._1) -// if (compare1 != 0) return compare1 -// } -// -// if (len >= 2) { -// val (x, y) = it.next() match { -// case true => tx -> ty -// case false => ty -> tx -// } -// val compare2 = ord.compare(x._2, y._2) -// if (compare2 != 0) return compare2 -// } -// -// if (len >= 3) { -// val (x, y) = it.next() match { -// case true => tx -> ty -// case false => ty -> tx -// } -// val compare3 = ord.compare(x._3, y._3) -// if (compare3 != 0) return compare3 -// } -// -// if (len >= 4) { -// val (x, y) = it.next() match { -// case true => tx -> ty -// case false => ty -> tx -// } -// val compare4 = ord.compare(x._4, y._4) -// if (compare4 != 0) return compare4 -// } -// 0 -// } -//}
