create S2EdgeLike.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/7413aad4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/7413aad4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/7413aad4 Branch: refs/heads/master Commit: 7413aad429b8f91ff223583ecb7e85688408c76d Parents: f717023 Author: DO YUNG YOON <[email protected]> Authored: Fri Nov 3 13:51:27 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Nov 3 13:51:27 2017 +0900 ---------------------------------------------------------------------- .../scala/org/apache/s2graph/core/S2Edge.scala | 398 ++----------------- .../org/apache/s2graph/core/S2EdgeLike.scala | 379 ++++++++++++++++++ 2 files changed, 406 insertions(+), 371 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7413aad4/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index 641db74..97abd26 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@ -301,167 +301,25 @@ case class IndexEdge(graph: S2Graph, } } -case class S2Edge(innerGraph: S2Graph, - srcVertex: S2VertexLike, - var tgtVertex: S2VertexLike, - innerLabel: Label, - dir: Int, - var op: Byte = GraphUtil.defaultOpByte, - var version: Long = System.currentTimeMillis(), - propsWithTs: Props = S2Edge.EmptyProps, - parentEdges: Seq[EdgeWithScore] = Nil, - originalEdgeOpt: Option[S2Edge] = None, - pendingEdgeOpt: Option[S2Edge] = None, - statusCode: Byte = 0, - lockTs: Option[Long] = None, - var tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with Edge { - - lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir) - lazy val schemaVer = innerLabel.schemaVersion - lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match { - case b: BigDecimal => b.longValue() - case l: Long => l - case i: Int => i.toLong - case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].") - } - - lazy val operation = GraphUtil.fromOp(op) - lazy val tsInnerVal = tsInnerValOpt.get.value - lazy val srcId = srcVertex.innerIdVal - lazy val tgtId = tgtVertex.innerIdVal - lazy val labelName = innerLabel.label - lazy val direction = GraphUtil.fromDirection(dir) - - def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs) - - def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq) - - def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = { - val emptyProp = S2Edge.EmptyProps - - propsWithTs.forEach(new BiConsumer[String, S2Property[_]] { - override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) - }) - - others.forEach(new BiConsumer[String, S2Property[_]] { - override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) - }) - - emptyProp - } - - def propertyValue(key: String): Option[InnerValLikeWithTs] = { - key match { - case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts)) - case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts)) - case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts)) - case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts)) - case _ => - innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta)) - } - } - - def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs= { - // propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse() - if (propsWithTs.containsKey(labelMeta.name)) { - propsWithTs.get(labelMeta.name).innerValWithTs - } else { - innerLabel.metaPropsDefaultMapInner(labelMeta) - } - } - - def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { - val labelMetas = for { - key <- keys - labelMeta <- innerLabel.metaPropsInvMap.get(key) - } yield labelMeta - - propertyValuesInner(labelMetas) - } - - def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { - if (labelMetas.isEmpty) { - innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) => - labelMeta -> propertyValueInner(labelMeta) - } - } else { - // This is important since timestamp is required for all edges. - (LabelMeta.timestamp +: labelMetas).map { labelMeta => - labelMeta -> propertyValueInner(labelMeta) - }.toMap - } - } - -// if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.") +case class S2Edge(override val innerGraph: S2Graph, + override val srcVertex: S2VertexLike, + override var tgtVertex: S2VertexLike, + override val innerLabel: Label, + override val dir: Int, + var op: Byte = GraphUtil.defaultOpByte, + var version: Long = System.currentTimeMillis(), + override val propsWithTs: Props = S2Edge.EmptyProps, + override val parentEdges: Seq[EdgeWithScore] = Nil, + override val originalEdgeOpt: Option[S2Edge] = None, + override val pendingEdgeOpt: Option[S2Edge] = None, + override val statusCode: Byte = 0, + override val lockTs: Option[Long] = None, + var tsInnerValOpt: Option[InnerValLike] = None) extends S2EdgeLike with GraphElement { + + // if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.") // assert(propsWithTs.contains(LabelMeta.timeStampSeq)) - lazy val properties = toProps() - - def props = propsWithTs.asScala.mapValues(_.innerVal) - - - private def toProps(): Map[String, Any] = { - for { - (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner - } yield { - // labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value - val value = - if (propsWithTs.containsKey(labelMeta.name)) { - propsWithTs.get(labelMeta.name).value - } else { - defaultVal.innerVal.value - } - labelMeta.name -> value - } - } - - def relatedEdges = { - if (labelWithDir.isDirected) { - val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false) - if (skipReverse) List(this) else List(this, duplicateEdge) - } else { -// val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) -// val base = copy(labelWithDir = outDir) - val base = copy(dir = GraphUtil.directions("out")) - List(base, base.reverseSrcTgtEdge) - } - } - - // def relatedEdges = List(this) - - private def getServiceColumn(vertex: S2VertexLike, defaultServiceColumn: ServiceColumn) = - if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column - - def srcForVertex = { - val belongLabelIds = Seq(labelWithDir.labelId) - if (labelWithDir.dir == GraphUtil.directions("in")) { - val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn) - innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) - } else { - val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn) - innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) - } - } - - def tgtForVertex = { - val belongLabelIds = Seq(labelWithDir.labelId) - if (labelWithDir.dir == GraphUtil.directions("in")) { - val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn) - innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) - } else { - val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn) - innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) - } - } - - def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge - -// def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled) - def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir)) - - def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex) - def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId) override def serviceName = innerLabel.serviceName @@ -471,74 +329,14 @@ case class S2Edge(innerGraph: S2Graph, override def isAsync = innerLabel.isAsync - def isDegree = propsWithTs.containsKey(LabelMeta.degree.name) + // def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { + // case Some(_) => props + // case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) + // } -// def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { -// case Some(_) => props -// case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) -// } - - def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava - - def edgesWithIndex = for (labelOrder <- labelOrders) yield { - IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt) - } - - def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { - IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt) - } - - /** force direction as out on invertedEdge */ - def toSnapshotEdge: SnapshotEdge = { - val (smaller, larger) = (srcForVertex, tgtForVertex) - -// val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out")) - - propertyInner(LabelMeta.timestamp.name, ts, ts) - val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel, - GraphUtil.directions("out"), op, version, propsWithTs, - pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) - ret - } - - def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(), - "label" -> innerLabel.label, "service" -> innerLabel.serviceName) - - def propsWithName = - for { - (_, v) <- propsWithTs.asScala - meta = v.labelMeta - jsValue <- innerValToJsValue(v.innerVal, meta.dataType) - } yield meta.name -> jsValue - - - def updateTgtVertex(id: InnerValLike) = { - val newId = TargetVertexId(tgtVertex.id.column, id) - val newTgtVertex = innerGraph.newVertex(newId, tgtVertex.ts, tgtVertex.props) - S2Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) - } - - def rank(r: RankParam): Double = - if (r.keySeqAndWeights.size <= 0) 1.0f - else { - var sum: Double = 0 - - for ((labelMeta, w) <- r.keySeqAndWeights) { - if (propsWithTs.containsKey(labelMeta.name)) { - val innerValWithTs = propsWithTs.get(labelMeta.name) - val cost = try innerValWithTs.innerVal.toString.toDouble catch { - case e: Exception => - logger.error("toInnerval failed in rank", e) - 1.0 - } - sum += w * cost - } - } - sum - } def toLogString: String = { -// val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj()) + // val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj()) List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t") } @@ -551,12 +349,12 @@ case class S2Edge(innerGraph: S2Graph, case _ => false } -// override def toString(): String = { -// Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> labelName, "direction" -> direction, -// "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString, -// "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, "statusCode" -> statusCode, "lockTs" -> lockTs -// ).toString -// } + // override def toString(): String = { + // Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> labelName, "direction" -> direction, + // "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString, + // "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, "statusCode" -> statusCode, "lockTs" -> lockTs + // ).toString + // } override def toString: String = { // E + L_BRACKET + edge.id() + R_BRACKET + L_BRACKET + edge.outVertex().id() + DASH + edge.label() + ARROW + edge.inVertex().id() + R_BRACKET; @@ -564,148 +362,6 @@ case class S2Edge(innerGraph: S2Graph, // s"e[${srcForVertex.id}-${innerLabel.label}->${tgtForVertex.id}]" } - def checkProperty(key: String): Boolean = propsWithTs.containsKey(key) - - def copyEdge(srcVertex: S2VertexLike = srcVertex, - tgtVertex: S2VertexLike = tgtVertex, - innerLabel: Label = innerLabel, - dir: Int = dir, - op: Byte = op, - version: Long = version, - propsWithTs: State = S2Edge.propsToState(this.propsWithTs), - parentEdges: Seq[EdgeWithScore] = parentEdges, - originalEdgeOpt: Option[S2Edge] = originalEdgeOpt, - pendingEdgeOpt: Option[S2Edge] = pendingEdgeOpt, - statusCode: Byte = statusCode, - lockTs: Option[Long] = lockTs, - tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt, - ts: Long = ts): S2Edge = { - val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps, - parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) - S2Edge.fillPropsWithTs(edge, propsWithTs) - edge.propertyInner(LabelMeta.timestamp.name, ts, ts) - edge - } - - def copyEdgeWithState(state: State, ts: Long): S2Edge = { - val newEdge = copy(propsWithTs = S2Edge.EmptyProps) - S2Edge.fillPropsWithTs(newEdge, state) - newEdge.propertyInner(LabelMeta.timestamp.name, ts, ts) - newEdge - } - - def copyEdgeWithState(state: State): S2Edge = { - val newEdge = copy(propsWithTs = S2Edge.EmptyProps) - S2Edge.fillPropsWithTs(newEdge, state) - newEdge - } - - override def vertices(direction: Direction): util.Iterator[structure.Vertex] = { - val arr = new util.ArrayList[Vertex]() - - direction match { - case Direction.OUT => -// val newVertexId = this.direction match { -// case "out" => VertexId(innerLabel.srcColumn, srcVertex.innerId) -// case "in" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId) -// case _ => throw new IllegalArgumentException("direction can only be out/in.") -// } - val newVertexId = edgeId.srcVertexId - innerGraph.getVertex(newVertexId).foreach(arr.add) - case Direction.IN => -// val newVertexId = this.direction match { -// case "in" => VertexId(innerLabel.srcColumn, srcVertex.innerId) -// case "out" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId) -// case _ => throw new IllegalArgumentException("direction can only be out/in.") -// } - val newVertexId = edgeId.tgtVertexId - innerGraph.getVertex(newVertexId).foreach(arr.add) - case _ => - import scala.collection.JavaConversions._ - vertices(Direction.OUT).foreach(arr.add) - vertices(Direction.IN).foreach(arr.add) - } - arr.iterator() - } - - override def properties[V](keys: String*): util.Iterator[Property[V]] = { - val ls = new util.ArrayList[Property[V]]() - if (keys.isEmpty) { - propsWithTs.forEach(new BiConsumer[String, S2Property[_]] { - override def accept(key: String, property: S2Property[_]): Unit = { - if (!LabelMeta.reservedMetaNamesSet(key) && property.isPresent && key != T.id.name) - ls.add(property.asInstanceOf[S2Property[V]]) - } - }) - } else { - keys.foreach { key => - val prop = property[V](key) - if (prop.isPresent) ls.add(prop) - } - } - ls.iterator() - } - - override def property[V](key: String): Property[V] = { - val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new java.lang.IllegalStateException(s"$key is not configured on Edge.")) - if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]] - else { - Property.empty() -// val default = innerLabel.metaPropsDefaultMapInner(labelMeta) -// propertyInner(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]] - } - } - - // just for tinkerpop: save to storage, do not use for internal - override def property[V](key: String, value: V): Property[V] = { - S2Property.assertValidProp(key, value) - - val v = propertyInner(key, value, System.currentTimeMillis()) - val newTs = props.get(LabelMeta.timestamp.name).map(_.toString.toLong + 1).getOrElse(System.currentTimeMillis()) - val newEdge = this.copyEdge(ts = newTs) - - Await.result(innerGraph.mutateEdges(Seq(newEdge), withWait = true), innerGraph.WaitTimeout) - - v - } - - def propertyInner[V](key: String, value: V, ts: Long): Property[V] = { - val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge.")) - val newProp = new S2Property[V](this, labelMeta, key, value, ts) - propsWithTs.put(key, newProp) - newProp - } - - override def remove(): Unit = { - if (graph.features().edge().supportsRemoveEdges()) { - val requestTs = System.currentTimeMillis() - val edgeToDelete = this.copyEdge(op = GraphUtil.operations("delete"), - version = version + S2Edge.incrementVersion, propsWithTs = S2Edge.propsToState(updatePropsWithTs()), ts = requestTs) - // should we delete related edges also? - val future = innerGraph.mutateEdges(Seq(edgeToDelete), withWait = true) - val mutateSuccess = Await.result(future, innerGraph.WaitTimeout) - if (!mutateSuccess.forall(_.isSuccess)) throw new RuntimeException("edge remove failed.") - } else { - throw Edge.Exceptions.edgeRemovalNotSupported() - } - } - - override def graph(): Graph = innerGraph - - lazy val edgeId: EdgeId = { - // NOTE: xxxForVertex makes direction to be "out" - val timestamp = if (this.innerLabel.consistencyLevel == "strong") 0l else ts -// EdgeId(srcVertex.innerId, tgtVertex.innerId, label(), "out", timestamp) - val (srcColumn, tgtColumn) = innerLabel.srcTgtColumn(dir) - if (direction == "out") - EdgeId(VertexId(srcColumn, srcVertex.id.innerId), VertexId(tgtColumn, tgtVertex.id.innerId), label(), "out", timestamp) - else - EdgeId(VertexId(tgtColumn, tgtVertex.id.innerId), VertexId(srcColumn, srcVertex.id.innerId), label(), "out", timestamp) - } - - override def id(): AnyRef = edgeId - - override def label(): String = innerLabel.label } object EdgeId { val EdgeIdDelimiter = "," http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/7413aad4/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala new file mode 100644 index 0000000..04ed7ab --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala @@ -0,0 +1,379 @@ +package org.apache.s2graph.core +import java.util +import java.util.function.BiConsumer + +import org.apache.s2graph.core.JSONParser.innerValToJsValue +import org.apache.s2graph.core.S2Edge.{Props, State} +import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta, ServiceColumn} +import org.apache.s2graph.core.types._ +import org.apache.s2graph.core.utils.logger +import org.apache.tinkerpop.gremlin.structure +import org.apache.tinkerpop.gremlin.structure.{Direction, Edge, Graph, Property, T, Vertex} +import play.api.libs.json.Json + +import scala.concurrent.Await +import scala.collection.JavaConverters._ + +trait S2EdgeLike extends Edge { + this: S2Edge => + + val innerGraph: S2Graph + val srcVertex: S2VertexLike + var tgtVertex: S2VertexLike + val innerLabel: Label + val dir: Int + +// var op: Byte = GraphUtil.defaultOpByte +// var version: Long = System.currentTimeMillis() + val propsWithTs: Props = S2Edge.EmptyProps + val parentEdges: Seq[EdgeWithScore] = Nil + val originalEdgeOpt: Option[S2Edge] = None + val pendingEdgeOpt: Option[S2Edge] = None + val statusCode: Byte = 0 + val lockTs: Option[Long] = None +// var tsInnerValOpt: Option[InnerValLike] = None + + lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir) + lazy val schemaVer = innerLabel.schemaVersion + lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match { + case b: BigDecimal => b.longValue() + case l: Long => l + case i: Int => i.toLong + case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].") + } + lazy val operation = GraphUtil.fromOp(op) + lazy val tsInnerVal = tsInnerValOpt.get.value + lazy val srcId = srcVertex.innerIdVal + lazy val tgtId = tgtVertex.innerIdVal + lazy val labelName = innerLabel.label + lazy val direction = GraphUtil.fromDirection(dir) + + def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs) + + def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq) + + def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = { + val emptyProp = S2Edge.EmptyProps + + propsWithTs.forEach(new BiConsumer[String, S2Property[_]] { + override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) + }) + + others.forEach(new BiConsumer[String, S2Property[_]] { + override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) + }) + + emptyProp + } + + def propertyValue(key: String): Option[InnerValLikeWithTs] = { + key match { + case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts)) + case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts)) + case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts)) + case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts)) + case _ => + innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta)) + } + } + + def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs = { + // propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse() + if (propsWithTs.containsKey(labelMeta.name)) { + propsWithTs.get(labelMeta.name).innerValWithTs + } else { + innerLabel.metaPropsDefaultMapInner(labelMeta) + } + } + + def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { + val labelMetas = for { + key <- keys + labelMeta <- innerLabel.metaPropsInvMap.get(key) + } yield labelMeta + + propertyValuesInner(labelMetas) + } + + def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { + if (labelMetas.isEmpty) { + innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) => + labelMeta -> propertyValueInner(labelMeta) + } + } else { + // This is important since timestamp is required for all edges. + (LabelMeta.timestamp +: labelMetas).map { labelMeta => + labelMeta -> propertyValueInner(labelMeta) + }.toMap + } + } + + lazy val properties = toProps() + + def props = propsWithTs.asScala.mapValues(_.innerVal) + + def relatedEdges = { + if (labelWithDir.isDirected) { + val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false) + if (skipReverse) List(this) else List(this, duplicateEdge) + } else { + // val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) + // val base = copy(labelWithDir = outDir) + val base = copy(dir = GraphUtil.directions("out")) + List(base, base.reverseSrcTgtEdge) + } + } + + def srcForVertex = { + val belongLabelIds = Seq(labelWithDir.labelId) + if (labelWithDir.dir == GraphUtil.directions("in")) { + val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn) + innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) + } else { + val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn) + innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) + } + } + + def tgtForVertex = { + val belongLabelIds = Seq(labelWithDir.labelId) + if (labelWithDir.dir == GraphUtil.directions("in")) { + val srcColumn = getServiceColumn(srcVertex, innerLabel.srcColumn) + innerGraph.newVertex(VertexId(srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) + } else { + val tgtColumn = getServiceColumn(tgtVertex, innerLabel.tgtColumn) + innerGraph.newVertex(VertexId(tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) + } + } + + def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge + + // def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled) + def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir)) + + def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex) + + def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId) + + def isDegree = propsWithTs.containsKey(LabelMeta.degree.name) + + def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava + + def edgesWithIndex = for (labelOrder <- labelOrders) yield { + IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt) + } + + def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { + IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt) + } + + /** force direction as out on invertedEdge */ + def toSnapshotEdge: SnapshotEdge = { + val (smaller, larger) = (srcForVertex, tgtForVertex) + + // val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out")) + + propertyInner(LabelMeta.timestamp.name, ts, ts) + val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel, + GraphUtil.directions("out"), op, version, propsWithTs, + pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) + ret + } + + def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(), + "label" -> innerLabel.label, "service" -> innerLabel.serviceName) + + def propsWithName = + for { + (_, v) <- propsWithTs.asScala + meta = v.labelMeta + jsValue <- innerValToJsValue(v.innerVal, meta.dataType) + } yield meta.name -> jsValue + + def updateTgtVertex(id: InnerValLike) = { + val newId = TargetVertexId(tgtVertex.id.column, id) + val newTgtVertex = innerGraph.newVertex(newId, tgtVertex.ts, tgtVertex.props) + S2Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) + } + + def rank(r: RankParam): Double = + if (r.keySeqAndWeights.size <= 0) 1.0f + else { + var sum: Double = 0 + + for ((labelMeta, w) <- r.keySeqAndWeights) { + if (propsWithTs.containsKey(labelMeta.name)) { + val innerValWithTs = propsWithTs.get(labelMeta.name) + val cost = try innerValWithTs.innerVal.toString.toDouble catch { + case e: Exception => + logger.error("toInnerval failed in rank", e) + 1.0 + } + sum += w * cost + } + } + sum + } + + def checkProperty(key: String): Boolean = propsWithTs.containsKey(key) + + def copyEdge(srcVertex: S2VertexLike = srcVertex, + tgtVertex: S2VertexLike = tgtVertex, + innerLabel: Label = innerLabel, + dir: Int = dir, + op: Byte = op, + version: Long = version, + propsWithTs: State = S2Edge.propsToState(this.propsWithTs), + parentEdges: Seq[EdgeWithScore] = parentEdges, + originalEdgeOpt: Option[S2Edge] = originalEdgeOpt, + pendingEdgeOpt: Option[S2Edge] = pendingEdgeOpt, + statusCode: Byte = statusCode, + lockTs: Option[Long] = lockTs, + tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt, + ts: Long = ts): S2Edge = { + val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps, + parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) + S2Edge.fillPropsWithTs(edge, propsWithTs) + edge.propertyInner(LabelMeta.timestamp.name, ts, ts) + edge + } + + def copyEdgeWithState(state: State, ts: Long): S2Edge = { + val newEdge = copy(propsWithTs = S2Edge.EmptyProps) + S2Edge.fillPropsWithTs(newEdge, state) + newEdge.propertyInner(LabelMeta.timestamp.name, ts, ts) + newEdge + } + + def copyEdgeWithState(state: State): S2Edge = { + val newEdge = copy(propsWithTs = S2Edge.EmptyProps) + S2Edge.fillPropsWithTs(newEdge, state) + newEdge + } + + def vertices(direction: Direction): util.Iterator[structure.Vertex] = { + val arr = new util.ArrayList[Vertex]() + + direction match { + case Direction.OUT => + // val newVertexId = this.direction match { + // case "out" => VertexId(innerLabel.srcColumn, srcVertex.innerId) + // case "in" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId) + // case _ => throw new IllegalArgumentException("direction can only be out/in.") + // } + val newVertexId = edgeId.srcVertexId + innerGraph.getVertex(newVertexId).foreach(arr.add) + case Direction.IN => + // val newVertexId = this.direction match { + // case "in" => VertexId(innerLabel.srcColumn, srcVertex.innerId) + // case "out" => VertexId(innerLabel.tgtColumn, tgtVertex.innerId) + // case _ => throw new IllegalArgumentException("direction can only be out/in.") + // } + val newVertexId = edgeId.tgtVertexId + innerGraph.getVertex(newVertexId).foreach(arr.add) + case _ => + import scala.collection.JavaConversions._ + vertices(Direction.OUT).foreach(arr.add) + vertices(Direction.IN).foreach(arr.add) + } + arr.iterator() + } + + def properties[V](keys: String*): util.Iterator[Property[V]] = { + val ls = new util.ArrayList[Property[V]]() + if (keys.isEmpty) { + propsWithTs.forEach(new BiConsumer[String, S2Property[_]] { + override def accept(key: String, property: S2Property[_]): Unit = { + if (!LabelMeta.reservedMetaNamesSet(key) && property.isPresent && key != T.id.name) + ls.add(property.asInstanceOf[S2Property[V]]) + } + }) + } else { + keys.foreach { key => + val prop = property[V](key) + if (prop.isPresent) ls.add(prop) + } + } + ls.iterator() + } + + def property[V](key: String): Property[V] = { + val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new java.lang.IllegalStateException(s"$key is not configured on Edge.")) + if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]] + else { + Property.empty() + // val default = innerLabel.metaPropsDefaultMapInner(labelMeta) + // propertyInner(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]] + } + } + + // just for tinkerpop: save to storage, do not use for internal + def property[V](key: String, value: V): Property[V] = { + S2Property.assertValidProp(key, value) + + val v = propertyInner(key, value, System.currentTimeMillis()) + val newTs = props.get(LabelMeta.timestamp.name).map(_.toString.toLong + 1).getOrElse(System.currentTimeMillis()) + val newEdge = this.copyEdge(ts = newTs) + + Await.result(innerGraph.mutateEdges(Seq(newEdge), withWait = true), innerGraph.WaitTimeout) + + v + } + + def propertyInner[V](key: String, value: V, ts: Long): Property[V] = { + val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge.")) + val newProp = new S2Property[V](this, labelMeta, key, value, ts) + propsWithTs.put(key, newProp) + newProp + } + + def remove(): Unit = { + if (graph.features().edge().supportsRemoveEdges()) { + val requestTs = System.currentTimeMillis() + val edgeToDelete = this.copyEdge(op = GraphUtil.operations("delete"), + version = version + S2Edge.incrementVersion, propsWithTs = S2Edge.propsToState(updatePropsWithTs()), ts = requestTs) + // should we delete related edges also? + val future = innerGraph.mutateEdges(Seq(edgeToDelete), withWait = true) + val mutateSuccess = Await.result(future, innerGraph.WaitTimeout) + if (!mutateSuccess.forall(_.isSuccess)) throw new RuntimeException("edge remove failed.") + } else { + throw Edge.Exceptions.edgeRemovalNotSupported() + } + } + + def graph(): Graph = innerGraph + + lazy val edgeId: EdgeId = { + // NOTE: xxxForVertex makes direction to be "out" + val timestamp = if (this.innerLabel.consistencyLevel == "strong") 0l else ts + // EdgeId(srcVertex.innerId, tgtVertex.innerId, label(), "out", timestamp) + val (srcColumn, tgtColumn) = innerLabel.srcTgtColumn(dir) + if (direction == "out") + EdgeId(VertexId(srcColumn, srcVertex.id.innerId), VertexId(tgtColumn, tgtVertex.id.innerId), label(), "out", timestamp) + else + EdgeId(VertexId(tgtColumn, tgtVertex.id.innerId), VertexId(srcColumn, srcVertex.id.innerId), label(), "out", timestamp) + } + + def id(): AnyRef = edgeId + + def label(): String = innerLabel.label + + private def toProps(): Map[String, Any] = { + for { + (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner + } yield { + // labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value + val value = + if (propsWithTs.containsKey(labelMeta.name)) { + propsWithTs.get(labelMeta.name).value + } else { + defaultVal.innerVal.value + } + labelMeta.name -> value + } + } + + private def getServiceColumn(vertex: S2VertexLike, defaultServiceColumn: ServiceColumn) = + if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column + +}
