Repository: incubator-s2graph Updated Branches: refs/heads/master 146094b50 -> c099da6fb
[S2GRAPH-132] add functionality for buffering increment Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/478db844 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/478db844 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/478db844 Branch: refs/heads/master Commit: 478db844a0c7322f51c2e75f2fbee1bf4c492902 Parents: 20bdf92 Author: daewon <[email protected]> Authored: Fri Dec 2 23:52:26 2016 +0900 Committer: daewon <[email protected]> Committed: Fri Dec 2 23:52:26 2016 +0900 ---------------------------------------------------------------------- .../scala/org/apache/s2graph/core/S2Edge.scala | 72 +++++++++++++++++++- .../scala/org/apache/s2graph/core/S2Graph.scala | 5 +- .../apache/s2graph/core/mysqls/LabelIndex.scala | 8 +-- .../apache/s2graph/core/storage/Storage.scala | 22 +++--- .../core/storage/hbase/AsynchbaseStorage.scala | 2 +- 5 files changed, 92 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index 5c2a5dc..9408ed5 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@ -35,6 +35,30 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.{Map => MutableMap} import scala.util.hashing.MurmurHash3 +object SnapshotEdge { + + def copyFrom(e: SnapshotEdge): SnapshotEdge = { + val copy = + SnapshotEdge( + e.graph, + e.srcVertex, + e.tgtVertex, + e.label, + e.dir, + e.op, + e.version, + S2Edge.EmptyProps, + e.pendingEdgeOpt, + e.statusCode, + e.lockTs, + e.tsInnerValOpt) + + copy.updatePropsWithTs(e.propsWithTs) + + copy + } +} + case class SnapshotEdge(graph: S2Graph, srcVertex: S2Vertex, tgtVertex: S2Vertex, @@ -73,6 +97,18 @@ case class SnapshotEdge(graph: S2Graph, jsValue <- innerValToJsValue(v.innerVal, meta.dataType) } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version)) + def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = { + if (others.isEmpty) propsWithTs + else { + val iter = others.entrySet().iterator() + while (iter.hasNext) { + val e = iter.next() + propsWithTs.put(e.getKey, e.getValue) + } + propsWithTs + } + } + // only for debug def toLogString() = { List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t") @@ -104,6 +140,27 @@ case class SnapshotEdge(graph: S2Graph, } } +object IndexEdge { + def copyFrom(e: IndexEdge): IndexEdge = { + val copy = IndexEdge( + e.graph, + e.srcVertex, + e.tgtVertex, + e.label, + e.dir, + e.op, + e.version, + e.labelIndexSeq, + S2Edge.EmptyProps, + e.tsInnerValOpt + ) + + copy.updatePropsWithTs(e.propsWithTs) + + copy + } +} + case class IndexEdge(graph: S2Graph, srcVertex: S2Vertex, tgtVertex: S2Vertex, @@ -579,6 +636,11 @@ case class S2Edge(innerGraph: S2Graph, object EdgeMutate { + + def partitionBufferedIncrement(edges: Seq[IndexEdge]): (Seq[IndexEdge], Seq[IndexEdge]) = { + edges.partition(_.indexOption.fold(false)(_.isBufferIncrement)) + } + def filterIndexOptionForDegree(edges: Seq[IndexEdge]): Seq[IndexEdge] = edges.filter { ie => ie.indexOption.fold(true)(_.storeDegree) } @@ -598,6 +660,12 @@ case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge], edgesToInsert: List[IndexEdge] = List.empty[IndexEdge], newSnapshotEdge: Option[SnapshotEdge] = None) { + def deepCopy: EdgeMutate = copy( + edgesToDelete = edgesToDelete.map(IndexEdge.copyFrom), + edgesToInsert = edgesToInsert.map(IndexEdge.copyFrom), + newSnapshotEdge = newSnapshotEdge.map(SnapshotEdge.copyFrom) + ) + val edgesToInsertWithIndexOpt: Seq[IndexEdge] = EdgeMutate.filterIndexOption(edgesToInsert) val edgesToDeleteWithIndexOpt: Seq[IndexEdge] = EdgeMutate.filterIndexOption(edgesToDelete) @@ -821,9 +889,7 @@ object S2Edge { } - EdgeMutate(edgesToDelete = edgesToDelete, - edgesToInsert = edgesToInsert, - newSnapshotEdge = newSnapshotEdgeOpt) + EdgeMutate(edgesToDelete = edgesToDelete, edgesToInsert = edgesToInsert, newSnapshotEdge = newSnapshotEdgeOpt) } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index b3f3ac8..1b022f7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -991,7 +991,10 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph if (edge.op == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge) else S2Edge.buildOperation(None, Seq(edge)) - storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate) ++ storage.snapshotEdgeMutations(edgeUpdate) ++ storage.increments(edgeUpdate) + val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy) + + if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false) + storage.buildVertexPutsAsync(edge) ++ storage.indexedEdgeMutations(edgeUpdate.deepCopy) ++ storage.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr } storage.writeToStorage(zkQuorum, mutations, withWait).map { ret => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala index fa61149..7d4d715 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/mysqls/LabelIndex.scala @@ -20,7 +20,7 @@ package org.apache.s2graph.core.mysqls import org.apache.s2graph.core.GraphUtil -import org.apache.s2graph.core.mysqls.LabelIndex.WriteOption +import org.apache.s2graph.core.mysqls.LabelIndex.indexOption import org.apache.s2graph.core.utils.logger import play.api.libs.json.{JsObject, JsString, Json} import scalikejdbc._ @@ -42,7 +42,7 @@ object LabelIndex extends Model[LabelIndex] { ) } - case class WriteOption(dir: Byte, + case class indexOption(dir: Byte, method: String, rate: Double, totalModular: Long, @@ -191,7 +191,7 @@ case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, me ) } - def parseOption(dir: String): Option[WriteOption] = try { + def parseOption(dir: String): Option[indexOption] = try { options.map { string => val jsObj = Json.parse(string) \ dir @@ -200,7 +200,7 @@ case class LabelIndex(id: Option[Int], labelId: Int, name: String, seq: Byte, me val totalModular = (jsObj \ "totalModular").asOpt[Long].getOrElse(100L) val storeDegree = (jsObj \ "storeDegree").asOpt[Boolean].getOrElse(true) - WriteOption(GraphUtil.directions(dir).toByte, method, rate, totalModular, storeDegree) + indexOption(GraphUtil.directions(dir).toByte, method, rate, totalModular, storeDegree) } } catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index 421bec3..f31c0ec 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -362,9 +362,11 @@ abstract class Storage[Q, R](val graph: S2Graph, val futures = edges.map { edge => val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge)) + val (bufferIncr, nonBufferIncr) = increments(edgeUpdate.deepCopy) val mutations = - indexedEdgeMutations(edgeUpdate) ++ snapshotEdgeMutations(edgeUpdate) ++ increments(edgeUpdate) + indexedEdgeMutations(edgeUpdate.deepCopy) ++ snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr + if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) writeToStorage(zkQuorum, mutations, withWait) } @@ -764,8 +766,10 @@ abstract class Storage[Q, R](val graph: S2Graph, val p = Random.nextDouble() if (p < FailProb) Future.failed(new PartialFailureException(squashedEdge, 2, s"$p")) else { - val incrs = increments(edgeMutate) - _write(incrs, true) + val (bufferIncr, nonBufferIncr) = increments(edgeMutate.deepCopy) + + if (bufferIncr.nonEmpty) _write(bufferIncr, withWait = false) + _write(nonBufferIncr, withWait = true) } } } @@ -1037,23 +1041,25 @@ abstract class Storage[Q, R](val graph: S2Graph, def snapshotEdgeMutations(edgeMutate: EdgeMutate): Seq[SKeyValue] = edgeMutate.newSnapshotEdge.map(e => snapshotEdgeSerializer(e).toKeyValues.map(_.copy(durability = e.label.durability))).getOrElse(Nil) - def increments(edgeMutate: EdgeMutate): Seq[SKeyValue] = { + def increments(edgeMutate: EdgeMutate): (Seq[SKeyValue], Seq[SKeyValue]) = { (edgeMutate.edgesToDeleteWithIndexOptForDegree.isEmpty, edgeMutate.edgesToInsertWithIndexOptForDegree.isEmpty) match { case (true, true) => /** when there is no need to update. shouldUpdate == false */ - Nil + Nil -> Nil case (true, false) => /** no edges to delete but there is new edges to insert so increase degree by 1 */ - edgeMutate.edgesToInsertWithIndexOptForDegree.flatMap(buildIncrementsAsync(_)) + val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToInsertWithIndexOptForDegree) + buffer.flatMap(buildIncrementsAsync(_)) -> nonBuffer.flatMap(buildIncrementsAsync(_)) case (false, true) => /** no edges to insert but there is old edges to delete so decrease degree by 1 */ - edgeMutate.edgesToDeleteWithIndexOptForDegree.flatMap(buildIncrementsAsync(_, -1)) + val (buffer, nonBuffer) = EdgeMutate.partitionBufferedIncrement(edgeMutate.edgesToDeleteWithIndexOptForDegree) + buffer.flatMap(buildIncrementsAsync(_, -1)) -> nonBuffer.flatMap(buildIncrementsAsync(_, -1)) case (false, false) => /** update on existing edges so no change on degree */ - Nil + Nil -> Nil } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/478db844/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 93b2454..5a9235e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -432,7 +432,7 @@ class AsynchbaseStorage(override val graph: S2Graph, } yield { val futures: List[Deferred[(Boolean, Long, Long)]] = for { relEdge <- edge.relatedEdges - edgeWithIndex <- relEdge.edgesWithIndexValid + edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid) } yield { val countWithTs = edge.propertyValueInner(LabelMeta.count) val countVal = countWithTs.innerVal.toString().toLong
