add MutationHelper.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/937b55a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/937b55a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/937b55a7

Branch: refs/heads/master
Commit: 937b55a7405899d38fee230a35b54e63b47b1677
Parents: bc26642
Author: DO YUNG YOON <[email protected]>
Authored: Fri Nov 3 21:14:04 2017 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Fri Nov 3 21:14:04 2017 +0900

----------------------------------------------------------------------
 .../core/storage/serde/MutationHelper.scala     | 172 +++++++++++++++++++
 1 file changed, 172 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/937b55a7/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
new file mode 100644
index 0000000..e2621af
--- /dev/null
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala
@@ -0,0 +1,172 @@
+package org.apache.s2graph.core.storage.serde
+
+import org.apache.s2graph.core.mysqls.LabelMeta
+import org.apache.s2graph.core._
+import org.apache.s2graph.core.storage._
+import org.apache.s2graph.core.utils.logger
+
+import scala.concurrent.{ExecutionContext, Future}
+
+class MutationHelper(storage: Storage) {
+  val serDe = storage.serDe
+  val io = storage.io
+  val fetcher = storage.fetcher
+  val mutator = storage.mutator
+  val conflictResolver = storage.conflictResolver
+
+  private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] =
+    mutator.writeToStorage(cluster, kvs, withWait)
+
+  def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult,
+                                    requestTs: Long,
+                                    retryNum: Int)(implicit ec: 
ExecutionContext): Future[Boolean] = {
+    if (stepInnerResult.isEmpty) Future.successful(true)
+    else {
+      val head = stepInnerResult.edgeWithScores.head
+      val zkQuorum = head.edge.innerLabel.hbaseZkAddr
+      val futures = for {
+        edgeWithScore <- stepInnerResult.edgeWithScores
+      } yield {
+        val edge = edgeWithScore.edge
+        val score = edgeWithScore.score
+
+        val edgeSnapshot = edge.copyEdge(propsWithTs = 
S2Edge.propsToState(edge.updatePropsWithTs()))
+        val reversedSnapshotEdgeMutations = 
serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation
 = SKeyValue.Put))
+
+        val edgeForward = edge.copyEdge(propsWithTs = 
S2Edge.propsToState(edge.updatePropsWithTs()))
+        val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { 
indexEdge =>
+          
serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete)) ++
+            io.buildIncrementsAsync(indexEdge, -1L)
+        }
+
+        /* reverted direction */
+        val edgeRevert = edge.copyEdge(propsWithTs = 
S2Edge.propsToState(edge.updatePropsWithTs()))
+        val reversedIndexedEdgesMutations = 
edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge =>
+          
serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete)) ++
+            io.buildIncrementsAsync(indexEdge, -1L)
+        }
+
+        val mutations = reversedIndexedEdgesMutations ++ 
reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations
+
+        writeToStorage(zkQuorum, mutations, withWait = true)
+      }
+
+      Future.sequence(futures).map { rets => rets.forall(_.isSuccess) }
+    }
+  }
+
+  def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: 
Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+    if (vertex.op == GraphUtil.operations("delete")) {
+      writeToStorage(zkQuorum,
+        serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = 
SKeyValue.Delete)), withWait)
+    } else if (vertex.op == GraphUtil.operations("deleteAll")) {
+      logger.info(s"deleteAll for vertex is truncated. $vertex")
+      Future.successful(MutateResponse.Success) // Ignore withWait parameter, 
because deleteAll operation may takes long time
+    } else {
+      writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait)
+    }
+  }
+
+  def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = {
+    val mutations = _edges.flatMap { edge =>
+      val (_, edgeUpdate) =
+        if (edge.getOp() == GraphUtil.operations("delete")) 
S2Edge.buildDeleteBulk(None, edge)
+        else S2Edge.buildOperation(None, Seq(edge))
+
+      val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+
+      if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, 
withWait = false)
+      io.buildVertexPutsAsync(edge) ++ 
io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ 
io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+    }
+
+    writeToStorage(zkQuorum, mutations, withWait).map { ret =>
+      _edges.zipWithIndex.map { case (edge, idx) =>
+        idx -> ret.isSuccess
+      }
+    }
+  }
+
+  def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = {
+    def mutateEdgesInner(edges: Seq[S2EdgeLike],
+                         checkConsistency: Boolean,
+                         withWait: Boolean)(implicit ec: ExecutionContext): 
Future[MutateResponse] = {
+      assert(edges.nonEmpty)
+      // TODO:: remove after code review: unreachable code
+      if (!checkConsistency) {
+
+        val futures = edges.map { edge =>
+          val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge))
+
+          val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy)
+          val mutations =
+            io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ 
io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr
+
+          if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, 
withWait = false)
+
+          writeToStorage(zkQuorum, mutations, withWait)
+        }
+        Future.sequence(futures).map { rets => new 
MutateResponse(rets.forall(_.isSuccess)) }
+      } else {
+        fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case 
(snapshotEdgeOpt, kvOpt) =>
+          conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new 
MutateResponse(_))
+        }
+      }
+    }
+
+    val edgeWithIdxs = _edges.zipWithIndex
+    val grouped = edgeWithIdxs.groupBy { case (edge, idx) =>
+      (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId)
+    } toSeq
+
+    val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) =>
+      val edges = edgeGroup.map(_._1)
+      val idxs = edgeGroup.map(_._2)
+      // After deleteAll, process others
+      val mutateEdgeFutures = edges.toList match {
+        case head :: tail =>
+          val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, 
withWait)
+
+          //TODO: decide what we will do on failure on vertex put
+          val puts = io.buildVertexPutsAsync(head)
+          val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, 
withWait)
+          Seq(edgeFuture, vertexFuture)
+        case Nil => Nil
+      }
+
+      val composed = for {
+      //        deleteRet <- Future.sequence(deleteAllFutures)
+        mutateRet <- Future.sequence(mutateEdgeFutures)
+      } yield mutateRet
+
+      composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> 
ret) }
+    }
+
+    Future.sequence(mutateEdges).map { squashedRets =>
+      squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2)
+    }
+  }
+
+  def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: 
Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = {
+    val futures = for {
+      edge <- edges
+    } yield {
+      val kvs = for {
+        relEdge <- edge.relatedEdges
+        edgeWithIndex <- 
EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid)
+      } yield {
+        val countWithTs = edge.propertyValueInner(LabelMeta.count)
+        val countVal = countWithTs.innerVal.toString().toLong
+        io.buildIncrementsCountAsync(edgeWithIndex, countVal).head
+      }
+      writeToStorage(zkQuorum, kvs, withWait = withWait)
+    }
+
+    Future.sequence(futures)
+  }
+
+  def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 
0)(implicit ec: ExecutionContext): Future[MutateResponse] = {
+    val kvs = io.buildDegreePuts(edge, degreeVal)
+
+    mutator.writeToStorage(zkQuorum, kvs, withWait = true)
+  }
+}

Reply via email to