[S2GRAPH-130]: Edge.propsWithTs data type should be changed into mutable to support setter interface exist in tp3. - Make Vertex/Edge/Graph to implement Tinkerpop3. - Change data type of Edge's propsWithTs to java.util.Map[String, S2Property[_]].
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/6356573e Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/6356573e Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/6356573e Branch: refs/heads/master Commit: 6356573e6a658dbfeb240bdee642d055991e5ac2 Parents: 292174e Author: DO YUNG YOON <steams...@apache.org> Authored: Thu Nov 24 12:14:08 2016 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Thu Nov 24 12:14:08 2016 +0900 ---------------------------------------------------------------------- .../loader/subscriber/GraphSubscriber.scala | 2 +- .../loader/subscriber/TransferToHFile.scala | 4 +- .../s2graph/loader/subscriber/WalLogStat.scala | 2 +- .../loader/subscriber/WalLogToHDFS.scala | 2 +- .../scala/org/apache/s2graph/core/Edge.scala | 442 +++++++++++++----- .../scala/org/apache/s2graph/core/Graph.scala | 246 +++++++--- .../org/apache/s2graph/core/QueryParam.scala | 4 +- .../org/apache/s2graph/core/QueryResult.scala | 2 +- .../org/apache/s2graph/core/S2Property.scala | 33 +- .../scala/org/apache/s2graph/core/Vertex.scala | 24 +- .../s2graph/core/rest/RequestParser.scala | 8 +- .../apache/s2graph/core/storage/Storage.scala | 58 ++- .../tall/IndexEdgeDeserializable.scala | 80 ++-- .../wide/IndexEdgeDeserializable.scala | 78 ++-- .../tall/SnapshotEdgeDeserializable.scala | 8 +- .../wide/SnapshotEdgeDeserializable.scala | 8 +- .../org/apache/s2graph/core/EdgeTest.scala | 457 +++---------------- .../core/Integrate/IntegrateCommon.scala | 2 + .../core/Integrate/WeakLabelDeleteTest.scala | 5 +- .../s2graph/core/parsers/WhereParserTest.scala | 31 +- .../core/storage/hbase/IndexEdgeTest.scala | 206 ++++----- .../loader/core/CounterEtlFunctions.scala | 5 +- 22 files changed, 850 insertions(+), 857 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala index 05aed34..b25bc84 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala @@ -106,7 +106,7 @@ object GraphSubscriberHelper extends WithKafka { (statFunc: (String, Int) => Unit): Iterable[GraphElement] = { (for (msg <- msgs) yield { statFunc("total", 1) - Graph.toGraphElement(msg, labelMapping) match { + g.toGraphElement(msg, labelMapping) match { case Some(e) if e.isInstanceOf[Edge] => statFunc("EdgeParseOk", 1) e.asInstanceOf[Edge] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala index 9ebff03..3345d56 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala @@ -101,7 +101,7 @@ object TransferToHFile extends SparkApp { val ts = System.currentTimeMillis() val propsWithTs = Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(ts, ts, label.schemaVersion)) - val edge = Edge(vertex, vertex, label, dir, propsWithTs=propsWithTs) + val edge = GraphSubscriberHelper.g.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs) edge.edgesWithIndex.flatMap { indexEdge => GraphSubscriberHelper.g.getStorage(indexEdge.label).indexEdgeSerializer(indexEdge).toKeyValues.map { kv => @@ -125,7 +125,7 @@ object TransferToHFile extends SparkApp { def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = { val kvs = for { s <- strs - element <- Graph.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[Edge] + element <- GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[Edge] edge = element.asInstanceOf[Edge] putRequest <- insertBulkForLoaderAsync(edge, autoEdgeCreate) } yield { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala index d47e648..5b68754 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala @@ -69,7 +69,7 @@ object WalLogStat extends SparkApp with WithKafka { val phase = System.getProperty("phase") GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList) partition.map { case (key, msg) => - Graph.toGraphElement(msg) match { + GraphSubscriberHelper.g.toGraphElement(msg) match { case Some(elem) => val serviceName = elem.serviceName msg.split("\t", 7) match { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala index a8fc4df..0f69dc7 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala @@ -92,7 +92,7 @@ object WalLogToHDFS extends SparkApp with WithKafka { GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList) partition.flatMap { case (key, msg) => - val optMsg = Graph.toGraphElement(msg).flatMap { element => + val optMsg = GraphSubscriberHelper.g.toGraphElement(msg).flatMap { element => val arr = msg.split("\t", 7) val service = element.serviceName val label = arr(5) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala index b27a05e..87f9cd7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala @@ -20,50 +20,57 @@ package org.apache.s2graph.core import java.util +import java.util.function.BiConsumer +import org.apache.s2graph.core.Edge.{Props, State} import org.apache.s2graph.core.GraphExceptions.LabelNotExistException import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.types._ import org.apache.s2graph.core.utils.logger import org.apache.tinkerpop.gremlin.structure +import org.apache.tinkerpop.gremlin.structure.{Direction, Edge => TpEdge, Graph => TpGraph, Property} import play.api.libs.json.{JsNumber, JsObject, Json} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{Map => MutableMap} import scala.util.hashing.MurmurHash3 -import org.apache.tinkerpop.gremlin.structure.{Edge => TpEdge, Direction, Property, Graph => TpGraph} -case class SnapshotEdge(srcVertex: Vertex, +case class SnapshotEdge(graph: Graph, + srcVertex: Vertex, tgtVertex: Vertex, label: Label, - direction: Int, + dir: Int, op: Byte, version: Long, - private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs], + private val propsWithTs: Props, pendingEdgeOpt: Option[Edge], statusCode: Byte = 0, lockTs: Option[Long], tsInnerValOpt: Option[InnerValLike] = None) { - - lazy val labelWithDir = LabelWithDirection(label.id.get, direction) - if (!propsWithTs.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.") + lazy val direction = GraphUtil.fromDirection(dir) + lazy val operation = GraphUtil.fromOp(op) + lazy val edge = toEdge + lazy val labelWithDir = LabelWithDirection(label.id.get, dir) +// if (!propsWithTs.contains(LabelMeta.timestamp.name)) throw new Exception("Timestamp is required.") // val label = Label.findById(labelWithDir.labelId) lazy val schemaVer = label.schemaVersion - lazy val propsWithoutTs = propsWithTs.mapValues(_.innerVal) - lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.toString().toLong + lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString().toLong - def propsToKeyValuesWithTs = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.map(kv => kv._1.seq -> kv._2).toSeq) + def propsToKeyValuesWithTs = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq) def allPropsDeleted = Edge.allPropsDeleted(propsWithTs) def toEdge: Edge = { - val ts = propsWithTs.get(LabelMeta.timestamp).map(v => v.ts).getOrElse(version) - Edge(srcVertex, tgtVertex, label, direction, op, + Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) } def propsWithName = (for { - (meta, v) <- propsWithTs + (_, v) <- propsWithTs.asScala + meta = v.labelMeta jsValue <- innerValToJsValue(v.innerVal, meta.dataType) } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version)) @@ -71,26 +78,55 @@ case class SnapshotEdge(srcVertex: Vertex, def toLogString() = { List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t") } + + def property[V](key: String, value: V, ts: Long): S2Property[V] = { + val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge.")) + val newProps = new S2Property(edge, labelMeta, key, value, ts) + propsWithTs.put(key, newProps) + newProps + } + override def hashCode(): Int = { + MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId) + } + + override def equals(other: Any): Boolean = other match { + case e: SnapshotEdge => + srcVertex.innerId == e.srcVertex.innerId && + tgtVertex.innerId == e.tgtVertex.innerId && + labelWithDir == e.labelWithDir && op == e.op && version == e.version && + pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode + case _ => false + } + + override def toString(): String = { + Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> direction, + "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString, + "statusCode" -> statusCode, "lockTs" -> lockTs).toString + } } -case class IndexEdge(srcVertex: Vertex, +case class IndexEdge(graph: Graph, + srcVertex: Vertex, tgtVertex: Vertex, label: Label, - direction: Int, + dir: Int, op: Byte, version: Long, labelIndexSeq: Byte, - private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs], + private val propsWithTs: Props, tsInnerValOpt: Option[InnerValLike] = None) { // if (!props.contains(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") // assert(props.contains(LabelMeta.timeStampSeq)) - lazy val labelWithDir = LabelWithDirection(label.id.get, direction) + lazy val direction = GraphUtil.fromDirection(dir) + lazy val operation = GraphUtil.fromOp(op) + lazy val edge = toEdge + lazy val labelWithDir = LabelWithDirection(label.id.get, dir) lazy val isInEdge = labelWithDir.dir == GraphUtil.directions("in") lazy val isOutEdge = !isInEdge - lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.toString.toLong - lazy val degreeEdge = propsWithTs.contains(LabelMeta.degree) + lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString.toLong + lazy val degreeEdge = propsWithTs.containsKey(LabelMeta.degree.name) lazy val schemaVer = label.schemaVersion lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get @@ -103,8 +139,8 @@ case class IndexEdge(srcVertex: Vertex, /** TODO: make sure call of this class fill props as this assumes */ lazy val orders = for (meta <- labelIndexMetaSeqs) yield { - propsWithTs.get(meta) match { - case None => + propsWithTs.get(meta.name) match { + case null => /** * TODO: agly hack @@ -120,12 +156,12 @@ case class IndexEdge(srcVertex: Vertex, } meta -> v - case Some(v) => meta -> v.innerVal + case v => meta -> v.innerVal } } - lazy val ordersKeyMap = orders.map { case (byte, _) => byte }.toSet - lazy val metas = for ((meta, v) <- propsWithTs if !ordersKeyMap.contains(meta)) yield meta -> v.innerVal + lazy val ordersKeyMap = orders.map { case (meta, _) => meta.name }.toSet + lazy val metas = for ((meta, v) <- propsWithTs.asScala if !ordersKeyMap.contains(meta)) yield v.labelMeta -> v.innerVal // lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } @@ -135,12 +171,13 @@ case class IndexEdge(srcVertex: Vertex, lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length def propsWithName = for { - (meta, v) <- propsWithTs if meta.seq >= 0 + (_, v) <- propsWithTs.asScala + meta = v.labelMeta jsValue <- innerValToJsValue(v.innerVal, meta.dataType) } yield meta.name -> jsValue - def toEdge: Edge = Edge(srcVertex, tgtVertex, label, direction, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) + def toEdge: Edge = Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) // only for debug def toLogString() = { @@ -152,52 +189,99 @@ case class IndexEdge(srcVertex: Vertex, } def property(labelMeta: LabelMeta): InnerValLikeWithTs = { - propsWithTs.get(labelMeta).getOrElse(label.metaPropsDefaultMapInner(labelMeta)) +// propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(label.metaPropsDefaultMapInner(labelMeta)) + if (propsWithTs.containsKey(labelMeta.name)) { + propsWithTs.get(labelMeta.name).innerValWithTs + } else { + label.metaPropsDefaultMapInner(labelMeta) + } } - def updatePropsWithTs(others: Map[LabelMeta, InnerValLikeWithTs] = Map.empty): Map[LabelMeta, InnerValLikeWithTs] = { + def updatePropsWithTs(others: Props = Edge.EmptyProps): Props = { if (others.isEmpty) propsWithTs - else propsWithTs ++ others + else { + val iter = others.entrySet().iterator() + while (iter.hasNext) { + val e = iter.next() + propsWithTs.put(e.getKey, e.getValue) + } + propsWithTs + } + } + + def property[V](key: String, value: V, ts: Long): S2Property[V] = { + val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge.")) + val newProps = new S2Property(edge, labelMeta, key, value, ts) + propsWithTs.put(key, newProps) + newProps + } + override def hashCode(): Int = { + MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId + "," + labelIndexSeq) + } + + override def equals(other: Any): Boolean = other match { + case e: IndexEdge => + srcVertex.innerId == e.srcVertex.innerId && + tgtVertex.innerId == e.tgtVertex.innerId && + labelWithDir == e.labelWithDir && op == e.op && version == e.version && + labelIndexSeq == e.labelIndexSeq + case _ => false + } + + override def toString(): String = { + Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> dir, + "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString + ).toString } } -case class Edge(srcVertex: Vertex, - tgtVertex: Vertex, +case class Edge(innerGraph: Graph, + srcVertex: Vertex, + var tgtVertex: Vertex, innerLabel: Label, dir: Int, - op: Byte = GraphUtil.defaultOpByte, - version: Long = System.currentTimeMillis(), - private val propsWithTs: Map[LabelMeta, InnerValLikeWithTs], + var op: Byte = GraphUtil.defaultOpByte, + var version: Long = System.currentTimeMillis(), + propsWithTs: Props = Edge.EmptyProps, parentEdges: Seq[EdgeWithScore] = Nil, originalEdgeOpt: Option[Edge] = None, pendingEdgeOpt: Option[Edge] = None, statusCode: Byte = 0, lockTs: Option[Long] = None, - tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with TpEdge { + var tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with TpEdge { lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir) lazy val schemaVer = innerLabel.schemaVersion - lazy val ts = propsWithTs(LabelMeta.timestamp).innerVal.value match { + lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match { case b: BigDecimal => b.longValue() case l: Long => l case i: Int => i.toLong case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].") } - //FIXME + lazy val operation = GraphUtil.fromOp(op) lazy val tsInnerVal = tsInnerValOpt.get.value lazy val srcId = srcVertex.innerIdVal lazy val tgtId = tgtVertex.innerIdVal lazy val labelName = innerLabel.label lazy val direction = GraphUtil.fromDirection(dir) - def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs) + def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs) - def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.map(kv => kv._1.seq -> kv._2).toSeq) + def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq) - def updatePropsWithTs(others: Map[LabelMeta, InnerValLikeWithTs] = Map.empty): Map[LabelMeta, InnerValLikeWithTs] = { - if (others.isEmpty) propsWithTs - else propsWithTs ++ others + def updatePropsWithTs(others: Props = Edge.EmptyProps): Props = { + val emptyProp = Edge.EmptyProps + + propsWithTs.forEach(new BiConsumer[String, S2Property[_]] { + override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) + }) + + others.forEach(new BiConsumer[String, S2Property[_]] { + override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) + }) + + emptyProp } def propertyValue(key: String): Option[InnerValLikeWithTs] = { @@ -212,7 +296,12 @@ case class Edge(srcVertex: Vertex, } def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs= { - propsWithTs.getOrElse(labelMeta, innerLabel.metaPropsDefaultMapInner(labelMeta)) + // propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse() + if (propsWithTs.containsKey(labelMeta.name)) { + propsWithTs.get(labelMeta.name).innerValWithTs + } else { + innerLabel.metaPropsDefaultMapInner(labelMeta) + } } def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { @@ -242,14 +331,21 @@ case class Edge(srcVertex: Vertex, lazy val properties = toProps() - def props = propsWithTs.mapValues(_.innerVal) + def props = propsWithTs.asScala.mapValues(_.innerVal) private def toProps(): Map[String, Any] = { for { (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner } yield { - labelMeta.name -> propsWithTs.getOrElse(labelMeta, defaultVal).innerVal.value + // labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value + val value = + if (propsWithTs.containsKey(labelMeta.name)) { + propsWithTs.get(labelMeta.name).value + } else { + defaultVal.innerVal.value + } + labelMeta.name -> value } } @@ -302,21 +398,21 @@ case class Edge(srcVertex: Vertex, override def isAsync = innerLabel.isAsync - def isDegree = propsWithTs.contains(LabelMeta.degree) + def isDegree = propsWithTs.containsKey(LabelMeta.degree.name) // def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { // case Some(_) => props // case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) // } - def propsPlusTsValid = propsWithTs.filter(kv => LabelMeta.isValidSeq(kv._1.seq)) + def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava def edgesWithIndex = for (labelOrder <- labelOrders) yield { - IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt) + IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt) } def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { - IndexEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt) + IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt) } /** force direction as out on invertedEdge */ @@ -325,38 +421,28 @@ case class Edge(srcVertex: Vertex, // val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out")) - val ret = SnapshotEdge(smaller, larger, innerLabel, GraphUtil.directions("out"), op, version, - Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(ts, schemaVer), ts)) ++ propsWithTs, + property(LabelMeta.timestamp.name, ts, ts) + val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel, + GraphUtil.directions("out"), op, version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) ret } - override def hashCode(): Int = { - MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId) - } - - override def equals(other: Any): Boolean = other match { - case e: Edge => - srcVertex.innerId == e.srcVertex.innerId && - tgtVertex.innerId == e.tgtVertex.innerId && - labelWithDir == e.labelWithDir - case _ => false - } - def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(), "label" -> innerLabel.label, "service" -> innerLabel.serviceName) def propsWithName = for { - (meta, v) <- props if meta.seq > 0 - jsValue <- innerValToJsValue(v, meta.dataType) + (_, v) <- propsWithTs.asScala + meta = v.labelMeta + jsValue <- innerValToJsValue(v.innerVal, meta.dataType) } yield meta.name -> jsValue def updateTgtVertex(id: InnerValLike) = { val newId = TargetVertexId(tgtVertex.id.colId, id) val newTgtVertex = Vertex(newId, tgtVertex.ts, tgtVertex.props) - Edge(srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) + Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) } def rank(r: RankParam): Double = @@ -364,17 +450,15 @@ case class Edge(srcVertex: Vertex, else { var sum: Double = 0 - for ((seq, w) <- r.keySeqAndWeights) { - propsWithTs.get(seq) match { - case None => // do nothing - case Some(innerValWithTs) => { - val cost = try innerValWithTs.innerVal.toString.toDouble catch { - case e: Exception => - logger.error("toInnerval failed in rank", e) - 1.0 - } - sum += w * cost + for ((labelMeta, w) <- r.keySeqAndWeights) { + if (propsWithTs.containsKey(labelMeta.name)) { + val innerValWithTs = propsWithTs.get(labelMeta.name) + val cost = try innerValWithTs.innerVal.toString.toDouble catch { + case e: Exception => + logger.error("toInnerval failed in rank", e) + 1.0 } + sum += w * cost } } sum @@ -385,23 +469,92 @@ case class Edge(srcVertex: Vertex, List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, allPropsWithName).mkString("\t") } + override def hashCode(): Int = { + MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId) + } + + override def equals(other: Any): Boolean = other match { + case e: Edge => + srcVertex.innerId == e.srcVertex.innerId && + tgtVertex.innerId == e.tgtVertex.innerId && + labelWithDir == e.labelWithDir && op == e.op && version == e.version && + pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode && + parentEdges == e.parentEdges && originalEdgeOpt == originalEdgeOpt + case _ => false + } + + override def toString(): String = { + Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> labelName, "direction" -> direction, + "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString, + "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, "statusCode" -> statusCode, "lockTs" -> lockTs + ).toString + } + + def checkProperty(key: String): Boolean = propsWithTs.containsKey(key) + + def copyEdge(srcVertex: Vertex = srcVertex, + tgtVertex: Vertex = tgtVertex, + innerLabel: Label = innerLabel, + dir: Int = dir, + op: Byte = op, + version: Long = version, + propsWithTs: State = Edge.propsToState(this.propsWithTs), + parentEdges: Seq[EdgeWithScore] = parentEdges, + originalEdgeOpt: Option[Edge] = originalEdgeOpt, + pendingEdgeOpt: Option[Edge] = pendingEdgeOpt, + statusCode: Byte = statusCode, + lockTs: Option[Long] = lockTs, + tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt, + ts: Long = ts): Edge = { + val edge = new Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, Edge.EmptyProps, + parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) + Edge.fillPropsWithTs(edge, propsWithTs) + edge.property(LabelMeta.timestamp.name, ts, ts) + edge + } + + def copyEdgeWithState(state: State, ts: Long): Edge = { + val newEdge = copy(propsWithTs = Edge.EmptyProps) + Edge.fillPropsWithTs(newEdge, state) + newEdge.property(LabelMeta.timestamp.name, ts, ts) + newEdge + } + + def copyEdgeWithState(state: State): Edge = { + val newEdge = copy(propsWithTs = Edge.EmptyProps) + Edge.fillPropsWithTs(newEdge, state) + newEdge + } + override def vertices(direction: Direction): util.Iterator[structure.Vertex] = ??? override def properties[V](strings: String*): util.Iterator[Property[V]] = ??? - override def property[V](key: String): Property[V] = ??? + override def property[V](key: String): Property[V] = { + val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge.")) + if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]] + else { + val default = innerLabel.metaPropsDefaultMapInner(labelMeta) + property(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]] + } + } override def property[V](key: String, value: V): Property[V] = { property(key, value, System.currentTimeMillis()) } - def property[V](key: String, value: V, ts: Long): Property[V] = ??? + def property[V](key: String, value: V, ts: Long): Property[V] = { + val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge.")) + val newProp = new S2Property[V](this, labelMeta, key, value, ts) + propsWithTs.put(key, newProp) + newProp + } - override def remove(): Unit = ??? + override def remove(): Unit = {} - override def graph(): TpGraph = ??? - - override def id(): AnyRef = ??? + override def graph(): TpGraph = innerGraph + + override def id(): AnyRef = (srcVertex.innerId, labelWithDir, tgtVertex.innerId) override def label(): String = innerLabel.label } @@ -425,38 +578,63 @@ object Edge { val incrementVersion = 1L val minTsVal = 0L - def toEdge(srcId: Any, - tgtId: Any, - labelName: String, - direction: String, - props: Map[String, Any] = Map.empty, - ts: Long = System.currentTimeMillis(), - operation: String = "insert"): Edge = { - val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) - - val srcVertexId = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion) - val tgtVertexId = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion) + /** now version information is required also **/ + type Props = java.util.Map[String, S2Property[_]] + type State = Map[LabelMeta, InnerValLikeWithTs] + type PropsPairWithTs = (State, State, Long, String) + type MergeState = PropsPairWithTs => (State, Boolean) + type UpdateFunc = (Option[Edge], Edge, MergeState) - val srcColId = label.srcColumn.id.get - val tgtColId = label.tgtColumn.id.get + def EmptyProps = new java.util.HashMap[String, S2Property[_]] + def EmptyState = Map.empty[LabelMeta, InnerValLikeWithTs] + def sameProps(base: Props, other: Props): Boolean = { + if (base.size != other.size) false + else { + var ret = true + val iter = base.entrySet().iterator() + while (iter.hasNext) { + val e = iter.next() + if (!other.containsKey(e.getKey)) ret = false + else if (e.getValue != other.get(e.getKey)) ret = false + else { - val srcVertex = Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis()) - val tgtVertex = Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()) - val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) + } + } + val otherIter = other.entrySet().iterator() + while (otherIter.hasNext) { + val e = otherIter.next() + if (!base.containsKey(e.getKey)) ret = false + else if (e.getValue != base.get(e.getKey)) ret = false + else { - val labelWithDir = LabelWithDirection(label.id.get, dir) - val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts) - val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) - val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) + } + } + ret + } +// base.sameElements(other) + } + def fillPropsWithTs(snapshotEdge: SnapshotEdge, state: State): Unit = { + state.foreach { case (k, v) => snapshotEdge.property(k.name, v.innerVal.value, v.ts) } + } + def fillPropsWithTs(indexEdge: IndexEdge, state: State): Unit = { + state.foreach { case (k, v) => indexEdge.property(k.name, v.innerVal.value, v.ts) } + } + def fillPropsWithTs(edge: Edge, state: State): Unit = { + state.foreach { case (k, v) => edge.property(k.name, v.innerVal.value, v.ts) } + } - new Edge(srcVertex, tgtVertex, label, dir, op = op, version = ts, propsWithTs = propsWithTs) + def propsToState(props: Props): State = { + props.asScala.map { case (k, v) => + v.labelMeta -> v.innerValWithTs + }.toMap } - /** now version information is required also **/ - type State = Map[LabelMeta, InnerValLikeWithTs] - type PropsPairWithTs = (State, State, Long, String) - type MergeState = PropsPairWithTs => (State, Boolean) - type UpdateFunc = (Option[Edge], Edge, MergeState) + def stateToProps(edge: Edge, state: State): Props = { + state.foreach { case (k, v) => + edge.property(k.name, v.innerVal.value, v.ts) + } + edge.propsWithTs + } def allPropsDeleted(props: Map[LabelMeta, InnerValLikeWithTs]): Boolean = if (!props.contains(LabelMeta.lastDeletedAt)) false @@ -467,6 +645,23 @@ object Edge { propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt } } + def allPropsDeleted(props: Props): Boolean = + if (!props.containsKey(LabelMeta.lastDeletedAt.name)) false + else { + val lastDeletedAt = props.get(LabelMeta.lastDeletedAt.name).ts + props.remove(LabelMeta.lastDeletedAt.name) +// val propsWithoutLastDeletedAt = props +// +// propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt } + var ret = true + val iter = props.entrySet().iterator() + while (iter.hasNext) { + val e = iter.next() + if (e.getValue.ts > lastDeletedAt) ret = false + } + ret + } + def buildDeleteBulk(invertedEdge: Option[Edge], requestEdge: Edge): (Edge, EdgeMutate) = { // assert(invertedEdge.isEmpty) // assert(requestEdge.op == GraphUtil.operations("delete")) @@ -481,7 +676,8 @@ object Edge { // logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}") // logger.debug(s"requestEdge: ${requestEdge.toStringRaw}") val oldPropsWithTs = - if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs] else invertedEdge.get.propsWithTs + if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs] + else propsToState(invertedEdge.get.propsWithTs) val funcs = requestEdges.map { edge => if (edge.op == GraphUtil.operations("insert")) { @@ -514,7 +710,7 @@ object Edge { for { (requestEdge, func) <- requestWithFuncs } { - val (_newPropsWithTs, _) = func(prevPropsWithTs, requestEdge.propsWithTs, requestEdge.ts, requestEdge.schemaVer) + val (_newPropsWithTs, _) = func(prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer) prevPropsWithTs = _newPropsWithTs // logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n") } @@ -530,7 +726,9 @@ object Edge { // logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}") // logger.error(s"$propsWithTs") - (requestEdge.copy(propsWithTs = propsWithTs), edgeMutate) + val newEdge = requestEdge.copy(propsWithTs = EmptyProps) + fillPropsWithTs(newEdge, propsWithTs) + (newEdge, edgeMutate) } } @@ -540,7 +738,7 @@ object Edge { // both direction use same indices that is defined when label creation. true case Some(dir) => - if (dir != ie.direction) { + if (dir != ie.dir) { // current labelIndex's direction is different with indexEdge's direction so don't touch false } else { @@ -566,13 +764,14 @@ object Edge { val newOp = snapshotEdgeOpt match { case None => requestEdge.op case Some(old) => - val oldMaxTs = old.propsWithTs.map(_._2.ts).max + val oldMaxTs = old.propsWithTs.asScala.map(_._2.ts).max if (oldMaxTs > requestEdge.ts) old.op else requestEdge.op } - val newSnapshotEdgeOpt = - Option(requestEdge.copy(op = newOp, propsWithTs = newPropsWithTs, version = newVersion).toSnapshotEdge) + val newSnapshotEdge = requestEdge.copy(op = newOp, version = newVersion).copyEdgeWithState(newPropsWithTs) + + val newSnapshotEdgeOpt = Option(newSnapshotEdge.toSnapshotEdge) // delete request must always update snapshot. if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.contains(LabelMeta.lastDeletedAt)) { // no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt. @@ -587,12 +786,17 @@ object Edge { val edgesToInsert = if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil - else - requestEdge.copy( + else { + val newEdge = requestEdge.copy( version = newVersion, - propsWithTs = newPropsWithTs, + propsWithTs = Edge.EmptyProps, op = GraphUtil.defaultOpByte - ).relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) } + ) + newPropsWithTs.foreach { case (k, v) => newEdge.property(k.name, v.innerVal.value, v.ts) } + + newEdge.relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) } + } + EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = edgesToInsert, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala index a2b17ef..ec3f286 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Graph.scala @@ -19,10 +19,11 @@ package org.apache.s2graph.core +import java.util import java.util.concurrent.Executors import com.typesafe.config.{Config, ConfigFactory} -import org.apache.hadoop.fs.Path +import org.apache.commons.configuration.Configuration import org.apache.s2graph.core.GraphExceptions.{FetchAllStepFailException, FetchTimeoutException, LabelNotExistException} import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.mysqls.{Label, LabelMeta, Model, Service} @@ -30,8 +31,11 @@ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage import org.apache.s2graph.core.storage.{SKeyValue, Storage} import org.apache.s2graph.core.types._ import org.apache.s2graph.core.utils.{DeferCache, Extensions, SafeUpdateCache, logger} +import org.apache.tinkerpop.gremlin.process.computer.GraphComputer +import org.apache.tinkerpop.gremlin.structure +import org.apache.tinkerpop.gremlin.structure.Graph.Variables +import org.apache.tinkerpop.gremlin.structure.{Graph => TpGraph, Transaction} import play.api.libs.json.{JsObject, Json} - import scala.annotation.tailrec import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, ListBuffer} @@ -83,62 +87,7 @@ object Graph { var DefaultConfig: Config = ConfigFactory.parseMap(DefaultConfigs) - def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try { - val parts = GraphUtil.split(s) - val logType = parts(2) - val element = if (logType == "edge" | logType == "e") { - /** current only edge is considered to be bulk loaded */ - labelMapping.get(parts(5)) match { - case None => - case Some(toReplace) => - parts(5) = toReplace - } - toEdge(parts) - } else if (logType == "vertex" | logType == "v") { - toVertex(parts) - } else { - throw new GraphExceptions.JsonParseException("log type is not exist in log.") - } - element - } recover { - case e: Exception => - logger.error(s"[toElement]: $e", e) - None - } get - - - def toVertex(s: String): Option[Vertex] = { - toVertex(GraphUtil.split(s)) - } - - def toEdge(s: String): Option[Edge] = { - toEdge(GraphUtil.split(s)) - } - - def toEdge(parts: Array[String]): Option[Edge] = Try { - val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) - val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] - val tempDirection = if (parts.length >= 8) parts(7) else "out" - val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection - val edge = Edge.toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation) - Option(edge) - } recover { - case e: Exception => - logger.error(s"[toEdge]: $e", e) - throw e - } get - - def toVertex(parts: Array[String]): Option[Vertex] = Try { - val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) - val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] - val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation) - Option(vertex) - } recover { - case e: Throwable => - logger.error(s"[toVertex]: $e", e) - throw e - } get def initStorage(graph: Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = { val storageBackend = config.getString("s2graph.storage.backend") @@ -326,7 +275,9 @@ object Graph { /** Select */ val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns) - val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) +// val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) + val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) + val newEdgeWithScore = edgeWithScore.copy(edge = newEdge) /** OrderBy */ val orderByValues = @@ -410,7 +361,7 @@ object Graph { edge.propertyValues(queryOption.selectColumns) ++ initial } - val newEdge = edge.copy(propsWithTs = mergedPropsWithTs) + val newEdge = edge.copyEdgeWithState(mergedPropsWithTs) edgeWithScore.copy(edge = newEdge) } } else Nil @@ -544,7 +495,7 @@ object Graph { } -class Graph(_config: Config)(implicit val ec: ExecutionContext) { +class Graph(_config: Config)(implicit val ec: ExecutionContext) extends TpGraph { import Graph._ @@ -948,20 +899,28 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) { val head = filtered.head val label = head.edge.innerLabel val edgeWithScoreLs = filtered.map { edgeWithScore => - val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match { - case "strong" => - val _newPropsWithTs = edgeWithScore.edge.updatePropsWithTs( - Map(LabelMeta.timestamp -> InnerValLikeWithTs.withLong(requestTs, requestTs, label.schemaVersion)) - ) - - (GraphUtil.operations("delete"), requestTs, _newPropsWithTs) - case _ => - val oldEdge = edgeWithScore.edge - (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs()) - } - - val copiedEdge = - edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs) + val edge = edgeWithScore.edge + val copiedEdge = label.consistencyLevel match { + case "strong" => + edge.copyEdge(op = GraphUtil.operations("delete"), + version = requestTs, propsWithTs = Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) + case _ => + edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) + } +// val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match { +// case "strong" => +// val edge = edgeWithScore.edge +// edge.property(LabelMeta.timestamp.name, requestTs) +// val _newPropsWithTs = edge.updatePropsWithTs() +// +// (GraphUtil.operations("delete"), requestTs, _newPropsWithTs) +// case _ => +// val oldEdge = edgeWithScore.edge +// (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs()) +// } +// +// val copiedEdge = +// edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs) val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}") @@ -1099,7 +1058,7 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) { operation: String = "insert", withWait: Boolean = true): Future[Boolean] = { - val innerEdges = Seq(Edge.toEdge(srcId, tgtId, labelName, direction, props.toMap, ts, operation)) + val innerEdges = Seq(toEdge(srcId, tgtId, labelName, direction, props.toMap, ts, operation)) mutateEdges(innerEdges, withWait).map(_.headOption.getOrElse(false)) } @@ -1113,4 +1072,141 @@ class Graph(_config: Config)(implicit val ec: ExecutionContext) { val innerVertices = Seq(Vertex.toVertex(serviceName, columnName, id, props.toMap, ts, operation)) mutateVertices(innerVertices, withWait).map(_.headOption.getOrElse(false)) } + + def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try { + val parts = GraphUtil.split(s) + val logType = parts(2) + val element = if (logType == "edge" | logType == "e") { + /** current only edge is considered to be bulk loaded */ + labelMapping.get(parts(5)) match { + case None => + case Some(toReplace) => + parts(5) = toReplace + } + toEdge(parts) + } else if (logType == "vertex" | logType == "v") { + toVertex(parts) + } else { + throw new GraphExceptions.JsonParseException("log type is not exist in log.") + } + + element + } recover { + case e: Exception => + logger.error(s"[toElement]: $e", e) + None + } get + + + def toVertex(s: String): Option[Vertex] = { + toVertex(GraphUtil.split(s)) + } + + def toEdge(s: String): Option[Edge] = { + toEdge(GraphUtil.split(s)) + } + + def toEdge(parts: Array[String]): Option[Edge] = Try { + val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) + val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] + val tempDirection = if (parts.length >= 8) parts(7) else "out" + val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection + val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation) + Option(edge) + } recover { + case e: Exception => + logger.error(s"[toEdge]: $e", e) + throw e + } get + + def toVertex(parts: Array[String]): Option[Vertex] = Try { + val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) + val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] + val vertex = Vertex.toVertex(serviceName, colName, srcId, props, ts.toLong, operation) + Option(vertex) + } recover { + case e: Throwable => + logger.error(s"[toVertex]: $e", e) + throw e + } get + + def newSnapshotEdge(srcVertex: Vertex, + tgtVertex: Vertex, + label: Label, + dir: Int, + op: Byte, + version: Long, + propsWithTs: Edge.State, + pendingEdgeOpt: Option[Edge], + statusCode: Byte = 0, + lockTs: Option[Long], + tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = { + val snapshotEdge = new SnapshotEdge(this, srcVertex, tgtVertex, label, dir, op, version, Edge.EmptyProps, + pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) + Edge.fillPropsWithTs(snapshotEdge, propsWithTs) + snapshotEdge + } + + def newEdge(srcVertex: Vertex, + tgtVertex: Vertex, + innerLabel: Label, + dir: Int, + op: Byte = GraphUtil.defaultOpByte, + version: Long = System.currentTimeMillis(), + propsWithTs: Edge.State, + parentEdges: Seq[EdgeWithScore] = Nil, + originalEdgeOpt: Option[Edge] = None, + pendingEdgeOpt: Option[Edge] = None, + statusCode: Byte = 0, + lockTs: Option[Long] = None, + tsInnerValOpt: Option[InnerValLike] = None): Edge = { + val edge = new Edge(this, srcVertex, tgtVertex, innerLabel, dir, op, version, Edge.EmptyProps, + parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) + Edge.fillPropsWithTs(edge, propsWithTs) + edge + } + def toEdge(srcId: Any, + tgtId: Any, + labelName: String, + direction: String, + props: Map[String, Any] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert"): Edge = { + val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) + + val srcVertexId = toInnerVal(srcId.toString, label.srcColumn.columnType, label.schemaVersion) + val tgtVertexId = toInnerVal(tgtId.toString, label.tgtColumn.columnType, label.schemaVersion) + + val srcColId = label.srcColumn.id.get + val tgtColId = label.tgtColumn.id.get + + val srcVertex = Vertex(SourceVertexId(srcColId, srcVertexId), System.currentTimeMillis()) + val tgtVertex = Vertex(TargetVertexId(tgtColId, tgtVertexId), System.currentTimeMillis()) + val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) + + val labelWithDir = LabelWithDirection(label.id.get, dir) + val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts) + val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) + val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) + + new Edge(this, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs) + } + + override def vertices(objects: AnyRef*): util.Iterator[structure.Vertex] = ??? + + override def tx(): Transaction = ??? + + override def edges(objects: AnyRef*): util.Iterator[structure.Edge] = ??? + + override def variables(): Variables = ??? + + override def configuration(): Configuration = ??? + + override def addVertex(objects: AnyRef*): structure.Vertex = ??? + + override def close(): Unit = ??? + + override def compute[C <: GraphComputer](aClass: Class[C]): C = ??? + + override def compute(): GraphComputer = ??? } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala index 7b10709..170fd0b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -376,9 +376,9 @@ case class QueryParam(labelName: String, val propVal = if (InnerVal.isNumericType(labelMeta.dataType)) { - InnerVal.withLong(edge.props(labelMeta).value.asInstanceOf[BigDecimal].longValue() + padding, label.schemaVersion) + InnerVal.withLong(edge.property(labelMeta.name).value.asInstanceOf[BigDecimal].longValue() + padding, label.schemaVersion) } else { - edge.props(labelMeta) + edge.property(labelMeta.name).asInstanceOf[S2Property[_]].innerVal } labelMeta -> propVal http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala index d8416c2..3753d0f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -40,7 +40,7 @@ object QueryResult { val edgeWithScores = for { vertex <- query.vertices } yield { - val edge = Edge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs) + val edge = graph.newEdge(vertex, vertex, label, queryParam.labelWithDir.dir, propsWithTs = propsWithTs) val edgeWithScore = EdgeWithScore(edge, Graph.DefaultScore, queryParam.label) edgeWithScore } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala index 67a9d4c..938a9bb 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Property.scala @@ -1,29 +1,48 @@ package org.apache.s2graph.core -import org.apache.hadoop.hbase.util.Bytes + import org.apache.s2graph.core.mysqls.LabelMeta -import org.apache.s2graph.core.types.CanInnerValLike -import org.apache.tinkerpop.gremlin.structure.{Element, Property} +import org.apache.s2graph.core.types.{InnerValLikeWithTs, CanInnerValLike} +import org.apache.tinkerpop.gremlin.structure.{Property} + +import scala.util.hashing.MurmurHash3 -case class S2Property[V](element: Element, +case class S2Property[V](element: Edge, labelMeta: LabelMeta, key: String, value: V, - ts: Long = System.currentTimeMillis()) extends Property[V] { + ts: Long) extends Property[V] { import CanInnerValLike._ - lazy val innerVal = anyToInnerValLike.toInnerVal(value, labelMeta.label.schemaVersion) + lazy val innerVal = anyToInnerValLike.toInnerVal(value)(element.innerLabel.schemaVersion) + lazy val innerValWithTs = InnerValLikeWithTs(innerVal, ts) def bytes: Array[Byte] = { innerVal.bytes } def bytesWithTs: Array[Byte] = { - Bytes.add(innerVal.bytes, Bytes.toBytes(ts)) + innerValWithTs.bytes } override def isPresent: Boolean = ??? override def remove(): Unit = ??? + + override def hashCode(): Int = { + MurmurHash3.stringHash(labelMeta.labelId + "," + labelMeta.id.get + "," + key + "," + value + "," + ts) + } + + override def equals(other: Any): Boolean = other match { + case p: S2Property[_] => + labelMeta.labelId == p.labelMeta.labelId && + labelMeta.seq == p.labelMeta.seq && + key == p.key && value == p.value && ts == p.ts + case _ => false + } + + override def toString(): String = { + Map("labelMeta" -> labelMeta.toString, "key" -> key, "value" -> value, "ts" -> ts).toString + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala index bbd71ec..0ff4f98 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Vertex.scala @@ -19,15 +19,21 @@ package org.apache.s2graph.core +import java.util + import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.mysqls.{ColumnMeta, Service, ServiceColumn} import org.apache.s2graph.core.types.{InnerVal, InnerValLike, SourceVertexId, VertexId} +import org.apache.tinkerpop.gremlin.structure +import org.apache.tinkerpop.gremlin.structure.VertexProperty.Cardinality +import org.apache.tinkerpop.gremlin.structure.{Vertex => TpVertex, Direction, Edge, VertexProperty, Graph} import play.api.libs.json.Json + case class Vertex(id: VertexId, ts: Long = System.currentTimeMillis(), props: Map[Int, InnerValLike] = Map.empty[Int, InnerValLike], op: Byte = 0, - belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement { + belongLabelIds: Seq[Int] = Seq.empty) extends GraphElement with TpVertex { val innerId = id.innerId @@ -97,6 +103,22 @@ case class Vertex(id: VertexId, else Seq(ts, GraphUtil.fromOp(op), "v", id.innerId, serviceName, columnName).mkString("\t") } + + override def vertices(direction: Direction, strings: String*): util.Iterator[TpVertex] = ??? + + override def edges(direction: Direction, strings: String*): util.Iterator[structure.Edge] = ??? + + override def property[V](cardinality: Cardinality, s: String, v: V, objects: AnyRef*): VertexProperty[V] = ??? + + override def addEdge(s: String, vertex: TpVertex, objects: AnyRef*): Edge = ??? + + override def properties[V](strings: String*): util.Iterator[VertexProperty[V]] = ??? + + override def remove(): Unit = ??? + + override def graph(): Graph = ??? + + override def label(): String = ??? } object Vertex { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala index 8baf787..805a544 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/rest/RequestParser.scala @@ -541,7 +541,7 @@ class RequestParser(graph: Graph) { val elementsWithTsv = for { edgeStr <- edgeStrs str <- GraphUtil.parseString(edgeStr) - element <- Graph.toGraphElement(str) + element <- graph.toGraphElement(str) } yield (element, str) elementsWithTsv @@ -566,7 +566,7 @@ class RequestParser(graph: Graph) { tgtId <- tgtIds.flatMap(jsValueToAny(_).toSeq) } yield { // val edge = Management.toEdge(graph, timestamp, operation, srcId, tgtId, label, direction, fromJsonToProperties(propsJson)) - val edge = Edge.toEdge(srcId, tgtId, label, direction, fromJsonToProperties(propsJson), ts = timestamp, operation = operation) + val edge = graph.toEdge(srcId, tgtId, label, direction, fromJsonToProperties(propsJson), ts = timestamp, operation = operation) val tsv = (jsValue \ "direction").asOpt[String] match { case None => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString).mkString("\t") case Some(dir) => Seq(timestamp, operation, "e", srcId, tgtId, label, propsJson.toString, dir).mkString("\t") @@ -690,7 +690,7 @@ class RequestParser(graph: Graph) { labelName <- (json \ "label").asOpt[String] direction = (json \ "direction").asOpt[String].getOrElse("out") } yield { - Edge.toEdge(from, to, labelName, direction, Map.empty) + graph.toEdge(from, to, labelName, direction, Map.empty) } } @@ -700,7 +700,7 @@ class RequestParser(graph: Graph) { for { edgeStr <- edgeStrs str <- GraphUtil.parseString(edgeStr) - element <- Graph.toGraphElement(str) + element <- graph.toGraphElement(str) } yield element } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index b1ef11d..26d6ad1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -113,20 +113,20 @@ abstract class Storage[Q, R](val graph: Graph, * */ val snapshotEdgeDeserializers: Map[String, Deserializable[SnapshotEdge]] = Map( - VERSION1 -> new SnapshotEdgeDeserializable, - VERSION2 -> new SnapshotEdgeDeserializable, - VERSION3 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable, - VERSION4 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable + VERSION1 -> new SnapshotEdgeDeserializable(graph), + VERSION2 -> new SnapshotEdgeDeserializable(graph), + VERSION3 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph), + VERSION4 -> new serde.snapshotedge.tall.SnapshotEdgeDeserializable(graph) ) def snapshotEdgeDeserializer(schemaVer: String) = snapshotEdgeDeserializers.get(schemaVer).getOrElse(throw new RuntimeException(s"not supported version: ${schemaVer}")) /** create deserializer that can parse stored CanSKeyValue into indexEdge. */ - val indexEdgeDeserializers: Map[String, Deserializable[IndexEdge]] = Map( - VERSION1 -> new IndexEdgeDeserializable, - VERSION2 -> new IndexEdgeDeserializable, - VERSION3 -> new IndexEdgeDeserializable, - VERSION4 -> new serde.indexedge.tall.IndexEdgeDeserializable + val indexEdgeDeserializers: Map[String, Deserializable[Edge]] = Map( + VERSION1 -> new IndexEdgeDeserializable(graph), + VERSION2 -> new IndexEdgeDeserializable(graph), + VERSION3 -> new IndexEdgeDeserializable(graph), + VERSION4 -> new serde.indexedge.tall.IndexEdgeDeserializable(graph) ) def indexEdgeDeserializer(schemaVer: String) = @@ -795,17 +795,25 @@ abstract class Storage[Q, R](val graph: Graph, } yield { val edge = edgeWithScore.edge val score = edgeWithScore.score - /** reverted direction */ - val reversedIndexedEdgesMutations = edge.duplicateEdge.edgesWithIndex.flatMap { indexEdge => + + val edgeSnapshot = edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs())) + val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) + + val edgeForward = edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs())) + val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ buildIncrementsAsync(indexEdge, -1L) } - val reversedSnapshotEdgeMutations = snapshotEdgeSerializer(edge.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - val forwardIndexedEdgeMutations = edge.edgesWithIndex.flatMap { indexEdge => + + /** reverted direction */ + val edgeRevert = edge.copyEdge(propsWithTs = Edge.propsToState(edge.updatePropsWithTs())) + val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ buildIncrementsAsync(indexEdge, -1L) } + val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations + writeToStorage(zkQuorum, mutations, withWait = true) } @@ -821,7 +829,7 @@ abstract class Storage[Q, R](val graph: Graph, /** Parsing Logic: parse from kv from Storage into Edge */ def toEdge[K: CanSKeyValue](kv: K, queryRequest: QueryRequest, - cacheElementOpt: Option[IndexEdge], + cacheElementOpt: Option[Edge], parentEdges: Seq[EdgeWithScore]): Option[Edge] = { logger.debug(s"toEdge: $kv") @@ -830,8 +838,8 @@ abstract class Storage[Q, R](val graph: Graph, val queryParam = queryRequest.queryParam val schemaVer = queryParam.label.schemaVersion val indexEdgeOpt = indexEdgeDeserializer(schemaVer).fromKeyValues(Option(queryParam.label), Seq(kv), queryParam.label.schemaVersion, cacheElementOpt) - if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.toEdge.copy(parentEdges = parentEdges)) - else indexEdgeOpt.map(indexEdge => indexEdge.toEdge) + if (!queryOption.returnTree) indexEdgeOpt.map(indexEdge => indexEdge.copy(parentEdges = parentEdges)) + else indexEdgeOpt } catch { case ex: Exception => logger.error(s"Fail on toEdge: ${kv.toString}, ${queryRequest}", ex) @@ -898,7 +906,7 @@ abstract class Storage[Q, R](val graph: Graph, val (degreeEdges, keyValues) = cacheElementOpt match { case None => (Nil, kvs) case Some(cacheElement) => - val head = cacheElement.toEdge + val head = cacheElement if (!head.isDegree) (Nil, kvs) else (Seq(EdgeWithScore(head, 1.0, label)), kvs.tail) } @@ -968,13 +976,13 @@ abstract class Storage[Q, R](val graph: Graph, val (srcVId, tgtVId) = (SourceVertexId(srcColumn.id.get, src), TargetVertexId(tgtColumn.id.get, tgt)) val (srcV, tgtV) = (Vertex(srcVId), Vertex(tgtVId)) - Edge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs) + graph.newEdge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs) case None => val src = InnerVal.convertVersion(srcVertex.innerId, srcColumn.columnType, label.schemaVersion) val srcVId = SourceVertexId(srcColumn.id.get, src) val srcV = Vertex(srcVId) - Edge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges) + graph.newEdge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges) } } @@ -1075,13 +1083,15 @@ abstract class Storage[Q, R](val graph: Graph, /** IndexEdge */ def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.updatePropsWithTs(Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer))) + val newProps = indexedEdge.updatePropsWithTs() + newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts)) val _indexedEdge = indexedEdge.copy(propsWithTs = newProps) indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability)) } def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = { - val newProps = indexedEdge.updatePropsWithTs(Map(LabelMeta.count -> InnerValLikeWithTs.withLong(amount, indexedEdge.ts, indexedEdge.schemaVer))) + val newProps = indexedEdge.updatePropsWithTs() + newProps.put(LabelMeta.degree.name, new S2Property(indexedEdge.toEdge, LabelMeta.degree, LabelMeta.degree.name, amount, indexedEdge.ts)) val _indexedEdge = indexedEdge.copy(propsWithTs = newProps) indexEdgeSerializer(_indexedEdge).toKeyValues.map(_.copy(operation = SKeyValue.Increment, durability = _indexedEdge.label.durability)) } @@ -1109,10 +1119,8 @@ abstract class Storage[Q, R](val graph: Graph, } def buildDegreePuts(edge: Edge, degreeVal: Long): Seq[SKeyValue] = { - val kvs = edge.edgesWithIndexValid.flatMap { _indexEdge => - val newProps = Map(LabelMeta.degree -> InnerValLikeWithTs.withLong(degreeVal, _indexEdge.ts, _indexEdge.schemaVer)) - val indexEdge = _indexEdge.copy(propsWithTs = newProps) - + edge.property(LabelMeta.degree.name, degreeVal, edge.ts) + val kvs = edge.edgesWithIndexValid.flatMap { indexEdge => indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put, durability = indexEdge.label.durability)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala index 2428173..c538e53 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/tall/IndexEdgeDeserializable.scala @@ -25,13 +25,14 @@ import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, StorageDeserializable} import org.apache.s2graph.core.types._ import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.{GraphUtil, IndexEdge, Vertex} +import org.apache.s2graph.core._ import scala.collection.immutable object IndexEdgeDeserializable{ - def getNewInstance() = new IndexEdgeDeserializable() + def getNewInstance(graph: Graph) = new IndexEdgeDeserializable(graph) } -class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { +class IndexEdgeDeserializable(graph: Graph, + bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[Edge] { import StorageDeserializable._ type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int) @@ -40,7 +41,7 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], _kvs: Seq[T], schemaVer: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { + cacheElementOpt: Option[Edge]): Edge = { assert(_kvs.size == 1) @@ -59,19 +60,25 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte pos += 1 val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId)) -// val op = kv.row(pos) -// pos += 1 + + val srcVertex = Vertex(srcVertexId, version) + //TODO: + val edge = graph.newEdge(srcVertex, null, + label, labelWithDir.dir, GraphUtil.defaultOpByte, version, Edge.EmptyState) + var tsVal = version if (pos == kv.row.length) { // degree // val degreeVal = Bytes.toLong(kv.value) val degreeVal = bytesToLongFunc(kv.value, 0) - val ts = kv.timestamp - val tsInnerValLikeWithTs = InnerValLikeWithTs.withLong(ts, ts, schemaVer) - val props = Map(LabelMeta.timestamp -> tsInnerValLikeWithTs, - LabelMeta.degree -> InnerValLikeWithTs.withLong(degreeVal, ts, schemaVer)) val tgtVertexId = VertexId(HBaseType.DEFAULT_COL_ID, InnerVal.withStr("0", schemaVer)) - IndexEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), label, labelWithDir.dir, GraphUtil.defaultOpByte, ts, labelIdxSeq, props, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal)) + + edge.property(LabelMeta.timestamp.name, version, version) + edge.property(LabelMeta.degree.name, degreeVal, version) + edge.tgtVertex = Vertex(tgtVertexId, version) + edge.op = GraphUtil.defaultOpByte + edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer)) + edge } else { // not degree edge val (idxPropsRaw, endAt) = bytesToProps(kv.row, pos, schemaVer) @@ -85,60 +92,47 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte } val op = kv.row(kv.row.length-1) - val allProps = immutable.Map.newBuilder[LabelMeta, InnerValLikeWithTs] val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}")) - /** process indexProps */ val size = idxPropsRaw.length (0 until size).foreach { ith => val meta = index.sortKeyTypesArray(ith) val (k, v) = idxPropsRaw(ith) - if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version) - else allProps += meta -> InnerValLikeWithTs(v, version) + if (k == LabelMeta.timestamp) tsVal = v.value.asInstanceOf[BigDecimal].longValue() + + if (k == LabelMeta.degree) { + edge.property(LabelMeta.degree.name, v.value, version) + } else { + edge.property(meta.name, v.value, version) + } } -// for { -// (meta, (k, v)) <- index.sortKeyTypes.zip(idxPropsRaw) -// } { -// if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version) -// else { -// allProps += meta -> InnerValLikeWithTs(v, version) -// } -// } /** process props */ if (op == GraphUtil.operations("incrementCount")) { // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.count -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + edge.property(LabelMeta.count.name, countVal, version) } else { val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label) props.foreach { case (k, v) => - allProps += (k -> InnerValLikeWithTs(v, version)) + if (k == LabelMeta.timestamp) tsVal = v.value.asInstanceOf[BigDecimal].longValue() + + edge.property(k.name, v.value, version) } } - val _mergedProps = allProps.result() - val (mergedProps, tsInnerValLikeWithTs) = _mergedProps.get(LabelMeta.timestamp) match { - case None => - val tsInnerVal = InnerValLikeWithTs.withLong(version, version, schemaVer) - val mergedProps = _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer)) - (mergedProps, tsInnerVal) - case Some(tsInnerVal) => - (_mergedProps, tsInnerVal) - } -// val mergedProps = -// if (_mergedProps.contains(LabelMeta.timestamp)) _mergedProps -// else _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer)) /** process tgtVertexId */ val tgtVertexId = - mergedProps.get(LabelMeta.to) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) - } - + if (edge.checkProperty(LabelMeta.to.name)) { + val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs + TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) + } else tgtVertexIdRaw - IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), label, labelWithDir.dir, op, version, labelIdxSeq, mergedProps, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal)) + edge.tgtVertex = Vertex(tgtVertexId, version) + edge.op = op + edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer)) + edge } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala index 534667b..2b620a1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/indexedge/wide/IndexEdgeDeserializable.scala @@ -23,10 +23,11 @@ import org.apache.s2graph.core.mysqls.{Label, LabelMeta} import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.types._ -import org.apache.s2graph.core.{GraphUtil, IndexEdge, Vertex} +import org.apache.s2graph.core._ import scala.collection.immutable -class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[IndexEdge] { +class IndexEdgeDeserializable(graph: Graph, + bytesToLongFunc: (Array[Byte], Int) => Long = bytesToLong) extends Deserializable[Edge] { import StorageDeserializable._ type QualifierRaw = (Array[(LabelMeta, InnerValLike)], VertexId, Byte, Boolean, Int) @@ -67,7 +68,7 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte override def fromKeyValuesInner[T: CanSKeyValue](checkLabel: Option[Label], _kvs: Seq[T], schemaVer: String, - cacheElementOpt: Option[IndexEdge]): IndexEdge = { + cacheElementOpt: Option[Edge]): Edge = { assert(_kvs.size == 1) // val kvs = _kvs.map { kv => implicitly[CanSKeyValue[T]].toSKeyValue(kv) } @@ -75,17 +76,22 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte val kv = implicitly[CanSKeyValue[T]].toSKeyValue(_kvs.head) val version = kv.timestamp - val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => - (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) - }.getOrElse(parseRow(kv, schemaVer)) +// val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = cacheElementOpt.map { e => +// (e.srcVertex.id, e.labelWithDir, e.labelIndexSeq, false, 0) +// }.getOrElse(parseRow(kv, schemaVer)) + val (srcVertexId, labelWithDir, labelIdxSeq, _, _) = parseRow(kv, schemaVer) val label = checkLabel.getOrElse(Label.findById(labelWithDir.labelId)) + val srcVertex = Vertex(srcVertexId, version) + //TODO: + val edge = graph.newEdge(srcVertex, null, + label, labelWithDir.dir, GraphUtil.defaultOpByte, version, Edge.EmptyState) + var tsVal = version val (idxPropsRaw, tgtVertexIdRaw, op, tgtVertexIdInQualifier, _) = if (kv.qualifier.isEmpty) parseDegreeQualifier(kv, schemaVer) else parseQualifier(kv, schemaVer) - val allProps = immutable.Map.newBuilder[LabelMeta, InnerValLikeWithTs] val index = label.indicesMap.getOrElse(labelIdxSeq, throw new RuntimeException(s"invalid index seq: ${label.id.get}, ${labelIdxSeq}")) /** process indexProps */ @@ -93,52 +99,38 @@ class IndexEdgeDeserializable(bytesToLongFunc: (Array[Byte], Int) => Long = byte (0 until size).foreach { ith => val meta = index.sortKeyTypesArray(ith) val (k, v) = idxPropsRaw(ith) - if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version) - else allProps += meta -> InnerValLikeWithTs(v, version) + if (k == LabelMeta.timestamp) tsVal = v.value.asInstanceOf[BigDecimal].longValue() + + if (k == LabelMeta.degree) { + edge.property(LabelMeta.degree.name, v.value, version) + } else { + edge.property(meta.name, v.value, version) + } } -// for { -// (seq, (k, v)) <- index.sortKeyTypes.zip(idxPropsRaw) -// } { -// if (k == LabelMeta.degree) allProps += LabelMeta.degree -> InnerValLikeWithTs(v, version) -// else allProps += seq -> InnerValLikeWithTs(v, version) -// } /** process props */ if (op == GraphUtil.operations("incrementCount")) { - // val countVal = Bytes.toLong(kv.value) + // val countVal = Bytes.toLong(kv.value) val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.count -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) - } else if (kv.qualifier.isEmpty) { - val countVal = bytesToLongFunc(kv.value, 0) - allProps += (LabelMeta.degree -> InnerValLikeWithTs.withLong(countVal, version, schemaVer)) + edge.property(LabelMeta.count.name, countVal, version) } else { - val (props, _) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label) + val (props, endAt) = bytesToKeyValues(kv.value, 0, kv.value.length, schemaVer, label) props.foreach { case (k, v) => - allProps += (k -> InnerValLikeWithTs(v, version)) - } - } + if (k == LabelMeta.timestamp) tsVal = v.value.asInstanceOf[BigDecimal].longValue() - val _mergedProps = allProps.result() - val (mergedProps, tsInnerValLikeWithTs) = _mergedProps.get(LabelMeta.timestamp) match { - case None => - val tsInnerVal = InnerValLikeWithTs.withLong(version, version, schemaVer) - val mergedProps = _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer)) - (mergedProps, tsInnerVal) - case Some(tsInnerVal) => - (_mergedProps, tsInnerVal) + edge.property(k.name, v.value, version) + } } -// val mergedProps = -// if (_mergedProps.contains(LabelMeta.timestamp)) _mergedProps -// else _mergedProps + (LabelMeta.timestamp -> InnerValLikeWithTs.withLong(version, version, schemaVer)) - /** process tgtVertexId */ val tgtVertexId = - mergedProps.get(LabelMeta.to) match { - case None => tgtVertexIdRaw - case Some(vId) => TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) - } - - IndexEdge(Vertex(srcVertexId, version), Vertex(tgtVertexId, version), label, labelWithDir.dir, op, version, labelIdxSeq, mergedProps, tsInnerValOpt = Option(tsInnerValLikeWithTs.innerVal)) - + if (edge.checkProperty(LabelMeta.to.name)) { + val vId = edge.property(LabelMeta.to.name).asInstanceOf[S2Property[_]].innerValWithTs + TargetVertexId(HBaseType.DEFAULT_COL_ID, vId.innerVal) + } else tgtVertexIdRaw + + edge.tgtVertex = Vertex(tgtVertexId, version) + edge.op = op + edge.tsInnerValOpt = Option(InnerVal.withLong(tsVal, schemaVer)) + edge } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/6356573e/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala index 91b8db1..37aafcf 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala @@ -24,9 +24,9 @@ import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.storage.StorageDeserializable._ import org.apache.s2graph.core.storage.{CanSKeyValue, Deserializable, SKeyValue, StorageDeserializable} import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceAndTargetVertexIdPair, SourceVertexId} -import org.apache.s2graph.core.{Edge, SnapshotEdge, Vertex} +import org.apache.s2graph.core.{Graph, Edge, SnapshotEdge, Vertex} -class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { +class SnapshotEdgeDeserializable(graph: Graph) extends Deserializable[SnapshotEdge] { def statusCodeWithOp(byte: Byte): (Byte, Byte) = { val statusCode = byte >> 4 @@ -87,7 +87,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { val lockTs = Option(Bytes.toLong(kv.value, pos, 8)) val pendingEdge = - Edge(Vertex(srcVertexId, cellVersion), + graph.newEdge(Vertex(srcVertexId, cellVersion), Vertex(tgtVertexId, cellVersion), label, labelWithDir.dir, pendingEdgeOp, cellVersion, pendingEdgeProps.toMap, @@ -98,7 +98,7 @@ class SnapshotEdgeDeserializable extends Deserializable[SnapshotEdge] { (kvsMap, op, ts, statusCode, _pendingEdgeOpt, tsInnerVal) } - SnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), + graph.newSnapshotEdge(Vertex(srcVertexId, ts), Vertex(tgtVertexId, ts), label, labelWithDir.dir, op, cellVersion, props, statusCode = statusCode, pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal)) }