add GraphElementBuilder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/bc26642b Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/bc26642b Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/bc26642b Branch: refs/heads/master Commit: bc26642bc8c3c738205c8bc6c8239f7c0c78254b Parents: 42b7702 Author: DO YUNG YOON <[email protected]> Authored: Fri Nov 3 21:11:13 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Nov 3 21:11:13 2017 +0900 ---------------------------------------------------------------------- .../s2graph/core/GraphElementBuilder.scala | 294 +++++++++++++++++++ 1 file changed, 294 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/bc26642b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala new file mode 100644 index 0000000..21179aa --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/GraphElementBuilder.scala @@ -0,0 +1,294 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.GraphExceptions.LabelNotExistException +import org.apache.s2graph.core.JSONParser.{fromJsonToProperties, toInnerVal} +import org.apache.s2graph.core.mysqls._ +import org.apache.s2graph.core.types._ +import org.apache.s2graph.core.utils.logger +import play.api.libs.json.{JsObject, Json} + +import scala.util.Try + +class GraphElementBuilder(graph: S2Graph) { + + def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try { + val parts = GraphUtil.split(s) + val logType = parts(2) + val element = if (logType == "edge" | logType == "e") { + /* current only edge is considered to be bulk loaded */ + labelMapping.get(parts(5)) match { + case None => + case Some(toReplace) => + parts(5) = toReplace + } + toEdge(parts) + } else if (logType == "vertex" | logType == "v") { + toVertex(parts) + } else { + throw new GraphExceptions.JsonParseException("log type is not exist in log.") + } + + element + } recover { + case e: Exception => + logger.error(s"[toElement]: $e", e) + None + } get + + + def toVertex(s: String): Option[S2VertexLike] = { + toVertex(GraphUtil.split(s)) + } + + def toEdge(s: String): Option[S2EdgeLike] = { + toEdge(GraphUtil.split(s)) + } + + def toEdge(parts: Array[String]): Option[S2EdgeLike] = Try { + val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) + val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] + val tempDirection = if (parts.length >= 8) parts(7) else "out" + val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection + val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation) + Option(edge) + } recover { + case e: Exception => + logger.error(s"[toEdge]: $e", e) + throw e + } get + + def toVertex(parts: Array[String]): Option[S2VertexLike] = Try { + val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) + val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] + val vertex = toVertex(serviceName, colName, srcId, props, ts.toLong, operation) + Option(vertex) + } recover { + case e: Throwable => + logger.error(s"[toVertex]: $e", e) + throw e + } get + + def toEdge(srcId: Any, + tgtId: Any, + labelName: String, + direction: String, + props: Map[String, Any] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert"): S2EdgeLike = { + val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) + + val srcColumn = if (direction == "out") label.srcColumn else label.tgtColumn + val tgtColumn = if (direction == "out") label.tgtColumn else label.srcColumn + + val srcVertexIdInnerVal = toInnerVal(srcId, srcColumn.columnType, label.schemaVersion) + val tgtVertexIdInnerVal = toInnerVal(tgtId, tgtColumn.columnType, label.schemaVersion) + + val srcVertex = newVertex(new SourceVertexId(srcColumn, srcVertexIdInnerVal), System.currentTimeMillis()) + val tgtVertex = newVertex(new TargetVertexId(tgtColumn, tgtVertexIdInnerVal), System.currentTimeMillis()) + val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) + + val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts) + val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) + val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) + + new S2Edge(graph, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs) + } + + def toVertex(serviceName: String, + columnName: String, + id: Any, + props: Map[String, Any] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert"): S2VertexLike = { + + val service = Service.findByName(serviceName).getOrElse(throw new java.lang.IllegalArgumentException(s"$serviceName is not found.")) + val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new java.lang.IllegalArgumentException(s"$columnName is not found.")) + val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) + + val srcVertexId = id match { + case vid: VertexId => id.asInstanceOf[VertexId] + case _ => new VertexId(column, toInnerVal(id, column.columnType, column.schemaVersion)) + } + + val propsInner = column.propsToInnerVals(props) ++ + Map(ColumnMeta.timestamp -> InnerVal.withLong(ts, column.schemaVersion)) + + val vertex = new S2Vertex(graph, srcVertexId, ts, S2Vertex.EmptyProps, op) + S2Vertex.fillPropsWithTs(vertex, propsInner) + vertex + } + + + /** + * helper to create new Edge instance from given parameters on memory(not actually stored in storage). + * + * Since we are using mutable map for property value(propsWithTs), + * we should make sure that reference for mutable map never be shared between multiple Edge instances. + * To guarantee this, we never create Edge directly, but rather use this helper which is available on S2Graph. + * + * Note that we are using following convention + * 1. `add*` for method that actually store instance into storage, + * 2. `new*` for method that only create instance on memory, but not store it into storage. + * + * @param srcVertex + * @param tgtVertex + * @param innerLabel + * @param dir + * @param op + * @param version + * @param propsWithTs + * @param parentEdges + * @param originalEdgeOpt + * @param pendingEdgeOpt + * @param statusCode + * @param lockTs + * @param tsInnerValOpt + * @return + */ + def newEdge(srcVertex: S2VertexLike, + tgtVertex: S2VertexLike, + innerLabel: Label, + dir: Int, + op: Byte = GraphUtil.defaultOpByte, + version: Long = System.currentTimeMillis(), + propsWithTs: S2Edge.State, + parentEdges: Seq[EdgeWithScore] = Nil, + originalEdgeOpt: Option[S2EdgeLike] = None, + pendingEdgeOpt: Option[S2EdgeLike] = None, + statusCode: Byte = 0, + lockTs: Option[Long] = None, + tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = { + val edge = S2Edge( + graph, + srcVertex, + tgtVertex, + innerLabel, + dir, + op, + version, + S2Edge.EmptyProps, + parentEdges, + originalEdgeOpt, + pendingEdgeOpt, + statusCode, + lockTs, + tsInnerValOpt) + S2Edge.fillPropsWithTs(edge, propsWithTs) + edge + } + + /** + * helper to create new SnapshotEdge instance from given parameters on memory(not actually stored in storage). + * + * Note that this is only available to S2Graph, not structure.Graph so only internal code should use this method. + * @param srcVertex + * @param tgtVertex + * @param label + * @param dir + * @param op + * @param version + * @param propsWithTs + * @param pendingEdgeOpt + * @param statusCode + * @param lockTs + * @param tsInnerValOpt + * @return + */ + private[core] def newSnapshotEdge(srcVertex: S2VertexLike, + tgtVertex: S2VertexLike, + label: Label, + dir: Int, + op: Byte, + version: Long, + propsWithTs: S2Edge.State, + pendingEdgeOpt: Option[S2EdgeLike], + statusCode: Byte = 0, + lockTs: Option[Long], + tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = { + val snapshotEdge = new SnapshotEdge(graph, srcVertex, tgtVertex, label, dir, op, version, S2Edge.EmptyProps, + pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) + S2Edge.fillPropsWithTs(snapshotEdge, propsWithTs) + snapshotEdge + } + + def newVertexId(serviceName: String)(columnName: String)(id: Any): VertexId = { + val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found.")) + val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found.")) + newVertexId(service, column, id) + } + + /** + * helper to create S2Graph's internal S2VertexId instance with given parameters. + * @param service + * @param column + * @param id + * @return + */ + def newVertexId(service: Service, + column: ServiceColumn, + id: Any): VertexId = { + val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(id)(column.schemaVersion) + new VertexId(column, innerVal) + } + + def newVertex(id: VertexId, + ts: Long = System.currentTimeMillis(), + props: S2Vertex.Props = S2Vertex.EmptyProps, + op: Byte = 0, + belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = { + val vertex = new S2Vertex(graph, id, ts, S2Vertex.EmptyProps, op, belongLabelIds) + S2Vertex.fillPropsWithTs(vertex, props) + vertex + } + + def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2EdgeLike = { + val srcVertex = queryRequest.vertex + val queryParam = queryRequest.queryParam + val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt + val label = queryParam.label + val labelWithDir = queryParam.labelWithDir + val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir) + val propsWithTs = label.EmptyPropsWithTs + + tgtVertexIdOpt match { + case Some(tgtVertexId) => // _to is given. + /* we use toSnapshotEdge so dont need to swap src, tgt */ + val src = srcVertex.innerId + val tgt = tgtVertexId + val (srcVId, tgtVId) = (new SourceVertexId(srcColumn, src), new TargetVertexId(tgtColumn, tgt)) + val (srcV, tgtV) = (newVertex(srcVId), newVertex(tgtVId)) + + newEdge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs) + case None => + val src = srcVertex.innerId + val srcVId = new SourceVertexId(srcColumn, src) + val srcV = newVertex(srcVId) + + newEdge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges) + } + } + + def buildEdgesToDelete(edgeWithScoreLs: Seq[EdgeWithScore], requestTs: Long): Seq[EdgeWithScore] = { + if (edgeWithScoreLs.isEmpty) Nil + else { + val head = edgeWithScoreLs.head + val label = head.edge.innerLabel + + //Degree edge? + edgeWithScoreLs.map { case edgeWithScore => + val edge = edgeWithScore.edge + val copiedEdge = label.consistencyLevel match { + case "strong" => + edge.copyEdge(op = GraphUtil.operations("delete"), + version = requestTs, propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) + case _ => + edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) + } + + val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) + // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}") + edgeToDelete + } + } + } +}
