http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala new file mode 100644 index 0000000..c1a5738 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala @@ -0,0 +1,567 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} +import org.apache.s2graph.core.types._ +import org.apache.s2graph.core.utils.logger +import play.api.libs.json.{JsNumber, Json} + +import scala.collection.JavaConversions._ +import scala.util.hashing.MurmurHash3 + + +case class SnapshotEdge(srcVertex: Vertex, + tgtVertex: Vertex, + labelWithDir: LabelWithDirection, + op: Byte, + version: Long, + props: Map[Byte, InnerValLikeWithTs], + pendingEdgeOpt: Option[Edge], + statusCode: Byte = 0, + lockTs: Option[Long]) extends JSONParser { + + if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") + + val label = Label.findById(labelWithDir.labelId) + val schemaVer = label.schemaVersion + lazy val propsWithoutTs = props.mapValues(_.innerVal) + val ts = props(LabelMeta.timeStampSeq).innerVal.toString().toLong + + def toEdge: Edge = { + val ts = props.get(LabelMeta.timeStampSeq).map(v => v.ts).getOrElse(version) + Edge(srcVertex, tgtVertex, labelWithDir, op, + version, props, pendingEdgeOpt = pendingEdgeOpt, + statusCode = statusCode, lockTs = lockTs) + } + + def propsWithName = (for { + (seq, v) <- props + meta <- label.metaPropsMap.get(seq) + jsValue <- innerValToJsValue(v.innerVal, meta.dataType) + } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version)) + + // only for debug + def toLogString() = { + List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t") + } +} + +case class IndexEdge(srcVertex: Vertex, + tgtVertex: Vertex, + labelWithDir: LabelWithDirection, + op: Byte, + version: Long, + labelIndexSeq: Byte, + props: Map[Byte, InnerValLike]) extends JSONParser { + if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") + // assert(props.containsKey(LabelMeta.timeStampSeq)) + + val ts = props(LabelMeta.timeStampSeq).toString.toLong + val degreeEdge = props.contains(LabelMeta.degreeSeq) + lazy val label = Label.findById(labelWithDir.labelId) + val schemaVer = label.schemaVersion + lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get + lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta => + val innerVal = toInnerVal(meta.defaultValue, meta.dataType, schemaVer) + meta.seq -> innerVal + }.toMap + + lazy val labelIndexMetaSeqs = labelIndex.metaSeqs + + /** TODO: make sure call of this class fill props as this assumes */ + lazy val orders = for (k <- labelIndexMetaSeqs) yield { + props.get(k) match { + case None => + + /** + * TODO: agly hack + * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once + */ + val v = k match { + case LabelMeta.timeStampSeq => InnerVal.withLong(version, schemaVer) + case LabelMeta.toSeq => tgtVertex.innerId + case LabelMeta.fromSeq => //srcVertex.innerId + // for now, it does not make sense to build index on srcVertex.innerId since all edges have same data. + throw new RuntimeException("_from on indexProps is not supported") + case _ => defaultIndexMetas(k) + } + + k -> v + case Some(v) => k -> v + } + } + + lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet + lazy val metas = for ((k, v) <- props if !ordersKeyMap.contains(k)) yield k -> v + + lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } + + //TODO: + // lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList + + lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length + + def propsWithName = for { + (seq, v) <- props + meta <- label.metaPropsMap.get(seq) if seq >= 0 + jsValue <- innerValToJsValue(v, meta.dataType) + } yield meta.name -> jsValue + + + def toEdge: Edge = Edge(srcVertex, tgtVertex, labelWithDir, op, version, propsWithTs) + + // only for debug + def toLogString() = { + List(version, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)).mkString("\t") + } +} + +case class Edge(srcVertex: Vertex, + tgtVertex: Vertex, + labelWithDir: LabelWithDirection, + op: Byte = GraphUtil.defaultOpByte, + version: Long = System.currentTimeMillis(), + propsWithTs: Map[Byte, InnerValLikeWithTs], + parentEdges: Seq[EdgeWithScore] = Nil, + originalEdgeOpt: Option[Edge] = None, + pendingEdgeOpt: Option[Edge] = None, + statusCode: Byte = 0, + lockTs: Option[Long] = None) extends GraphElement with JSONParser { + + if (!props.containsKey(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") + // assert(propsWithTs.containsKey(LabelMeta.timeStampSeq)) + val schemaVer = label.schemaVersion + val ts = propsWithTs(LabelMeta.timeStampSeq).innerVal.toString.toLong + + def props = propsWithTs.mapValues(_.innerVal) + + def relatedEdges = { + if (labelWithDir.isDirected) List(this, duplicateEdge) + else { + val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) + val base = copy(labelWithDir = outDir) + List(base, base.reverseSrcTgtEdge) + } + } + + // def relatedEdges = List(this) + + def srcForVertex = { + val belongLabelIds = Seq(labelWithDir.labelId) + if (labelWithDir.dir == GraphUtil.directions("in")) { + Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) + } else { + Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) + } + } + + def tgtForVertex = { + val belongLabelIds = Seq(labelWithDir.labelId) + if (labelWithDir.dir == GraphUtil.directions("in")) { + Vertex(VertexId(label.srcColumn.id.get, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) + } else { + Vertex(VertexId(label.tgtColumn.id.get, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) + } + } + + def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge + + def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled) + + def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex) + + def label = Label.findById(labelWithDir.labelId) + + def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId) + + override def serviceName = label.serviceName + + override def queueKey = Seq(ts.toString, tgtVertex.serviceName).mkString("|") + + override def queuePartitionKey = Seq(srcVertex.innerId, tgtVertex.innerId).mkString("|") + + override def isAsync = label.isAsync + + def isDegree = propsWithTs.contains(LabelMeta.degreeSeq) + + def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { + case Some(_) => props + case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) + } + + def propsPlusTsValid = propsPlusTs.filter(kv => kv._1 >= 0) + + def edgesWithIndex = for (labelOrder <- labelOrders) yield { + IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTs) + } + + def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { + IndexEdge(srcVertex, tgtVertex, labelWithDir, op, version, labelOrder.seq, propsPlusTsValid) + } + + /** force direction as out on invertedEdge */ + def toSnapshotEdge: SnapshotEdge = { + val (smaller, larger) = (srcForVertex, tgtForVertex) + + val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out")) + + val ret = SnapshotEdge(smaller, larger, newLabelWithDir, op, version, + Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, schemaVer), ts)) ++ propsWithTs, + pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs) + ret + } + + override def hashCode(): Int = { + MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId) + } + + override def equals(other: Any): Boolean = other match { + case e: Edge => + srcVertex.innerId == e.srcVertex.innerId && + tgtVertex.innerId == e.tgtVertex.innerId && + labelWithDir == e.labelWithDir + case _ => false + } + + def propsWithName = for { + (seq, v) <- props + meta <- label.metaPropsMap.get(seq) if seq > 0 + jsValue <- innerValToJsValue(v, meta.dataType) + } yield meta.name -> jsValue + + def updateTgtVertex(id: InnerValLike) = { + val newId = TargetVertexId(tgtVertex.id.colId, id) + val newTgtVertex = Vertex(newId, tgtVertex.ts, tgtVertex.props) + Edge(srcVertex, newTgtVertex, labelWithDir, op, version, propsWithTs) + } + + def rank(r: RankParam): Double = + if (r.keySeqAndWeights.size <= 0) 1.0f + else { + var sum: Double = 0 + + for ((seq, w) <- r.keySeqAndWeights) { + seq match { + case LabelMeta.countSeq => sum += 1 + case _ => { + propsWithTs.get(seq) match { + case None => // do nothing + case Some(innerValWithTs) => { + val cost = try innerValWithTs.innerVal.toString.toDouble catch { + case e: Exception => + logger.error("toInnerval failed in rank", e) + 1.0 + } + sum += w * cost + } + } + } + } + } + sum + } + + def toLogString: String = { + val ret = + if (propsWithName.nonEmpty) + List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)) + else + List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label) + + ret.mkString("\t") + } +} + +case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge], + edgesToInsert: List[IndexEdge] = List.empty[IndexEdge], + newSnapshotEdge: Option[SnapshotEdge] = None) { + + def toLogString: String = { + val l = (0 until 50).map(_ => "-").mkString("") + val deletes = s"deletes: ${edgesToDelete.map(e => e.toLogString).mkString("\n")}" + val inserts = s"inserts: ${edgesToInsert.map(e => e.toLogString).mkString("\n")}" + val updates = s"snapshot: ${newSnapshotEdge.map(e => e.toLogString).mkString("\n")}" + + List("\n", l, deletes, inserts, updates, l, "\n").mkString("\n") + } +} + +object Edge extends JSONParser { + val incrementVersion = 1L + val minTsVal = 0L + + /** now version information is required also **/ + type State = Map[Byte, InnerValLikeWithTs] + type PropsPairWithTs = (State, State, Long, String) + type MergeState = PropsPairWithTs => (State, Boolean) + type UpdateFunc = (Option[Edge], Edge, MergeState) + + def allPropsDeleted(props: Map[Byte, InnerValLikeWithTs]): Boolean = + if (!props.containsKey(LabelMeta.lastDeletedAt)) false + else { + val lastDeletedAt = props.get(LabelMeta.lastDeletedAt).get.ts + val propsWithoutLastDeletedAt = props - LabelMeta.lastDeletedAt + + propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt } + } + + def buildDeleteBulk(invertedEdge: Option[Edge], requestEdge: Edge): (Edge, EdgeMutate) = { + // assert(invertedEdge.isEmpty) + // assert(requestEdge.op == GraphUtil.operations("delete")) + + val edgesToDelete = requestEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } + val edgeInverted = Option(requestEdge.toSnapshotEdge) + + (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, edgeInverted)) + } + + def buildOperation(invertedEdge: Option[Edge], requestEdges: Seq[Edge]): (Edge, EdgeMutate) = { + // logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}") + // logger.debug(s"requestEdge: ${requestEdge.toStringRaw}") + val oldPropsWithTs = + if (invertedEdge.isEmpty) Map.empty[Byte, InnerValLikeWithTs] else invertedEdge.get.propsWithTs + + val funcs = requestEdges.map { edge => + if (edge.op == GraphUtil.operations("insert")) { + edge.label.consistencyLevel match { + case "strong" => Edge.mergeUpsert _ + case _ => Edge.mergeInsertBulk _ + } + } else if (edge.op == GraphUtil.operations("insertBulk")) { + Edge.mergeInsertBulk _ + } else if (edge.op == GraphUtil.operations("delete")) { + edge.label.consistencyLevel match { + case "strong" => Edge.mergeDelete _ + case _ => throw new RuntimeException("not supported") + } + } + else if (edge.op == GraphUtil.operations("update")) Edge.mergeUpdate _ + else if (edge.op == GraphUtil.operations("increment")) Edge.mergeIncrement _ + else throw new RuntimeException(s"not supported operation on edge: $edge") + } + + val oldTs = invertedEdge.map(_.ts).getOrElse(minTsVal) + val requestWithFuncs = requestEdges.zip(funcs).filter(oldTs != _._1.ts).sortBy(_._1.ts) + + if (requestWithFuncs.isEmpty) { + (requestEdges.head, EdgeMutate()) + } else { + val requestEdge = requestWithFuncs.last._1 + var prevPropsWithTs = oldPropsWithTs + + for { + (requestEdge, func) <- requestWithFuncs + } { + val (_newPropsWithTs, _) = func(prevPropsWithTs, requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer) + prevPropsWithTs = _newPropsWithTs + // logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n") + } + val requestTs = requestEdge.ts + /** version should be monotoniously increasing so our RPC mutation should be applied safely */ + val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs) + val maxTs = prevPropsWithTs.map(_._2.ts).max + val newTs = if (maxTs > requestTs) maxTs else requestTs + val propsWithTs = prevPropsWithTs ++ + Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.label.schemaVersion), newTs)) + val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) + + // logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}") + // logger.error(s"$propsWithTs") + (requestEdge, edgeMutate) + } + } + + def buildMutation(snapshotEdgeOpt: Option[Edge], + requestEdge: Edge, + newVersion: Long, + oldPropsWithTs: Map[Byte, InnerValLikeWithTs], + newPropsWithTs: Map[Byte, InnerValLikeWithTs]): EdgeMutate = { + if (oldPropsWithTs == newPropsWithTs) { + // all requests should be dropped. so empty mutation. + // logger.error(s"Case 1") + EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = None) + } else { + val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAt) + val newOp = snapshotEdgeOpt match { + case None => requestEdge.op + case Some(old) => + val oldMaxTs = old.propsWithTs.map(_._2.ts).max + if (oldMaxTs > requestEdge.ts) old.op + else requestEdge.op + } + + val newSnapshotEdgeOpt = + Option(requestEdge.copy(op = newOp, propsWithTs = newPropsWithTs, version = newVersion).toSnapshotEdge) + // delete request must always update snapshot. + if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.containsKey(LabelMeta.lastDeletedAt)) { + // no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt. + // logger.error(s"Case 2") + EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt) + } else { + // logger.error(s"Case 3") + val edgesToDelete = snapshotEdgeOpt match { + case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") => + snapshotEdge.copy(op = GraphUtil.defaultOpByte). + relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } + case _ => Nil + } + + val edgesToInsert = + if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil + else + requestEdge.copy(version = newVersion, propsWithTs = newPropsWithTs, op = GraphUtil.defaultOpByte). + relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } + + EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = edgesToInsert, newSnapshotEdge = newSnapshotEdgeOpt) + } + } + } + + def mergeUpsert(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { + var shouldReplace = false + val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs + val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal) + val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { + propsWithTs.get(k) match { + case Some(newValWithTs) => + assert(oldValWithTs.ts >= lastDeletedAt) + val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs + else { + shouldReplace = true + newValWithTs + } + Some(k -> v) + + case None => + assert(oldValWithTs.ts >= lastDeletedAt) + if (oldValWithTs.ts >= requestTs || k < 0) Some(k -> oldValWithTs) + else { + shouldReplace = true + None + } + } + } + val existInNew = + for { + (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt + } yield { + shouldReplace = true + Some(k -> newValWithTs) + } + + ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace) + } + + def mergeUpdate(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { + var shouldReplace = false + val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs + val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal) + val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { + propsWithTs.get(k) match { + case Some(newValWithTs) => + assert(oldValWithTs.ts >= lastDeletedAt) + val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs + else { + shouldReplace = true + newValWithTs + } + Some(k -> v) + case None => + // important: update need to merge previous valid values. + assert(oldValWithTs.ts >= lastDeletedAt) + Some(k -> oldValWithTs) + } + } + val existInNew = for { + (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt + } yield { + shouldReplace = true + Some(k -> newValWithTs) + } + + ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace) + } + + def mergeIncrement(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { + var shouldReplace = false + val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs + val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal) + val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { + propsWithTs.get(k) match { + case Some(newValWithTs) => + if (k == LabelMeta.timeStampSeq) { + val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs + else { + shouldReplace = true + newValWithTs + } + Some(k -> v) + } else { + if (oldValWithTs.ts >= newValWithTs.ts) { + Some(k -> oldValWithTs) + } else { + assert(oldValWithTs.ts < newValWithTs.ts && oldValWithTs.ts >= lastDeletedAt) + shouldReplace = true + // incr(t0), incr(t2), d(t1) => deleted + Some(k -> InnerValLikeWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, oldValWithTs.ts)) + } + } + + case None => + assert(oldValWithTs.ts >= lastDeletedAt) + Some(k -> oldValWithTs) + // if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) else None + } + } + val existInNew = for { + (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt + } yield { + shouldReplace = true + Some(k -> newValWithTs) + } + + ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace) + } + + def mergeDelete(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { + var shouldReplace = false + val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs + val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt) match { + case Some(prevDeletedAt) => + if (prevDeletedAt.ts >= requestTs) prevDeletedAt.ts + else { + shouldReplace = true + requestTs + } + case None => { + shouldReplace = true + requestTs + } + } + val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { + if (k == LabelMeta.timeStampSeq) { + if (oldValWithTs.ts >= requestTs) Some(k -> oldValWithTs) + else { + shouldReplace = true + Some(k -> InnerValLikeWithTs.withLong(requestTs, requestTs, version)) + } + } else { + if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) + else { + shouldReplace = true + None + } + } + } + val mustExistInNew = Map(LabelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(lastDeletedAt, lastDeletedAt, version)) + ((existInOld.flatten ++ mustExistInNew).toMap, shouldReplace) + } + + def mergeInsertBulk(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { + val (_, propsWithTs, _, _) = propsPairWithTs + (propsWithTs, true) + } + + def fromString(s: String): Option[Edge] = Graph.toEdge(s) + + +}
http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala new file mode 100644 index 0000000..d3177b8 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/ExceptionHandler.scala @@ -0,0 +1,159 @@ +package org.apache.s2graph.core + +import java.util.Properties +import java.util.concurrent.atomic.AtomicLong + +import akka.actor._ +import akka.routing.{Broadcast, RoundRobinPool} +import com.typesafe.config.Config +import org.apache.kafka.clients.producer._ + +import scala.concurrent.duration._ + +object ExceptionHandler { + + var producer: Option[Producer[Key, Val]] = None + var properties: Option[Properties] = None + val numOfRoutees = 1 + val actorSystem = ActorSystem("ExceptionHandler") + var routees: Option[ActorRef] = None + var shutdownTime = 1000 millis + var phase = "dev" + lazy val failTopic = s"mutateFailed_${phase}" + + def apply(config: Config) = { + properties = + if (config.hasPath("kafka.metadata.broker.list")) Option(kafkaConfig(config)) + else None + phase = if (config.hasPath("phase")) config.getString("phase") else "dev" + producer = for { + props <- properties + p <- try { + Option(new KafkaProducer[Key, Val](props)) + } catch { + case e: Throwable => None + } + } yield { + p + } + init() + } + + def props(producer: Producer[Key, Val]) = Props(classOf[KafkaAggregatorActor], producer) + + def init() = { + for { + p <- producer + } { + routees = Option(actorSystem.actorOf(RoundRobinPool(numOfRoutees).props(props(p)))) + } + } + + def shutdown() = { + routees.map(_ ! Broadcast(PoisonPill)) + Thread.sleep(shutdownTime.length) + } + + def enqueues(msgs: Seq[KafkaMessage]) = { + msgs.foreach(enqueue) + } + + def enqueue(msg: KafkaMessage) = { + routees.map(_ ! msg) + } + + + def kafkaConfig(config: Config) = { + val props = new Properties(); + + /** all default configuration for new producer */ + val brokers = + if (config.hasPath("kafka.metadata.broker.list")) config.getString("kafka.metadata.broker.list") + else "localhost" + props.put("bootstrap.servers", brokers) + props.put("acks", "1") + props.put("buffer.memory", "33554432") + props.put("compression.type", "snappy") + props.put("retries", "0") + props.put("batch.size", "16384") + props.put("linger.ms", "0") + props.put("max.request.size", "1048576") + props.put("receive.buffer.bytes", "32768") + props.put("send.buffer.bytes", "131072") + props.put("timeout.ms", "30000") + props.put("block.on.buffer.full", "false") + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") + props + } + + type Key = String + type Val = String + + def toKafkaMessage(topic: String = failTopic, element: GraphElement, originalString: Option[String] = None) = { + KafkaMessage(new ProducerRecord[Key, Val](topic, element.queuePartitionKey, + originalString.getOrElse(element.toLogString()))) + } + + case class KafkaMessage(msg: ProducerRecord[Key, Val]) + + case class Message(topic: String, msg: String) + + case class BufferedKafkaMessage(msgs: Seq[ProducerRecord[Key, Val]], bufferSize: Int) + + case class BufferedMessage(topic: String, bufferedMsgs: String, bufferSize: Int) + + case object FlushBuffer + + case class UpdateHealth(isHealty: Boolean) + + case object ShowMetrics + +} + +class KafkaAggregatorActor(kafkaProducer: Producer[String, String]) extends Stash with ActorLogging { + + import ExceptionHandler._ + + val failedCount = new AtomicLong(0L) + val successCount = new AtomicLong(0L) + val stashCount = new AtomicLong(0L) + + implicit val ex = context.system.dispatcher + + context.system.scheduler.schedule(0 millis, 10 seconds) { + self ! ShowMetrics + } + + override def receive = { + case ShowMetrics => + log.info(s"[Stats]: failed[${failedCount.get}], stashed[${stashCount.get}], success[${successCount.get}]") + + case m: KafkaMessage => + val replayTo = self + try { + kafkaProducer.send(m.msg, new Callback() { + override def onCompletion(meta: RecordMetadata, e: Exception) = { + if (e == null) { + // success + successCount.incrementAndGet() + unstashAll() + stashCount.set(0L) + } else { + // failure + log.error(s"onCompletion: $e", e) + failedCount.incrementAndGet() + replayTo ! m + } + } + }) + } catch { + case e@(_: org.apache.kafka.clients.producer.BufferExhaustedException | _: Throwable) => + log.error(s"$e", e) + log.info(s"stash") + stash() + stashCount.incrementAndGet() + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala new file mode 100644 index 0000000..b2cbd19 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala @@ -0,0 +1,379 @@ +package org.apache.s2graph.core + +import java.util +import java.util.concurrent.ConcurrentHashMap + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.s2graph.core.mysqls.{Label, Model} +import org.apache.s2graph.core.parsers.WhereParser +import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage +import org.apache.s2graph.core.types.{InnerVal, LabelWithDirection} +import org.apache.s2graph.core.utils.logger + +import scala.collection.JavaConversions._ +import scala.collection._ +import scala.collection.mutable.ListBuffer +import scala.concurrent._ +import scala.util.Try + +object Graph { + val DefaultScore = 1.0 + + private val DefaultConfigs: Map[String, AnyRef] = Map( + "hbase.zookeeper.quorum" -> "localhost", + "hbase.table.name" -> "s2graph", + "hbase.table.compression.algorithm" -> "gz", + "phase" -> "dev", + "db.default.driver" -> "com.mysql.jdbc.Driver", + "db.default.url" -> "jdbc:mysql://localhost:3306/graph_dev", + "db.default.password" -> "graph", + "db.default.user" -> "graph", + "cache.max.size" -> java.lang.Integer.valueOf(10000), + "cache.ttl.seconds" -> java.lang.Integer.valueOf(60), + "hbase.client.retries.number" -> java.lang.Integer.valueOf(20), + "hbase.rpcs.buffered_flush_interval" -> java.lang.Short.valueOf(100.toShort), + "hbase.rpc.timeout" -> java.lang.Integer.valueOf(1000), + "max.retry.number" -> java.lang.Integer.valueOf(100), + "lock.expire.time" -> java.lang.Integer.valueOf(1000 * 60 * 10), + "max.back.off" -> java.lang.Integer.valueOf(100), + "hbase.fail.prob" -> java.lang.Double.valueOf(-0.1), + "delete.all.fetch.size" -> java.lang.Integer.valueOf(1000), + "future.cache.max.size" -> java.lang.Integer.valueOf(100000), + "future.cache.expire.after.write" -> java.lang.Integer.valueOf(10000), + "future.cache.expire.after.access" -> java.lang.Integer.valueOf(5000), + "s2graph.storage.backend" -> "hbase" + ) + + var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs) + + /** helpers for filterEdges */ + type HashKey = (Int, Int, Int, Int, Boolean) + type FilterHashKey = (Int, Int) + type Result = (ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]], + ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)], + ListBuffer[(HashKey, FilterHashKey, Edge, Double)]) + + def toHashKey(queryParam: QueryParam, edge: Edge, isDegree: Boolean): (HashKey, FilterHashKey) = { + val src = edge.srcVertex.innerId.hashCode() + val tgt = edge.tgtVertex.innerId.hashCode() + val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree) + val filterHashKey = (src, tgt) + + (hashKey, filterHashKey) + } + + def alreadyVisitedVertices(queryResultLs: Seq[QueryResult]): Map[(LabelWithDirection, Vertex), Boolean] = { + val vertices = for { + queryResult <- queryResultLs + edgeWithScore <- queryResult.edgeWithScoreLs + edge = edgeWithScore.edge + vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex + } yield (edge.labelWithDir, vertex) -> true + + vertices.toMap + } + + /** common methods for filter out, transform, aggregate queryResult */ + def convertEdges(queryParam: QueryParam, edge: Edge, nextStepOpt: Option[Step]): Seq[Edge] = { + for { + convertedEdge <- queryParam.transformer.transform(edge, nextStepOpt) if !edge.isDegree + } yield convertedEdge + } + + def processTimeDecay(queryParam: QueryParam, edge: Edge) = { + /** process time decay */ + val tsVal = queryParam.timeDecay match { + case None => 1.0 + case Some(timeDecay) => + val tsVal = try { + val labelMeta = edge.label.metaPropsMap(timeDecay.labelMetaSeq) + val innerValWithTsOpt = edge.propsWithTs.get(timeDecay.labelMetaSeq) + innerValWithTsOpt.map { innerValWithTs => + val innerVal = innerValWithTs.innerVal + labelMeta.dataType match { + case InnerVal.LONG => innerVal.value match { + case n: BigDecimal => n.bigDecimal.longValue() + case _ => innerVal.toString().toLong + } + case _ => innerVal.toString().toLong + } + } getOrElse(edge.ts) + } catch { + case e: Exception => + logger.error(s"processTimeDecay error. ${edge.toLogString}", e) + edge.ts + } + val timeDiff = queryParam.timestamp - tsVal + timeDecay.decay(timeDiff) + } + + tsVal + } + + def aggregateScore(newScore: Double, + resultEdges: ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)], + duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]], + edgeWithScoreSorted: ListBuffer[(HashKey, FilterHashKey, Edge, Double)], + hashKey: HashKey, + filterHashKey: FilterHashKey, + queryParam: QueryParam, + convertedEdge: Edge) = { + + /** skip duplicate policy check if consistencyLevel is strong */ + if (queryParam.label.consistencyLevel != "strong" && resultEdges.containsKey(hashKey)) { + val (oldFilterHashKey, oldEdge, oldScore) = resultEdges.get(hashKey) + //TODO: + queryParam.duplicatePolicy match { + case Query.DuplicatePolicy.First => // do nothing + case Query.DuplicatePolicy.Raw => + if (duplicateEdges.containsKey(hashKey)) { + duplicateEdges.get(hashKey).append(convertedEdge -> newScore) + } else { + val newBuffer = new ListBuffer[(Edge, Double)] + newBuffer.append(convertedEdge -> newScore) + duplicateEdges.put(hashKey, newBuffer) + } + case Query.DuplicatePolicy.CountSum => + resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + 1)) + case _ => + resultEdges.put(hashKey, (filterHashKey, oldEdge, oldScore + newScore)) + } + } else { + resultEdges.put(hashKey, (filterHashKey, convertedEdge, newScore)) + edgeWithScoreSorted.append((hashKey, filterHashKey, convertedEdge, newScore)) + } + } + + def aggregateResults(queryRequestWithResult: QueryRequestWithResult, + queryParamResult: Result, + edgesToInclude: util.HashSet[FilterHashKey], + edgesToExclude: util.HashSet[FilterHashKey]): QueryRequestWithResult = { + val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + val (query, stepIdx, _, queryParam) = QueryRequest.unapply(queryRequest).get + + val (duplicateEdges, resultEdges, edgeWithScoreSorted) = queryParamResult + val edgesWithScores = for { + (hashKey, filterHashKey, edge, _) <- edgeWithScoreSorted if !edgesToExclude.contains(filterHashKey) || edgesToInclude.contains(filterHashKey) + score = resultEdges.get(hashKey)._3 + (duplicateEdge, aggregatedScore) <- fetchDuplicatedEdges(edge, score, hashKey, duplicateEdges) if aggregatedScore >= queryParam.threshold + } yield EdgeWithScore(duplicateEdge, aggregatedScore) + + QueryRequestWithResult(queryRequest, QueryResult(edgesWithScores)) + } + + def fetchDuplicatedEdges(edge: Edge, + score: Double, + hashKey: HashKey, + duplicateEdges: ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]) = { + (edge -> score) +: (if (duplicateEdges.containsKey(hashKey)) duplicateEdges.get(hashKey) else Seq.empty) + } + + def queryResultWithFilter(queryRequestWithResult: QueryRequestWithResult) = { + val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + val (_, _, _, queryParam) = QueryRequest.unapply(queryRequest).get + val whereFilter = queryParam.where.get + if (whereFilter == WhereParser.success) queryResult.edgeWithScoreLs + else queryResult.edgeWithScoreLs.withFilter(edgeWithScore => whereFilter.filter(edgeWithScore.edge)) + } + + def filterEdges(queryResultLsFuture: Future[Seq[QueryRequestWithResult]], + alreadyVisited: Map[(LabelWithDirection, Vertex), Boolean] = Map.empty[(LabelWithDirection, Vertex), Boolean]) + (implicit ec: scala.concurrent.ExecutionContext): Future[Seq[QueryRequestWithResult]] = { + + queryResultLsFuture.map { queryRequestWithResultLs => + if (queryRequestWithResultLs.isEmpty) Nil + else { + val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResultLs.head).get + val (q, stepIdx, srcVertex, queryParam) = QueryRequest.unapply(queryRequest).get + val step = q.steps(stepIdx) + + val nextStepOpt = if (stepIdx < q.steps.size - 1) Option(q.steps(stepIdx + 1)) else None + + val excludeLabelWithDirSet = new util.HashSet[(Int, Int)] + val includeLabelWithDirSet = new util.HashSet[(Int, Int)] + step.queryParams.filter(_.exclude).foreach(l => excludeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir)) + step.queryParams.filter(_.include).foreach(l => includeLabelWithDirSet.add(l.labelWithDir.labelId -> l.labelWithDir.dir)) + + val edgesToExclude = new util.HashSet[FilterHashKey]() + val edgesToInclude = new util.HashSet[FilterHashKey]() + + val queryParamResultLs = new ListBuffer[Result] + queryRequestWithResultLs.foreach { queryRequestWithResult => + val (queryRequest, queryResult) = QueryRequestWithResult.unapply(queryRequestWithResult).get + val queryParam = queryRequest.queryParam + val duplicateEdges = new util.concurrent.ConcurrentHashMap[HashKey, ListBuffer[(Edge, Double)]]() + val resultEdges = new util.concurrent.ConcurrentHashMap[HashKey, (FilterHashKey, Edge, Double)]() + val edgeWithScoreSorted = new ListBuffer[(HashKey, FilterHashKey, Edge, Double)] + val labelWeight = step.labelWeights.getOrElse(queryParam.labelWithDir.labelId, 1.0) + + // store degree value with Array.empty so if degree edge exist, it comes at very first. + def checkDegree() = queryResult.edgeWithScoreLs.headOption.exists { edgeWithScore => + edgeWithScore.edge.isDegree + } + var isDegree = checkDegree() + + val includeExcludeKey = queryParam.labelWithDir.labelId -> queryParam.labelWithDir.dir + val shouldBeExcluded = excludeLabelWithDirSet.contains(includeExcludeKey) + val shouldBeIncluded = includeLabelWithDirSet.contains(includeExcludeKey) + + queryResultWithFilter(queryRequestWithResult).foreach { edgeWithScore => + val (edge, score) = EdgeWithScore.unapply(edgeWithScore).get + if (queryParam.transformer.isDefault) { + val convertedEdge = edge + + val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree) + + /** check if this edge should be exlcuded. */ + if (shouldBeExcluded && !isDegree) { + edgesToExclude.add(filterHashKey) + } else { + if (shouldBeIncluded && !isDegree) { + edgesToInclude.add(filterHashKey) + } + val tsVal = processTimeDecay(queryParam, convertedEdge) + val newScore = labelWeight * score * tsVal + aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge) + } + } else { + convertEdges(queryParam, edge, nextStepOpt).foreach { convertedEdge => + val (hashKey, filterHashKey) = toHashKey(queryParam, convertedEdge, isDegree) + + /** check if this edge should be exlcuded. */ + if (shouldBeExcluded && !isDegree) { + edgesToExclude.add(filterHashKey) + } else { + if (shouldBeIncluded && !isDegree) { + edgesToInclude.add(filterHashKey) + } + val tsVal = processTimeDecay(queryParam, convertedEdge) + val newScore = labelWeight * score * tsVal + aggregateScore(newScore, resultEdges, duplicateEdges, edgeWithScoreSorted, hashKey, filterHashKey, queryParam, convertedEdge) + } + } + } + isDegree = false + } + val ret = (duplicateEdges, resultEdges, edgeWithScoreSorted) + queryParamResultLs.append(ret) + } + + val aggregatedResults = for { + (queryRequestWithResult, queryParamResult) <- queryRequestWithResultLs.zip(queryParamResultLs) + } yield { + aggregateResults(queryRequestWithResult, queryParamResult, edgesToInclude, edgesToExclude) + } + + aggregatedResults + } + } + } + + def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try { + val parts = GraphUtil.split(s) + val logType = parts(2) + val element = if (logType == "edge" | logType == "e") { + /** current only edge is considered to be bulk loaded */ + labelMapping.get(parts(5)) match { + case None => + case Some(toReplace) => + parts(5) = toReplace + } + toEdge(parts) + } else if (logType == "vertex" | logType == "v") { + toVertex(parts) + } else { + throw new GraphExceptions.JsonParseException("log type is not exist in log.") + } + + element + } recover { + case e: Exception => + logger.error(s"$e", e) + None + } get + + + def toVertex(s: String): Option[Vertex] = { + toVertex(GraphUtil.split(s)) + } + + def toEdge(s: String): Option[Edge] = { + toEdge(GraphUtil.split(s)) + } + + //"1418342849000\tu\te\t3286249\t71770\ttalk_friend\t{\"is_hidden\":false}" + //{"from":1,"to":101,"label":"graph_test","props":{"time":-1, "weight":10},"timestamp":1417616431}, + def toEdge(parts: Array[String]): Option[Edge] = Try { + val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) + val props = if (parts.length >= 7) parts(6) else "{}" + val tempDirection = if (parts.length >= 8) parts(7) else "out" + val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection + + val edge = Management.toEdge(ts.toLong, operation, srcId, tgtId, label, direction, props) + // logger.debug(s"toEdge: $edge") + Some(edge) + } recover { + case e: Exception => + logger.error(s"toEdge: $e", e) + throw e + } get + + //"1418342850000\ti\tv\t168756793\ttalk_user_id\t{\"country_iso\":\"KR\"}" + def toVertex(parts: Array[String]): Option[Vertex] = Try { + val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) + val props = if (parts.length >= 7) parts(6) else "{}" + Some(Management.toVertex(ts.toLong, operation, srcId, serviceName, colName, props)) + } recover { + case e: Throwable => + logger.error(s"toVertex: $e", e) + throw e + } get + + def initStorage(config: Config)(ec: ExecutionContext) = { + config.getString("s2graph.storage.backend") match { + case "hbase" => new AsynchbaseStorage(config)(ec) + case _ => throw new RuntimeException("not supported storage.") + } + } +} + +class Graph(_config: Config)(implicit val ec: ExecutionContext) { + val config = _config.withFallback(Graph.DefaultConfig) + + Model.apply(config) + Model.loadCache() + + // TODO: Make storage client by config param + val storage = Graph.initStorage(config)(ec) + + + for { + entry <- config.entrySet() if Graph.DefaultConfigs.contains(entry.getKey) + (k, v) = (entry.getKey, entry.getValue) + } logger.info(s"[Initialized]: $k, ${this.config.getAnyRef(k)}") + + /** select */ + def checkEdges(params: Seq[(Vertex, Vertex, QueryParam)]): Future[Seq[QueryRequestWithResult]] = storage.checkEdges(params) + + def getEdges(q: Query): Future[Seq[QueryRequestWithResult]] = storage.getEdges(q) + + def getVertices(vertices: Seq[Vertex]): Future[Seq[Vertex]] = storage.getVertices(vertices) + + /** write */ + def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] = + storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts) + + def mutateElements(elements: Seq[GraphElement], withWait: Boolean = false): Future[Seq[Boolean]] = + storage.mutateElements(elements, withWait) + + def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait) + + def mutateVertices(vertices: Seq[Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateVertices(vertices, withWait) + + def incrementCounts(edges: Seq[Edge], withWait: Boolean): Future[Seq[(Boolean, Long)]] = storage.incrementCounts(edges, withWait) + + def shutdown(): Unit = { + storage.flush() + Model.shutdown() + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/GraphElement.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphElement.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphElement.scala new file mode 100644 index 0000000..12bb941 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElement.scala @@ -0,0 +1,10 @@ +package org.apache.s2graph.core + +trait GraphElement { + def serviceName: String + def ts: Long + def isAsync: Boolean + def queueKey: String + def queuePartitionKey: String + def toLogString(): String +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala new file mode 100644 index 0000000..e7d2d76 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphExceptions.scala @@ -0,0 +1,28 @@ +package org.apache.s2graph.core + +object GraphExceptions { + + case class JsonParseException(msg: String) extends Exception(msg) + + case class LabelNotExistException(msg: String) extends Exception(msg) + + case class ModelNotFoundException(msg: String) extends Exception(msg) + + case class MaxPropSizeReachedException(msg: String) extends Exception(msg) + + case class LabelAlreadyExistException(msg: String) extends Exception(msg) + + case class InternalException(msg: String) extends Exception(msg) + + case class IllegalDataTypeException(msg: String) extends Exception(msg) + + case class WhereParserException(msg: String, ex: Exception = null) extends Exception(msg, ex) + + case class BadQueryException(msg: String, ex: Throwable = null) extends Exception(msg, ex) + + case class InvalidHTableException(msg: String) extends Exception(msg) + + case class FetchTimeoutException(msg: String) extends Exception(msg) + + case class DropRequestException(msg: String) extends Exception(msg) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala new file mode 100644 index 0000000..1a2b916 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphUtil.scala @@ -0,0 +1,139 @@ +package org.apache.s2graph.core + +import java.util.regex.Pattern + +import play.api.libs.json.Json + +import scala.util.hashing.MurmurHash3 + +object GraphUtil { + private val TOKEN_DELIMITER = Pattern.compile("[\t]") + val operations = Map("i" -> 0, "insert" -> 0, "u" -> 1, "update" -> 1, + "increment" -> 2, "d" -> 3, "delete" -> 3, + "deleteAll" -> 4, "insertBulk" -> 5, "incrementCount" -> 6).map { + case (k, v) => + k -> v.toByte + } + val BitsForMurMurHash = 16 + val bytesForMurMurHash = 2 + val defaultOpByte = operations("insert") + val directions = Map("out" -> 0, "in" -> 1, "undirected" -> 2, "u" -> 2, "directed" -> 0, "d" -> 0) + val consistencyLevel = Map("weak" -> 0, "strong" -> 1) + + def toType(t: String) = { + t.trim().toLowerCase match { + case "e" | "edge" => "edge" + case "v" | "vertex" => "vertex" + } + } + + def toDir(direction: String): Option[Byte] = { + val d = direction.trim().toLowerCase match { + case "directed" | "d" => Some(0) + case "undirected" | "u" => Some(2) + case "out" => Some(0) + case "in" => Some(1) + case _ => None + } + d.map(x => x.toByte) + } + + def toDirection(direction: String): Int = { + direction.trim().toLowerCase match { + case "directed" | "d" => 0 + case "undirected" | "u" => 2 + case "out" => 0 + case "in" => 1 + case _ => 2 + } + } + + def fromDirection(direction: Int) = { + direction match { + case 0 => "out" + case 1 => "in" + case 2 => "undirected" + } + } + + def toggleDir(dir: Int) = { + dir match { + case 0 => 1 + case 1 => 0 + case 2 => 2 + case _ => throw new UnsupportedOperationException(s"toggleDirection: $dir") + } + } + + def toOp(op: String): Option[Byte] = { + op.trim() match { + case "i" | "insert" => Some(0) + case "d" | "delete" => Some(3) + case "u" | "update" => Some(1) + case "increment" => Some(2) + case "deleteAll" => Some(4) + case "insertBulk" => Some(5) + case "incrementCount" => Option(6) + case _ => None + } + } + + def fromOp(op: Byte): String = { + op match { + case 0 => "insert" + case 3 => "delete" + case 1 => "update" + case 2 => "increment" + case 4 => "deleteAll" + case 5 => "insertBulk" + case 6 => "incrementCount" + case _ => + throw new UnsupportedOperationException(s"op : $op (only support 0(insert),1(delete),2(updaet),3(increment))") + } + } + + // def toggleOp(op: Byte): Byte = { + // val ret = op match { + // case 0 => 1 + // case 1 => 0 + // case x: Byte => x + // } + // ret.toByte + // } + // 2^31 - 1 + + def transformHash(h: Int): Int = { + // h / 2 + (Int.MaxValue / 2 - 1) + if (h < 0) -1 * (h + 1) else h + } + def murmur3Int(s: String): Int = { + val hash = MurmurHash3.stringHash(s) + transformHash(hash) + } + def murmur3(s: String): Short = { + val hash = MurmurHash3.stringHash(s) + val positiveHash = transformHash(hash) >> BitsForMurMurHash + positiveHash.toShort +// Random.nextInt(Short.MaxValue).toShort + } + + def smartSplit(s: String, delemiter: String) = { + val trimed_string = s.trim() + if (trimed_string.equals("")) { + Seq[String]() + } else { + trimed_string.split(delemiter).toList + } + } + + def split(line: String) = TOKEN_DELIMITER.split(line) + + def parseString(s: String): List[String] = { + if (!s.startsWith("[")) { + s.split("\n").toList + } else { + Json.parse(s).asOpt[List[String]].getOrElse(List.empty[String]) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala new file mode 100644 index 0000000..8b9a228 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/JSONParser.scala @@ -0,0 +1,134 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.types.{InnerVal, InnerValLike} +import org.apache.s2graph.core.utils.logger +import play.api.libs.json._ + + +trait JSONParser { + + //TODO: check result notation on bigDecimal. + def innerValToJsValue(innerVal: InnerValLike, dataType: String): Option[JsValue] = { + try { + val dType = InnerVal.toInnerDataType(dataType) + val jsValue = dType match { + case InnerVal.STRING => JsString(innerVal.value.asInstanceOf[String]) + case InnerVal.BOOLEAN => JsBoolean(innerVal.value.asInstanceOf[Boolean]) + case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE => + // case t if InnerVal.NUMERICS.contains(t) => + innerVal.value match { + case l: Long => JsNumber(l) + case i: Int => JsNumber(i) + case s: Short => JsNumber(s.toLong) + case b: Byte => JsNumber(b.toLong) + case f: Float => JsNumber(f.toDouble) + case d: Double => + // JsNumber(d) + dType match { + case InnerVal.BYTE => JsNumber(d.toInt) + case InnerVal.SHORT => JsNumber(d.toInt) + case InnerVal.INT => JsNumber(d.toInt) + case InnerVal.LONG => JsNumber(d.toLong) + case InnerVal.FLOAT => JsNumber(d.toDouble) + case InnerVal.DOUBLE => JsNumber(d.toDouble) + case _ => throw new RuntimeException(s"$innerVal, $dType => $dataType") + } + case num: BigDecimal => + // JsNumber(num) + // JsNumber(InnerVal.scaleNumber(num.asInstanceOf[BigDecimal], dType)) + dType match { + case InnerVal.BYTE => JsNumber(num.toInt) + case InnerVal.SHORT => JsNumber(num.toInt) + case InnerVal.INT => JsNumber(num.toInt) + case InnerVal.LONG => JsNumber(num.toLong) + case InnerVal.FLOAT => JsNumber(num.toDouble) + case InnerVal.DOUBLE => JsNumber(num.toDouble) + case _ => throw new RuntimeException(s"$innerVal, $dType => $dataType") + } + // JsNumber(num.toLong) + case _ => throw new RuntimeException(s"$innerVal, Numeric Unknown => $dataType") + } + // JsNumber(InnerVal.scaleNumber(innerVal.asInstanceOf[BigDecimal], dType)) + case _ => throw new RuntimeException(s"$innerVal, Unknown => $dataType") + } + Some(jsValue) + } catch { + case e: Exception => + logger.info(s"JSONParser.innerValToJsValue: $e") + None + } + } + + // def innerValToString(innerVal: InnerValLike, dataType: String): String = { + // val dType = InnerVal.toInnerDataType(dataType) + // InnerVal.toInnerDataType(dType) match { + // case InnerVal.STRING => innerVal.toString + // case InnerVal.BOOLEAN => innerVal.toString + // // case t if InnerVal.NUMERICS.contains(t) => + // case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE => + // BigDecimal(innerVal.toString).bigDecimal.toPlainString + // case _ => innerVal.toString + // // throw new RuntimeException("innerVal to jsValue failed.") + // } + // } + + def toInnerVal(str: String, dataType: String, version: String): InnerValLike = { + //TODO: + // logger.error(s"toInnerVal: $str, $dataType, $version") + val s = + if (str.startsWith("\"") && str.endsWith("\"")) str.substring(1, str.length - 1) + else str + val dType = InnerVal.toInnerDataType(dataType) + + dType match { + case InnerVal.STRING => InnerVal.withStr(s, version) + // case t if InnerVal.NUMERICS.contains(t) => InnerVal.withNumber(BigDecimal(s), version) + case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE => + InnerVal.withNumber(BigDecimal(s), version) + case InnerVal.BOOLEAN => InnerVal.withBoolean(s.toBoolean, version) + case InnerVal.BLOB => InnerVal.withBlob(s.getBytes, version) + case _ => + // InnerVal.withStr("") + throw new RuntimeException(s"illegal datatype for string: dataType is $dataType for $s") + } + } + + def jsValueToInnerVal(jsValue: JsValue, dataType: String, version: String): Option[InnerValLike] = { + val ret = try { + val dType = InnerVal.toInnerDataType(dataType.toLowerCase()) + jsValue match { + case n: JsNumber => + dType match { + case InnerVal.STRING => Some(InnerVal.withStr(jsValue.toString, version)) + // case t if InnerVal.NUMERICS.contains(t) => + case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE => + Some(InnerVal.withNumber(n.value, version)) + case _ => None + } + case s: JsString => + dType match { + case InnerVal.STRING => Some(InnerVal.withStr(s.value, version)) + case InnerVal.BOOLEAN => Some(InnerVal.withBoolean(s.as[String].toBoolean, version)) + // case t if InnerVal.NUMERICS.contains(t) => + case InnerVal.BYTE | InnerVal.SHORT | InnerVal.INT | InnerVal.LONG | InnerVal.FLOAT | InnerVal.DOUBLE => + Some(InnerVal.withNumber(BigDecimal(s.value), version)) + case _ => None + } + case b: JsBoolean => + dType match { + case InnerVal.STRING => Some(InnerVal.withStr(b.toString, version)) + case InnerVal.BOOLEAN => Some(InnerVal.withBoolean(b.value, version)) + case _ => None + } + case _ => + None + } + } catch { + case e: Exception => + logger.error(e.getMessage) + None + } + + ret + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala new file mode 100644 index 0000000..b355757 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -0,0 +1,368 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNotExistException} +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.mysqls._ +import org.apache.s2graph.core.types.HBaseType._ +import org.apache.s2graph.core.types._ +import play.api.libs.json.Reads._ +import play.api.libs.json._ + +import scala.util.Try + +/** + * This is designed to be bridge between rest to s2core. + * s2core never use this for finding models. + */ +object Management extends JSONParser { + + object JsonModel { + + case class Prop(name: String, defaultValue: String, datatType: String) + + object Prop extends ((String, String, String) => Prop) + + case class Index(name: String, propNames: Seq[String]) + + } + + import HBaseType._ + + val DefaultCompressionAlgorithm = "gz" + + + def findService(serviceName: String) = { + Service.findByName(serviceName, useCache = false) + } + + def deleteService(serviceName: String) = { + Service.findByName(serviceName).foreach { service => + // service.deleteAll() + } + } + + def updateHTable(labelName: String, newHTableName: String): Try[Int] = Try { + val targetLabel = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(s"Target label $labelName does not exist.")) + if (targetLabel.hTableName == newHTableName) throw new InvalidHTableException(s"New HTable name is already in use for target label.") + + Label.updateHTableName(targetLabel.label, newHTableName) + } + + + + def createServiceColumn(serviceName: String, + columnName: String, + columnType: String, + props: Seq[Prop], + schemaVersion: String = DEFAULT_VERSION) = { + + Model withTx { implicit session => + val serviceOpt = Service.findByName(serviceName) + serviceOpt match { + case None => throw new RuntimeException(s"create service $serviceName has not been created.") + case Some(service) => + val serviceColumn = ServiceColumn.findOrInsert(service.id.get, columnName, Some(columnType), schemaVersion) + for { + Prop(propName, defaultValue, dataType) <- props + } yield { + ColumnMeta.findOrInsert(serviceColumn.id.get, propName, dataType) + } + } + } + } + + def deleteColumn(serviceName: String, columnName: String, schemaVersion: String = DEFAULT_VERSION) = { + Model withTx { implicit session => + val service = Service.findByName(serviceName, useCache = false).getOrElse(throw new RuntimeException("Service not Found")) + val serviceColumns = ServiceColumn.find(service.id.get, columnName, useCache = false) + val columnNames = serviceColumns.map { serviceColumn => + ServiceColumn.delete(serviceColumn.id.get) + serviceColumn.columnName + } + + columnNames.getOrElse(throw new RuntimeException("column not found")) + } + } + + def findLabel(labelName: String): Option[Label] = { + Label.findByName(labelName, useCache = false) + } + + def deleteLabel(labelName: String) = { + Model withTx { implicit session => + Label.findByName(labelName, useCache = false).foreach { label => + Label.deleteAll(label) + } + labelName + } + } + + def addIndex(labelStr: String, indices: Seq[Index]): Try[Label] = { + Model withTx { implicit session => + val label = Label.findByName(labelStr).getOrElse(throw LabelNotExistException(s"$labelStr not found")) + val labelMetaMap = label.metaPropsInvMap + + indices.foreach { index => + val metaSeq = index.propNames.map { name => labelMetaMap(name).seq } + LabelIndex.findOrInsert(label.id.get, index.name, metaSeq.toList, "none") + } + + label + } + } + + def addProp(labelStr: String, prop: Prop) = { + Model withTx { implicit session => + val labelOpt = Label.findByName(labelStr) + val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found")) + + LabelMeta.findOrInsert(label.id.get, prop.name, prop.defaultValue, prop.datatType) + } + } + + def addProps(labelStr: String, props: Seq[Prop]) = { + Model withTx { implicit session => + val labelOpt = Label.findByName(labelStr) + val label = labelOpt.getOrElse(throw LabelNotExistException(s"$labelStr not found")) + + props.map { + case Prop(propName, defaultValue, dataType) => + LabelMeta.findOrInsert(label.id.get, propName, defaultValue, dataType) + } + } + } + + def addVertexProp(serviceName: String, + columnName: String, + propsName: String, + propsType: String, + schemaVersion: String = DEFAULT_VERSION): ColumnMeta = { + val result = for { + service <- Service.findByName(serviceName, useCache = false) + serviceColumn <- ServiceColumn.find(service.id.get, columnName) + } yield { + ColumnMeta.findOrInsert(serviceColumn.id.get, propsName, propsType) + } + result.getOrElse({ + throw new RuntimeException(s"add property on vertex failed") + }) + } + + def getServiceLable(label: String): Option[Label] = { + Label.findByName(label, useCache = true) + } + + /** + * + */ + + def toLabelWithDirectionAndOp(label: Label, direction: String): Option[LabelWithDirection] = { + for { + labelId <- label.id + dir = GraphUtil.toDirection(direction) + } yield LabelWithDirection(labelId, dir) + } + + def tryOption[A, R](key: A, f: A => Option[R]) = { + f(key) match { + case None => throw new GraphExceptions.InternalException(s"$key is not found in DB. create $key first.") + case Some(r) => r + } + } + + def toEdge(ts: Long, operation: String, srcId: String, tgtId: String, + labelStr: String, direction: String = "", props: String): Edge = { + + val label = tryOption(labelStr, getServiceLable) + val dir = + if (direction == "") +// GraphUtil.toDirection(label.direction) + GraphUtil.directions("out") + else + GraphUtil.toDirection(direction) + + // logger.debug(s"$srcId, ${label.srcColumnWithDir(dir)}") + // logger.debug(s"$tgtId, ${label.tgtColumnWithDir(dir)}") + + val srcVertexId = toInnerVal(srcId, label.srcColumn.columnType, label.schemaVersion) + val tgtVertexId = toInnerVal(tgtId, label.tgtColumn.columnType, label.schemaVersion) + + val srcColId = label.srcColumn.id.get + val tgtColId = label.tgtColumn.id.get + val (srcVertex, tgtVertex) = if (dir == GraphUtil.directions("out")) { + (Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis()), + Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis())) + } else { + (Vertex(SourceVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()), + Vertex(TargetVertexId(srcColId, srcVertexId), System.currentTimeMillis())) + } + + // val dir = if (direction == "") GraphUtil.toDirection(label.direction) else GraphUtil.toDirection(direction) + val labelWithDir = LabelWithDirection(label.id.get, dir) + val op = tryOption(operation, GraphUtil.toOp) + + val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj()) + val parsedProps = toProps(label, jsObject.fields).toMap + val propsWithTs = parsedProps.map(kv => (kv._1 -> InnerValLikeWithTs(kv._2, ts))) ++ + Map(LabelMeta.timeStampSeq -> InnerValLikeWithTs(InnerVal.withLong(ts, label.schemaVersion), ts)) + + Edge(srcVertex, tgtVertex, labelWithDir, op, version = ts, propsWithTs = propsWithTs) + + } + + def toVertex(ts: Long, operation: String, id: String, serviceName: String, columnName: String, props: String): Vertex = { + Service.findByName(serviceName) match { + case None => throw new RuntimeException(s"$serviceName does not exist. create service first.") + case Some(service) => + ServiceColumn.find(service.id.get, columnName) match { + case None => throw new RuntimeException(s"$columnName is not exist. create service column first.") + case Some(col) => + val idVal = toInnerVal(id, col.columnType, col.schemaVersion) + val op = tryOption(operation, GraphUtil.toOp) + val jsObject = Json.parse(props).asOpt[JsObject].getOrElse(Json.obj()) + val parsedProps = toProps(col, jsObject).toMap + Vertex(VertexId(col.id.get, idVal), ts, parsedProps, op = op) + } + } + } + + def toProps(column: ServiceColumn, js: JsObject): Seq[(Int, InnerValLike)] = { + + val props = for { + (k, v) <- js.fields + meta <- column.metasInvMap.get(k) + } yield { + val innerVal = jsValueToInnerVal(v, meta.dataType, column.schemaVersion).getOrElse( + throw new RuntimeException(s"$k is not defined. create schema for vertex.")) + + (meta.seq.toInt, innerVal) + } + props + + } + + def toProps(label: Label, js: Seq[(String, JsValue)]): Seq[(Byte, InnerValLike)] = { + val props = for { + (k, v) <- js + meta <- label.metaPropsInvMap.get(k) + innerVal <- jsValueToInnerVal(v, meta.dataType, label.schemaVersion) + } yield (meta.seq, innerVal) + + props + } + + + /** + * update label name. + */ + def updateLabelName(oldLabelName: String, newLabelName: String) = { + for { + old <- Label.findByName(oldLabelName) + } { + Label.findByName(newLabelName) match { + case None => + Label.updateName(oldLabelName, newLabelName) + case Some(_) => + // throw new RuntimeException(s"$newLabelName already exist") + } + } + } +} + +class Management(graph: Graph) { + import Management._ + val storage = graph.storage + + def createTable(zkAddr: String, + tableName: String, + cfs: List[String], + regionMultiplier: Int, + ttl: Option[Int], + compressionAlgorithm: String = DefaultCompressionAlgorithm): Unit = + storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm) + + /** HBase specific code */ + def createService(serviceName: String, + cluster: String, hTableName: String, + preSplitSize: Int, hTableTTL: Option[Int], + compressionAlgorithm: String = DefaultCompressionAlgorithm): Try[Service] = { + + Model withTx { implicit session => + val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL, compressionAlgorithm) + /** create hbase table for service */ + storage.createTable(cluster, hTableName, List("e", "v"), preSplitSize, hTableTTL, compressionAlgorithm) + service + } + } + + /** HBase specific code */ + def createLabel(label: String, + srcServiceName: String, + srcColumnName: String, + srcColumnType: String, + tgtServiceName: String, + tgtColumnName: String, + tgtColumnType: String, + isDirected: Boolean = true, + serviceName: String, + indices: Seq[Index], + props: Seq[Prop], + consistencyLevel: String, + hTableName: Option[String], + hTableTTL: Option[Int], + schemaVersion: String = DEFAULT_VERSION, + isAsync: Boolean, + compressionAlgorithm: String = "gz"): Try[Label] = { + + val labelOpt = Label.findByName(label, useCache = false) + + Model withTx { implicit session => + labelOpt match { + case Some(l) => + throw new GraphExceptions.LabelAlreadyExistException(s"Label name ${l.label} already exist.") + case None => + /** create all models */ + val newLabel = Label.insertAll(label, + srcServiceName, srcColumnName, srcColumnType, + tgtServiceName, tgtColumnName, tgtColumnType, + isDirected, serviceName, indices, props, consistencyLevel, + hTableName, hTableTTL, schemaVersion, isAsync, compressionAlgorithm) + + /** create hbase table */ + val service = newLabel.service + (hTableName, hTableTTL) match { + case (None, None) => // do nothing + case (None, Some(hbaseTableTTL)) => throw new RuntimeException("if want to specify ttl, give hbaseTableName also") + case (Some(hbaseTableName), None) => + // create own hbase table with default ttl on service level. + storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm) + case (Some(hbaseTableName), Some(hbaseTableTTL)) => + // create own hbase table with own ttl. + storage.createTable(service.cluster, hbaseTableName, List("e", "v"), service.preSplitSize, hTableTTL, compressionAlgorithm) + } + newLabel + } + } + } + + /** + * label + */ + /** + * copy label when if oldLabel exist and newLabel do not exist. + * copy label: only used by bulk load job. not sure if we need to parameterize hbase cluster. + */ + def copyLabel(oldLabelName: String, newLabelName: String, hTableName: Option[String]) = { + val old = Label.findByName(oldLabelName).getOrElse(throw new LabelAlreadyExistException(s"Old label $oldLabelName not exists.")) + if (Label.findByName(newLabelName).isDefined) throw new LabelAlreadyExistException(s"New label $newLabelName already exists.") + + val allProps = old.metas.map { labelMeta => Prop(labelMeta.name, labelMeta.defaultValue, labelMeta.dataType) } + val allIndices = old.indices.map { index => Index(index.name, index.propNames) } + + createLabel(newLabelName, old.srcService.serviceName, old.srcColumnName, old.srcColumnType, + old.tgtService.serviceName, old.tgtColumnName, old.tgtColumnType, + old.isDirected, old.serviceName, + allIndices, allProps, + old.consistencyLevel, hTableName, old.hTableTTL, old.schemaVersion, old.isAsync, old.compressionAlgorithm) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala new file mode 100644 index 0000000..c31aa79 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/OrderingUtil.scala @@ -0,0 +1,146 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.types.InnerValLike +import play.api.libs.json.{JsNumber, JsString, JsValue} + +object OrderingUtil { + + implicit object JsValueOrdering extends Ordering[JsValue] { + override def compare(x: JsValue, y: JsValue): Int = { + (x, y) match { + case (JsNumber(xv), JsNumber(yv)) => + Ordering.BigDecimal.compare(xv, yv) + case (JsString(xv), JsString(yv)) => + Ordering.String.compare(xv, yv) + case _ => throw new Exception(s"unsupported type") + } + } + } + + implicit object InnerValLikeOrdering extends Ordering[InnerValLike] { + override def compare(x: InnerValLike, y: InnerValLike): Int = { + x.compare(y) + } + } + + implicit object MultiValueOrdering extends Ordering[Any] { + override def compare(x: Any, y: Any): Int = { + (x, y) match { + case (xv: Int, yv: Int) => implicitly[Ordering[Int]].compare(xv, yv) + case (xv: Long, yv: Long) => implicitly[Ordering[Long]].compare(xv, yv) + case (xv: Double, yv: Double) => implicitly[Ordering[Double]].compare(xv, yv) + case (xv: String, yv: String) => implicitly[Ordering[String]].compare(xv, yv) + case (xv: JsValue, yv: JsValue) => implicitly[Ordering[JsValue]].compare(xv, yv) + case (xv: InnerValLike, yv: InnerValLike) => implicitly[Ordering[InnerValLike]].compare(xv, yv) + } + } + } + + def TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: Ordering[T]): Ordering[(T, T, T, T)] = { + new Ordering[(T, T, T, T)] { + override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = { + val len = ascendingLs.length + val it = ascendingLs.iterator + if (len >= 1) { + val (x, y) = it.next() match { + case true => tx -> ty + case false => ty -> tx + } + val compare1 = ord.compare(x._1, y._1) + if (compare1 != 0) return compare1 + } + + if (len >= 2) { + val (x, y) = it.next() match { + case true => tx -> ty + case false => ty -> tx + } + val compare2 = ord.compare(x._2, y._2) + if (compare2 != 0) return compare2 + } + + if (len >= 3) { + val (x, y) = it.next() match { + case true => tx -> ty + case false => ty -> tx + } + val compare3 = ord.compare(x._3, y._3) + if (compare3 != 0) return compare3 + } + + if (len >= 4) { + val (x, y) = it.next() match { + case true => tx -> ty + case false => ty -> tx + } + val compare4 = ord.compare(x._4, y._4) + if (compare4 != 0) return compare4 + } + 0 + } + } + } +} + +class SeqMultiOrdering[T](ascendingLs: Seq[Boolean], defaultAscending: Boolean = true)(implicit ord: Ordering[T]) extends Ordering[Seq[T]] { + override def compare(x: Seq[T], y: Seq[T]): Int = { + val xe = x.iterator + val ye = y.iterator + val oe = ascendingLs.iterator + + while (xe.hasNext && ye.hasNext) { + val ascending = if (oe.hasNext) oe.next() else defaultAscending + val (xev, yev) = ascending match { + case true => xe.next() -> ye.next() + case false => ye.next() -> xe.next() + } + val res = ord.compare(xev, yev) + if (res != 0) return res + } + + Ordering.Boolean.compare(xe.hasNext, ye.hasNext) + } +} + +//class TupleMultiOrdering[T](ascendingLs: Seq[Boolean])(implicit ord: Ordering[T]) extends Ordering[(T, T, T, T)] { +// override def compare(tx: (T, T, T, T), ty: (T, T, T, T)): Int = { +// val len = ascendingLs.length +// val it = ascendingLs.iterator +// if (len >= 1) { +// val (x, y) = it.next() match { +// case true => tx -> ty +// case false => ty -> tx +// } +// val compare1 = ord.compare(x._1, y._1) +// if (compare1 != 0) return compare1 +// } +// +// if (len >= 2) { +// val (x, y) = it.next() match { +// case true => tx -> ty +// case false => ty -> tx +// } +// val compare2 = ord.compare(x._2, y._2) +// if (compare2 != 0) return compare2 +// } +// +// if (len >= 3) { +// val (x, y) = it.next() match { +// case true => tx -> ty +// case false => ty -> tx +// } +// val compare3 = ord.compare(x._3, y._3) +// if (compare3 != 0) return compare3 +// } +// +// if (len >= 4) { +// val (x, y) = it.next() match { +// case true => tx -> ty +// case false => ty -> tx +// } +// val compare4 = ord.compare(x._4, y._4) +// if (compare4 != 0) return compare4 +// } +// 0 +// } +//}
