Abstract Mutator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b69421b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b69421b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b69421b0 Branch: refs/heads/master Commit: b69421b0b40083c5fd98b4f8139a41bf9a8ff55f Parents: 6f9d852 Author: DO YUNG YOON <steams...@apache.org> Authored: Thu May 3 14:27:48 2018 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Thu May 3 14:27:48 2018 +0900 ---------------------------------------------------------------------- s2core/build.sbt | 3 +- .../scala/org/apache/s2graph/core/Mutator.scala | 22 +++ .../scala/org/apache/s2graph/core/S2Graph.scala | 26 ++- .../org/apache/s2graph/core/S2GraphLike.scala | 4 + .../apache/s2graph/core/TraversalHelper.scala | 4 +- .../core/storage/DefaultOptimisticMutator.scala | 171 +++++++++++++++++ .../core/storage/OptimisticMutator.scala | 46 +++++ .../apache/s2graph/core/storage/Storage.scala | 33 ++-- .../s2graph/core/storage/StorageWritable.scala | 64 ------- .../storage/WriteWriteConflictResolver.scala | 2 +- .../core/storage/hbase/AsynchbaseStorage.scala | 3 +- .../hbase/AsynchbaseStorageWritable.scala | 11 +- .../core/storage/rocks/RocksStorage.scala | 11 +- .../storage/rocks/RocksStorageWritable.scala | 10 +- .../core/storage/serde/MutationHelper.scala | 190 ------------------- 15 files changed, 302 insertions(+), 298 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/build.sbt ---------------------------------------------------------------------- diff --git a/s2core/build.sbt b/s2core/build.sbt index 6e062cc..0b83c3d 100644 --- a/s2core/build.sbt +++ b/s2core/build.sbt @@ -56,7 +56,8 @@ libraryDependencies ++= Seq( "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion excludeLogging(), "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging(), "org.scala-lang.modules" %% "scala-pickling" % "0.10.1", - "com.spotify" % "annoy" % "0.2.5" + "com.spotify" % "annoy" % "0.2.5", + "org.tensorflow" % "tensorflow" % tensorflowVersion ) libraryDependencies := { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala new file mode 100644 index 0000000..a97dcff --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala @@ -0,0 +1,22 @@ +package org.apache.s2graph.core + +import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue} + +import scala.concurrent.{ExecutionContext, Future} + +trait Mutator { + def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] + + def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] + + def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] + + def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] + + def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] + + def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, + requestTs: Long, + retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 43ab92c..4b2274a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -260,6 +260,15 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap else getStorage(label).reader } + override def getMutator(column: ServiceColumn): Mutator = { + getStorage(column.service).mutator + } + + override def getMutator(label: Label): Mutator = { + getStorage(label).mutator + } + + //TODO: override def flushStorage(): Unit = { storagePool.foreach { case (_, storage) => @@ -315,7 +324,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = { val futures = vertices.map { vertex => - storage.mutateVertex(zkQuorum, vertex, withWait) + getMutator(vertex.serviceColumn).mutateVertex(zkQuorum, vertex, withWait) } Future.sequence(futures) } @@ -342,12 +351,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) => val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) => - val storage = getStorage(label) + val mutator = getMutator(label) val edges = edgeGroup.map(_._1) val idxs = edgeGroup.map(_._2) /* multiple edges with weak consistency level will be processed as batch */ - storage.mutateWeakEdges(zkQuorum, edges, withWait) + mutator.mutateWeakEdges(zkQuorum, edges, withWait) } Future.sequence(futures) } @@ -360,9 +369,10 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) => val edges = edgeGroup.map(_._1) val idxs = edgeGroup.map(_._2) - val storage = getStorage(label) + val mutator = getMutator(label) val zkQuorum = label.hbaseZkAddr - storage.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets => + + mutator.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets => idxs.zip(rets) } } @@ -487,7 +497,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = { val edgesWithIdx = edges.zipWithIndex val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) => - getStorage(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) + getMutator(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) } Future.sequence(futures).map { ls => @@ -497,9 +507,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = { val label = edge.innerLabel - val storage = getStorage(label) + val mutator = getMutator(label) - storage.updateDegree(label.hbaseZkAddr, edge, degreeVal) + mutator.updateDegree(label.hbaseZkAddr, edge, degreeVal) } override def getVertex(vertexId: VertexId): Option[S2VertexLike] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala index fef0078..6ed78b0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala @@ -97,6 +97,10 @@ trait S2GraphLike extends Graph { def getFetcher(label: Label): Fetcher + def getMutator(label: Label): Mutator + + def getMutator(column: ServiceColumn): Mutator + def flushStorage(): Unit def shutdown(modelDataDelete: Boolean = false): Unit http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala index 003a2d1..d19dd1f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala @@ -256,7 +256,7 @@ class TraversalHelper(graph: S2GraphLike) { */ graph.mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess)) } else { - graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) + graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) } case _ => @@ -264,7 +264,7 @@ class TraversalHelper(graph: S2GraphLike) { * read: x * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) */ - graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) + graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) } ret } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala new file mode 100644 index 0000000..40f29c0 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala @@ -0,0 +1,171 @@ +package org.apache.s2graph.core.storage + +import org.apache.s2graph.core._ +import org.apache.s2graph.core.schema.LabelMeta +import org.apache.s2graph.core.utils.logger + +import scala.concurrent.{ExecutionContext, Future} + +abstract class DefaultOptimisticMutator(graph: S2GraphLike, + serDe: StorageSerDe, + reader: StorageReadable) extends OptimisticMutator { + val fetcher = reader + + lazy val io: StorageIO = new StorageIO(graph, serDe) + lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, this, reader) + +// private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = +// mutator.writeToStorage(cluster, kvs, withWait) + + def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, + requestTs: Long, + retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = { + if (stepInnerResult.isEmpty) Future.successful(true) + else { + val head = stepInnerResult.edgeWithScores.head + val zkQuorum = head.edge.innerLabel.hbaseZkAddr + val futures = for { + edgeWithScore <- stepInnerResult.edgeWithScores + } yield { + val edge = edgeWithScore.edge + + val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) + val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) + + val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) + val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => + serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + io.buildIncrementsAsync(indexEdge, -1L) + } + + /* reverted direction */ + val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) + val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => + serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + io.buildIncrementsAsync(indexEdge, -1L) + } + + val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations + + writeToStorage(zkQuorum, mutations, withWait = true) + } + + Future.sequence(futures).map { rets => rets.forall(_.isSuccess) } + } + } + + def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { + if (vertex.op == GraphUtil.operations("delete")) { + writeToStorage(zkQuorum, + serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) + } else if (vertex.op == GraphUtil.operations("deleteAll")) { + logger.info(s"deleteAll for vertex is truncated. $vertex") + Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time + } else { + writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait) + } + } + + def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = { + val mutations = _edges.flatMap { edge => + val (_, edgeUpdate) = + if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge) + else S2Edge.buildOperation(None, Seq(edge)) + + val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) + + if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) + io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr + } + + writeToStorage(zkQuorum, mutations, withWait).map { ret => + _edges.zipWithIndex.map { case (edge, idx) => + idx -> ret.isSuccess + } + } + } + + def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { + def mutateEdgesInner(edges: Seq[S2EdgeLike], + checkConsistency: Boolean, + withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { + assert(edges.nonEmpty) + // TODO:: remove after code review: unreachable code + if (!checkConsistency) { + + val futures = edges.map { edge => + val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge)) + + val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) + val mutations = + io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr + + if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) + + writeToStorage(zkQuorum, mutations, withWait) + } + Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) } + } else { + fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => + conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_)) + } + } + } + + val edgeWithIdxs = _edges.zipWithIndex + val grouped = edgeWithIdxs.groupBy { case (edge, idx) => + (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId) + } toSeq + + val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => + val edges = edgeGroup.map(_._1) + val idxs = edgeGroup.map(_._2) + // After deleteAll, process others + val mutateEdgeFutures = edges.toList match { + case head :: tail => + val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait) + + //TODO: decide what we will do on failure on vertex put + val puts = io.buildVertexPutsAsync(head) + val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait) + Seq(edgeFuture, vertexFuture) + case Nil => Nil + } + + val composed = for { + // deleteRet <- Future.sequence(deleteAllFutures) + mutateRet <- Future.sequence(mutateEdgeFutures) + } yield mutateRet + + composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) } + } + + Future.sequence(mutateEdges).map { squashedRets => + squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2) + } + } + + def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = { + val futures = for { + edge <- edges + } yield { + val kvs = for { + relEdge <- edge.relatedEdges + edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid) + } yield { + val countWithTs = edge.propertyValueInner(LabelMeta.count) + val countVal = countWithTs.innerVal.toString().toLong + io.buildIncrementsCountAsync(edgeWithIndex, countVal).head + } + writeToStorage(zkQuorum, kvs, withWait = withWait) + } + + Future.sequence(futures) + } + + def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = { + val kvs = io.buildDegreePuts(edge, degreeVal) + + writeToStorage(zkQuorum, kvs, withWait = true) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala new file mode 100644 index 0000000..f9681a9 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala @@ -0,0 +1,46 @@ +package org.apache.s2graph.core.storage + +import org.apache.s2graph.core.Mutator + +import scala.concurrent.{ExecutionContext, Future} + +trait OptimisticMutator extends Mutator { + /** + * decide how to store given key values Seq[SKeyValue] into storage using storage's client. + * note that this should be return true on all success. + * we assumes that each storage implementation has client as member variable. + * + * @param cluster : where this key values should be stored. + * @param kvs : sequence of SKeyValue that need to be stored in storage. + * @param withWait : flag to control wait ack from storage. + * note that in AsynchbaseStorage(which support asynchronous operations), even with true, + * it never block thread, but rather submit work and notified by event loop when storage send ack back. + * @return ack message from storage. + */ + def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] + + /** + * write requestKeyValue into storage if the current value in storage that is stored matches. + * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge. + * + * Most important thing is this have to be 'atomic' operation. + * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be + * either blocked or failed on write-write conflict case. + * + * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to + * prevent wrong data for read. + * + * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction, + * compareAndSet to synchronize. + * + * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'. + * for storage that does not support concurrency control, then storage implementation + * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues) + * and write(writeLock). + * + * @param requestKeyValue + * @param expectedOpt + * @return + */ + def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index 6ad62b1..d2500a6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -22,7 +22,7 @@ package org.apache.s2graph.core.storage import com.typesafe.config.Config import org.apache.s2graph.core._ -import org.apache.s2graph.core.storage.serde.{Deserializable, MutationHelper} +import org.apache.s2graph.core.storage.serde.Deserializable import org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializable import org.apache.s2graph.core.types._ @@ -33,9 +33,6 @@ abstract class Storage(val graph: S2GraphLike, /* Storage backend specific resource management */ val management: StorageManagement - /* Physically store given KeyValue into backend storage. */ - val mutator: StorageWritable - /* * Given QueryRequest/Vertex/Edge, fetch KeyValue from storage * then convert them into Edge/Vertex @@ -50,6 +47,11 @@ abstract class Storage(val graph: S2GraphLike, val serDe: StorageSerDe /* + * Responsible to connect physical storage backend to store GraphElement(Edge/Vertex). + */ + val mutator: Mutator + + /* * Common helper to translate SKeyValue to Edge/Vertex and vice versa. * Note that it require storage backend specific implementation for serialize/deserialize. */ @@ -60,16 +62,9 @@ abstract class Storage(val graph: S2GraphLike, * Note that it require storage backend specific implementations for * all of StorageWritable, StorageReadable, StorageSerDe, StorageIO */ - lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, reader) - - lazy val mutationHelper: MutationHelper = new MutationHelper(this) - - /** Mutation **/ - def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = - mutator.writeToStorage(cluster, kvs, withWait) +// lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, reader) +// lazy val mutationHelper: MutationHelper = new MutationHelper(this) - def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] = - mutator.writeLock(requestKeyValue, expectedOpt) /** Fetch **/ def fetches(queryRequests: Seq[QueryRequest], @@ -102,21 +97,21 @@ abstract class Storage(val graph: S2GraphLike, def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, requestTs: Long, retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = - mutationHelper.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum) + mutator.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum) def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = - mutationHelper.mutateVertex(zkQuorum: String, vertex, withWait) + mutator.mutateVertex(zkQuorum: String, vertex, withWait) def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = - mutationHelper.mutateStrongEdges(zkQuorum, _edges, withWait) + mutator.mutateStrongEdges(zkQuorum, _edges, withWait) def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = - mutationHelper.mutateWeakEdges(zkQuorum, _edges, withWait) + mutator.mutateWeakEdges(zkQuorum, _edges, withWait) def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = - mutationHelper.incrementCounts(zkQuorum, edges, withWait) + mutator.incrementCounts(zkQuorum, edges, withWait) def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = - mutationHelper.updateDegree(zkQuorum, edge, degreeVal) + mutator.updateDegree(zkQuorum, edge, degreeVal) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala deleted file mode 100644 index 80da3a9..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage - -import scala.concurrent.{ExecutionContext, Future} - -trait StorageWritable { - /** - * decide how to store given key values Seq[SKeyValue] into storage using storage's client. - * note that this should be return true on all success. - * we assumes that each storage implementation has client as member variable. - * - * - * @param cluster: where this key values should be stored. - * @param kvs: sequence of SKeyValue that need to be stored in storage. - * @param withWait: flag to control wait ack from storage. - * note that in AsynchbaseStorage(which support asynchronous operations), even with true, - * it never block thread, but rather submit work and notified by event loop when storage send ack back. - * @return ack message from storage. - */ - def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] - - /** - * write requestKeyValue into storage if the current value in storage that is stored matches. - * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge. - * - * Most important thing is this have to be 'atomic' operation. - * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be - * either blocked or failed on write-write conflict case. - * - * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to - * prevent wrong data for read. - * - * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction, - * compareAndSet to synchronize. - * - * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'. - * for storage that does not support concurrency control, then storage implementation - * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues) - * and write(writeLock). - * @param requestKeyValue - * @param expectedOpt - * @return - */ - def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala index dcef1cc..bfc5bc6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala @@ -31,7 +31,7 @@ import scala.util.Random class WriteWriteConflictResolver(graph: S2GraphLike, serDe: StorageSerDe, io: StorageIO, - mutator: StorageWritable, + mutator: OptimisticMutator, fetcher: StorageReadable) { val BackoffTimeout = graph.BackoffTimeout val MaxRetryNum = graph.MaxRetryNum http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index e233277..4be3767 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -151,10 +151,9 @@ class AsynchbaseStorage(override val graph: S2GraphLike, override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients) - override val mutator: StorageWritable = new AsynchbaseStorageWritable(client, clientWithFlush) - override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph) override val reader: StorageReadable = new AsynchbaseStorageReadable(graph, config, client, serDe, io) + override val mutator: Mutator = new AsynchbaseStorageWritable(graph, serDe, reader, client, clientWithFlush) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala index 7ca3782..b4236b9 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala @@ -20,14 +20,19 @@ package org.apache.s2graph.core.storage.hbase import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.storage.{IncrementResponse, MutateResponse, SKeyValue, StorageWritable} +import org.apache.s2graph.core.S2GraphLike +import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.utils.{Extensions, logger} import org.hbase.async.{AtomicIncrementRequest, DeleteRequest, HBaseClient, PutRequest} + import scala.collection.mutable.ArrayBuffer import scala.concurrent.{ExecutionContext, Future} -class AsynchbaseStorageWritable(val client: HBaseClient, - val clientWithFlush: HBaseClient) extends StorageWritable { +class AsynchbaseStorageWritable(val graph: S2GraphLike, + val serDe: StorageSerDe, + val reader: StorageReadable, + val client: HBaseClient, + val clientWithFlush: HBaseClient) extends DefaultOptimisticMutator(graph, serDe, reader) { import Extensions.DeferOps private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala index e53aeb3..b24e375 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala @@ -26,7 +26,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import com.google.common.hash.Hashing import com.typesafe.config.Config import org.apache.s2graph.core._ -import org.apache.s2graph.core.storage.Storage +import org.apache.s2graph.core.storage.{Storage, StorageManagement, StorageReadable, StorageSerDe} import org.apache.s2graph.core.storage.rocks.RocksHelper.RocksRPC import org.apache.s2graph.core.utils.logger import org.rocksdb._ @@ -150,11 +150,12 @@ class RocksStorage(override val graph: S2GraphLike, .maximumSize(1000 * 10 * 10 * 10 * 10) .build[String, ReentrantLock](cacheLoader) - override val management = new RocksStorageManagement(config, vdb, db) + override val management: StorageManagement = new RocksStorageManagement(config, vdb, db) - override val mutator = new RocksStorageWritable(db, vdb, lockMap) + override val serDe: StorageSerDe = new RocksStorageSerDe(graph) - override val serDe = new RocksStorageSerDe(graph) + override val reader: StorageReadable = new RocksStorageReadable(graph, config, db, vdb, serDe, io) + + override val mutator: Mutator = new RocksStorageWritable(graph, serDe, reader, db, vdb, lockMap) - override val reader = new RocksStorageReadable(graph, config, db, vdb, serDe, io) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala index 7ec147d..d29ccce 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala @@ -23,15 +23,19 @@ import java.util.concurrent.locks.ReentrantLock import com.google.common.cache.{Cache, LoadingCache} import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue, StorageWritable} +import org.apache.s2graph.core.S2GraphLike +import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.utils.logger import org.rocksdb.{RocksDB, RocksDBException, WriteBatch, WriteOptions} import scala.concurrent.{ExecutionContext, Future} -class RocksStorageWritable(val db: RocksDB, +class RocksStorageWritable(val graph: S2GraphLike, + val serDe: StorageSerDe, + val reader: StorageReadable, + val db: RocksDB, val vdb: RocksDB, - val lockMap: LoadingCache[String, ReentrantLock]) extends StorageWritable { + val lockMap: LoadingCache[String, ReentrantLock]) extends DefaultOptimisticMutator(graph, serDe, reader) { override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = { if (kvs.isEmpty) { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b69421b0/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala deleted file mode 100644 index fecc6ea..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage.serde - -import org.apache.s2graph.core.schema.LabelMeta -import org.apache.s2graph.core._ -import org.apache.s2graph.core.storage._ -import org.apache.s2graph.core.utils.logger - -import scala.concurrent.{ExecutionContext, Future} - -class MutationHelper(storage: Storage) { - val serDe = storage.serDe - val io = storage.io - val fetcher = storage.reader - val mutator = storage.mutator - val conflictResolver = storage.conflictResolver - - private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = - mutator.writeToStorage(cluster, kvs, withWait) - - def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, - requestTs: Long, - retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = { - if (stepInnerResult.isEmpty) Future.successful(true) - else { - val head = stepInnerResult.edgeWithScores.head - val zkQuorum = head.edge.innerLabel.hbaseZkAddr - val futures = for { - edgeWithScore <- stepInnerResult.edgeWithScores - } yield { - val edge = edgeWithScore.edge - - val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) - val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) - - val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) - val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => - serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - io.buildIncrementsAsync(indexEdge, -1L) - } - - /* reverted direction */ - val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) - val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => - serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ - io.buildIncrementsAsync(indexEdge, -1L) - } - - val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations - - writeToStorage(zkQuorum, mutations, withWait = true) - } - - Future.sequence(futures).map { rets => rets.forall(_.isSuccess) } - } - } - - def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { - if (vertex.op == GraphUtil.operations("delete")) { - writeToStorage(zkQuorum, - serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) - } else if (vertex.op == GraphUtil.operations("deleteAll")) { - logger.info(s"deleteAll for vertex is truncated. $vertex") - Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time - } else { - writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait) - } - } - - def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = { - val mutations = _edges.flatMap { edge => - val (_, edgeUpdate) = - if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge) - else S2Edge.buildOperation(None, Seq(edge)) - - val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) - - if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false) - io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr - } - - writeToStorage(zkQuorum, mutations, withWait).map { ret => - _edges.zipWithIndex.map { case (edge, idx) => - idx -> ret.isSuccess - } - } - } - - def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { - def mutateEdgesInner(edges: Seq[S2EdgeLike], - checkConsistency: Boolean, - withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { - assert(edges.nonEmpty) - // TODO:: remove after code review: unreachable code - if (!checkConsistency) { - - val futures = edges.map { edge => - val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge)) - - val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) - val mutations = - io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr - - if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) - - writeToStorage(zkQuorum, mutations, withWait) - } - Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) } - } else { - fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => - conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_)) - } - } - } - - val edgeWithIdxs = _edges.zipWithIndex - val grouped = edgeWithIdxs.groupBy { case (edge, idx) => - (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId) - } toSeq - - val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => - val edges = edgeGroup.map(_._1) - val idxs = edgeGroup.map(_._2) - // After deleteAll, process others - val mutateEdgeFutures = edges.toList match { - case head :: tail => - val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait) - - //TODO: decide what we will do on failure on vertex put - val puts = io.buildVertexPutsAsync(head) - val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait) - Seq(edgeFuture, vertexFuture) - case Nil => Nil - } - - val composed = for { - // deleteRet <- Future.sequence(deleteAllFutures) - mutateRet <- Future.sequence(mutateEdgeFutures) - } yield mutateRet - - composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) } - } - - Future.sequence(mutateEdges).map { squashedRets => - squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2) - } - } - - def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = { - val futures = for { - edge <- edges - } yield { - val kvs = for { - relEdge <- edge.relatedEdges - edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid) - } yield { - val countWithTs = edge.propertyValueInner(LabelMeta.count) - val countVal = countWithTs.innerVal.toString().toLong - io.buildIncrementsCountAsync(edgeWithIndex, countVal).head - } - writeToStorage(zkQuorum, kvs, withWait = withWait) - } - - Future.sequence(futures) - } - - def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = { - val kvs = io.buildDegreePuts(edge, degreeVal) - - mutator.writeToStorage(zkQuorum, kvs, withWait = true) - } -}