[S2GRAPH-131]: Add actual implementation on interfaces from TinkerPop3 structure package. - Change core.Edge/Vertex/Graph to core.S2Edge/S2Vertex/S2Graph. - Implement base interfaces for tinkerpop3 structure package.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/e8c0bf20 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/e8c0bf20 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/e8c0bf20 Branch: refs/heads/master Commit: e8c0bf20b517a2e9a752df63c156fc265b7365b6 Parents: 189bc41 Author: DO YUNG YOON <[email protected]> Authored: Mon Nov 28 12:09:52 2016 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Mon Nov 28 12:09:52 2016 +0900 ---------------------------------------------------------------------- .../loader/subscriber/GraphSubscriber.scala | 14 +- .../loader/subscriber/TransferToHFile.scala | 6 +- .../s2graph/loader/subscriber/WalLogStat.scala | 2 +- .../loader/subscriber/WalLogToHDFS.scala | 2 +- .../scala/org/apache/s2graph/core/Edge.scala | 956 ------------ .../scala/org/apache/s2graph/core/Graph.scala | 1238 ---------------- .../org/apache/s2graph/core/Management.scala | 2 +- .../org/apache/s2graph/core/PostProcess.scala | 8 +- .../org/apache/s2graph/core/QueryParam.scala | 23 +- .../org/apache/s2graph/core/QueryResult.scala | 10 +- .../scala/org/apache/s2graph/core/S2Edge.scala | 969 ++++++++++++ .../scala/org/apache/s2graph/core/S2Graph.scala | 1397 ++++++++++++++++++ .../org/apache/s2graph/core/S2Property.scala | 7 +- .../org/apache/s2graph/core/S2Vertex.scala | 221 +++ .../apache/s2graph/core/S2VertexProperty.scala | 31 +- .../scala/org/apache/s2graph/core/Vertex.scala | 132 -- .../apache/s2graph/core/mysqls/ColumnMeta.scala | 12 +- .../apache/s2graph/core/mysqls/LabelMeta.scala | 1 - .../s2graph/core/mysqls/ServiceColumn.scala | 15 +- .../s2graph/core/parsers/WhereParser.scala | 34 +- .../s2graph/core/rest/RequestParser.scala | 10 +- .../apache/s2graph/core/rest/RestHandler.scala | 4 +- .../apache/s2graph/core/storage/Storage.scala | 96 +- .../core/storage/hbase/AsynchbaseStorage.scala | 10 +- .../tall/IndexEdgeDeserializable.scala | 10 +- .../wide/IndexEdgeDeserializable.scala | 8 +- .../tall/SnapshotEdgeDeserializable.scala | 4 +- .../wide/SnapshotEdgeDeserializable.scala | 4 +- .../serde/vertex/VertexDeserializable.scala | 23 +- .../serde/vertex/VertexSerializable.scala | 13 +- .../s2graph/core/types/InnerValLike.scala | 17 + .../apache/s2graph/core/types/VertexId.scala | 2 +- .../org/apache/s2graph/core/EdgeTest.scala | 210 --- .../core/Integrate/IntegrateCommon.scala | 6 +- .../Integrate/tinkerpop/S2S2GraphTest.scala | 130 ++ .../org/apache/s2graph/core/S2EdgeTest.scala | 210 +++ .../s2graph/core/TestCommonWithModels.scala | 4 +- .../core/benchmark/BenchmarkCommon.scala | 2 +- .../s2graph/core/parsers/WhereParserTest.scala | 2 +- .../s2graph/counter/helper/CounterAdmin.scala | 4 +- .../counter/core/RankingCounterSpec.scala | 4 +- .../loader/core/CounterEtlFunctions.scala | 14 +- .../loader/core/CounterEtlFunctionsSpec.scala | 4 +- .../org/apache/s2graph/rest/netty/Server.scala | 4 +- .../apache/s2graph/rest/play/Bootstrap.scala | 6 +- .../s2graph/rest/play/actors/QueueActor.scala | 8 +- .../rest/play/controllers/EdgeController.scala | 12 +- .../play/controllers/VertexController.scala | 4 +- 48 files changed, 3179 insertions(+), 2726 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala index b25bc84..2352cdf 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala @@ -48,7 +48,7 @@ object GraphConfig { if (kafkaBrokerList.isEmpty) Map("hbase.zookeeper.quorum" -> zkQuorum, "db.default.url" -> database, "cache.ttl.seconds" -> cacheTTL) else Map("hbase.zookeeper.quorum" -> zkQuorum, "db.default.url" -> database, "kafka.metadata.broker.list" -> kafkaBrokers, "cache.ttl.seconds" -> cacheTTL) - ConfigFactory.parseMap(newConf).withFallback(Graph.DefaultConfig) + ConfigFactory.parseMap(newConf).withFallback(S2Graph.DefaultConfig) } } @@ -64,7 +64,7 @@ object GraphSubscriberHelper extends WithKafka { private val sleepPeriod = 10000 private val maxTryNum = 10 - var g: Graph = null + var g: S2Graph = null var management: Management = null val conns = new scala.collection.mutable.HashMap[String, Connection]() @@ -80,7 +80,7 @@ object GraphSubscriberHelper extends WithKafka { if (g == null) { val ec = ExecutionContext.Implicits.global - g = new Graph(config)(ec) + g = new S2Graph(config)(ec) management = new Management(g) } } @@ -107,12 +107,12 @@ object GraphSubscriberHelper extends WithKafka { (for (msg <- msgs) yield { statFunc("total", 1) g.toGraphElement(msg, labelMapping) match { - case Some(e) if e.isInstanceOf[Edge] => + case Some(e) if e.isInstanceOf[S2Edge] => statFunc("EdgeParseOk", 1) - e.asInstanceOf[Edge] - case Some(v) if v.isInstanceOf[Vertex] => + e.asInstanceOf[S2Edge] + case Some(v) if v.isInstanceOf[S2Vertex] => statFunc("VertexParseOk", 1) - v.asInstanceOf[Vertex] + v.asInstanceOf[S2Vertex] case Some(x) => throw new RuntimeException(s">>>>> GraphSubscriber.toGraphElements: parsing failed. ${x.serviceName}") case None => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala index d1da319..dce085e 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala @@ -57,7 +57,7 @@ object TransferToHFile extends SparkApp { /** build key values */ case class DegreeKey(vertexIdStr: String, labelName: String, direction: String) - private def insertBulkForLoaderAsync(edge: Edge, createRelEdges: Boolean = true): List[PutRequest] = { + private def insertBulkForLoaderAsync(edge: S2Edge, createRelEdges: Boolean = true): List[PutRequest] = { val relEdges = if (createRelEdges) edge.relatedEdges else List(edge) buildPutRequests(edge.toSnapshotEdge) ++ relEdges.toList.flatMap { e => e.edgesWithIndex.flatMap { indexEdge => buildPutRequests(indexEdge) } @@ -125,8 +125,8 @@ object TransferToHFile extends SparkApp { def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = { val kvs = for { s <- strs - element <- GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[Edge] - edge = element.asInstanceOf[Edge] + element <- GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq if element.isInstanceOf[S2Edge] + edge = element.asInstanceOf[S2Edge] putRequest <- insertBulkForLoaderAsync(edge, autoEdgeCreate) } yield { val p = putRequest http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala index 5b68754..40f936d 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala @@ -21,7 +21,7 @@ package org.apache.s2graph.loader.subscriber import kafka.producer.KeyedMessage import kafka.serializer.StringDecoder -import org.apache.s2graph.core.Graph +import org.apache.s2graph.core.S2Graph$ import org.apache.s2graph.spark.spark.{WithKafka, SparkApp} import org.apache.spark.streaming.Durations._ import org.apache.spark.streaming.kafka.HasOffsetRanges http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala index 0f69dc7..23c3cda 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala @@ -22,7 +22,7 @@ package org.apache.s2graph.loader.subscriber import java.text.SimpleDateFormat import java.util.Date import kafka.serializer.StringDecoder -import org.apache.s2graph.core.Graph +import org.apache.s2graph.core.S2Graph$ import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.streaming.Durations._ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/e8c0bf20/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala deleted file mode 100644 index f10b4db..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/Edge.scala +++ /dev/null @@ -1,956 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core - -import java.util -import java.util.function.BiConsumer - -import org.apache.s2graph.core.Edge.{Props, State} -import org.apache.s2graph.core.GraphExceptions.LabelNotExistException -import org.apache.s2graph.core.JSONParser._ -import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} -import org.apache.s2graph.core.types._ -import org.apache.s2graph.core.utils.logger -import org.apache.tinkerpop.gremlin.structure -import org.apache.tinkerpop.gremlin.structure.{Direction, Edge => TpEdge, Graph => TpGraph, Property} -import play.api.libs.json.{JsNumber, JsObject, Json} - -import scala.collection.JavaConverters._ -import scala.collection.mutable.{Map => MutableMap} -import scala.util.hashing.MurmurHash3 - -case class SnapshotEdge(graph: Graph, - srcVertex: Vertex, - tgtVertex: Vertex, - label: Label, - dir: Int, - op: Byte, - version: Long, - private val propsWithTs: Props, - pendingEdgeOpt: Option[Edge], - statusCode: Byte = 0, - lockTs: Option[Long], - tsInnerValOpt: Option[InnerValLike] = None) { - lazy val direction = GraphUtil.fromDirection(dir) - lazy val operation = GraphUtil.fromOp(op) - lazy val edge = toEdge - lazy val labelWithDir = LabelWithDirection(label.id.get, dir) -// if (!propsWithTs.contains(LabelMeta.timestamp.name)) throw new Exception("Timestamp is required.") - -// val label = Label.findById(labelWithDir.labelId) - lazy val schemaVer = label.schemaVersion - lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString().toLong - - def propsToKeyValuesWithTs = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq) - - def allPropsDeleted = Edge.allPropsDeleted(propsWithTs) - - def toEdge: Edge = { - Edge(graph, srcVertex, tgtVertex, label, dir, op, - version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt, - statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) - } - - def propsWithName = (for { - (_, v) <- propsWithTs.asScala - meta = v.labelMeta - jsValue <- innerValToJsValue(v.innerVal, meta.dataType) - } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version)) - - // only for debug - def toLogString() = { - List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t") - } - - def property[V](key: String, value: V, ts: Long): S2Property[V] = { - val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge.")) - val newProps = new S2Property(edge, labelMeta, key, value, ts) - propsWithTs.put(key, newProps) - newProps - } - override def hashCode(): Int = { - MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId) - } - - override def equals(other: Any): Boolean = other match { - case e: SnapshotEdge => - srcVertex.innerId == e.srcVertex.innerId && - tgtVertex.innerId == e.tgtVertex.innerId && - labelWithDir == e.labelWithDir && op == e.op && version == e.version && - pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode - case _ => false - } - - override def toString(): String = { - Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> direction, - "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString, - "statusCode" -> statusCode, "lockTs" -> lockTs).toString - } -} - -case class IndexEdge(graph: Graph, - srcVertex: Vertex, - tgtVertex: Vertex, - label: Label, - dir: Int, - op: Byte, - version: Long, - labelIndexSeq: Byte, - private val propsWithTs: Props, - tsInnerValOpt: Option[InnerValLike] = None) { -// if (!props.contains(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") - // assert(props.contains(LabelMeta.timeStampSeq)) - lazy val direction = GraphUtil.fromDirection(dir) - lazy val operation = GraphUtil.fromOp(op) - lazy val edge = toEdge - lazy val labelWithDir = LabelWithDirection(label.id.get, dir) - - lazy val isInEdge = labelWithDir.dir == GraphUtil.directions("in") - lazy val isOutEdge = !isInEdge - - lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString.toLong - lazy val degreeEdge = propsWithTs.containsKey(LabelMeta.degree.name) - - lazy val schemaVer = label.schemaVersion - lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get - lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta => - val innerVal = toInnerVal(meta.defaultValue, meta.dataType, schemaVer) - meta.seq -> innerVal - }.toMap - - lazy val labelIndexMetaSeqs = labelIndex.sortKeyTypes - - /** TODO: make sure call of this class fill props as this assumes */ - lazy val orders = for (meta <- labelIndexMetaSeqs) yield { - propsWithTs.get(meta.name) match { - case null => - - /** - * TODO: agly hack - * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once - */ - val v = meta match { - case LabelMeta.timestamp=> InnerVal.withLong(version, schemaVer) - case LabelMeta.to => toEdge.tgtVertex.innerId - case LabelMeta.from => toEdge.srcVertex.innerId - // for now, it does not make sense to build index on srcVertex.innerId since all edges have same data. - // throw new RuntimeException("_from on indexProps is not supported") - case _ => toInnerVal(meta.defaultValue, meta.dataType, schemaVer) - } - - meta -> v - case v => meta -> v.innerVal - } - } - - lazy val ordersKeyMap = orders.map { case (meta, _) => meta.name }.toSet - lazy val metas = for ((meta, v) <- propsWithTs.asScala if !ordersKeyMap.contains(meta)) yield v.labelMeta -> v.innerVal - -// lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } - - //TODO: - // lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList - - lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length - - def propsWithName = for { - (_, v) <- propsWithTs.asScala - meta = v.labelMeta - jsValue <- innerValToJsValue(v.innerVal, meta.dataType) - } yield meta.name -> jsValue - - - def toEdge: Edge = Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) - - // only for debug - def toLogString() = { - List(version, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)).mkString("\t") - } - - def property(key: String): Option[InnerValLikeWithTs] = { - label.metaPropsInvMap.get(key).map(labelMeta => property(labelMeta)) - } - - def property(labelMeta: LabelMeta): InnerValLikeWithTs = { -// propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(label.metaPropsDefaultMapInner(labelMeta)) - if (propsWithTs.containsKey(labelMeta.name)) { - propsWithTs.get(labelMeta.name).innerValWithTs - } else { - label.metaPropsDefaultMapInner(labelMeta) - } - } - - def updatePropsWithTs(others: Props = Edge.EmptyProps): Props = { - if (others.isEmpty) propsWithTs - else { - val iter = others.entrySet().iterator() - while (iter.hasNext) { - val e = iter.next() - propsWithTs.put(e.getKey, e.getValue) - } - propsWithTs - } - } - - def property[V](key: String, value: V, ts: Long): S2Property[V] = { - val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge.")) - val newProps = new S2Property(edge, labelMeta, key, value, ts) - propsWithTs.put(key, newProps) - newProps - } - override def hashCode(): Int = { - MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId + "," + labelIndexSeq) - } - - override def equals(other: Any): Boolean = other match { - case e: IndexEdge => - srcVertex.innerId == e.srcVertex.innerId && - tgtVertex.innerId == e.tgtVertex.innerId && - labelWithDir == e.labelWithDir && op == e.op && version == e.version && - labelIndexSeq == e.labelIndexSeq - case _ => false - } - - override def toString(): String = { - Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> dir, - "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString - ).toString - } -} - -case class Edge(innerGraph: Graph, - srcVertex: Vertex, - var tgtVertex: Vertex, - innerLabel: Label, - dir: Int, - var op: Byte = GraphUtil.defaultOpByte, - var version: Long = System.currentTimeMillis(), - propsWithTs: Props = Edge.EmptyProps, - parentEdges: Seq[EdgeWithScore] = Nil, - originalEdgeOpt: Option[Edge] = None, - pendingEdgeOpt: Option[Edge] = None, - statusCode: Byte = 0, - lockTs: Option[Long] = None, - var tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with TpEdge { - - lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir) - lazy val schemaVer = innerLabel.schemaVersion - lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match { - case b: BigDecimal => b.longValue() - case l: Long => l - case i: Int => i.toLong - case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].") - } - - lazy val operation = GraphUtil.fromOp(op) - lazy val tsInnerVal = tsInnerValOpt.get.value - lazy val srcId = srcVertex.innerIdVal - lazy val tgtId = tgtVertex.innerIdVal - lazy val labelName = innerLabel.label - lazy val direction = GraphUtil.fromDirection(dir) - - def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs) - - def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq) - - def updatePropsWithTs(others: Props = Edge.EmptyProps): Props = { - val emptyProp = Edge.EmptyProps - - propsWithTs.forEach(new BiConsumer[String, S2Property[_]] { - override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) - }) - - others.forEach(new BiConsumer[String, S2Property[_]] { - override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) - }) - - emptyProp - } - - def propertyValue(key: String): Option[InnerValLikeWithTs] = { - key match { - case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts)) - case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts)) - case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts)) - case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts)) - case _ => - innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta)) - } - } - - def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs= { - // propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse() - if (propsWithTs.containsKey(labelMeta.name)) { - propsWithTs.get(labelMeta.name).innerValWithTs - } else { - innerLabel.metaPropsDefaultMapInner(labelMeta) - } - } - - def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { - val labelMetas = for { - key <- keys - labelMeta <- innerLabel.metaPropsInvMap.get(key) - } yield labelMeta - - propertyValuesInner(labelMetas) - } - - def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { - if (labelMetas.isEmpty) { - innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) => - labelMeta -> propertyValueInner(labelMeta) - } - } else { - // This is important since timestamp is required for all edges. - (LabelMeta.timestamp +: labelMetas).map { labelMeta => - labelMeta -> propertyValueInner(labelMeta) - }.toMap - } - } - -// if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.") - // assert(propsWithTs.contains(LabelMeta.timeStampSeq)) - - lazy val properties = toProps() - - def props = propsWithTs.asScala.mapValues(_.innerVal) - - - private def toProps(): Map[String, Any] = { - for { - (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner - } yield { - // labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value - val value = - if (propsWithTs.containsKey(labelMeta.name)) { - propsWithTs.get(labelMeta.name).value - } else { - defaultVal.innerVal.value - } - labelMeta.name -> value - } - } - - def relatedEdges = { - if (labelWithDir.isDirected) { - val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false) - if (skipReverse) List(this) else List(this, duplicateEdge) - } else { -// val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) -// val base = copy(labelWithDir = outDir) - val base = copy(dir = GraphUtil.directions("out")) - List(base, base.reverseSrcTgtEdge) - } - } - - // def relatedEdges = List(this) - - def srcForVertex = { - val belongLabelIds = Seq(labelWithDir.labelId) - if (labelWithDir.dir == GraphUtil.directions("in")) { - innerGraph.newVertex(VertexId(innerLabel.tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) - } else { - innerGraph.newVertex(VertexId(innerLabel.srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) - } - } - - def tgtForVertex = { - val belongLabelIds = Seq(labelWithDir.labelId) - if (labelWithDir.dir == GraphUtil.directions("in")) { - innerGraph.newVertex(VertexId(innerLabel.srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) - } else { - innerGraph.newVertex(VertexId(innerLabel.tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) - } - } - - def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge - -// def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled) - def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir)) - - def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex) - - def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId) - - override def serviceName = innerLabel.serviceName - - override def queueKey = Seq(ts.toString, tgtVertex.serviceName).mkString("|") - - override def queuePartitionKey = Seq(srcVertex.innerId, tgtVertex.innerId).mkString("|") - - override def isAsync = innerLabel.isAsync - - def isDegree = propsWithTs.containsKey(LabelMeta.degree.name) - -// def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { -// case Some(_) => props -// case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) -// } - - def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava - - def edgesWithIndex = for (labelOrder <- labelOrders) yield { - IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt) - } - - def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { - IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt) - } - - /** force direction as out on invertedEdge */ - def toSnapshotEdge: SnapshotEdge = { - val (smaller, larger) = (srcForVertex, tgtForVertex) - -// val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out")) - - property(LabelMeta.timestamp.name, ts, ts) - val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel, - GraphUtil.directions("out"), op, version, propsWithTs, - pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) - ret - } - - def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(), - "label" -> innerLabel.label, "service" -> innerLabel.serviceName) - - def propsWithName = - for { - (_, v) <- propsWithTs.asScala - meta = v.labelMeta - jsValue <- innerValToJsValue(v.innerVal, meta.dataType) - } yield meta.name -> jsValue - - - def updateTgtVertex(id: InnerValLike) = { - val newId = TargetVertexId(tgtVertex.id.column, id) - val newTgtVertex = innerGraph.newVertex(newId, tgtVertex.ts, tgtVertex.props) - Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) - } - - def rank(r: RankParam): Double = - if (r.keySeqAndWeights.size <= 0) 1.0f - else { - var sum: Double = 0 - - for ((labelMeta, w) <- r.keySeqAndWeights) { - if (propsWithTs.containsKey(labelMeta.name)) { - val innerValWithTs = propsWithTs.get(labelMeta.name) - val cost = try innerValWithTs.innerVal.toString.toDouble catch { - case e: Exception => - logger.error("toInnerval failed in rank", e) - 1.0 - } - sum += w * cost - } - } - sum - } - - def toLogString: String = { - val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj()) - List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, allPropsWithName).mkString("\t") - } - - override def hashCode(): Int = { - MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId) - } - - override def equals(other: Any): Boolean = other match { - case e: Edge => - srcVertex.innerId == e.srcVertex.innerId && - tgtVertex.innerId == e.tgtVertex.innerId && - labelWithDir == e.labelWithDir && op == e.op && version == e.version && - pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode && - parentEdges == e.parentEdges && originalEdgeOpt == originalEdgeOpt - case _ => false - } - - override def toString(): String = { - Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> labelName, "direction" -> direction, - "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString, - "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, "statusCode" -> statusCode, "lockTs" -> lockTs - ).toString - } - - def checkProperty(key: String): Boolean = propsWithTs.containsKey(key) - - def copyEdge(srcVertex: Vertex = srcVertex, - tgtVertex: Vertex = tgtVertex, - innerLabel: Label = innerLabel, - dir: Int = dir, - op: Byte = op, - version: Long = version, - propsWithTs: State = Edge.propsToState(this.propsWithTs), - parentEdges: Seq[EdgeWithScore] = parentEdges, - originalEdgeOpt: Option[Edge] = originalEdgeOpt, - pendingEdgeOpt: Option[Edge] = pendingEdgeOpt, - statusCode: Byte = statusCode, - lockTs: Option[Long] = lockTs, - tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt, - ts: Long = ts): Edge = { - val edge = new Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, Edge.EmptyProps, - parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) - Edge.fillPropsWithTs(edge, propsWithTs) - edge.property(LabelMeta.timestamp.name, ts, ts) - edge - } - - def copyEdgeWithState(state: State, ts: Long): Edge = { - val newEdge = copy(propsWithTs = Edge.EmptyProps) - Edge.fillPropsWithTs(newEdge, state) - newEdge.property(LabelMeta.timestamp.name, ts, ts) - newEdge - } - - def copyEdgeWithState(state: State): Edge = { - val newEdge = copy(propsWithTs = Edge.EmptyProps) - Edge.fillPropsWithTs(newEdge, state) - newEdge - } - - override def vertices(direction: Direction): util.Iterator[structure.Vertex] = ??? - - override def properties[V](strings: String*): util.Iterator[Property[V]] = ??? - - override def property[V](key: String): Property[V] = { - val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge.")) - if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]] - else { - val default = innerLabel.metaPropsDefaultMapInner(labelMeta) - property(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]] - } - } - - override def property[V](key: String, value: V): Property[V] = { - property(key, value, System.currentTimeMillis()) - } - - def property[V](key: String, value: V, ts: Long): Property[V] = { - val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge.")) - val newProp = new S2Property[V](this, labelMeta, key, value, ts) - propsWithTs.put(key, newProp) - newProp - } - - override def remove(): Unit = {} - - override def graph(): TpGraph = innerGraph - - override def id(): AnyRef = (srcVertex.innerId, labelWithDir, tgtVertex.innerId) - - override def label(): String = innerLabel.label -} - - -case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge], - edgesToInsert: List[IndexEdge] = List.empty[IndexEdge], - newSnapshotEdge: Option[SnapshotEdge] = None) { - - def toLogString: String = { - val l = (0 until 50).map(_ => "-").mkString("") - val deletes = s"deletes: ${edgesToDelete.map(e => e.toLogString).mkString("\n")}" - val inserts = s"inserts: ${edgesToInsert.map(e => e.toLogString).mkString("\n")}" - val updates = s"snapshot: ${newSnapshotEdge.map(e => e.toLogString).mkString("\n")}" - - List("\n", l, deletes, inserts, updates, l, "\n").mkString("\n") - } -} - -object Edge { - val incrementVersion = 1L - val minTsVal = 0L - - /** now version information is required also **/ - type Props = java.util.Map[String, S2Property[_]] - type State = Map[LabelMeta, InnerValLikeWithTs] - type PropsPairWithTs = (State, State, Long, String) - type MergeState = PropsPairWithTs => (State, Boolean) - type UpdateFunc = (Option[Edge], Edge, MergeState) - - def EmptyProps = new java.util.HashMap[String, S2Property[_]] - def EmptyState = Map.empty[LabelMeta, InnerValLikeWithTs] - def sameProps(base: Props, other: Props): Boolean = { - if (base.size != other.size) false - else { - var ret = true - val iter = base.entrySet().iterator() - while (iter.hasNext) { - val e = iter.next() - if (!other.containsKey(e.getKey)) ret = false - else if (e.getValue != other.get(e.getKey)) ret = false - else { - - } - } - val otherIter = other.entrySet().iterator() - while (otherIter.hasNext) { - val e = otherIter.next() - if (!base.containsKey(e.getKey)) ret = false - else if (e.getValue != base.get(e.getKey)) ret = false - else { - - } - } - ret - } -// base.sameElements(other) - } - def fillPropsWithTs(snapshotEdge: SnapshotEdge, state: State): Unit = { - state.foreach { case (k, v) => snapshotEdge.property(k.name, v.innerVal.value, v.ts) } - } - def fillPropsWithTs(indexEdge: IndexEdge, state: State): Unit = { - state.foreach { case (k, v) => indexEdge.property(k.name, v.innerVal.value, v.ts) } - } - def fillPropsWithTs(edge: Edge, state: State): Unit = { - state.foreach { case (k, v) => edge.property(k.name, v.innerVal.value, v.ts) } - } - - def propsToState(props: Props): State = { - props.asScala.map { case (k, v) => - v.labelMeta -> v.innerValWithTs - }.toMap - } - - def stateToProps(edge: Edge, state: State): Props = { - state.foreach { case (k, v) => - edge.property(k.name, v.innerVal.value, v.ts) - } - edge.propsWithTs - } - - def allPropsDeleted(props: Map[LabelMeta, InnerValLikeWithTs]): Boolean = - if (!props.contains(LabelMeta.lastDeletedAt)) false - else { - val lastDeletedAt = props.get(LabelMeta.lastDeletedAt).get.ts - val propsWithoutLastDeletedAt = props - LabelMeta.lastDeletedAt - - propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt } - } - - def allPropsDeleted(props: Props): Boolean = - if (!props.containsKey(LabelMeta.lastDeletedAt.name)) false - else { - val lastDeletedAt = props.get(LabelMeta.lastDeletedAt.name).ts - props.remove(LabelMeta.lastDeletedAt.name) -// val propsWithoutLastDeletedAt = props -// -// propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt } - var ret = true - val iter = props.entrySet().iterator() - while (iter.hasNext) { - val e = iter.next() - if (e.getValue.ts > lastDeletedAt) ret = false - } - ret - } - - def buildDeleteBulk(invertedEdge: Option[Edge], requestEdge: Edge): (Edge, EdgeMutate) = { - // assert(invertedEdge.isEmpty) - // assert(requestEdge.op == GraphUtil.operations("delete")) - - val edgesToDelete = requestEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } - val edgeInverted = Option(requestEdge.toSnapshotEdge) - - (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, newSnapshotEdge = edgeInverted)) - } - - def buildOperation(invertedEdge: Option[Edge], requestEdges: Seq[Edge]): (Edge, EdgeMutate) = { - // logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}") - // logger.debug(s"requestEdge: ${requestEdge.toStringRaw}") - val oldPropsWithTs = - if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs] - else propsToState(invertedEdge.get.propsWithTs) - - val funcs = requestEdges.map { edge => - if (edge.op == GraphUtil.operations("insert")) { - edge.innerLabel.consistencyLevel match { - case "strong" => Edge.mergeUpsert _ - case _ => Edge.mergeInsertBulk _ - } - } else if (edge.op == GraphUtil.operations("insertBulk")) { - Edge.mergeInsertBulk _ - } else if (edge.op == GraphUtil.operations("delete")) { - edge.innerLabel.consistencyLevel match { - case "strong" => Edge.mergeDelete _ - case _ => throw new RuntimeException("not supported") - } - } - else if (edge.op == GraphUtil.operations("update")) Edge.mergeUpdate _ - else if (edge.op == GraphUtil.operations("increment")) Edge.mergeIncrement _ - else throw new RuntimeException(s"not supported operation on edge: $edge") - } - - val oldTs = invertedEdge.map(_.ts).getOrElse(minTsVal) - val requestWithFuncs = requestEdges.zip(funcs).filter(oldTs != _._1.ts).sortBy(_._1.ts) - - if (requestWithFuncs.isEmpty) { - (requestEdges.head, EdgeMutate()) - } else { - val requestEdge = requestWithFuncs.last._1 - var prevPropsWithTs = oldPropsWithTs - - for { - (requestEdge, func) <- requestWithFuncs - } { - val (_newPropsWithTs, _) = func(prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer) - prevPropsWithTs = _newPropsWithTs - // logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n") - } - val requestTs = requestEdge.ts - /** version should be monotoniously increasing so our RPC mutation should be applied safely */ - val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs) - val maxTs = prevPropsWithTs.map(_._2.ts).max - val newTs = if (maxTs > requestTs) maxTs else requestTs - val propsWithTs = prevPropsWithTs ++ - Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.innerLabel.schemaVersion), newTs)) - - val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) - - // logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}") - // logger.error(s"$propsWithTs") - val newEdge = requestEdge.copy(propsWithTs = EmptyProps) - fillPropsWithTs(newEdge, propsWithTs) - (newEdge, edgeMutate) - } - } - - def filterOutWithLabelOption(ls: Seq[IndexEdge]): Seq[IndexEdge] = ls.filter { ie => - ie.labelIndex.dir match { - case None => - // both direction use same indices that is defined when label creation. - true - case Some(dir) => - if (dir != ie.dir) { - // current labelIndex's direction is different with indexEdge's direction so don't touch - false - } else { - ie.labelIndex.writeOption.map { option => - val hashValueOpt = ie.orders.find { case (k, v) => k == LabelMeta.fromHash }.map{ case (k, v) => v.value.toString.toLong } - option.sample(ie, hashValueOpt) - }.getOrElse(true) - } - } - } - - def buildMutation(snapshotEdgeOpt: Option[Edge], - requestEdge: Edge, - newVersion: Long, - oldPropsWithTs: Map[LabelMeta, InnerValLikeWithTs], - newPropsWithTs: Map[LabelMeta, InnerValLikeWithTs]): EdgeMutate = { - - if (oldPropsWithTs == newPropsWithTs) { - // all requests should be dropped. so empty mutation. - EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = None) - } else { - val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAtSeq) - val newOp = snapshotEdgeOpt match { - case None => requestEdge.op - case Some(old) => - val oldMaxTs = old.propsWithTs.asScala.map(_._2.ts).max - if (oldMaxTs > requestEdge.ts) old.op - else requestEdge.op - } - - val newSnapshotEdge = requestEdge.copy(op = newOp, version = newVersion).copyEdgeWithState(newPropsWithTs) - - val newSnapshotEdgeOpt = Option(newSnapshotEdge.toSnapshotEdge) - // delete request must always update snapshot. - if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.contains(LabelMeta.lastDeletedAt)) { - // no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt. - EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt) - } else { - val edgesToDelete = snapshotEdgeOpt match { - case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") => - snapshotEdge.copy(op = GraphUtil.defaultOpByte) - .relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) } - case _ => Nil - } - - val edgesToInsert = - if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil - else { - val newEdge = requestEdge.copy( - version = newVersion, - propsWithTs = Edge.EmptyProps, - op = GraphUtil.defaultOpByte - ) - newPropsWithTs.foreach { case (k, v) => newEdge.property(k.name, v.innerVal.value, v.ts) } - - newEdge.relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) } - } - - - EdgeMutate(edgesToDelete = edgesToDelete, - edgesToInsert = edgesToInsert, - newSnapshotEdge = newSnapshotEdgeOpt) - } - } - } - - def mergeUpsert(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { - var shouldReplace = false - val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs - val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal) - val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { - propsWithTs.get(k) match { - case Some(newValWithTs) => - assert(oldValWithTs.ts >= lastDeletedAt) - val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs - else { - shouldReplace = true - newValWithTs - } - Some(k -> v) - - case None => - assert(oldValWithTs.ts >= lastDeletedAt) - if (oldValWithTs.ts >= requestTs || k.seq < 0) Some(k -> oldValWithTs) - else { - shouldReplace = true - None - } - } - } - val existInNew = - for { - (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt - } yield { - shouldReplace = true - Some(k -> newValWithTs) - } - - ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace) - } - - def mergeUpdate(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { - var shouldReplace = false - val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs - val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal) - val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { - propsWithTs.get(k) match { - case Some(newValWithTs) => - assert(oldValWithTs.ts >= lastDeletedAt) - val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs - else { - shouldReplace = true - newValWithTs - } - Some(k -> v) - case None => - // important: update need to merge previous valid values. - assert(oldValWithTs.ts >= lastDeletedAt) - Some(k -> oldValWithTs) - } - } - val existInNew = for { - (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt - } yield { - shouldReplace = true - Some(k -> newValWithTs) - } - - ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace) - } - - def mergeIncrement(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { - var shouldReplace = false - val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs - val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal) - val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { - propsWithTs.get(k) match { - case Some(newValWithTs) => - if (k == LabelMeta.timestamp) { - val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs - else { - shouldReplace = true - newValWithTs - } - Some(k -> v) - } else { - if (oldValWithTs.ts >= newValWithTs.ts) { - Some(k -> oldValWithTs) - } else { - assert(oldValWithTs.ts < newValWithTs.ts && oldValWithTs.ts >= lastDeletedAt) - shouldReplace = true - // incr(t0), incr(t2), d(t1) => deleted - Some(k -> InnerValLikeWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, oldValWithTs.ts)) - } - } - - case None => - assert(oldValWithTs.ts >= lastDeletedAt) - Some(k -> oldValWithTs) - // if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) else None - } - } - val existInNew = for { - (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt - } yield { - shouldReplace = true - Some(k -> newValWithTs) - } - - ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace) - } - - def mergeDelete(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { - var shouldReplace = false - val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs - val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt) match { - case Some(prevDeletedAt) => - if (prevDeletedAt.ts >= requestTs) prevDeletedAt.ts - else { - shouldReplace = true - requestTs - } - case None => { - shouldReplace = true - requestTs - } - } - val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { - if (k == LabelMeta.timestamp) { - if (oldValWithTs.ts >= requestTs) Some(k -> oldValWithTs) - else { - shouldReplace = true - Some(k -> InnerValLikeWithTs.withLong(requestTs, requestTs, version)) - } - } else { - if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) - else { - shouldReplace = true - None - } - } - } - val mustExistInNew = Map(LabelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(lastDeletedAt, lastDeletedAt, version)) - ((existInOld.flatten ++ mustExistInNew).toMap, shouldReplace) - } - - def mergeInsertBulk(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { - val (_, propsWithTs, _, _) = propsPairWithTs - (propsWithTs, true) - } - -// def fromString(s: String): Option[Edge] = Graph.toEdge(s) - - -}
