add S2EdgeBuilder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2b5df1dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2b5df1dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2b5df1dd Branch: refs/heads/master Commit: 2b5df1dd0ddc913dca92353b191e8982368f08c0 Parents: 7d08225 Author: DO YUNG YOON <[email protected]> Authored: Mon Nov 6 23:08:25 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Nov 7 00:50:54 2017 +0900 ---------------------------------------------------------------------- .../s2graph/core/GraphElementBuilder.scala | 4 +- .../org/apache/s2graph/core/QueryParam.scala | 21 +- .../scala/org/apache/s2graph/core/S2Edge.scala | 63 ++++- .../org/apache/s2graph/core/S2EdgeBuilder.scala | 103 ++++++++ .../org/apache/s2graph/core/S2EdgeLike.scala | 258 ++++--------------- .../apache/s2graph/core/storage/StorageIO.scala | 4 +- .../core/storage/serde/MutationHelper.scala | 6 +- 7 files changed, 234 insertions(+), 225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala index 21179aa..c9133b1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala @@ -279,10 +279,10 @@ class GraphElementBuilder(graph: S2Graph) { val edge = edgeWithScore.edge val copiedEdge = label.consistencyLevel match { case "strong" => - edge.copyEdge(op = GraphUtil.operations("delete"), + edge.builder.copyEdge(op = GraphUtil.operations("delete"), version = requestTs, propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) case _ => - edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) + edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) } val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala index e98ef37..3d0d076 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -185,7 +185,7 @@ case class EdgeTransformer(jsValue: JsValue) { replace(queryParam, fmt, fieldNames.flatMap(fieldName => toInnerValOpt(queryParam, edge, fieldName)), nextStepOpt) } } - } yield edge.updateTgtVertex(innerVal).copy(originalEdgeOpt = Option(edge)) + } yield edge.builder.updateTgtVertex(innerVal).copyOriginalEdgeOpt(Option(edge)) edges @@ -251,7 +251,26 @@ case class RankParam(keySeqAndWeights: Seq[(LabelMeta, Double)] = Seq((LabelMeta } bytes } + + def score(edge: S2EdgeLike): Double = { + if (keySeqAndWeights.size <= 0) 1.0f + else { + var sum: Double = 0 + + for ((labelMeta, w) <- keySeqAndWeights) { + if (edge.getPropsWithTs().containsKey(labelMeta.name)) { + val innerValWithTs = edge.getPropsWithTs().get(labelMeta.name) + val cost = try innerValWithTs.innerVal.toString.toDouble catch { + case e: Exception => 1.0 + } + sum += w * cost + } + } + sum + } + } } + object QueryParam { lazy val Empty = QueryParam(labelName = "") lazy val DefaultThreshold = Double.MinValue http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index 3529991..03678c8 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@ -40,6 +40,17 @@ import scala.concurrent.Await import scala.util.hashing.MurmurHash3 object SnapshotEdge { + def apply(e: S2EdgeLike): SnapshotEdge = { + val (smaller, larger) = (e.srcForVertex, e.tgtForVertex) + + val snapshotEdge = SnapshotEdge(e.innerGraph, smaller, larger, e.innerLabel, + GraphUtil.directions("out"), e.getOp(), e.getVersion(), e.getPropsWithTs(), + pendingEdgeOpt = e.getPendingEdgeOpt(), statusCode = e.getStatusCode(), lockTs = e.getLockTs(), tsInnerValOpt = e.getTsInnerValOpt()) + + snapshotEdge.property(LabelMeta.timestamp.name, e.ts, e.ts) + + snapshotEdge + } def copyFrom(e: SnapshotEdge): SnapshotEdge = { val copy = @@ -418,8 +429,8 @@ object EdgeMutate { } } -case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge], - edgesToInsert: List[IndexEdge] = List.empty[IndexEdge], +case class EdgeMutate(edgesToDelete: Seq[IndexEdge] = Nil, + edgesToInsert: Seq[IndexEdge] = Nil, newSnapshotEdge: Option[SnapshotEdge] = None) { def deepCopy: EdgeMutate = copy( @@ -799,5 +810,53 @@ object S2Edge { // def fromString(s: String): Option[Edge] = Graph.toEdge(s) + def getServiceColumn(vertex: S2VertexLike, defaultServiceColumn: ServiceColumn) = + if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column + + def srcForVertex(e: S2EdgeLike): S2VertexLike = { + val belongLabelIds = Seq(e.labelWithDir.labelId) + if (e.labelWithDir.dir == GraphUtil.directions("in")) { + val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn) + e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds) + } else { + val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn) + e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds) + } + } + def tgtForVertex(e: S2EdgeLike): S2VertexLike = { + val belongLabelIds = Seq(e.labelWithDir.labelId) + if (e.labelWithDir.dir == GraphUtil.directions("in")) { + val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn) + e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds) + } else { + val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn) + e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds) + } + } + + def updatePropsWithTs(e: S2EdgeLike, others: Props = S2Edge.EmptyProps): Props = { + val emptyProp = S2Edge.EmptyProps + + e.getPropsWithTs().forEach(new BiConsumer[String, S2Property[_]] { + override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) + }) + + others.forEach(new BiConsumer[String, S2Property[_]] { + override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) + }) + + emptyProp + } + + def propertyValue(e: S2EdgeLike, key: String): Option[InnerValLikeWithTs] = { + key match { + case "from" | "_from" => Option(InnerValLikeWithTs(e.srcVertex.innerId, e.ts)) + case "to" | "_to" => Option(InnerValLikeWithTs(e.tgtVertex.innerId, e.ts)) + case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(e.innerLabel.label, e.innerLabel.schemaVersion), e.ts)) + case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(e.direction, e.innerLabel.schemaVersion), e.ts)) + case _ => + e.innerLabel.metaPropsInvMap.get(key).map(labelMeta => e.propertyValueInner(labelMeta)) + } + } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala new file mode 100644 index 0000000..ea9598e --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala @@ -0,0 +1,103 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.S2Edge.State +import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} +import org.apache.s2graph.core.types.{InnerValLike, TargetVertexId, VertexId} +import org.apache.tinkerpop.gremlin.structure.Property + +import scala.collection.JavaConverters._ + +class S2EdgeBuilder(edge: S2EdgeLike) { + def srcForVertex = S2Edge.srcForVertex(edge) + + def tgtForVertex = S2Edge.tgtForVertex(edge) + + def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge + + def reverseDirEdge = copyEdge(dir = GraphUtil.toggleDir(edge.getDir)) + + def reverseSrcTgtEdge = copyEdge(srcVertex = edge.tgtVertex, tgtVertex = edge.srcVertex) + + def isDegree = edge.getPropsWithTs().containsKey(LabelMeta.degree.name) + + def propsPlusTsValid = edge.getPropsWithTs().asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava + + def labelOrders = LabelIndex.findByLabelIdAll(edge.labelWithDir.labelId) + + def edgesWithIndex = for (labelOrder <- labelOrders) yield { + IndexEdge(edge.innerGraph, edge.srcVertex, edge.tgtVertex, edge.innerLabel, edge.getDir(), edge.getOp(), + edge.getVersion(), labelOrder.seq, edge.getPropsWithTs(), tsInnerValOpt = edge.getTsInnerValOpt()) + } + + def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { + IndexEdge(edge.innerGraph, edge.srcVertex, edge.tgtVertex, edge.innerLabel, edge.getDir(), edge.getOp(), + edge.getVersion(), labelOrder.seq, propsPlusTsValid, tsInnerValOpt = edge.getTsInnerValOpt()) + } + + def relatedEdges: Seq[S2EdgeLike] = { + if (edge.labelWithDir.isDirected) { + val skipReverse = edge.innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false) + if (skipReverse) Seq(edge) else Seq(edge, duplicateEdge) + } else { + // val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) + // val base = copy(labelWithDir = outDir) + val base = copyEdge(dir = GraphUtil.directions("out")) + Seq(base, base.reverseSrcTgtEdge) + } + } + + def copyEdge(innerGraph: S2Graph = edge.innerGraph, + srcVertex: S2VertexLike = edge.srcVertex, + tgtVertex: S2VertexLike = edge.tgtVertex, + innerLabel: Label = edge.innerLabel, + dir: Int = edge.getDir(), + op: Byte = edge.getOp(), + version: Long = edge.getVersion(), + propsWithTs: State = S2Edge.propsToState(edge.getPropsWithTs()), + parentEdges: Seq[EdgeWithScore] = edge.getParentEdges(), + originalEdgeOpt: Option[S2EdgeLike] = edge.getOriginalEdgeOpt(), + pendingEdgeOpt: Option[S2EdgeLike] = edge.getPendingEdgeOpt(), + statusCode: Byte = edge.getStatusCode(), + lockTs: Option[Long] = edge.getLockTs(), + tsInnerValOpt: Option[InnerValLike] = edge.getTsInnerValOpt(), + ts: Long = edge.getTs()): S2EdgeLike = { + val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps, + parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) + S2Edge.fillPropsWithTs(edge, propsWithTs) + edge.propertyInner(LabelMeta.timestamp.name, ts, ts) + edge + } + + def copyEdgeWithState(state: State): S2EdgeLike = { + val newEdge = new S2Edge(edge.innerGraph, edge.srcVertex, edge.tgtVertex, edge.innerLabel, + edge.getDir(), edge.getOp(), edge.getVersion(), S2Edge.EmptyProps, + edge.getParentEdges(), edge.getOriginalEdgeOpt(), edge.getPendingEdgeOpt(), + edge.getStatusCode(), edge.getLockTs(), edge.getTsInnerValOpt()) + + S2Edge.fillPropsWithTs(newEdge, state) + newEdge + } + + def updateTgtVertex(id: InnerValLike): S2EdgeLike = { + val newId = TargetVertexId(edge.tgtVertex.id.column, id) + val newTgtVertex = edge.innerGraph.newVertex(newId, edge.tgtVertex.ts, edge.tgtVertex.props) + copyEdge(tgtVertex = newTgtVertex) + } + + def edgeId: EdgeId = { + val timestamp = if (edge.innerLabel.consistencyLevel == "strong") 0l else edge.ts + // EdgeId(srcVertex.innerId, tgtVertex.innerId, label(), "out", timestamp) + val (srcColumn, tgtColumn) = edge.innerLabel.srcTgtColumn(edge.getDir()) + if (edge.getDir() == GraphUtil.directions("out")) + EdgeId(VertexId(srcColumn, edge.srcVertex.id.innerId), VertexId(tgtColumn, edge.tgtVertex.id.innerId), edge.label(), "out", timestamp) + else + EdgeId(VertexId(tgtColumn, edge.tgtVertex.id.innerId), VertexId(srcColumn, edge.srcVertex.id.innerId), edge.label(), "out", timestamp) + } + + def propertyInner[V](key: String, value: V, ts: Long): Property[V] = { + val labelMeta = edge.innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge.")) + val newProp = new S2Property[V](edge, labelMeta, key, value, ts) + edge.getPropsWithTs().put(key, newProp) + newProp + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala index f823d60..9963be7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala @@ -17,6 +17,8 @@ import scala.collection.JavaConverters._ trait S2EdgeLike extends Edge with GraphElement { this: S2Edge => + val builder: S2EdgeBuilder = new S2EdgeBuilder(this) + val innerGraph: S2Graph val srcVertex: S2VertexLike var tgtVertex: S2VertexLike @@ -48,6 +50,14 @@ trait S2EdgeLike extends Edge with GraphElement { lazy val labelName = innerLabel.label lazy val direction = GraphUtil.fromDirection(dir) + def getTs(): Long = ts + def getOriginalEdgeOpt(): Option[S2EdgeLike] = originalEdgeOpt + def getParentEdges(): Seq[EdgeWithScore] = parentEdges + def getPendingEdgeOpt(): Option[S2EdgeLike] = pendingEdgeOpt + def getPropsWithTs(): Props = propsWithTs + def getLockTs(): Option[Long] = lockTs + def getStatusCode(): Byte = statusCode + def getDir(): Int = dir def setTgtVertex(v: S2VertexLike): Unit = tgtVertex = v def getOp(): Byte = op def setOp(newOp: Byte): Unit = op = newOp @@ -60,30 +70,10 @@ trait S2EdgeLike extends Edge with GraphElement { def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq) - def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = { - val emptyProp = S2Edge.EmptyProps - - propsWithTs.forEach(new BiConsumer[String, S2Property[_]] { - override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) - }) + def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = + S2Edge.updatePropsWithTs(this, others) - others.forEach(new BiConsumer[String, S2Property[_]] { - override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) - }) - - emptyProp - } - - def propertyValue(key: String): Option[InnerValLikeWithTs] = { - key match { - case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts)) - case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts)) - case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts)) - case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts)) - case _ => - innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta)) - } - } + def propertyValue(key: String): Option[InnerValLikeWithTs] = S2Edge.propertyValue(this, key) def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs = { // propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse() @@ -116,172 +106,55 @@ trait S2EdgeLike extends Edge with GraphElement { } } - lazy val properties = toProps() - - def props = propsWithTs.asScala.mapValues(_.innerVal) - - def relatedEdges = { - if (labelWithDir.isDirected) { - val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false) - if (skipReverse) List(this) else List(this, duplicateEdge) - } else { - // val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) - // val base = copy(labelWithDir = outDir) - val base = copy(dir = GraphUtil.directions("out")) - List(base, base.reverseSrcTgtEdge) - } - } + def relatedEdges = builder.relatedEdges - def srcForVertex = { - val belongLabelIds = Seq(labelWithDir.labelId) - if (labelWithDir.dir == GraphUtil.directions("in")) { - val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn) - innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) - } else { - val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn) - innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) - } - } + def srcForVertex = builder.srcForVertex - def tgtForVertex = { - val belongLabelIds = Seq(labelWithDir.labelId) - if (labelWithDir.dir == GraphUtil.directions("in")) { - val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn) - innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) - } else { - val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn) - innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) - } - } + def tgtForVertex = builder.tgtForVertex - def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge + def duplicateEdge = builder.duplicateEdge // def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled) - def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir)) + def reverseDirEdge = builder.reverseDirEdge - def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex) + def reverseSrcTgtEdge = builder.reverseSrcTgtEdge - def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId) + def isDegree = builder.isDegree - def isDegree = propsWithTs.containsKey(LabelMeta.degree.name) + def edgesWithIndex = builder.edgesWithIndex - def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava - - def edgesWithIndex = for (labelOrder <- labelOrders) yield { - IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt) - } - - def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { - IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt) - } + def edgesWithIndexValid = builder.edgesWithIndexValid /** force direction as out on invertedEdge */ - def toSnapshotEdge: SnapshotEdge = { - val (smaller, larger) = (srcForVertex, tgtForVertex) - - // val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out")) - - propertyInner(LabelMeta.timestamp.name, ts, ts) - val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel, - GraphUtil.directions("out"), op, version, propsWithTs, - pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) - ret - } - - def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(), - "label" -> innerLabel.label, "service" -> innerLabel.serviceName) - - def propsWithName = - for { - (_, v) <- propsWithTs.asScala - meta = v.labelMeta - jsValue <- innerValToJsValue(v.innerVal, meta.dataType) - } yield meta.name -> jsValue - - def updateTgtVertex(id: InnerValLike) = { - val newId = TargetVertexId(tgtVertex.id.column, id) - val newTgtVertex = innerGraph.newVertex(newId, tgtVertex.ts, tgtVertex.props) - S2Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) - } - - def updateOriginalEdgeOpt(newOriginalEdgeOpt: S2EdgeLike): S2EdgeLike = { - S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs, parentEdges, - Option(newOriginalEdgeOpt), pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) - } - - def rank(r: RankParam): Double = - if (r.keySeqAndWeights.size <= 0) 1.0f - else { - var sum: Double = 0 - - for ((labelMeta, w) <- r.keySeqAndWeights) { - if (propsWithTs.containsKey(labelMeta.name)) { - val innerValWithTs = propsWithTs.get(labelMeta.name) - val cost = try innerValWithTs.innerVal.toString.toDouble catch { - case e: Exception => - logger.error("toInnerval failed in rank", e) - 1.0 - } - sum += w * cost - } - } - sum - } + def toSnapshotEdge: SnapshotEdge = SnapshotEdge.apply(this) def checkProperty(key: String): Boolean = propsWithTs.containsKey(key) - def copyEdge(srcVertex: S2VertexLike = srcVertex, - tgtVertex: S2VertexLike = tgtVertex, - innerLabel: Label = innerLabel, - dir: Int = dir, - op: Byte = op, - version: Long = version, - propsWithTs: State = S2Edge.propsToState(this.propsWithTs), - parentEdges: Seq[EdgeWithScore] = parentEdges, - originalEdgeOpt: Option[S2EdgeLike] = originalEdgeOpt, - pendingEdgeOpt: Option[S2EdgeLike] = pendingEdgeOpt, - statusCode: Byte = statusCode, - lockTs: Option[Long] = lockTs, - tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt, - ts: Long = ts): S2EdgeLike = { - val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps, - parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) - S2Edge.fillPropsWithTs(edge, propsWithTs) - edge.propertyInner(LabelMeta.timestamp.name, ts, ts) - edge - } - - def copyEdgeWithState(state: State, ts: Long): S2EdgeLike = { - val newEdge = copy(propsWithTs = S2Edge.EmptyProps) - S2Edge.fillPropsWithTs(newEdge, state) - newEdge.propertyInner(LabelMeta.timestamp.name, ts, ts) - newEdge - } - def copyEdgeWithState(state: State): S2EdgeLike = { - val newEdge = copy(propsWithTs = S2Edge.EmptyProps) - S2Edge.fillPropsWithTs(newEdge, state) - newEdge + builder.copyEdgeWithState(state) } def copyOp(newOp: Byte): S2EdgeLike = { - copy(op = newOp) + builder.copyEdge(op = newOp) } def copyVersion(newVersion: Long): S2EdgeLike = { - copy(version = newVersion) + builder.copyEdge(version = newVersion) } def copyParentEdges(parents: Seq[EdgeWithScore]): S2EdgeLike = { - copy(parentEdges = parents) + builder.copyEdge(parentEdges = parents) } + def copyOriginalEdgeOpt(newOriginalEdgeOpt: Option[S2EdgeLike]): S2EdgeLike = + builder.copyEdge(originalEdgeOpt = newOriginalEdgeOpt) + def copyStatusCode(newStatusCode: Byte): S2EdgeLike = { - copy(statusCode = newStatusCode) + builder.copyEdge(statusCode = newStatusCode) } def copyLockTs(newLockTs: Option[Long]): S2EdgeLike = { - copy(lockTs = newLockTs) + builder.copyEdge(lockTs = newLockTs) } def vertices(direction: Direction): util.Iterator[structure.Vertex] = { @@ -289,19 +162,9 @@ trait S2EdgeLike extends Edge with GraphElement { direction match { case Direction.OUT => - // val newVertexId = this.direction match { - // case "out" => VertexId(innerLabel.srcColumn, srcVertex.innerId) - // case "in" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId) - // case _ => throw new IllegalArgumentException("direction can only be out/in.") - // } val newVertexId = edgeId.srcVertexId innerGraph.getVertex(newVertexId).foreach(arr.add) case Direction.IN => - // val newVertexId = this.direction match { - // case "in" => VertexId(innerLabel.srcColumn, srcVertex.innerId) - // case "out" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId) - // case _ => throw new IllegalArgumentException("direction can only be out/in.") - // } val newVertexId = edgeId.tgtVertexId innerGraph.getVertex(newVertexId).foreach(arr.add) case _ => @@ -331,39 +194,33 @@ trait S2EdgeLike extends Edge with GraphElement { } def property[V](key: String): Property[V] = { - val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new java.lang.IllegalStateException(s"$key is not configured on Edge.")) if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]] - else { - Property.empty() - // val default = innerLabel.metaPropsDefaultMapInner(labelMeta) - // propertyInner(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]] - } + else Property.empty() } - // just for tinkerpop: save to storage, do not use for internal def property[V](key: String, value: V): Property[V] = { S2Property.assertValidProp(key, value) val v = propertyInner(key, value, System.currentTimeMillis()) - val newTs = props.get(LabelMeta.timestamp.name).map(_.toString.toLong + 1).getOrElse(System.currentTimeMillis()) - val newEdge = this.copyEdge(ts = newTs) + val newTs = + if (propsWithTs.containsKey(LabelMeta.timestamp.name)) + propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString().toLong + 1 + else + System.currentTimeMillis() + + val newEdge = builder.copyEdge(ts = newTs) Await.result(innerGraph.mutateEdges(Seq(newEdge), withWait = true), innerGraph.WaitTimeout) v } - def propertyInner[V](key: String, value: V, ts: Long): Property[V] = { - val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge.")) - val newProp = new S2Property[V](this, labelMeta, key, value, ts) - propsWithTs.put(key, newProp) - newProp - } + def propertyInner[V](key: String, value: V, ts: Long): Property[V] = builder.propertyInner(key, value, ts) def remove(): Unit = { if (graph.features().edge().supportsRemoveEdges()) { val requestTs = System.currentTimeMillis() - val edgeToDelete = this.copyEdge(op = GraphUtil.operations("delete"), + val edgeToDelete = builder.copyEdge(op = GraphUtil.operations("delete"), version = version + S2Edge.incrementVersion, propsWithTs = S2Edge.propsToState(updatePropsWithTs()), ts = requestTs) // should we delete related edges also? val future = innerGraph.mutateEdges(Seq(edgeToDelete), withWait = true) @@ -376,43 +233,14 @@ trait S2EdgeLike extends Edge with GraphElement { def graph(): Graph = innerGraph - lazy val edgeId: EdgeId = { - // NOTE: xxxForVertex makes direction to be "out" - val timestamp = if (this.innerLabel.consistencyLevel == "strong") 0l else ts - // EdgeId(srcVertex.innerId, tgtVertex.innerId, label(), "out", timestamp) - val (srcColumn, tgtColumn) = innerLabel.srcTgtColumn(dir) - if (direction == "out") - EdgeId(VertexId(srcColumn, srcVertex.id.innerId), VertexId(tgtColumn, tgtVertex.id.innerId), label(), "out", timestamp) - else - EdgeId(VertexId(tgtColumn, tgtVertex.id.innerId), VertexId(srcColumn, srcVertex.id.innerId), label(), "out", timestamp) - } + lazy val edgeId: EdgeId = builder.edgeId def id(): AnyRef = edgeId def label(): String = innerLabel.label - private def toProps(): Map[String, Any] = { - for { - (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner - } yield { - // labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value - val value = - if (propsWithTs.containsKey(labelMeta.name)) { - propsWithTs.get(labelMeta.name).value - } else { - defaultVal.innerVal.value - } - labelMeta.name -> value - } - } - def toLogString: String = { // val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj()) List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t") } - - private def getServiceColumn(vertex: S2VertexLike, defaultServiceColumn: ServiceColumn) = - if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column - - } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala index 28b15d0..1b9c94b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageIO.scala @@ -121,7 +121,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) { if where == WhereParser.success || where.filter(edge) convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt) } yield { - val score = edge.rank(queryParam.rank) + val score = queryParam.rank.score(edge) EdgeWithScore(convertedEdge, score, label) } StepResult(edgeWithScores = edgeWithScores, grouped = Nil, degreeEdges = degreeEdges, cursors = lastCursor) @@ -134,7 +134,7 @@ class StorageIO(val graph: S2Graph, val serDe: StorageSerDe) { if where == WhereParser.success || where.filter(edge) convertedEdge <- if (isDefaultTransformer) Seq(edge) else convertEdges(queryParam, edge, nextStepOpt) } yield { - val edgeScore = edge.rank(queryParam.rank) + val edgeScore = queryParam.rank.score(edge) val score = queryParam.scorePropagateOp match { case "plus" => edgeScore + prevScore case "divide" => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2b5df1dd/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala index e2621af..79c9dc3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala @@ -30,17 +30,17 @@ class MutationHelper(storage: Storage) { val edge = edgeWithScore.edge val score = edgeWithScore.score - val edgeSnapshot = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) + val edgeSnapshot = edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - val edgeForward = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) + val edgeForward = edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ io.buildIncrementsAsync(indexEdge, -1L) } /* reverted direction */ - val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) + val edgeRevert = edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ io.buildIncrementsAsync(indexEdge, -1L)
