apply MutationHelper and GraphElementBuilder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/aa66822b Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/aa66822b Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/aa66822b Branch: refs/heads/master Commit: aa66822b08d0045e3870af2a9b82523947f553ce Parents: 937b55a Author: DO YUNG YOON <[email protected]> Authored: Fri Nov 3 21:41:57 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Sat Nov 4 07:01:07 2017 +0900 ---------------------------------------------------------------------- .../loader/subscriber/GraphSubscriber.scala | 2 +- .../loader/subscriber/TransferToHFile.scala | 10 +- .../s2graph/loader/subscriber/WalLogStat.scala | 3 +- .../loader/subscriber/WalLogToHDFS.scala | 3 +- .../scala/org/apache/s2graph/core/S2Graph.scala | 718 ++----------------- .../org/apache/s2graph/core/S2GraphLike.scala | 238 ++++++ .../s2graph/core/features/S2Features.scala | 19 + .../apache/s2graph/core/storage/Storage.scala | 112 +-- .../hbase/AsynchbaseStorageReadable.scala | 2 +- .../tall/SnapshotEdgeDeserializable.scala | 2 +- .../wide/SnapshotEdgeDeserializable.scala | 2 +- .../core/storage/hbase/IndexEdgeTest.scala | 4 +- 12 files changed, 386 insertions(+), 729 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala index 2352cdf..a371b6b 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/GraphSubscriber.scala @@ -106,7 +106,7 @@ object GraphSubscriberHelper extends WithKafka { (statFunc: (String, Int) => Unit): Iterable[GraphElement] = { (for (msg <- msgs) yield { statFunc("total", 1) - g.toGraphElement(msg, labelMapping) match { + g.elementBuilder.toGraphElement(msg, labelMapping) match { case Some(e) if e.isInstanceOf[S2Edge] => statFunc("EdgeParseOk", 1) e.asInstanceOf[S2Edge] http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala index 618c1bd..a7b4e00 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala @@ -84,11 +84,11 @@ object TransferToHFile extends SparkApp { } yield output } def buildPutRequests(snapshotEdge: SnapshotEdge): List[PutRequest] = { - val kvs = GraphSubscriberHelper.g.getStorage(snapshotEdge.label).snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList + val kvs = GraphSubscriberHelper.g.getStorage(snapshotEdge.label).serDe.snapshotEdgeSerializer(snapshotEdge).toKeyValues.toList kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } } def buildPutRequests(indexEdge: IndexEdge): List[PutRequest] = { - val kvs = GraphSubscriberHelper.g.getStorage(indexEdge.label).indexEdgeSerializer(indexEdge).toKeyValues.toList + val kvs = GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.toList kvs.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } } def buildDegreePutRequests(vertexId: String, labelName: String, direction: String, degreeVal: Long): List[PutRequest] = { @@ -104,7 +104,7 @@ object TransferToHFile extends SparkApp { val edge = GraphSubscriberHelper.g.newEdge(vertex, vertex, label, dir, propsWithTs=propsWithTs) edge.edgesWithIndex.flatMap { indexEdge => - GraphSubscriberHelper.g.getStorage(indexEdge.label).indexEdgeSerializer(indexEdge).toKeyValues.map { kv => + GraphSubscriberHelper.g.getStorage(indexEdge.label).serDe.indexEdgeSerializer(indexEdge).toKeyValues.map { kv => new PutRequest(kv.table, kv.row, kv.cf, Array.empty[Byte], Bytes.toBytes(degreeVal), kv.timestamp) } } @@ -125,7 +125,7 @@ object TransferToHFile extends SparkApp { def toKeyValues(strs: Seq[String], labelMapping: Map[String, String], autoEdgeCreate: Boolean): Iterator[KeyValue] = { val kvList = new java.util.ArrayList[KeyValue] for (s <- strs) { - val elementList = GraphSubscriberHelper.g.toGraphElement(s, labelMapping).toSeq + val elementList = GraphSubscriberHelper.g.elementBuilder.toGraphElement(s, labelMapping).toSeq for (element <- elementList) { if (element.isInstanceOf[S2Edge]) { val edge = element.asInstanceOf[S2Edge] @@ -136,7 +136,7 @@ object TransferToHFile extends SparkApp { } } else if (element.isInstanceOf[S2Vertex]) { val vertex = element.asInstanceOf[S2Vertex] - val putRequestList = GraphSubscriberHelper.g.getStorage(vertex.service).vertexSerializer(vertex).toKeyValues.map { kv => + val putRequestList = GraphSubscriberHelper.g.getStorage(vertex.service).serDe.vertexSerializer(vertex).toKeyValues.map { kv => new PutRequest(kv.table, kv.row, kv.cf, kv.qualifier, kv.value, kv.timestamp) } for (p <- putRequestList) { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala index 40f936d..eca77f9 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogStat.scala @@ -21,7 +21,6 @@ package org.apache.s2graph.loader.subscriber import kafka.producer.KeyedMessage import kafka.serializer.StringDecoder -import org.apache.s2graph.core.S2Graph$ import org.apache.s2graph.spark.spark.{WithKafka, SparkApp} import org.apache.spark.streaming.Durations._ import org.apache.spark.streaming.kafka.HasOffsetRanges @@ -69,7 +68,7 @@ object WalLogStat extends SparkApp with WithKafka { val phase = System.getProperty("phase") GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList) partition.map { case (key, msg) => - GraphSubscriberHelper.g.toGraphElement(msg) match { + GraphSubscriberHelper.g.elementBuilder.toGraphElement(msg) match { case Some(elem) => val serviceName = elem.serviceName msg.split("\t", 7) match { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala ---------------------------------------------------------------------- diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala index 23c3cda..605a994 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/WalLogToHDFS.scala @@ -22,7 +22,6 @@ package org.apache.s2graph.loader.subscriber import java.text.SimpleDateFormat import java.util.Date import kafka.serializer.StringDecoder -import org.apache.s2graph.core.S2Graph$ import org.apache.s2graph.spark.spark.{WithKafka, SparkApp, HashMapParam} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.streaming.Durations._ @@ -92,7 +91,7 @@ object WalLogToHDFS extends SparkApp with WithKafka { GraphSubscriberHelper.apply(phase, dbUrl, "none", brokerList) partition.flatMap { case (key, msg) => - val optMsg = GraphSubscriberHelper.g.toGraphElement(msg).flatMap { element => + val optMsg = GraphSubscriberHelper.g.elementBuilder.toGraphElement(msg).flatMap { element => val arr = msg.split("\t", 7) val service = element.serviceName val label = arr(5) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 82d0c6a..5e23f9b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -483,11 +483,11 @@ object S2Graph { new Graph.OptOut(test="org.apache.tinkerpop.gremlin.structure.io.IoTest", method="*", reason="no") // all failed. )) -class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph { +class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2GraphLike { import S2Graph._ - private var apacheConfiguration: Configuration = _ + var apacheConfiguration: Configuration = _ def dbSession() = scalikejdbc.AutoSession @@ -575,6 +575,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val indexProvider = IndexProvider.apply(config) + val elementBuilder = new GraphElementBuilder(this) + def getStorage(service: Service): Storage = { storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage) } @@ -639,8 +641,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph Try { if (q.steps.isEmpty) fallback else { - - val queryOption = q.queryOption def fetch: Future[StepResult] = { val startStepInnerResult = QueryResult.fromVertices(this, q) q.steps.zipWithIndex.foldLeft(Future.successful(startStepInnerResult)) { case (prevStepInnerResultFuture, (step, stepIdx)) => @@ -795,7 +795,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph Query(vertices, Vector(step)) } - // Extensions.retryOnSuccessWithBackoff(MaxRetryNum, Random.nextInt(MaxBackOff) + 1) { val retryFuture = Extensions.retryOnSuccess(DeleteAllFetchCount) { fetchAndDeleteAll(queries, requestTs) } { case (allDeleted, deleteSuccess) => @@ -832,16 +831,19 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult], requestTs: Long): Future[(Boolean, Boolean)] = { stepInnerResultLs.foreach { stepInnerResult => - logger.error(s"[!!!!!!]: ${stepInnerResult.edgeWithScores.size}") if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.") } val futures = for { stepInnerResult <- stepInnerResultLs - deleteStepInnerResult = buildEdgesToDelete(stepInnerResult, requestTs) - if deleteStepInnerResult.edgeWithScores.nonEmpty + filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore => + (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree + } + edgesToDelete = elementBuilder.buildEdgesToDelete(filtered, requestTs) + if edgesToDelete.nonEmpty } yield { - val head = deleteStepInnerResult.edgeWithScores.head + val head = edgesToDelete.head val label = head.edge.innerLabel + val stepResult = StepResult(edgesToDelete, Nil, Nil, false) val ret = label.schemaVersion match { case HBaseType.VERSION3 | HBaseType.VERSION4 => if (label.consistencyLevel == "strong") { @@ -849,9 +851,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph * read: snapshotEdge on queryResult = O(N) * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) */ - mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(_.isSuccess)) + mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess)) } else { - deleteAllFetchedEdgesAsyncOld(getStorage(label))(deleteStepInnerResult, requestTs, MaxRetryNum) + getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) } case _ => @@ -859,7 +861,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph * read: x * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) */ - deleteAllFetchedEdgesAsyncOld(getStorage(label))(deleteStepInnerResult, requestTs, MaxRetryNum) + getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) } ret } @@ -872,71 +874,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } } - private def deleteAllFetchedEdgesAsyncOld(storage: Storage)(stepInnerResult: StepResult, - requestTs: Long, - retryNum: Int): Future[Boolean] = { - if (stepInnerResult.isEmpty) Future.successful(true) - else { - val head = stepInnerResult.edgeWithScores.head - val zkQuorum = head.edge.innerLabel.hbaseZkAddr - val futures = for { - edgeWithScore <- stepInnerResult.edgeWithScores - } yield { - val edge = edgeWithScore.edge - val score = edgeWithScore.score - - val edgeSnapshot = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) - val reversedSnapshotEdgeMutations = storage.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - - val edgeForward = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) - val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => - storage.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - storage.buildIncrementsAsync(indexEdge, -1L) - } - - /* reverted direction */ - val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) - val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => - storage.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - storage.buildIncrementsAsync(indexEdge, -1L) - } - - val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations - - storage.writeToStorage(zkQuorum, mutations, withWait = true) - } - - Future.sequence(futures).map { rets => rets.forall(_.isSuccess) } - } - } - - def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = { - val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore => - (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree - } - if (filtered.isEmpty) StepResult.Empty - else { - val head = filtered.head - val label = head.edge.innerLabel - val edgeWithScoreLs = filtered.map { edgeWithScore => - val edge = edgeWithScore.edge - val copiedEdge = label.consistencyLevel match { - case "strong" => - edge.copyEdge(op = GraphUtil.operations("delete"), - version = requestTs, propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) - case _ => - edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) - } - - val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) - // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}") - edgeToDelete - } - //Degree edge? - StepResult(edgeWithScores = edgeWithScoreLs, grouped = Nil, degreeEdges = Nil, false) - } - } - def mutateElements(elements: Seq[GraphElement], withWait: Boolean = false): Future[Seq[MutateResponse]] = { @@ -967,8 +904,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } - // def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait) - def mutateEdges(edges: Seq[S2EdgeLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = { val edgeWithIdxs = edges.zipWithIndex @@ -985,20 +920,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val idxs = edgeGroup.map(_._2) /* multiple edges with weak consistency level will be processed as batch */ - val mutations = edges.flatMap { edge => - val (_, edgeUpdate) = - if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge) - else S2Edge.buildOperation(None, Seq(edge)) - - val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy) - - if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false) - storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate.deepCopy) ++ storage.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr - } - - storage.writeToStorage(zkQuorum, mutations, withWait).map { ret => - idxs.map(idx => idx -> ret.isSuccess) - } + storage.mutateWeakEdges(zkQuorum, edges, withWait) } Future.sequence(futures) } @@ -1012,7 +934,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val edges = edgeGroup.map(_._1) val idxs = edgeGroup.map(_._2) val storage = getStorage(label) - mutateStrongEdges(storage)(edges, withWait = true).map { rets => + val zkQuorum = label.hbaseZkAddr + storage.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets => idxs.zip(rets) } } @@ -1026,119 +949,24 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } } - private def mutateStrongEdges(storage: Storage)(_edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[Boolean]] = { - - val edgeWithIdxs = _edges.zipWithIndex - val grouped = edgeWithIdxs.groupBy { case (edge, idx) => - (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId) - } toSeq - - val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => - val edges = edgeGroup.map(_._1) - val idxs = edgeGroup.map(_._2) - // After deleteAll, process others - val mutateEdgeFutures = edges.toList match { - case head :: tail => - val edgeFuture = mutateEdgesInner(storage)(edges, checkConsistency = true , withWait) - - //TODO: decide what we will do on failure on vertex put - val puts = storage.buildVertexPutsAsync(head) - val vertexFuture = storage.writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait) - Seq(edgeFuture, vertexFuture) - case Nil => Nil - } - - val composed = for { - // deleteRet <- Future.sequence(deleteAllFutures) - mutateRet <- Future.sequence(mutateEdgeFutures) - } yield mutateRet - - composed.map(_.forall(_.isSuccess)).map { ret => idxs.map( idx => idx -> ret) } - } - - Future.sequence(mutateEdges).map { squashedRets => - squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2) - } - } - - - private def mutateEdgesInner(storage: Storage)(edges: Seq[S2EdgeLike], - checkConsistency: Boolean, - withWait: Boolean): Future[MutateResponse] = { - assert(edges.nonEmpty) - // TODO:: remove after code review: unreachable code - if (!checkConsistency) { - - val zkQuorum = edges.head.innerLabel.hbaseZkAddr - val futures = edges.map { edge => - val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge)) - - val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy) - val mutations = - storage.indexedEdgeMutations(edgeUpdate.deepCopy) ++ storage.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr - - if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false) - - storage.writeToStorage(zkQuorum, mutations, withWait) - } - Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) } - } else { - storage.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => - storage.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_)) - } - } - } - def mutateVertices(vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = { - def mutateVertex(storage: Storage)(vertex: S2VertexLike, withWait: Boolean): Future[MutateResponse] = { - if (vertex.op == GraphUtil.operations("delete")) { - storage.writeToStorage(vertex.hbaseZkAddr, - storage.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) - } else if (vertex.op == GraphUtil.operations("deleteAll")) { - logger.info(s"deleteAll for vertex is truncated. $vertex") - Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time - } else { - storage.writeToStorage(vertex.hbaseZkAddr, storage.buildPutsAll(vertex), withWait) - } - } - - def mutateVertices(storage: Storage)(vertices: Seq[S2VertexLike], - withWait: Boolean = false): Future[Seq[MutateResponse]] = { - val futures = vertices.map { vertex => mutateVertex(storage)(vertex, withWait) } + def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike], + withWait: Boolean = false): Future[Seq[MutateResponse]] = { + val futures = vertices.map { vertex => storage.mutateVertex(zkQuorum, vertex, withWait) } Future.sequence(futures) } val verticesWithIdx = vertices.zipWithIndex val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => - mutateVertices(getStorage(service))(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2))) + mutateVertices(getStorage(service))(service.cluster, vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2))) } Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } } - - def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = { - def incrementCounts(storage: Storage)(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = { - val futures = for { - edge <- edges - } yield { - val kvs = for { - relEdge <- edge.relatedEdges - edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid) - } yield { - val countWithTs = edge.propertyValueInner(LabelMeta.count) - val countVal = countWithTs.innerVal.toString().toLong - storage.buildIncrementsCountAsync(edgeWithIndex, countVal).head - } - storage.writeToStorage(edge.innerLabel.hbaseZkAddr, kvs, withWait = withWait) - } - - Future.sequence(futures) - } - val edgesWithIdx = edges.zipWithIndex val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) => - incrementCounts(getStorage(label))(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) + getStorage(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) } Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) @@ -1147,11 +975,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = { val label = edge.innerLabel - val storage = getStorage(label) - val kvs = storage.buildDegreePuts(edge, degreeVal) - storage.writeToStorage(edge.innerLabel.service.cluster, kvs, withWait = true) + storage.updateDegree(label.hbaseZkAddr, edge, degreeVal) } def isRunning(): Boolean = running.get() @@ -1164,166 +990,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph localLongId.set(0l) } - def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = Try { - val parts = GraphUtil.split(s) - val logType = parts(2) - val element = if (logType == "edge" | logType == "e") { - /* current only edge is considered to be bulk loaded */ - labelMapping.get(parts(5)) match { - case None => - case Some(toReplace) => - parts(5) = toReplace - } - toEdge(parts) - } else if (logType == "vertex" | logType == "v") { - toVertex(parts) - } else { - throw new GraphExceptions.JsonParseException("log type is not exist in log.") - } - - element - } recover { - case e: Exception => - logger.error(s"[toElement]: $e", e) - None - } get - - - def toVertex(s: String): Option[S2VertexLike] = { - toVertex(GraphUtil.split(s)) - } - - def toEdge(s: String): Option[S2EdgeLike] = { - toEdge(GraphUtil.split(s)) - } - - def toEdge(parts: Array[String]): Option[S2EdgeLike] = Try { - val (ts, operation, logType, srcId, tgtId, label) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) - val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] - val tempDirection = if (parts.length >= 8) parts(7) else "out" - val direction = if (tempDirection != "out" && tempDirection != "in") "out" else tempDirection - val edge = toEdge(srcId, tgtId, label, direction, props, ts.toLong, operation) - Option(edge) - } recover { - case e: Exception => - logger.error(s"[toEdge]: $e", e) - throw e - } get - - def toVertex(parts: Array[String]): Option[S2VertexLike] = Try { - val (ts, operation, logType, srcId, serviceName, colName) = (parts(0), parts(1), parts(2), parts(3), parts(4), parts(5)) - val props = if (parts.length >= 7) fromJsonToProperties(Json.parse(parts(6)).asOpt[JsObject].getOrElse(Json.obj())) else Map.empty[String, Any] - val vertex = toVertex(serviceName, colName, srcId, props, ts.toLong, operation) - Option(vertex) - } recover { - case e: Throwable => - logger.error(s"[toVertex]: $e", e) - throw e - } get - - def toEdge(srcId: Any, - tgtId: Any, - labelName: String, - direction: String, - props: Map[String, Any] = Map.empty, - ts: Long = System.currentTimeMillis(), - operation: String = "insert"): S2EdgeLike = { - val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) - - val srcColumn = if (direction == "out") label.srcColumn else label.tgtColumn - val tgtColumn = if (direction == "out") label.tgtColumn else label.srcColumn - - val srcVertexIdInnerVal = toInnerVal(srcId, srcColumn.columnType, label.schemaVersion) - val tgtVertexIdInnerVal = toInnerVal(tgtId, tgtColumn.columnType, label.schemaVersion) - - val srcVertex = newVertex(SourceVertexId(srcColumn, srcVertexIdInnerVal), System.currentTimeMillis()) - val tgtVertex = newVertex(TargetVertexId(tgtColumn, tgtVertexIdInnerVal), System.currentTimeMillis()) - val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) - - val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts) - val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) - val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) - - new S2Edge(this, srcVertex, tgtVertex, label, dir, op = op, version = ts).copyEdgeWithState(propsWithTs) - } - - def toVertex(serviceName: String, - columnName: String, - id: Any, - props: Map[String, Any] = Map.empty, - ts: Long = System.currentTimeMillis(), - operation: String = "insert"): S2VertexLike = { - - val service = Service.findByName(serviceName).getOrElse(throw new java.lang.IllegalArgumentException(s"$serviceName is not found.")) - val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new java.lang.IllegalArgumentException(s"$columnName is not found.")) - val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) - - val srcVertexId = id match { - case vid: VertexId => id.asInstanceOf[VertexId] - case _ => VertexId(column, toInnerVal(id, column.columnType, column.schemaVersion)) - } - - val propsInner = column.propsToInnerVals(props) ++ - Map(ColumnMeta.timestamp -> InnerVal.withLong(ts, column.schemaVersion)) - - val vertex = new S2Vertex(this, srcVertexId, ts, S2Vertex.EmptyProps, op) - S2Vertex.fillPropsWithTs(vertex, propsInner) - vertex - } - - def toRequestEdge(queryRequest: QueryRequest, parentEdges: Seq[EdgeWithScore]): S2EdgeLike = { - val srcVertex = queryRequest.vertex - val queryParam = queryRequest.queryParam - val tgtVertexIdOpt = queryParam.tgtVertexInnerIdOpt - val label = queryParam.label - val labelWithDir = queryParam.labelWithDir - val (srcColumn, tgtColumn) = label.srcTgtColumn(labelWithDir.dir) - val propsWithTs = label.EmptyPropsWithTs - - tgtVertexIdOpt match { - case Some(tgtVertexId) => // _to is given. - /* we use toSnapshotEdge so dont need to swap src, tgt */ - val src = srcVertex.innerId - val tgt = tgtVertexId - val (srcVId, tgtVId) = (SourceVertexId(srcColumn, src), TargetVertexId(tgtColumn, tgt)) - val (srcV, tgtV) = (newVertex(srcVId), newVertex(tgtVId)) - - newEdge(srcV, tgtV, label, labelWithDir.dir, propsWithTs = propsWithTs) - case None => - val src = srcVertex.innerId - val srcVId = SourceVertexId(srcColumn, src) - val srcV = newVertex(srcVId) - - newEdge(srcV, srcV, label, labelWithDir.dir, propsWithTs = propsWithTs, parentEdges = parentEdges) - } - } - /** - * helper to create new Edge instance from given parameters on memory(not actually stored in storage). - * - * Since we are using mutable map for property value(propsWithTs), - * we should make sure that reference for mutable map never be shared between multiple Edge instances. - * To guarantee this, we never create Edge directly, but rather use this helper which is available on S2Graph. - * - * Note that we are using following convention - * 1. `add*` for method that actually store instance into storage, - * 2. `new*` for method that only create instance on memory, but not store it into storage. - * - * @param srcVertex - * @param tgtVertex - * @param innerLabel - * @param dir - * @param op - * @param version - * @param propsWithTs - * @param parentEdges - * @param originalEdgeOpt - * @param pendingEdgeOpt - * @param statusCode - * @param lockTs - * @param tsInnerValOpt - * @return - */ def newEdge(srcVertex: S2VertexLike, tgtVertex: S2VertexLike, innerLabel: Label, @@ -1336,89 +1003,22 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph pendingEdgeOpt: Option[S2EdgeLike] = None, statusCode: Byte = 0, lockTs: Option[Long] = None, - tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = { - val edge = S2Edge( - this, - srcVertex, - tgtVertex, - innerLabel, - dir, - op, - version, - S2Edge.EmptyProps, - parentEdges, - originalEdgeOpt, - pendingEdgeOpt, - statusCode, - lockTs, - tsInnerValOpt) - S2Edge.fillPropsWithTs(edge, propsWithTs) - edge - } - - /** - * helper to create new SnapshotEdge instance from given parameters on memory(not actually stored in storage). - * - * Note that this is only available to S2Graph, not structure.Graph so only internal code should use this method. - * @param srcVertex - * @param tgtVertex - * @param label - * @param dir - * @param op - * @param version - * @param propsWithTs - * @param pendingEdgeOpt - * @param statusCode - * @param lockTs - * @param tsInnerValOpt - * @return - */ - private[core] def newSnapshotEdge(srcVertex: S2VertexLike, - tgtVertex: S2VertexLike, - label: Label, - dir: Int, - op: Byte, - version: Long, - propsWithTs: S2Edge.State, - pendingEdgeOpt: Option[S2EdgeLike], - statusCode: Byte = 0, - lockTs: Option[Long], - tsInnerValOpt: Option[InnerValLike] = None): SnapshotEdge = { - val snapshotEdge = new SnapshotEdge(this, srcVertex, tgtVertex, label, dir, op, version, S2Edge.EmptyProps, - pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) - S2Edge.fillPropsWithTs(snapshotEdge, propsWithTs) - snapshotEdge - } - - def newVertexId(serviceName: String)(columnName: String)(id: Any): VertexId = { - val service = Service.findByName(serviceName).getOrElse(throw new RuntimeException(s"$serviceName is not found.")) - val column = ServiceColumn.find(service.id.get, columnName).getOrElse(throw new RuntimeException(s"$columnName is not found.")) - newVertexId(service, column, id) - } + tsInnerValOpt: Option[InnerValLike] = None): S2EdgeLike = + elementBuilder.newEdge(srcVertex, tgtVertex, innerLabel, dir, op, version, propsWithTs, + parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) - /** - * helper to create S2Graph's internal VertexId instance with given parameters. - * @param service - * @param column - * @param id - * @return - */ def newVertexId(service: Service, column: ServiceColumn, - id: Any): VertexId = { - val innerVal = CanInnerValLike.anyToInnerValLike.toInnerVal(id)(column.schemaVersion) - new VertexId(column, innerVal) - } + id: Any): VertexId = + elementBuilder.newVertexId(service, column, id) def newVertex(id: VertexId, ts: Long = System.currentTimeMillis(), props: S2Vertex.Props = S2Vertex.EmptyProps, op: Byte = 0, - belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = { - val vertex = new S2Vertex(this, id, ts, S2Vertex.EmptyProps, op, belongLabelIds) - S2Vertex.fillPropsWithTs(vertex, props) - vertex - } + belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = + elementBuilder.newVertex(id, ts, props, op, belongLabelIds) + def getVertex(vertexId: VertexId): Option[S2VertexLike] = { val v = newVertex(vertexId) @@ -1441,248 +1041,26 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph stepResultLs.foreach(_.edgeWithScores.foreach(es => ls.add(es.edge))) ls.iterator() } -// getEdges(query).map { stepResult => -// val ls = new util.ArrayList[Edge]() -// stepResult.edgeWithScores.foreach(es => ls.add(es.edge)) -// ls.iterator() -// } - } - - /** - * used by graph.traversal().V() - * @param ids: array of VertexId values. note that last parameter can be used to control if actually fetch vertices from storage or not. - * since S2Graph use user-provided id as part of edge, it is possible to - * fetch edge without fetch start vertex. default is false which means we are not fetching vertices from storage. - * @return - */ - override def vertices(ids: AnyRef*): util.Iterator[structure.Vertex] = { - val fetchVertices = ids.lastOption.map { lastParam => - if (lastParam.isInstanceOf[Boolean]) lastParam.asInstanceOf[Boolean] - else true - }.getOrElse(true) - - if (ids.isEmpty) { - //TODO: default storage need to be fixed. - Await.result(defaultStorage.fetchVerticesAll(), WaitTimeout).iterator - } else { - val vertices = ids.collect { - case s2Vertex: S2VertexLike => s2Vertex - case vId: VertexId => newVertex(vId) - case vertex: Vertex => newVertex(vertex.id().asInstanceOf[VertexId]) - case other @ _ => newVertex(VertexId.fromString(other.toString)) - } - - if (fetchVertices) { - val future = getVertices(vertices).map { vs => - val ls = new util.ArrayList[structure.Vertex]() - ls.addAll(vs) - ls.iterator() - } - Await.result(future, WaitTimeout) - } else { - vertices.iterator - } - } - } - - override def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = { - if (edgeIds.isEmpty) { - // FIXME - Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator - } else { - Await.result(edgesAsync(edgeIds: _*), WaitTimeout) - } - } - - def edgesAsync(edgeIds: AnyRef*): Future[util.Iterator[structure.Edge]] = { - val s2EdgeIds = edgeIds.collect { - case s2Edge: S2EdgeLike => s2Edge.id().asInstanceOf[EdgeId] - case id: EdgeId => id - case s: String => EdgeId.fromString(s) - } - val edgesToFetch = for { - id <- s2EdgeIds - } yield { - toEdge(id.srcVertexId, id.tgtVertexId, id.labelName, id.direction) - } - - checkEdges(edgesToFetch).map { stepResult => - val ls = new util.ArrayList[structure.Edge] - stepResult.edgeWithScores.foreach { es => ls.add(es.edge) } - ls.iterator() - } - } - override def tx(): Transaction = { - if (!features.graph.supportsTransactions) throw Graph.Exceptions.transactionsNotSupported - ??? - } - - override def variables(): Variables = new S2GraphVariables - - override def configuration(): Configuration = apacheConfiguration - - override def addVertex(label: String): Vertex = { - if (label == null) throw Element.Exceptions.labelCanNotBeNull - if (label.isEmpty) throw Element.Exceptions.labelCanNotBeEmpty - - addVertex(Seq(T.label, label): _*) - } - - def makeVertex(idValue: AnyRef, kvsMap: Map[String, AnyRef]): S2VertexLike = { - idValue match { - case vId: VertexId => - toVertex(vId.column.service.serviceName, vId.column.columnName, vId, kvsMap) - case _ => - val serviceColumnNames = kvsMap.getOrElse(T.label.toString, DefaultColumnName).toString - - val names = serviceColumnNames.split(S2Vertex.VertexLabelDelimiter) - val (serviceName, columnName) = - if (names.length == 1) (DefaultServiceName, names(0)) - else throw new RuntimeException("malformed data on vertex label.") - - toVertex(serviceName, columnName, idValue, kvsMap) - } - } - - override def addVertex(kvs: AnyRef*): structure.Vertex = { - if (!features().vertex().supportsUserSuppliedIds() && kvs.contains(T.id)) { - throw Vertex.Exceptions.userSuppliedIdsNotSupported - } - - val kvsMap = S2Property.kvsToProps(kvs) - kvsMap.get(T.id.name()) match { - case Some(idValue) if !S2Property.validType(idValue) => - throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported() - case _ => - } - - kvsMap.foreach { case (k, v) => S2Property.assertValidProp(k, v) } - - if (kvsMap.contains(T.label.name()) && kvsMap(T.label.name).toString.isEmpty) - throw Element.Exceptions.labelCanNotBeEmpty - - val vertex = kvsMap.get(T.id.name()) match { - case None => // do nothing - val id = nextLocalLongId - makeVertex(Long.box(id), kvsMap) - case Some(idValue) if S2Property.validType(idValue) => - makeVertex(idValue, kvsMap) - case _ => - throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported - } - - addVertexInner(vertex) - - vertex - } - - def addVertex(id: VertexId, - ts: Long = System.currentTimeMillis(), - props: S2Vertex.Props = S2Vertex.EmptyProps, - op: Byte = 0, - belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = { - val vertex = newVertex(id, ts, props, op, belongLabelIds) - - val future = mutateVertices(Seq(vertex), withWait = true).map { rets => - if (rets.forall(_.isSuccess)) vertex - else throw new RuntimeException("addVertex failed.") - } - Await.ready(future, WaitTimeout) - - vertex - } - - def addVertexInner(vertex: S2VertexLike): S2VertexLike = { - val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets => - if (rets.forall(_.isSuccess)) { - indexProvider.mutateVerticesAsync(Seq(vertex)) - } else throw new RuntimeException("addVertex failed.") - } - Await.ready(future, WaitTimeout) - - vertex - } - - /* tp3 only */ - def addEdge(srcVertex: S2VertexLike, labelName: String, tgtVertex: Vertex, kvs: AnyRef*): Edge = { - val containsId = kvs.contains(T.id) - - tgtVertex match { - case otherV: S2VertexLike => - if (!features().edge().supportsUserSuppliedIds() && containsId) { - throw Exceptions.userSuppliedIdsNotSupported() - } - - val props = S2Property.kvsToProps(kvs) - - props.foreach { case (k, v) => S2Property.assertValidProp(k, v) } - - //TODO: direction, operation, _timestamp need to be reserved property key. - - try { - val direction = props.get("direction").getOrElse("out").toString - val ts = props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis()) - val operation = props.get("operation").map(_.toString).getOrElse("insert") - val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) - val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) - val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts) - val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) - val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) - - val edge = newEdge(srcVertex, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs) - - val future = mutateEdges(Seq(edge), withWait = true).flatMap { rets => - indexProvider.mutateEdgesAsync(Seq(edge)) - } - Await.ready(future, WaitTimeout) - - edge - } catch { - case e: LabelNotExistException => throw new java.lang.IllegalArgumentException(e) - } - case null => throw new java.lang.IllegalArgumentException - case _ => throw new RuntimeException("only S2Graph vertex can be used.") - } } - override def close(): Unit = { - shutdown() - } - - override def compute[C <: GraphComputer](aClass: Class[C]): C = ??? - - override def compute(): GraphComputer = { - if (!features.graph.supportsComputer) { - throw Graph.Exceptions.graphComputerNotSupported - } - ??? - } - - class S2GraphFeatures extends Features { - import org.apache.s2graph.core.{features => FS} - override def edge(): Features.EdgeFeatures = new FS.S2EdgeFeatures - - override def graph(): Features.GraphFeatures = new FS.S2GraphFeatures - - override def supports(featureClass: Class[_ <: Features.FeatureSet], feature: String): Boolean = - super.supports(featureClass, feature) - - override def vertex(): Features.VertexFeatures = new FS.S2VertexFeatures - - override def toString: String = { - s"FEATURES:\nEdgeFeatures:${edge}\nGraphFeatures:${graph}\nVertexFeatures:${vertex}" - } - } - - private val s2Features = new S2GraphFeatures - - override def features() = s2Features - - override def toString(): String = "[s2graph]" + def toVertex(serviceName: String, + columnName: String, + id: Any, + props: Map[String, Any] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert"): S2VertexLike = + elementBuilder.toVertex(serviceName, columnName, id, props, ts, operation) - override def io[I <: Io[_ <: GraphReader.ReaderBuilder[_ <: GraphReader], _ <: GraphWriter.WriterBuilder[_ <: GraphWriter], _ <: Mapper.Builder[_]]](builder: Io.Builder[I]): I = { - builder.graph(this).registry(S2GraphIoRegistry.instance).create().asInstanceOf[I] - } + def toEdge(srcId: Any, + tgtId: Any, + labelName: String, + direction: String, + props: Map[String, Any] = Map.empty, + ts: Long = System.currentTimeMillis(), + operation: String = "insert"): S2EdgeLike = + elementBuilder.toEdge(srcId, tgtId, labelName, direction, props, ts, operation) + def toGraphElement(s: String, labelMapping: Map[String, String] = Map.empty): Option[GraphElement] = + elementBuilder.toGraphElement(s, labelMapping) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala new file mode 100644 index 0000000..03a92c6 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala @@ -0,0 +1,238 @@ +package org.apache.s2graph.core +import java.util + +import org.apache.commons.configuration.Configuration +import org.apache.s2graph.core.GraphExceptions.LabelNotExistException +import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName} +import org.apache.s2graph.core.features.{S2Features, S2GraphVariables} +import org.apache.s2graph.core.mysqls.{Label, LabelMeta} +import org.apache.s2graph.core.types.VertexId +import org.apache.tinkerpop.gremlin.process.computer.GraphComputer +import org.apache.tinkerpop.gremlin.structure +import org.apache.tinkerpop.gremlin.structure.Edge.Exceptions +import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables} +import org.apache.tinkerpop.gremlin.structure.io.{GraphReader, GraphWriter, Io, Mapper} +import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, Transaction, Vertex} + +import scala.concurrent.{Await, Future} +import scala.collection.JavaConversions._ + +trait S2GraphLike extends Graph { + this: S2Graph => + + var apacheConfiguration: Configuration + + private val s2Features = new S2Features + + override def features() = s2Features + + def vertices(ids: AnyRef*): util.Iterator[structure.Vertex] = { + val fetchVertices = ids.lastOption.map { lastParam => + if (lastParam.isInstanceOf[Boolean]) lastParam.asInstanceOf[Boolean] + else true + }.getOrElse(true) + + if (ids.isEmpty) { + //TODO: default storage need to be fixed. + Await.result(defaultStorage.fetchVerticesAll(), WaitTimeout).iterator + } else { + val vertices = ids.collect { + case s2Vertex: S2VertexLike => s2Vertex + case vId: VertexId => newVertex(vId) + case vertex: Vertex => newVertex(vertex.id().asInstanceOf[VertexId]) + case other@_ => newVertex(VertexId.fromString(other.toString)) + } + + if (fetchVertices) { + val future = getVertices(vertices).map { vs => + val ls = new util.ArrayList[structure.Vertex]() + ls.addAll(vs) + ls.iterator() + } + Await.result(future, WaitTimeout) + } else { + vertices.iterator + } + } + } + + def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = { + if (edgeIds.isEmpty) { + // FIXME + Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator + } else { + Await.result(edgesAsync(edgeIds: _*), WaitTimeout) + } + } + + def edgesAsync(edgeIds: AnyRef*): Future[util.Iterator[structure.Edge]] = { + val s2EdgeIds = edgeIds.collect { + case s2Edge: S2EdgeLike => s2Edge.id().asInstanceOf[EdgeId] + case id: EdgeId => id + case s: String => EdgeId.fromString(s) + } + val edgesToFetch = for { + id <- s2EdgeIds + } yield { + elementBuilder.toEdge(id.srcVertexId, id.tgtVertexId, id.labelName, id.direction) + } + + checkEdges(edgesToFetch).map { stepResult => + val ls = new util.ArrayList[structure.Edge] + stepResult.edgeWithScores.foreach { es => ls.add(es.edge) } + ls.iterator() + } + } + + def tx(): Transaction = { + if (!features.graph.supportsTransactions) throw Graph.Exceptions.transactionsNotSupported + ??? + } + + def variables(): Variables = new S2GraphVariables + + def configuration(): Configuration = apacheConfiguration + + def addVertex(label: String): Vertex = { + if (label == null) throw Element.Exceptions.labelCanNotBeNull + if (label.isEmpty) throw Element.Exceptions.labelCanNotBeEmpty + + addVertex(Seq(T.label, label): _*) + } + + def makeVertex(idValue: AnyRef, kvsMap: Map[String, AnyRef]): S2VertexLike = { + idValue match { + case vId: VertexId => + elementBuilder.toVertex(vId.column.service.serviceName, vId.column.columnName, vId, kvsMap) + case _ => + val serviceColumnNames = kvsMap.getOrElse(T.label.toString, DefaultColumnName).toString + + val names = serviceColumnNames.split(S2Vertex.VertexLabelDelimiter) + val (serviceName, columnName) = + if (names.length == 1) (DefaultServiceName, names(0)) + else throw new RuntimeException("malformed data on vertex label.") + + elementBuilder.toVertex(serviceName, columnName, idValue, kvsMap) + } + } + + def addVertex(kvs: AnyRef*): structure.Vertex = { + if (!features().vertex().supportsUserSuppliedIds() && kvs.contains(T.id)) { + throw Vertex.Exceptions.userSuppliedIdsNotSupported + } + + val kvsMap = S2Property.kvsToProps(kvs) + kvsMap.get(T.id.name()) match { + case Some(idValue) if !S2Property.validType(idValue) => + throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported() + case _ => + } + + kvsMap.foreach { case (k, v) => S2Property.assertValidProp(k, v) } + + if (kvsMap.contains(T.label.name()) && kvsMap(T.label.name).toString.isEmpty) + throw Element.Exceptions.labelCanNotBeEmpty + + val vertex = kvsMap.get(T.id.name()) match { + case None => // do nothing + val id = nextLocalLongId + makeVertex(Long.box(id), kvsMap) + case Some(idValue) if S2Property.validType(idValue) => + makeVertex(idValue, kvsMap) + case _ => + throw Vertex.Exceptions.userSuppliedIdsOfThisTypeNotSupported + } + + addVertexInner(vertex) + + vertex + } + + def addVertex(id: VertexId, + ts: Long = System.currentTimeMillis(), + props: S2Vertex.Props = S2Vertex.EmptyProps, + op: Byte = 0, + belongLabelIds: Seq[Int] = Seq.empty): S2VertexLike = { + val vertex = newVertex(id, ts, props, op, belongLabelIds) + + val future = mutateVertices(Seq(vertex), withWait = true).map { rets => + if (rets.forall(_.isSuccess)) vertex + else throw new RuntimeException("addVertex failed.") + } + Await.ready(future, WaitTimeout) + + vertex + } + + def addVertexInner(vertex: S2VertexLike): S2VertexLike = { + val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets => + if (rets.forall(_.isSuccess)) { + indexProvider.mutateVerticesAsync(Seq(vertex)) + } else throw new RuntimeException("addVertex failed.") + } + Await.ready(future, WaitTimeout) + + vertex + } + + /* tp3 only */ + def addEdge(srcVertex: S2VertexLike, labelName: String, tgtVertex: Vertex, kvs: AnyRef*): Edge = { + val containsId = kvs.contains(T.id) + + tgtVertex match { + case otherV: S2VertexLike => + if (!features().edge().supportsUserSuppliedIds() && containsId) { + throw Exceptions.userSuppliedIdsNotSupported() + } + + val props = S2Property.kvsToProps(kvs) + + props.foreach { case (k, v) => S2Property.assertValidProp(k, v) } + + //TODO: direction, operation, _timestamp need to be reserved property key. + + try { + val direction = props.get("direction").getOrElse("out").toString + val ts = props.get(LabelMeta.timestamp.name).map(_.toString.toLong).getOrElse(System.currentTimeMillis()) + val operation = props.get("operation").map(_.toString).getOrElse("insert") + val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) + val dir = GraphUtil.toDir(direction).getOrElse(throw new RuntimeException(s"$direction is not supported.")) + val propsPlusTs = props ++ Map(LabelMeta.timestamp.name -> ts) + val propsWithTs = label.propsToInnerValsWithTs(propsPlusTs, ts) + val op = GraphUtil.toOp(operation).getOrElse(throw new RuntimeException(s"$operation is not supported.")) + + val edge = newEdge(srcVertex, otherV, label, dir, op = op, version = ts, propsWithTs = propsWithTs) + + val future = mutateEdges(Seq(edge), withWait = true).flatMap { rets => + indexProvider.mutateEdgesAsync(Seq(edge)) + } + Await.ready(future, WaitTimeout) + + edge + } catch { + case e: LabelNotExistException => throw new java.lang.IllegalArgumentException(e) + } + case null => throw new java.lang.IllegalArgumentException + case _ => throw new RuntimeException("only S2Graph vertex can be used.") + } + } + + def close(): Unit = { + shutdown() + } + + def compute[C <: GraphComputer](aClass: Class[C]): C = ??? + + def compute(): GraphComputer = { + if (!features.graph.supportsComputer) { + throw Graph.Exceptions.graphComputerNotSupported + } + ??? + } + + def io[I <: Io[_ <: GraphReader.ReaderBuilder[_ <: GraphReader], _ <: GraphWriter.WriterBuilder[_ <: GraphWriter], _ <: Mapper.Builder[_]]](builder: Io.Builder[I]): I = { + builder.graph(this).registry(S2GraphIoRegistry.instance).create().asInstanceOf[I] + } + + override def toString(): String = "[s2graph]" +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/features/S2Features.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/features/S2Features.scala b/s2core/src/main/scala/org/apache/s2graph/core/features/S2Features.scala new file mode 100644 index 0000000..36c9ecc --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/features/S2Features.scala @@ -0,0 +1,19 @@ +package org.apache.s2graph.core.features + +import org.apache.tinkerpop.gremlin.structure.Graph.Features + +class S2Features extends Features { + import org.apache.s2graph.core.{features => FS} + override def edge(): Features.EdgeFeatures = new FS.S2EdgeFeatures + + override def graph(): Features.GraphFeatures = new FS.S2GraphFeatures + + override def supports(featureClass: Class[_ <: Features.FeatureSet], feature: String): Boolean = + super.supports(featureClass, feature) + + override def vertex(): Features.VertexFeatures = new FS.S2VertexFeatures + + override def toString: String = { + s"FEATURES:\nEdgeFeatures:${edge}\nGraphFeatures:${graph}\nVertexFeatures:${vertex}" + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index e6075ec..2a8f1e2 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -22,9 +22,10 @@ package org.apache.s2graph.core.storage import com.typesafe.config.Config import org.apache.s2graph.core._ -import org.apache.s2graph.core.storage.serde.Deserializable +import org.apache.s2graph.core.storage.serde.{Deserializable, MutationHelper} import org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializable import org.apache.s2graph.core.types._ + import scala.concurrent.{ExecutionContext, Future} abstract class Storage(val graph: S2Graph, @@ -61,49 +62,51 @@ abstract class Storage(val graph: S2Graph, */ lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, fetcher) - /** IO **/ - def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): serde.Serializable[SnapshotEdge] = - serDe.snapshotEdgeSerializer(snapshotEdge) - - def indexEdgeSerializer(indexEdge: IndexEdge): serde.Serializable[IndexEdge] = - serDe.indexEdgeSerializer(indexEdge) - - def vertexSerializer(vertex: S2VertexLike): serde.Serializable[S2VertexLike] = - serDe.vertexSerializer(vertex) - - def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge] = - serDe.snapshotEdgeDeserializer(schemaVer) - - def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable = - serDe.indexEdgeDeserializer(schemaVer) - - def vertexDeserializer(schemaVer: String): Deserializable[S2VertexLike] = - serDe.vertexDeserializer(schemaVer) + lazy val mutationHelper: MutationHelper = new MutationHelper(this) + +// /** IO **/ +// def snapshotEdgeSerializer(snapshotEdge: SnapshotEdge): serde.Serializable[SnapshotEdge] = +// serDe.snapshotEdgeSerializer(snapshotEdge) +// +// def indexEdgeSerializer(indexEdge: IndexEdge): serde.Serializable[IndexEdge] = +// serDe.indexEdgeSerializer(indexEdge) +// +// def vertexSerializer(vertex: S2Vertex): serde.Serializable[S2Vertex] = +// serDe.vertexSerializer(vertex) +// +// def snapshotEdgeDeserializer(schemaVer: String): Deserializable[SnapshotEdge] = +// serDe.snapshotEdgeDeserializer(schemaVer) +// +// def indexEdgeDeserializer(schemaVer: String): IndexEdgeDeserializable = +// serDe.indexEdgeDeserializer(schemaVer) +// +// def vertexDeserializer(schemaVer: String): Deserializable[S2Vertex] = +// serDe.vertexDeserializer(schemaVer) /** Mutation Builder */ - def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = - io.increments(edgeMutate) - - def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = - io.indexedEdgeMutations(edgeMutate) - - def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = - io.buildIncrementsAsync(indexedEdge, amount) - - def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = - io.buildIncrementsCountAsync(indexedEdge, amount) - - def buildVertexPutsAsync(edge: S2EdgeLike): Seq[SKeyValue] = - io.buildVertexPutsAsync(edge) - - def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = - io.snapshotEdgeMutations(edgeMutate) - - def buildDegreePuts(edge: S2EdgeLike, degreeVal: Long): Seq[SKeyValue] = - io.buildDegreePuts(edge, degreeVal) - - def buildPutsAll(vertex: S2VertexLike): Seq[SKeyValue] = - io.buildPutsAll(vertex) +// def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = +// io.increments(edgeMutate) +// +// def indexedEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = +// io.indexedEdgeMutations(edgeMutate) +// +// def buildIncrementsAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = +// io.buildIncrementsAsync(indexedEdge, amount) +// +// def buildIncrementsCountAsync(indexedEdge: IndexEdge, amount: Long = 1L): Seq[SKeyValue] = +// io.buildIncrementsCountAsync(indexedEdge, amount) +// +// def buildVertexPutsAsync(edge: S2Edge): Seq[SKeyValue] = +// io.buildVertexPutsAsync(edge) +// +// def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = +// io.snapshotEdgeMutations(edgeMutate) +// +// def buildDegreePuts(edge: S2Edge, degreeVal: Long): Seq[SKeyValue] = +// io.buildDegreePuts(edge, degreeVal) +// +// def buildPutsAll(vertex: S2Vertex): Seq[SKeyValue] = +// io.buildPutsAll(vertex) /** Mutation **/ @@ -129,8 +132,8 @@ abstract class Storage(val graph: S2Graph, fetcher.fetchSnapshotEdgeInner(edge) /** Conflict Resolver **/ - def retry(tryNum: Int)(edges: Seq[S2EdgeLike], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2EdgeLike])(implicit ec: ExecutionContext): Future[Boolean] = - conflictResolver.retry(tryNum)(edges, statusCode, fetchedSnapshotEdgeOpt) +// def retry(tryNum: Int)(edges: Seq[S2Edge], statusCode: Byte, fetchedSnapshotEdgeOpt: Option[S2Edge])(implicit ec: ExecutionContext): Future[Boolean] = +// conflictResolver.retry(tryNum)(edges, statusCode, fetchedSnapshotEdgeOpt) /** Management **/ @@ -145,4 +148,25 @@ abstract class Storage(val graph: S2Graph, def shutdown(): Unit = management.shutdown() def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName) + + def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, + requestTs: Long, + retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = + mutationHelper.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum) + + def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = + mutationHelper.mutateVertex(zkQuorum: String, vertex, withWait) + + def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = + mutationHelper.mutateStrongEdges(zkQuorum, _edges, withWait) + + + def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = + mutationHelper.mutateWeakEdges(zkQuorum, _edges, withWait) + + def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = + mutationHelper.incrementCounts(zkQuorum, edges, withWait) + + def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = + mutationHelper.updateDegree(zkQuorum, edge, degreeVal) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala index 6be5f60..bdb6e99 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageReadable.scala @@ -303,7 +303,7 @@ class AsynchbaseStorageReadable(val graph: S2Graph, val cacheTTL = queryParam.cacheTTLInMillis /* with version 4, request's type is (Scanner, (Int, Int)). otherwise GetRequest. */ - val edge = graph.toRequestEdge(queryRequest, parentEdges) + val edge = graph.elementBuilder.toRequestEdge(queryRequest, parentEdges) val request = buildRequest(queryRequest, edge) val (intervalMaxBytes, intervalMinBytes) = queryParam.buildInterval(Option(edge)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala index b618962..408822a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/tall/SnapshotEdgeDeserializable.scala @@ -97,7 +97,7 @@ class SnapshotEdgeDeserializable(graph: S2Graph) extends Deserializable[Snapshot Option(pendingEdge) } - val snapshotEdge = graph.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts), + val snapshotEdge = graph.elementBuilder.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts), label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode, pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala index 8c961ce..ff8eb80 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/snapshotedge/wide/SnapshotEdgeDeserializable.scala @@ -88,7 +88,7 @@ class SnapshotEdgeDeserializable(graph: S2Graph) extends Deserializable[Snapshot Option(pendingEdge) } - val snapshotEdge = graph.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts), + val snapshotEdge = graph.elementBuilder.newSnapshotEdge(graph.newVertex(srcVertexId, ts), graph.newVertex(tgtVertexId, ts), label, labelWithDir.dir, op, version, props.toMap, statusCode = statusCode, pendingEdgeOpt = _pendingEdgeOpt, lockTs = None, tsInnerValOpt = Option(tsInnerVal)) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/aa66822b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala index 0cbaa81..24e98a1 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/storage/hbase/IndexEdgeTest.scala @@ -46,8 +46,8 @@ class IndexEdgeTest extends FunSuite with Matchers with TestCommonWithModels { val labelOpt = Option(l) val edge = graph.newEdge(vertex, tgtVertex, l, labelWithDir.dir, 0, ts, props, tsInnerValOpt = Option(InnerVal.withLong(ts, l.schemaVersion))) val indexEdge = edge.edgesWithIndex.find(_.labelIndexSeq == LabelIndex.DefaultSeq).head - val kvs = graph.getStorage(l).indexEdgeSerializer(indexEdge).toKeyValues - val _indexEdgeOpt = graph.getStorage(l).indexEdgeDeserializer(l.schemaVersion).fromKeyValues(kvs, None) + val kvs = graph.getStorage(l).serDe.indexEdgeSerializer(indexEdge).toKeyValues + val _indexEdgeOpt = graph.getStorage(l).serDe.indexEdgeDeserializer(l.schemaVersion).fromKeyValues(kvs, None) _indexEdgeOpt should not be empty edge == _indexEdgeOpt.get should be(true)
