add MutationHelper.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/937b55a7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/937b55a7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/937b55a7 Branch: refs/heads/master Commit: 937b55a7405899d38fee230a35b54e63b47b1677 Parents: bc26642 Author: DO YUNG YOON <[email protected]> Authored: Fri Nov 3 21:14:04 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri Nov 3 21:14:04 2017 +0900 ---------------------------------------------------------------------- .../core/storage/serde/MutationHelper.scala | 172 +++++++++++++++++++ 1 file changed, 172 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/937b55a7/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala new file mode 100644 index 0000000..e2621af --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala @@ -0,0 +1,172 @@ +package org.apache.s2graph.core.storage.serde + +import org.apache.s2graph.core.mysqls.LabelMeta +import org.apache.s2graph.core._ +import org.apache.s2graph.core.storage._ +import org.apache.s2graph.core.utils.logger + +import scala.concurrent.{ExecutionContext, Future} + +class MutationHelper(storage: Storage) { + val serDe = storage.serDe + val io = storage.io + val fetcher = storage.fetcher + val mutator = storage.mutator + val conflictResolver = storage.conflictResolver + + private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = + mutator.writeToStorage(cluster, kvs, withWait) + + def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, + requestTs: Long, + retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = { + if (stepInnerResult.isEmpty) Future.successful(true) + else { + val head = stepInnerResult.edgeWithScores.head + val zkQuorum = head.edge.innerLabel.hbaseZkAddr + val futures = for { + edgeWithScore <- stepInnerResult.edgeWithScores + } yield { + val edge = edgeWithScore.edge + val score = edgeWithScore.score + + val edgeSnapshot = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) + val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) + + val edgeForward = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) + val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => + serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + io.buildIncrementsAsync(indexEdge, -1L) + } + + /* reverted direction */ + val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) + val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => + serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + io.buildIncrementsAsync(indexEdge, -1L) + } + + val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations + + writeToStorage(zkQuorum, mutations, withWait = true) + } + + Future.sequence(futures).map { rets => rets.forall(_.isSuccess) } + } + } + + def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { + if (vertex.op == GraphUtil.operations("delete")) { + writeToStorage(zkQuorum, + serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) + } else if (vertex.op == GraphUtil.operations("deleteAll")) { + logger.info(s"deleteAll for vertex is truncated. $vertex") + Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time + } else { + writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait) + } + } + + def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = { + val mutations = _edges.flatMap { edge => + val (_, edgeUpdate) = + if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge) + else S2Edge.buildOperation(None, Seq(edge)) + + val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) + + if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false) + io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr + } + + writeToStorage(zkQuorum, mutations, withWait).map { ret => + _edges.zipWithIndex.map { case (edge, idx) => + idx -> ret.isSuccess + } + } + } + + def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { + def mutateEdgesInner(edges: Seq[S2EdgeLike], + checkConsistency: Boolean, + withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { + assert(edges.nonEmpty) + // TODO:: remove after code review: unreachable code + if (!checkConsistency) { + + val futures = edges.map { edge => + val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge)) + + val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) + val mutations = + io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr + + if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) + + writeToStorage(zkQuorum, mutations, withWait) + } + Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) } + } else { + fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => + conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_)) + } + } + } + + val edgeWithIdxs = _edges.zipWithIndex + val grouped = edgeWithIdxs.groupBy { case (edge, idx) => + (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId) + } toSeq + + val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => + val edges = edgeGroup.map(_._1) + val idxs = edgeGroup.map(_._2) + // After deleteAll, process others + val mutateEdgeFutures = edges.toList match { + case head :: tail => + val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait) + + //TODO: decide what we will do on failure on vertex put + val puts = io.buildVertexPutsAsync(head) + val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait) + Seq(edgeFuture, vertexFuture) + case Nil => Nil + } + + val composed = for { + // deleteRet <- Future.sequence(deleteAllFutures) + mutateRet <- Future.sequence(mutateEdgeFutures) + } yield mutateRet + + composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) } + } + + Future.sequence(mutateEdges).map { squashedRets => + squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2) + } + } + + def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = { + val futures = for { + edge <- edges + } yield { + val kvs = for { + relEdge <- edge.relatedEdges + edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid) + } yield { + val countWithTs = edge.propertyValueInner(LabelMeta.count) + val countVal = countWithTs.innerVal.toString().toLong + io.buildIncrementsCountAsync(edgeWithIndex, countVal).head + } + writeToStorage(zkQuorum, kvs, withWait = withWait) + } + + Future.sequence(futures) + } + + def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = { + val kvs = io.buildDegreePuts(edge, degreeVal) + + mutator.writeToStorage(zkQuorum, kvs, withWait = true) + } +}
