remove SelfType bidirectional dependencies on S2EdgeLike.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/3514060a Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/3514060a Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/3514060a Branch: refs/heads/master Commit: 3514060ab112303efb0782929e4333dd5ee1f2cd Parents: d3a2e75 Author: DO YUNG YOON <[email protected]> Authored: Tue Nov 7 10:38:44 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Nov 7 11:13:27 2017 +0900 ---------------------------------------------------------------------- .../s2graph/core/GraphElementBuilder.scala | 11 +++++++--- .../org/apache/s2graph/core/PostProcess.scala | 8 ++++---- .../org/apache/s2graph/core/QueryParam.scala | 4 ++-- .../scala/org/apache/s2graph/core/S2Edge.scala | 8 ++++---- .../org/apache/s2graph/core/S2EdgeLike.scala | 21 ++++++++++++-------- .../s2graph/core/index/IndexProvider.scala | 2 +- .../s2graph/core/parsers/WhereParser.scala | 4 ++-- .../storage/WriteWriteConflictResolver.scala | 6 +++--- .../core/storage/serde/MutationHelper.scala | 7 +++---- .../tall/SnapshotEdgeSerializable.scala | 4 ++-- .../wide/SnapshotEdgeSerializable.scala | 4 ++-- 11 files changed, 44 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3514060a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala index c9133b1..3ef7ba3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala @@ -279,10 +279,15 @@ class GraphElementBuilder(graph: S2Graph) { val edge = edgeWithScore.edge val copiedEdge = label.consistencyLevel match { case "strong" => - edge.builder.copyEdge(op = GraphUtil.operations("delete"), - version = requestTs, propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) + edge + .copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) + .copyTs(requestTs) + .copyOp(GraphUtil.operations("delete")) + .copyVersion(requestTs) case _ => - edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) + edge + .copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) + .copyTs(requestTs) } val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3514060a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala index 5118600..900bbbd 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala @@ -64,9 +64,9 @@ object PostProcess { else { val ancestors = for { current <- parentEdges - parents = s2EdgeParent(graph, queryOption, current.edge.parentEdges) if parents != JsNull + parents = s2EdgeParent(graph, queryOption, current.edge.getParentEdges()) if parents != JsNull } yield { - val s2Edge = current.edge.originalEdgeOpt.getOrElse(current.edge) + val s2Edge = current.edge.getOriginalEdgeOpt().getOrElse(current.edge) s2EdgeToJsValue(queryOption, current.copy(edge = s2Edge), false, parents = parents, checkSelectColumns = true) } Json.toJson(ancestors) @@ -240,7 +240,7 @@ object PostProcess { // no group by specified on query. val results = if (limitOpt.isDefined) stepResult.edgeWithScores.take(limitOpt.get) else stepResult.edgeWithScores val ls = results.map { t => - val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.parentEdges) else JsNull + val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.getParentEdges()) else JsNull s2EdgeToJsValue(queryOption, t, false, parents) } @@ -266,7 +266,7 @@ object PostProcess { ) } else { val agg = edges.map { t => - val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.parentEdges) else JsNull + val parents = if (queryOption.returnTree) s2EdgeParent(graph, queryOption, t.edge.getParentEdges()) else JsNull s2EdgeToJsValue(queryOption, t, false, parents) } val aggJson = Json.toJson(agg) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3514060a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala index 3d0d076..748e8c5 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -185,7 +185,7 @@ case class EdgeTransformer(jsValue: JsValue) { replace(queryParam, fmt, fieldNames.flatMap(fieldName => toInnerValOpt(queryParam, edge, fieldName)), nextStepOpt) } } - } yield edge.builder.updateTgtVertex(innerVal).copyOriginalEdgeOpt(Option(edge)) + } yield edge.updateTgtVertex(innerVal).copyOriginalEdgeOpt(Option(edge)) edges @@ -383,7 +383,7 @@ case class QueryParam(labelName: String, propValJs match { case JsString(in) if edgeOpt.isDefined && in.contains("_parent.") => val parentLen = in.split("_parent.").length - 1 - val edge = (0 until parentLen).foldLeft(edgeOpt.get) { case (acc, _) => acc.parentEdges.head.edge } + val edge = (0 until parentLen).foldLeft(edgeOpt.get) { case (acc, _) => acc.getParentEdges().head.edge } val timePivot = edge.ts val replaced = TemplateHelper.replaceVariable(timePivot, in).trim http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3514060a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index 9f5093c..db530ac 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@ -515,7 +515,7 @@ object S2Edge { state.foreach { case (k, v) => edge.propertyInner(k.name, v.innerVal.value, v.ts) } - edge.propsWithTs + edge.getPropsWithTs() } def allPropsDeleted(props: Map[LabelMeta, InnerValLikeWithTs]): Boolean = @@ -559,7 +559,7 @@ object S2Edge { // logger.debug(s"requestEdge: ${requestEdge.toStringRaw}") val oldPropsWithTs = if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs] - else propsToState(invertedEdge.get.propsWithTs) + else propsToState(invertedEdge.get.getPropsWithTs()) val funcs = requestEdges.map { edge => if (edge.getOp() == GraphUtil.operations("insert")) { @@ -592,7 +592,7 @@ object S2Edge { for { (requestEdge, func) <- requestWithFuncs } { - val (_newPropsWithTs, _) = func((prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.innerLabel.schemaVersion)) + val (_newPropsWithTs, _) = func((prevPropsWithTs, propsToState(requestEdge.getPropsWithTs()), requestEdge.ts, requestEdge.innerLabel.schemaVersion)) prevPropsWithTs = _newPropsWithTs // logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n") } @@ -628,7 +628,7 @@ object S2Edge { val newOp = snapshotEdgeOpt match { case None => requestEdge.getOp() case Some(old) => - val oldMaxTs = old.propsWithTs.asScala.map(_._2.ts).max + val oldMaxTs = old.getPropsWithTs().asScala.map(_._2.ts).max if (oldMaxTs > requestEdge.ts) old.getOp() else requestEdge.getOp() } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3514060a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala index 33e7e83..0b40cf5 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala @@ -15,26 +15,25 @@ import scala.concurrent.Await import scala.collection.JavaConverters._ trait S2EdgeLike extends Edge with GraphElement { - this: S2Edge => - - val builder: S2EdgeBuilder = new S2EdgeBuilder(this) - - val innerGraph: S2Graph val srcVertex: S2VertexLike var tgtVertex: S2VertexLike val innerLabel: Label val dir: Int -// var op: Byte = GraphUtil.defaultOpByte -// var version: Long = System.currentTimeMillis() + val builder: S2EdgeBuilder = new S2EdgeBuilder(this) + + var op: Byte + var version: Long + var tsInnerValOpt: Option[InnerValLike] + val propsWithTs: Props = S2Edge.EmptyProps + val parentEdges: Seq[EdgeWithScore] = Nil val originalEdgeOpt: Option[S2EdgeLike] = None val pendingEdgeOpt: Option[S2EdgeLike] = None val statusCode: Byte = 0 val lockTs: Option[Long] = None -// var tsInnerValOpt: Option[InnerValLike] = None lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match { case b: BigDecimal => b.longValue() @@ -149,6 +148,12 @@ trait S2EdgeLike extends Edge with GraphElement { builder.copyEdge(lockTs = newLockTs) } + def copyTs(newTs: Long): S2EdgeLike = + builder.copyEdge(ts = newTs) + + def updateTgtVertex(id: InnerValLike): S2EdgeLike = + builder.updateTgtVertex(id) + def vertices(direction: Direction): util.Iterator[structure.Vertex] = { val arr = new util.ArrayList[Vertex]() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3514060a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala index e5005b7..2411e65 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/index/IndexProvider.scala @@ -180,7 +180,7 @@ class LuceneIndexProvider(config: Config) extends IndexProvider { } private def toDocument(globalIndex: GlobalIndex, edge: S2EdgeLike): Option[Document] = { - val props = edge.propsWithTs.asScala + val props = edge.getPropsWithTs().asScala val exist = props.exists(t => globalIndex.propNamesSet(t._1)) if (!exist) None else { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3514060a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala index d947066..d9e6a7b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala @@ -66,7 +66,7 @@ trait ExtractValue { @tailrec private def findParent(edge: S2EdgeLike, depth: Int): S2EdgeLike = - if (depth > 0) findParent(edge.parentEdges.head.edge, depth - 1) + if (depth > 0) findParent(edge.getParentEdges().head.edge, depth - 1) else edge private def findParentEdge(edge: S2EdgeLike, key: String): (String, S2EdgeLike) = { @@ -145,7 +145,7 @@ case class InWithoutParent(label: Label, propKey: String, values: Set[String]) e } override def filter(edge: S2EdgeLike): Boolean = { - if (edge.dir == GraphUtil.directions("in")) { + if (edge.getDir() == GraphUtil.directions("in")) { val propVal = propToInnerVal(edge, propKey) innerValLikeLsIn.contains(propVal) } else { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3514060a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala index 854fc18..af0d53d 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala @@ -143,7 +143,7 @@ class WriteWriteConflictResolver(graph: S2Graph, commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) case Some(snapshotEdge) => - snapshotEdge.pendingEdgeOpt match { + snapshotEdge.getPendingEdgeOpt() match { case None => /* * others finished commit on this SN. but there is no contention. @@ -165,7 +165,7 @@ class WriteWriteConflictResolver(graph: S2Graph, commitProcess(statusCode, squashedEdge, fetchedSnapshotEdgeOpt, lockSnapshotEdge, releaseLockSnapshotEdge, edgeMutate) } case Some(pendingEdge) => - val isLockExpired = pendingEdge.lockTs.get + LockExpireDuration < System.currentTimeMillis() + val isLockExpired = pendingEdge.getLockTs().get + LockExpireDuration < System.currentTimeMillis() if (isLockExpired) { /* * if pendingEdge.ts == snapshotEdge.ts => @@ -219,7 +219,7 @@ class WriteWriteConflictResolver(graph: S2Graph, * releaseLock = (edgeMutate.newSnapshotEdge, None) */ val _edges = - if (fetchedSnapshotEdgeOpt.isDefined && fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.isDefined) fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges + if (fetchedSnapshotEdgeOpt.isDefined && fetchedSnapshotEdgeOpt.get.getPendingEdgeOpt().isDefined) fetchedSnapshotEdgeOpt.get.pendingEdgeOpt.get +: edges else edges val (squashedEdge, edgeMutate) = S2Edge.buildOperation(fetchedSnapshotEdgeOpt, _edges) val newVersion = fetchedSnapshotEdgeOpt.map(_.getVersion()).getOrElse(squashedEdge.ts) + 2 http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3514060a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala index 79c9dc3..9312181 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala @@ -28,19 +28,18 @@ class MutationHelper(storage: Storage) { edgeWithScore <- stepInnerResult.edgeWithScores } yield { val edge = edgeWithScore.edge - val score = edgeWithScore.score - val edgeSnapshot = edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) + val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - val edgeForward = edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) + val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ io.buildIncrementsAsync(indexEdge, -1L) } /* reverted direction */ - val edgeRevert = edge.builder.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) + val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ io.buildIncrementsAsync(indexEdge, -1L) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3514060a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala index 12edf54..02b2977 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala @@ -52,10 +52,10 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ snapshotEdge.pendingEdgeOpt match { case None => valueBytes() case Some(pendingEdge) => - val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.getOp()) + val opBytes = statusCodeWithOp(pendingEdge.getStatusCode(), pendingEdge.getOp()) val versionBytes = Array.empty[Byte] val propsBytes = S2Edge.serializePropsWithTs(pendingEdge) - val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) + val lockBytes = Bytes.toBytes(pendingEdge.getLockTs().get) Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/3514060a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala index 02a72b1..44d4a2a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala @@ -59,10 +59,10 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ snapshotEdge.pendingEdgeOpt match { case None => valueBytes() case Some(pendingEdge) => - val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.getOp()) + val opBytes = statusCodeWithOp(pendingEdge.getStatusCode(), pendingEdge.getOp()) val versionBytes = Array.empty[Byte] val propsBytes = S2Edge.serializePropsWithTs(pendingEdge) - val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) + val lockBytes = Bytes.toBytes(pendingEdge.getLockTs().get) Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) }
