add S2EdgePropertyHelper.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/d3a2e75d Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/d3a2e75d Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/d3a2e75d Branch: refs/heads/master Commit: d3a2e75dfd40de0c2dc4878f14b30cfd179344c9 Parents: 2b5df1d Author: DO YUNG YOON <[email protected]> Authored: Tue Nov 7 08:31:45 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue Nov 7 09:00:19 2017 +0900 ---------------------------------------------------------------------- .../org/apache/s2graph/core/PostProcess.scala | 28 +++---- .../org/apache/s2graph/core/QueryResult.scala | 4 +- .../scala/org/apache/s2graph/core/S2Edge.scala | 36 ++------- .../org/apache/s2graph/core/S2EdgeBuilder.scala | 11 +-- .../org/apache/s2graph/core/S2EdgeLike.scala | 81 ++++++++------------ .../s2graph/core/S2EdgePropertyHelper.scala | 70 +++++++++++++++++ .../scala/org/apache/s2graph/core/S2Graph.scala | 6 +- .../org/apache/s2graph/core/S2VertexLike.scala | 2 +- .../apache/s2graph/core/TraversalHelper.scala | 16 ++-- .../s2graph/core/parsers/WhereParser.scala | 2 +- .../s2graph/core/storage/StorageReadable.scala | 2 +- .../hbase/AsynchbaseStorageReadable.scala | 2 +- .../tall/SnapshotEdgeSerializable.scala | 4 +- .../wide/SnapshotEdgeSerializable.scala | 4 +- .../rest/play/controllers/EdgeController.scala | 2 +- 15 files changed, 146 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala index 2d2e183..5118600 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/PostProcess.scala @@ -84,17 +84,17 @@ object PostProcess { val score = edgeWithScore.score val label = edgeWithScore.label if (isDegree) { - builder += ("from" -> anyValToJsValue(s2Edge.srcId).get) + builder += ("from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get) builder += ("label" -> anyValToJsValue(label.label).get) - builder += ("direction" -> anyValToJsValue(s2Edge.direction).get) + builder += ("direction" -> anyValToJsValue(s2Edge.getDirection()).get) builder += (LabelMeta.degree.name -> anyValToJsValue(s2Edge.propertyValueInner(LabelMeta.degree).innerVal.value).get) JsObject(builder) } else { if (queryOption.withScore) builder += ("score" -> anyValToJsValue(score).get) if (queryOption.selectColumns.isEmpty) { - builder += ("from" -> anyValToJsValue(s2Edge.srcId).get) - builder += ("to" -> anyValToJsValue(s2Edge.tgtId).get) + builder += ("from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get) + builder += ("to" -> anyValToJsValue(s2Edge.tgtVertex.innerIdVal).get) builder += ("label" -> anyValToJsValue(label.label).get) val innerProps = ArrayBuffer.empty[(String, JsValue)] @@ -107,23 +107,23 @@ object PostProcess { builder += ("props" -> JsObject(innerProps)) - builder += ("direction" -> anyValToJsValue(s2Edge.direction).get) - builder += ("timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get) - builder += ("_timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get) // backward compatibility + builder += ("direction" -> anyValToJsValue(s2Edge.getDirection()).get) + builder += ("timestamp" -> anyValToJsValue(s2Edge.getTsInnerValValue()).get) + builder += ("_timestamp" -> anyValToJsValue(s2Edge.getTsInnerValValue()).get) // backward compatibility if (parents != JsNull) builder += ("parents" -> parents) // Json.toJson(builder.result()) JsObject(builder) } else { queryOption.selectColumnsMap.foreach { case (columnName, _) => columnName match { - case "from" => builder += ("from" -> anyValToJsValue(s2Edge.srcId).get) - case "_from" => builder += ("_from" -> anyValToJsValue(s2Edge.srcId).get) - case "to" => builder += ("to" -> anyValToJsValue(s2Edge.tgtId).get) - case "_to" => builder += ("_to" -> anyValToJsValue(s2Edge.tgtId).get) + case "from" => builder += ("from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get) + case "_from" => builder += ("_from" -> anyValToJsValue(s2Edge.srcVertex.innerIdVal).get) + case "to" => builder += ("to" -> anyValToJsValue(s2Edge.tgtVertex.innerIdVal).get) + case "_to" => builder += ("_to" -> anyValToJsValue(s2Edge.tgtVertex.innerIdVal).get) case "label" => builder += ("label" -> anyValToJsValue(label.label).get) - case "direction" => builder += ("direction" -> anyValToJsValue(s2Edge.direction).get) - case "timestamp" => builder += ("timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get) - case "_timestamp" => builder += ("_timestamp" -> anyValToJsValue(s2Edge.tsInnerVal).get) + case "direction" => builder += ("direction" -> anyValToJsValue(s2Edge.getDirection()).get) + case "timestamp" => builder += ("timestamp" -> anyValToJsValue(s2Edge.getTsInnerValValue()).get) + case "_timestamp" => builder += ("_timestamp" -> anyValToJsValue(s2Edge.getTsInnerValValue()).get) case _ => // should not happen } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala index 9bd3cdb..b654e71 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -237,7 +237,7 @@ object StepResult { // val newOrderByValues = updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, newScore) val newOrderByValues = - if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.tsInnerVal, None, None) + if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.getTsInnerValValue(), None, None) else toTuple4(newT.toValues(globalQueryOption.orderByKeys)) val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys) @@ -262,7 +262,7 @@ object StepResult { // val newOrderByValues = updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, newScore) val newOrderByValues = - if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.tsInnerVal, None, None) + if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.getTsInnerValValue(), None, None) else toTuple4(newT.toValues(globalQueryOption.orderByKeys)) val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index 03678c8..9f5093c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@ -592,7 +592,7 @@ object S2Edge { for { (requestEdge, func) <- requestWithFuncs } { - val (_newPropsWithTs, _) = func((prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer)) + val (_newPropsWithTs, _) = func((prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.innerLabel.schemaVersion)) prevPropsWithTs = _newPropsWithTs // logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n") } @@ -814,8 +814,8 @@ object S2Edge { if (vertex.id.column == ServiceColumn.Default) defaultServiceColumn else vertex.id.column def srcForVertex(e: S2EdgeLike): S2VertexLike = { - val belongLabelIds = Seq(e.labelWithDir.labelId) - if (e.labelWithDir.dir == GraphUtil.directions("in")) { + val belongLabelIds = Seq(e.getLabelId()) + if (e.getDir() == GraphUtil.directions("in")) { val tgtColumn = getServiceColumn(e.tgtVertex, e.innerLabel.tgtColumn) e.innerGraph.newVertex(VertexId(tgtColumn, e.tgtVertex.innerId), e.tgtVertex.ts, e.tgtVertex.props, belongLabelIds = belongLabelIds) } else { @@ -825,8 +825,8 @@ object S2Edge { } def tgtForVertex(e: S2EdgeLike): S2VertexLike = { - val belongLabelIds = Seq(e.labelWithDir.labelId) - if (e.labelWithDir.dir == GraphUtil.directions("in")) { + val belongLabelIds = Seq(e.getLabelId()) + if (e.getDir() == GraphUtil.directions("in")) { val srcColumn = getServiceColumn(e.srcVertex, e.innerLabel.srcColumn) e.innerGraph.newVertex(VertexId(srcColumn, e.srcVertex.innerId), e.srcVertex.ts, e.srcVertex.props, belongLabelIds = belongLabelIds) } else { @@ -835,28 +835,6 @@ object S2Edge { } } - def updatePropsWithTs(e: S2EdgeLike, others: Props = S2Edge.EmptyProps): Props = { - val emptyProp = S2Edge.EmptyProps - - e.getPropsWithTs().forEach(new BiConsumer[String, S2Property[_]] { - override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) - }) - - others.forEach(new BiConsumer[String, S2Property[_]] { - override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) - }) - - emptyProp - } - - def propertyValue(e: S2EdgeLike, key: String): Option[InnerValLikeWithTs] = { - key match { - case "from" | "_from" => Option(InnerValLikeWithTs(e.srcVertex.innerId, e.ts)) - case "to" | "_to" => Option(InnerValLikeWithTs(e.tgtVertex.innerId, e.ts)) - case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(e.innerLabel.label, e.innerLabel.schemaVersion), e.ts)) - case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(e.direction, e.innerLabel.schemaVersion), e.ts)) - case _ => - e.innerLabel.metaPropsInvMap.get(key).map(labelMeta => e.propertyValueInner(labelMeta)) - } - } + def serializePropsWithTs(edge: S2EdgeLike): Array[Byte] = + HBaseSerializable.propsToKeyValuesWithTs(edge.getPropsWithTs().asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala index ea9598e..2ea1504 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeBuilder.scala @@ -22,7 +22,7 @@ class S2EdgeBuilder(edge: S2EdgeLike) { def propsPlusTsValid = edge.getPropsWithTs().asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava - def labelOrders = LabelIndex.findByLabelIdAll(edge.labelWithDir.labelId) + def labelOrders = LabelIndex.findByLabelIdAll(edge.getLabelId()) def edgesWithIndex = for (labelOrder <- labelOrders) yield { IndexEdge(edge.innerGraph, edge.srcVertex, edge.tgtVertex, edge.innerLabel, edge.getDir(), edge.getOp(), @@ -35,7 +35,7 @@ class S2EdgeBuilder(edge: S2EdgeLike) { } def relatedEdges: Seq[S2EdgeLike] = { - if (edge.labelWithDir.isDirected) { + if (edge.isDirected()) { val skipReverse = edge.innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false) if (skipReverse) Seq(edge) else Seq(edge, duplicateEdge) } else { @@ -93,11 +93,4 @@ class S2EdgeBuilder(edge: S2EdgeLike) { else EdgeId(VertexId(tgtColumn, edge.tgtVertex.id.innerId), VertexId(srcColumn, edge.srcVertex.id.innerId), edge.label(), "out", timestamp) } - - def propertyInner[V](key: String, value: V, ts: Long): Property[V] = { - val labelMeta = edge.innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge.")) - val newProp = new S2Property[V](edge, labelMeta, key, value, ts) - edge.getPropsWithTs().put(key, newProp) - newProp - } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala index 9963be7..33e7e83 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgeLike.scala @@ -19,6 +19,7 @@ trait S2EdgeLike extends Edge with GraphElement { val builder: S2EdgeBuilder = new S2EdgeBuilder(this) + val innerGraph: S2Graph val srcVertex: S2VertexLike var tgtVertex: S2VertexLike @@ -35,20 +36,35 @@ trait S2EdgeLike extends Edge with GraphElement { val lockTs: Option[Long] = None // var tsInnerValOpt: Option[InnerValLike] = None - lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir) - lazy val schemaVer = innerLabel.schemaVersion lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match { case b: BigDecimal => b.longValue() case l: Long => l case i: Int => i.toLong case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].") } - lazy val operation = GraphUtil.fromOp(op) - lazy val tsInnerVal = tsInnerValOpt.get.value - lazy val srcId = srcVertex.innerIdVal - lazy val tgtId = tgtVertex.innerIdVal - lazy val labelName = innerLabel.label - lazy val direction = GraphUtil.fromDirection(dir) + + private lazy val operation = GraphUtil.fromOp(op) + private lazy val direction = GraphUtil.fromDirection(dir) + private lazy val tsInnerVal = tsInnerValOpt.get.value + + def graph(): Graph = innerGraph + + lazy val edgeId: EdgeId = builder.edgeId + + def id(): AnyRef = edgeId + + def label(): String = innerLabel.label + + def getLabelId(): Int = innerLabel.id.get + + def getDirection(): String = direction + + def getOperation(): String = operation + + def getTsInnerValValue(): Any = tsInnerVal + + def isDirected(): Boolean = + getDir() == 0 || getDir() == 1 def getTs(): Long = ts def getOriginalEdgeOpt(): Option[S2EdgeLike] = originalEdgeOpt @@ -68,43 +84,20 @@ trait S2EdgeLike extends Edge with GraphElement { def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs) - def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq) - def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = - S2Edge.updatePropsWithTs(this, others) + S2EdgePropertyHelper.updatePropsWithTs(this, others) - def propertyValue(key: String): Option[InnerValLikeWithTs] = S2Edge.propertyValue(this, key) + def propertyValue(key: String): Option[InnerValLikeWithTs] = S2EdgePropertyHelper.propertyValue(this, key) - def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs = { - // propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse() - if (propsWithTs.containsKey(labelMeta.name)) { - propsWithTs.get(labelMeta.name).innerValWithTs - } else { - innerLabel.metaPropsDefaultMapInner(labelMeta) - } - } + def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs = + S2EdgePropertyHelper.propertyValueInner(this, labelMeta) def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { - val labelMetas = for { - key <- keys - labelMeta <- innerLabel.metaPropsInvMap.get(key) - } yield labelMeta - - propertyValuesInner(labelMetas) + S2EdgePropertyHelper.propertyValuesInner(this, S2EdgePropertyHelper.toLabelMetas(this, keys)) } - def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { - if (labelMetas.isEmpty) { - innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) => - labelMeta -> propertyValueInner(labelMeta) - } - } else { - // This is important since timestamp is required for all edges. - (LabelMeta.timestamp +: labelMetas).map { labelMeta => - labelMeta -> propertyValueInner(labelMeta) - }.toMap - } - } + def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = + S2EdgePropertyHelper.propertyValuesInner(this, labelMetas) def relatedEdges = builder.relatedEdges @@ -114,7 +107,6 @@ trait S2EdgeLike extends Edge with GraphElement { def duplicateEdge = builder.duplicateEdge - // def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled) def reverseDirEdge = builder.reverseDirEdge def reverseSrcTgtEdge = builder.reverseSrcTgtEdge @@ -215,7 +207,8 @@ trait S2EdgeLike extends Edge with GraphElement { v } - def propertyInner[V](key: String, value: V, ts: Long): Property[V] = builder.propertyInner(key, value, ts) + def propertyInner[V](key: String, value: V, ts: Long): Property[V] = + S2EdgePropertyHelper.propertyInner(this, key, value, ts) def remove(): Unit = { if (graph.features().edge().supportsRemoveEdges()) { @@ -231,14 +224,6 @@ trait S2EdgeLike extends Edge with GraphElement { } } - def graph(): Graph = innerGraph - - lazy val edgeId: EdgeId = builder.edgeId - - def id(): AnyRef = edgeId - - def label(): String = innerLabel.label - def toLogString: String = { // val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj()) List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, propsWithTs).mkString("\t") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala new file mode 100644 index 0000000..2d24a2e --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2EdgePropertyHelper.scala @@ -0,0 +1,70 @@ +package org.apache.s2graph.core + +import java.util.function.BiConsumer + +import org.apache.s2graph.core.S2Edge.Props +import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.types.{InnerVal, InnerValLikeWithTs} +import org.apache.tinkerpop.gremlin.structure.Property + +object S2EdgePropertyHelper { + def propertyInner[V](edge: S2EdgeLike, key: String, value: V, ts: Long): Property[V] = { + val labelMeta = edge.innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge.")) + val newProp = new S2Property[V](edge, labelMeta, key, value, ts) + edge.getPropsWithTs().put(key, newProp) + newProp + } + def updatePropsWithTs(edge: S2EdgeLike, others: Props = S2Edge.EmptyProps): Props = { + val emptyProp = S2Edge.EmptyProps + + edge.getPropsWithTs().forEach(new BiConsumer[String, S2Property[_]] { + override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) + }) + + others.forEach(new BiConsumer[String, S2Property[_]] { + override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) + }) + + emptyProp + } + + def propertyValue(e: S2EdgeLike, key: String): Option[InnerValLikeWithTs] = { + key match { + case "from" | "_from" => Option(InnerValLikeWithTs(e.srcVertex.innerId, e.ts)) + case "to" | "_to" => Option(InnerValLikeWithTs(e.tgtVertex.innerId, e.ts)) + case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(e.innerLabel.label, e.innerLabel.schemaVersion), e.ts)) + case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(e.getDirection(), e.innerLabel.schemaVersion), e.ts)) + case _ => + e.innerLabel.metaPropsInvMap.get(key).map(labelMeta => e.propertyValueInner(labelMeta)) + } + } + + def propertyValuesInner(edge: S2EdgeLike, labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { + if (labelMetas.isEmpty) { + edge.innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) => + labelMeta -> edge.propertyValueInner(labelMeta) + } + } else { + // This is important since timestamp is required for all edges. + (LabelMeta.timestamp +: labelMetas).map { labelMeta => + labelMeta -> propertyValueInner(edge, labelMeta) + }.toMap + } + } + + def propertyValueInner(edge: S2EdgeLike, labelMeta: LabelMeta): InnerValLikeWithTs = { + // propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse() + if (edge.getPropsWithTs().containsKey(labelMeta.name)) { + edge.getPropsWithTs().get(labelMeta.name).innerValWithTs + } else { + edge.innerLabel.metaPropsDefaultMapInner(labelMeta) + } + } + + def toLabelMetas(edge: S2EdgeLike, keys: Seq[String]): Seq[LabelMeta] = { + for { + key <- keys + labelMeta <- edge.innerLabel.metaPropsInvMap.get(key) + } yield labelMeta + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index f061160..3270e84 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -663,13 +663,13 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap buildLastStepInnerResult: Boolean = false): Future[StepResult] = { if (stepInnerResult.isEmpty) Future.successful(StepResult.Empty) else { - val (alreadyVisited: Map[(LabelWithDirection, S2VertexLike), Boolean], prevStepTgtVertexIdEdges: Map[VertexId, ArrayBuffer[EdgeWithScore]], queryRequests: Seq[QueryRequest]) = + val (_, prevStepTgtVertexIdEdges: Map[VertexId, ArrayBuffer[EdgeWithScore]], queryRequests: Seq[QueryRequest]) = traversalHelper.buildNextStepQueryRequests(orgQuery, stepIdx, stepInnerResult) val fetchedLs = fetches(queryRequests, prevStepTgtVertexIdEdges) traversalHelper.filterEdges(orgQuery, stepIdx, queryRequests, - fetchedLs, orgQuery.steps(stepIdx).queryParams, alreadyVisited, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec) + fetchedLs, orgQuery.steps(stepIdx).queryParams, buildLastStepInnerResult, prevStepTgtVertexIdEdges)(ec) } } @@ -883,7 +883,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap val (strongDeleteAll, strongEdgesAll) = strongEdges.partition { case (edge, idx) => edge.getOp() == GraphUtil.operations("deleteAll") } val deleteAllFutures = strongDeleteAll.map { case (edge, idx) => - deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.labelWithDir.dir, edge.ts).map(idx -> _) + deleteAllAdjacentEdges(Seq(edge.srcVertex), Seq(edge.innerLabel), edge.getDir(), edge.ts).map(idx -> _) } val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala index b88c18d..9ec2ab0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2VertexLike.scala @@ -73,7 +73,7 @@ trait S2VertexLike extends Vertex with GraphElement { override def accept(edge: Edge): Unit = { val s2Edge = edge.asInstanceOf[S2EdgeLike] - s2Edge.direction match { + s2Edge.getDirection() match { case "out" => arr.add(edge.inVertex()) case "in" => arr.add(edge.outVertex()) case _ => throw new IllegalStateException("only out/in direction can be found in S2Edge") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala index 58da145..25b909e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala @@ -43,12 +43,12 @@ object TraversalHelper { } } - def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(LabelWithDirection, S2VertexLike), Boolean] = { + def alreadyVisitedVertices(edgeWithScoreLs: Seq[EdgeWithScore]): Map[(Int, Int, S2VertexLike), Boolean] = { val vertices = for { edgeWithScore <- edgeWithScoreLs edge = edgeWithScore.edge - vertex = if (edge.labelWithDir.dir == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex - } yield (edge.labelWithDir, vertex) -> true + vertex = if (edge.getDir() == GraphUtil.directions("out")) edge.tgtVertex else edge.srcVertex + } yield (edge.getLabelId(), edge.getDir(), vertex) -> true vertices.toMap } @@ -114,7 +114,7 @@ object TraversalHelper { def toHashKey(queryParam: QueryParam, edge: S2EdgeLike, isDegree: Boolean): (HashKey, FilterHashKey) = { val src = edge.srcVertex.innerId.hashCode() val tgt = edge.tgtVertex.innerId.hashCode() - val hashKey = (src, edge.labelWithDir.labelId, edge.labelWithDir.dir, tgt, isDegree) + val hashKey = (src, edge.getLabelId(), edge.getDir(), tgt, isDegree) val filterHashKey = (src, tgt) (hashKey, filterHashKey) @@ -136,7 +136,7 @@ class TraversalHelper(graph: S2GraphLike) { val step = q.steps(stepIdx) val alreadyVisited = - if (stepIdx == 0) Map.empty[(LabelWithDirection, S2VertexLike), Boolean] + if (stepIdx == 0) Map.empty[(Int, Int, S2VertexLike), Boolean] else alreadyVisitedVertices(stepInnerResult.edgeWithScores) val initial = (Map.empty[S2VertexLike, Double], Map.empty[S2VertexLike, ArrayBuffer[EdgeWithScore]]) @@ -172,7 +172,6 @@ class TraversalHelper(graph: S2GraphLike) { queryRequests: Seq[QueryRequest], queryResultLsFuture: Future[Seq[StepResult]], queryParams: Seq[QueryParam], - alreadyVisited: Map[(LabelWithDirection, S2VertexLike), Boolean] = Map.empty, buildLastStepInnerResult: Boolean = true, parentEdges: Map[VertexId, Seq[EdgeWithScore]]) (implicit ec: scala.concurrent.ExecutionContext): Future[StepResult] = { @@ -214,7 +213,6 @@ class TraversalHelper(graph: S2GraphLike) { val _results = buildResult(q, stepIdx, currentStepResults, parentEdges) { (edgeWithScore, propsSelectColumns) => val edge = edgeWithScore.edge val score = edgeWithScore.score - val label = edgeWithScore.label /* Select */ val mergedPropsWithTs = edge.propertyValuesInner(propsSelectColumns) @@ -225,7 +223,7 @@ class TraversalHelper(graph: S2GraphLike) { val newEdgeWithScore = edgeWithScore.copy(edge = newEdge) /* OrderBy */ val orderByValues = - if (queryOption.orderByKeys.isEmpty) (score, edge.tsInnerVal, None, None) + if (queryOption.orderByKeys.isEmpty) (score, edge.getTsInnerValValue(), None, None) else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys)) /* StepGroupBy */ @@ -296,8 +294,6 @@ class TraversalHelper(graph: S2GraphLike) { val parents = if (shouldBuildParents) { parentEdges.getOrElse(queryRequest.vertex.id, Nil).map { edgeWithScore => val edge = edgeWithScore.edge - val score = edgeWithScore.score - val label = edgeWithScore.label /* Select */ val mergedPropsWithTs = http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala index a0d56b2..d947066 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/parsers/WhereParser.scala @@ -54,7 +54,7 @@ trait ExtractValue { val (propKey, _) = findParentEdge(edge, key) val labelMeta = label.metaPropsInvMap.getOrElse(propKey, throw WhereParserException(s"Where clause contains not existing property name: $propKey")) - val (srcColumn, tgtColumn) = label.srcTgtColumn(edge.labelWithDir.dir) + val (srcColumn, tgtColumn) = label.srcTgtColumn(edge.getDir()) val dataType = propKey match { case "_to" | "to" => tgtColumn.columnType case "_from" | "from" => srcColumn.columnType http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala index 44bd4dc..79ca8aa 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala @@ -50,7 +50,7 @@ trait StorageReadable { def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = { val queryParam = QueryParam(labelName = edge.innerLabel.label, - direction = GraphUtil.fromDirection(edge.labelWithDir.dir), + direction = GraphUtil.fromDirection(edge.getDir()), tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), cacheTTLInMillis = -1) val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala index bdb6e99..5f54e47 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala @@ -237,7 +237,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph, serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION) .fromKeyValues(Seq(kv), None) - .filter(e => distinctLabels(e.innerLabel) && e.direction == "out" && !e.isDegree) + .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree) } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala index 24775e7..12edf54 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeSerializable.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.storage.serde.snapshotedge.tall import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.SnapshotEdge +import org.apache.s2graph.core.{S2Edge, SnapshotEdge} import org.apache.s2graph.core.mysqls.LabelIndex import org.apache.s2graph.core.storage.serde._ import org.apache.s2graph.core.storage.serde.StorageSerializable._ @@ -54,7 +54,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ case Some(pendingEdge) => val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.getOp()) val versionBytes = Array.empty[Byte] - val propsBytes = pendingEdge.serializePropsWithTs() + val propsBytes = S2Edge.serializePropsWithTs(pendingEdge) val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala index 44f2596..02a72b1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeSerializable.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.storage.serde.snapshotedge.wide import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.SnapshotEdge +import org.apache.s2graph.core.{S2Edge, SnapshotEdge} import org.apache.s2graph.core.mysqls.LabelIndex import org.apache.s2graph.core.storage.serde.Serializable import org.apache.s2graph.core.storage.serde.StorageSerializable._ @@ -61,7 +61,7 @@ class SnapshotEdgeSerializable(snapshotEdge: SnapshotEdge) extends Serializable[ case Some(pendingEdge) => val opBytes = statusCodeWithOp(pendingEdge.statusCode, pendingEdge.getOp()) val versionBytes = Array.empty[Byte] - val propsBytes = pendingEdge.serializePropsWithTs() + val propsBytes = S2Edge.serializePropsWithTs(pendingEdge) val lockBytes = Bytes.toBytes(pendingEdge.lockTs.get) Bytes.add(Bytes.add(valueBytes(), opBytes, versionBytes), Bytes.add(propsBytes, lockBytes)) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/d3a2e75d/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala ---------------------------------------------------------------------- diff --git a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala index 2b31fcd..69878f8 100644 --- a/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala +++ b/s2rest_play/app/org/apache/s2graph/rest/play/controllers/EdgeController.scala @@ -95,7 +95,7 @@ object EdgeController extends Controller { results.get.zip(elements).map { case (r: MutateResponse, (e: S2EdgeLike, tsv: String)) if !r.isSuccess => val kafkaMessages = if(e.getOp() == GraphUtil.operations("deleteAll")){ - toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.innerLabel), e.labelWithDir.dir, e.ts) + toDeleteAllFailMessages(Seq(e.srcVertex), Seq(e.innerLabel), e.getDir(), e.ts) } else{ Seq(ExceptionHandler.toKafkaMessage(Config.KAFKA_MUTATE_FAIL_TOPIC, e, Some(tsv))) }
