Separate interfaces from Storage.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/39544dc5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/39544dc5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/39544dc5 Branch: refs/heads/master Commit: 39544dc5debd4821c4f73db17e88bdbeb9f43b38 Parents: 3361320 Author: DO YUNG YOON <[email protected]> Authored: Sat Oct 28 09:27:51 2017 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Sat Oct 28 09:27:51 2017 +0900 ---------------------------------------------------------------------- .gitignore | 2 +- s2core/build.sbt | 3 +- .../org/apache/s2graph/core/Management.scala | 74 +- .../org/apache/s2graph/core/QueryParam.scala | 2 +- .../scala/org/apache/s2graph/core/S2Edge.scala | 2 +- .../scala/org/apache/s2graph/core/S2Graph.scala | 275 +++-- .../org/apache/s2graph/core/S2Vertex.scala | 40 +- .../s2graph/core/storage/Deserializable.scala | 43 - .../s2graph/core/storage/MutateResponse.scala | 12 + .../apache/s2graph/core/storage/SKeyValue.scala | 1 + .../s2graph/core/storage/Serializable.scala | 27 - .../apache/s2graph/core/storage/Storage.scala | 1146 ++---------------- .../core/storage/StorageDeserializable.scala | 120 -- .../apache/s2graph/core/storage/StorageIO.scala | 241 ++++ .../core/storage/StorageManagement.scala | 35 + .../s2graph/core/storage/StorageReadable.scala | 62 + .../s2graph/core/storage/StorageSerDe.scala | 59 + .../core/storage/StorageSerializable.scala | 82 -- .../s2graph/core/storage/StorageWritable.scala | 45 + .../storage/WriteWriteConflictResolver.scala | 438 +++++++ .../core/storage/hbase/AsynchbaseStorage.scala | 764 +----------- .../hbase/AsynchbaseStorageManagement.scala | 263 ++++ .../hbase/AsynchbaseStorageReadable.scala | 335 +++++ .../storage/hbase/AsynchbaseStorageSerDe.scala | 68 ++ .../hbase/AsynchbaseStorageWritable.scala | 118 ++ .../core/storage/serde/Deserializable.scala | 41 + .../core/storage/serde/Serializable.scala | 27 + .../storage/serde/StorageDeserializable.scala | 144 +++ .../storage/serde/StorageSerializable.scala | 90 ++ .../tall/IndexEdgeDeserializable.scala | 12 +- .../indexedge/tall/IndexEdgeSerializable.scala | 5 +- .../wide/IndexEdgeDeserializable.scala | 4 +- .../indexedge/wide/IndexEdgeSerializable.scala | 5 +- .../tall/SnapshotEdgeDeserializable.scala | 7 +- .../tall/SnapshotEdgeSerializable.scala | 4 +- .../wide/SnapshotEdgeDeserializable.scala | 9 +- .../wide/SnapshotEdgeSerializable.scala | 4 +- .../serde/vertex/VertexDeserializable.scala | 146 +-- .../serde/vertex/VertexSerializable.scala | 114 +- .../vertex/tall/VertexDeserializable.scala | 58 + .../serde/vertex/tall/VertexSerializable.scala | 54 + .../vertex/wide/VertexDeserializable.scala | 73 ++ .../serde/vertex/wide/VertexSerializable.scala | 52 + .../apache/s2graph/core/utils/DeferCache.scala | 8 +- .../s2graph/core/Integrate/CrudTest.scala | 8 +- .../LabelLabelIndexMutateOptionTest.scala | 6 +- .../s2graph/core/storage/StorageIOTest.scala | 59 + .../core/storage/hbase/IndexEdgeTest.scala | 3 +- .../core/storage/rocks/RocksStorageTest.scala | 33 + .../rest/play/controllers/EdgeController.scala | 11 +- .../play/controllers/VertexController.scala | 5 +- 51 files changed, 2913 insertions(+), 2326 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 9f295f2..ad5a5e9 100644 --- a/.gitignore +++ b/.gitignore @@ -107,4 +107,4 @@ server.pid .cache ### Local Embedded HBase Data ### -storage/ +#storage/ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/build.sbt ---------------------------------------------------------------------- diff --git a/s2core/build.sbt b/s2core/build.sbt index 8033581..e7e602f 100644 --- a/s2core/build.sbt +++ b/s2core/build.sbt @@ -48,7 +48,8 @@ libraryDependencies ++= Seq( "org.specs2" %% "specs2-core" % specs2Version % "test", "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion , "org.apache.lucene" % "lucene-core" % "6.6.0", - "org.apache.lucene" % "lucene-queryparser" % "6.6.0" + "org.apache.lucene" % "lucene-queryparser" % "6.6.0", + "org.rocksdb" % "rocksdbjni" % "5.8.0" ) libraryDependencies := { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index a9741d2..49d3c0e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -21,6 +21,7 @@ package org.apache.s2graph.core import java.util +import com.typesafe.config.{Config, ConfigFactory} import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNameTooLongException, LabelNotExistException} import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} import org.apache.s2graph.core.mysqls._ @@ -36,9 +37,22 @@ import scala.util.Try * s2core never use this for finding models. */ object Management { - + import HBaseType._ import scala.collection.JavaConversions._ + val ZookeeperQuorum = "hbase.zookeeper.quorum" + val ColumnFamilies = "hbase.table.column.family" + val RegionMultiplier = "hbase.table.region.multiplier" + val Ttl = "hbase.table.ttl" + val CompressionAlgorithm = "hbase.table.compression.algorithm" + val ReplicationScope = "hbase.table.replication.scope" + val TotalRegionCount = "hbase.table.total.region.count" + + val DefaultColumnFamilies = Seq("e", "v") + val DefaultCompressionAlgorithm = "gz" + val LABEL_NAME_MAX_LENGTH = 100 + + def newProp(name: String, defaultValue: String, datatType: String): Prop = { new Prop(name, defaultValue, datatType) } @@ -56,10 +70,6 @@ object Management { case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None) } - import HBaseType._ - - val LABEL_NAME_MAX_LENGTH = 100 - val DefaultCompressionAlgorithm = "gz" def findService(serviceName: String) = { Service.findByName(serviceName, useCache = false) @@ -263,6 +273,24 @@ object Management { Label.updateName(tempLabel, rightLabel) } } + def toConfig(params: Map[String, Any]): Config = { + import scala.collection.JavaConversions._ + + val filtered = params.filter { case (k, v) => + v match { + case None => false + case _ => true + } + }.map { case (k, v) => + val newV = v match { + case Some(value) => value + case _ => v + } + k -> newV + } + + ConfigFactory.parseMap(filtered) + } } class Management(graph: S2Graph) { @@ -277,7 +305,15 @@ class Management(graph: S2Graph) { compressionAlgorithm: String = DefaultCompressionAlgorithm, replicationScopeOpt: Option[Int] = None, totalRegionCount: Option[Int] = None): Unit = { - graph.defaultStorage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm, replicationScopeOpt, totalRegionCount) + val config = toConfig(Map( + ZookeeperQuorum -> zkAddr, +// ColumnFamilies -> cfs, + RegionMultiplier -> regionMultiplier, + Ttl -> ttl, + CompressionAlgorithm -> compressionAlgorithm, + TotalRegionCount -> totalRegionCount + )) + graph.defaultStorage.createTable(config, tableName) } @@ -299,8 +335,15 @@ class Management(graph: S2Graph) { Model withTx { implicit session => val service = Service.findOrInsert(serviceName, cluster, hTableName, preSplitSize, hTableTTL.orElse(Some(Integer.MAX_VALUE)), compressionAlgorithm, useCache = false) + val config = toConfig(Map( + ZookeeperQuorum -> service.cluster, +// ColumnFamilies -> List("e", "v"), + RegionMultiplier -> service.preSplitSize, + Ttl -> service.hTableTTL, + CompressionAlgorithm -> compressionAlgorithm + )) /* create hbase table for service */ - graph.getStorage(service).createTable(service.cluster, service.hTableName, List("e", "v"), service.preSplitSize, service.hTableTTL, compressionAlgorithm) + graph.getStorage(service).createTable(config, service.hTableName) service } } @@ -390,7 +433,14 @@ class Management(graph: S2Graph) { /* create hbase table */ val storage = graph.getStorage(newLabel) val service = newLabel.service - storage.createTable(service.cluster, newLabel.hbaseTableName, List("e", "v"), service.preSplitSize, newLabel.hTableTTL, newLabel.compressionAlgorithm) + val config = toConfig(Map( + ZookeeperQuorum -> service.cluster, +// ColumnFamilies -> List("e", "v"), + RegionMultiplier -> service.preSplitSize, + Ttl -> newLabel.hTableTTL, + CompressionAlgorithm -> newLabel.compressionAlgorithm + )) + storage.createTable(config, newLabel.hbaseTableName) newLabel } @@ -449,7 +499,9 @@ class Management(graph: S2Graph) { labelOpt.map { label => val storage = graph.getStorage(label) val zkAddr = label.service.cluster - storage.truncateTable(zkAddr, label.hbaseTableName) + + val config = toConfig(Map(ZookeeperQuorum -> zkAddr)) + storage.truncateTable(config, label.hbaseTableName) } } } @@ -459,7 +511,9 @@ class Management(graph: S2Graph) { labelOpt.map { label => val storage = graph.getStorage(label) val zkAddr = label.service.cluster - storage.deleteTable(zkAddr, label.hbaseTableName) + + val config = toConfig(Map(ZookeeperQuorum -> zkAddr)) + storage.deleteTable(config, label.hbaseTableName) } } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala index 7e95f58..1100f6c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryParam.scala @@ -26,7 +26,7 @@ import org.apache.s2graph.core.GraphExceptions.LabelNotExistException import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} import org.apache.s2graph.core.parsers.{Where, WhereParser} import org.apache.s2graph.core.rest.TemplateHelper -import org.apache.s2graph.core.storage.StorageSerializable._ +import org.apache.s2graph.core.storage.serde.StorageSerializable._ import org.apache.s2graph.core.types._ import org.apache.tinkerpop.gremlin.process.traversal.step.util.HasContainer import org.hbase.async.ColumnRangeFilter http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index 7165579..51af831 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@ -684,7 +684,7 @@ case class S2Edge(innerGraph: S2Graph, // should we delete related edges also? val future = innerGraph.mutateEdges(Seq(edgeToDelete), withWait = true) val mutateSuccess = Await.result(future, innerGraph.WaitTimeout) - if (!mutateSuccess.forall(identity)) throw new RuntimeException("edge remove failed.") + if (!mutateSuccess.forall(_.isSuccess)) throw new RuntimeException("edge remove failed.") } else { throw Edge.Exceptions.edgeRemovalNotSupported() } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index d1eda5e..32724b4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -22,18 +22,17 @@ package org.apache.s2graph.core import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.{Executors, TimeUnit} -import java.util.function.Consumer - import com.typesafe.config.{Config, ConfigFactory} import org.apache.commons.configuration.{BaseConfiguration, Configuration} -import org.apache.s2graph.core.GraphExceptions.{FetchTimeoutException, LabelNotExistException} +import org.apache.s2graph.core.GraphExceptions.LabelNotExistException import org.apache.s2graph.core.JSONParser._ import org.apache.s2graph.core.features.S2GraphVariables -import org.apache.s2graph.core.index.{IndexProvider, LuceneIndexProvider} +import org.apache.s2graph.core.index.IndexProvider import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy import org.apache.s2graph.core.mysqls._ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage -import org.apache.s2graph.core.storage.{SKeyValue, Storage} +import org.apache.s2graph.core.storage.rocks.RocksStorage +import org.apache.s2graph.core.storage.{ MutateResponse, SKeyValue, Storage} import org.apache.s2graph.core.types._ import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger} import org.apache.tinkerpop.gremlin.process.computer.GraphComputer @@ -44,8 +43,6 @@ import org.apache.tinkerpop.gremlin.structure.Graph.{Features, Variables} import org.apache.tinkerpop.gremlin.structure.io.{GraphReader, GraphWriter, Io, Mapper} import org.apache.tinkerpop.gremlin.structure.{Edge, Element, Graph, T, Transaction, Vertex} import play.api.libs.json.{JsObject, Json} -import scalikejdbc.DBSession - import scala.annotation.tailrec import scala.collection.JavaConversions._ import scala.collection.mutable @@ -137,12 +134,13 @@ object S2Graph { new S2Graph(configuration)(ec) } - def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage[_, _] = { + def initStorage(graph: S2Graph, config: Config)(ec: ExecutionContext): Storage[_] = { val storageBackend = config.getString("s2graph.storage.backend") logger.info(s"[InitStorage]: $storageBackend") storageBackend match { - case "hbase" => new AsynchbaseStorage(graph, config)(ec) + case "hbase" => new AsynchbaseStorage(graph, config) + case "rocks" => new RocksStorage(graph, config) case _ => throw new RuntimeException("not supported storage.") } } @@ -912,7 +910,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph /** * TODO: we need to some way to handle malformed configuration for storage. */ - val storagePool: scala.collection.mutable.Map[String, Storage[_, _]] = { + val storagePool: scala.collection.mutable.Map[String, Storage[_]] = { val labels = Label.findAll() val services = Service.findAll() @@ -923,12 +921,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph confWithFallback(conf) }.toSet - val pools = new java.util.HashMap[Config, Storage[_, _]]() + val pools = new java.util.HashMap[Config, Storage[_]]() configs.foreach { config => pools.put(config, S2Graph.initStorage(this, config)(ec)) } - val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_, _]]() + val m = new java.util.concurrent.ConcurrentHashMap[String, Storage[_]]() labels.foreach { label => if (label.storageConfigOpt.isDefined) { @@ -945,7 +943,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph m } - val defaultStorage: Storage[_, _] = S2Graph.initStorage(this, config)(ec) + val defaultStorage: Storage[_] = S2Graph.initStorage(this, config)(ec) /** QueryLevel FutureCache */ val queryFutureCache = new DeferCache[StepResult, Promise, Future](parseCacheConfig(config, "query."), empty = StepResult.Empty) @@ -957,11 +955,11 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val indexProvider = IndexProvider.apply(config) - def getStorage(service: Service): Storage[_, _] = { + def getStorage(service: Service): Storage[_] = { storagePool.getOrElse(s"service:${service.serviceName}", defaultStorage) } - def getStorage(label: Label): Storage[_, _] = { + def getStorage(label: Label): Storage[_] = { storagePool.getOrElse(s"label:${label.label}", defaultStorage) } @@ -979,8 +977,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val futures = for { edge <- edges } yield { - fetchSnapshotEdge(edge).map { case (queryParam, edgeOpt, kvOpt) => - edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, queryParam.label)) + getStorage(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (_, edgeOpt, _) => + edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, edge.innerLabel)) } } @@ -1147,39 +1145,33 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph fallback } get } + + def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = { + def getVertices[Q](storage: Storage[Q])(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = { + def fromResult(kvs: Seq[SKeyValue], + version: String): Option[S2Vertex] = { + if (kvs.isEmpty) None + else storage.vertexDeserializer(version).fromKeyValues(kvs, None) + // .map(S2Vertex(graph, _)) + } - - def fetchSnapshotEdge(edge: S2Edge): Future[(QueryParam, Option[S2Edge], Option[SKeyValue])] = { - /* TODO: Fix this. currently fetchSnapshotEdge should not use future cache - * so use empty cacheKey. - * */ - val queryParam = QueryParam(labelName = edge.innerLabel.label, - direction = GraphUtil.fromDirection(edge.labelWithDir.dir), - tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), - cacheTTLInMillis = -1) - val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam)) - val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam) - - val storage = getStorage(edge.innerLabel) - storage.fetchSnapshotEdgeKeyValues(queryRequest).map { kvs => - val (edgeOpt, kvOpt) = - if (kvs.isEmpty) (None, None) - else { - val snapshotEdgeOpt = storage.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil) - val _kvOpt = kvs.headOption - (snapshotEdgeOpt, _kvOpt) + val futures = vertices.map { vertex => + val queryParam = QueryParam.Empty + val q = Query.toQuery(Seq(vertex), Seq(queryParam)) + val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) + val rpc = storage.buildRequest(queryRequest, vertex) + storage.fetchKeyValues(rpc).map { kvs => + fromResult(kvs, vertex.serviceColumn.schemaVersion) + } recoverWith { case ex: Throwable => + Future.successful(None) } - (queryParam, edgeOpt, kvOpt) - } recoverWith { case ex: Throwable => - logger.error(s"fetchQueryParam failed. fallback return.", ex) - throw FetchTimeoutException(s"${edge.toLogString}") - } - } + } - def getVertices(vertices: Seq[S2Vertex]): Future[Seq[S2Vertex]] = { + Future.sequence(futures).map { result => result.toList.flatten } + } val verticesWithIdx = vertices.zipWithIndex val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => - getStorage(service).getVertices(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2))) + getVertices(getStorage(service))(vertexGroup.map(_._1)).map(_.zip(vertexGroup.map(_._2))) } Future.sequence(futures).map { ls => @@ -1221,8 +1213,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } def fetchAndDeleteAll(queries: Seq[Query], requestTs: Long): Future[(Boolean, Boolean)] = { + val futures = queries.map(getEdgesStepInner(_, true)) val future = for { - stepInnerResultLs <- Future.sequence(queries.map(getEdgesStepInner(_, true))) + stepInnerResultLs <- Future.sequence(futures) (allDeleted, ret) <- deleteAllFetchedEdgesLs(stepInnerResultLs, requestTs) } yield { // logger.debug(s"fetchAndDeleteAll: ${allDeleted}, ${ret}") @@ -1241,6 +1234,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph def deleteAllFetchedEdgesLs(stepInnerResultLs: Seq[StepResult], requestTs: Long): Future[(Boolean, Boolean)] = { stepInnerResultLs.foreach { stepInnerResult => + logger.error(s"[!!!!!!]: ${stepInnerResult.edgeWithScores.size}") if (stepInnerResult.isFailure) throw new RuntimeException("fetched result is fallback.") } val futures = for { @@ -1257,9 +1251,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph * read: snapshotEdge on queryResult = O(N) * write: N x (relatedEdges x indices(indexedEdge) + 1(snapshotEdge)) */ - mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(identity)) + mutateEdges(deleteStepInnerResult.edgeWithScores.map(_.edge), withWait = true).map(_.forall(_.isSuccess)) } else { - getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum) + deleteAllFetchedEdgesAsyncOld(getStorage(label))(deleteStepInnerResult, requestTs, MaxRetryNum) } case _ => @@ -1267,7 +1261,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph * read: x * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) */ - getStorage(label).deleteAllFetchedEdgesAsyncOld(deleteStepInnerResult, requestTs, MaxRetryNum) + deleteAllFetchedEdgesAsyncOld(getStorage(label))(deleteStepInnerResult, requestTs, MaxRetryNum) } ret } @@ -1280,6 +1274,44 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } } + private def deleteAllFetchedEdgesAsyncOld(storage: Storage[_])(stepInnerResult: StepResult, + requestTs: Long, + retryNum: Int): Future[Boolean] = { + if (stepInnerResult.isEmpty) Future.successful(true) + else { + val head = stepInnerResult.edgeWithScores.head + val zkQuorum = head.edge.innerLabel.hbaseZkAddr + val futures = for { + edgeWithScore <- stepInnerResult.edgeWithScores + } yield { + val edge = edgeWithScore.edge + val score = edgeWithScore.score + + val edgeSnapshot = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) + val reversedSnapshotEdgeMutations = storage.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) + + val edgeForward = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) + val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => + storage.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + storage.buildIncrementsAsync(indexEdge, -1L) + } + + /* reverted direction */ + val edgeRevert = edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs())) + val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => + storage.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + storage.buildIncrementsAsync(indexEdge, -1L) + } + + val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations + + storage.writeToStorage(zkQuorum, mutations, withWait = true) + } + + Future.sequence(futures).map { rets => rets.forall(_.isSuccess) } + } + } + def buildEdgesToDelete(stepInnerResult: StepResult, requestTs: Long): StepResult = { val filtered = stepInnerResult.edgeWithScores.filter { edgeWithScore => (edgeWithScore.edge.ts < requestTs) && !edgeWithScore.edge.isDegree @@ -1297,20 +1329,6 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph case _ => edge.copyEdge(propsWithTs = S2Edge.propsToState(edge.updatePropsWithTs()), ts = requestTs) } -// val (newOp, newVersion, newPropsWithTs) = label.consistencyLevel match { -// case "strong" => -// val edge = edgeWithScore.edge -// edge.property(LabelMeta.timestamp.name, requestTs) -// val _newPropsWithTs = edge.updatePropsWithTs() -// -// (GraphUtil.operations("delete"), requestTs, _newPropsWithTs) -// case _ => -// val oldEdge = edgeWithScore.edge -// (oldEdge.op, oldEdge.version, oldEdge.updatePropsWithTs()) -// } -// -// val copiedEdge = -// edgeWithScore.edge.copy(op = newOp, version = newVersion, propsWithTs = newPropsWithTs) val edgeToDelete = edgeWithScore.copy(edge = copiedEdge) // logger.debug(s"delete edge from deleteAll: ${edgeToDelete.edge.toLogString}") @@ -1321,11 +1339,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } } - // def deleteAllAdjacentEdges(srcVertices: List[Vertex], labels: Seq[Label], dir: Int, ts: Long): Future[Boolean] = - // storage.deleteAllAdjacentEdges(srcVertices, labels, dir, ts) def mutateElements(elements: Seq[GraphElement], - withWait: Boolean = false): Future[Seq[Boolean]] = { + withWait: Boolean = false): Future[Seq[MutateResponse]] = { val edgeBuffer = ArrayBuffer[(S2Edge, Int)]() val vertexBuffer = ArrayBuffer[(S2Vertex, Int)]() @@ -1355,7 +1371,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph // def mutateEdges(edges: Seq[Edge], withWait: Boolean = false): Future[Seq[Boolean]] = storage.mutateEdges(edges, withWait) - def mutateEdges(edges: Seq[S2Edge], withWait: Boolean = false): Future[Seq[Boolean]] = { + def mutateEdges(edges: Seq[S2Edge], withWait: Boolean = false): Future[Seq[MutateResponse]] = { val edgeWithIdxs = edges.zipWithIndex val (strongEdges, weakEdges) = @@ -1383,7 +1399,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph } storage.writeToStorage(zkQuorum, mutations, withWait).map { ret => - idxs.map(idx => idx -> ret) + idxs.map(idx => idx -> ret.isSuccess) } } Future.sequence(futures) @@ -1398,7 +1414,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val edges = edgeGroup.map(_._1) val idxs = edgeGroup.map(_._2) val storage = getStorage(label) - storage.mutateStrongEdges(edges, withWait = true).map { rets => + mutateStrongEdges(storage)(edges, withWait = true).map { rets => idxs.zip(rets) } } @@ -1408,27 +1424,130 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph deleteAll <- Future.sequence(deleteAllFutures) strong <- Future.sequence(strongEdgesFutures) } yield { - (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(_._2) + (deleteAll ++ weak.flatten.flatten ++ strong.flatten).sortBy(_._1).map(r => new MutateResponse(r._2)) } } - def mutateVertices(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[Boolean]] = { + private def mutateStrongEdges(storage: Storage[_])(_edges: Seq[S2Edge], withWait: Boolean): Future[Seq[Boolean]] = { + + val edgeWithIdxs = _edges.zipWithIndex + val grouped = edgeWithIdxs.groupBy { case (edge, idx) => + (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId) + } toSeq + + val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => + val edges = edgeGroup.map(_._1) + val idxs = edgeGroup.map(_._2) + // After deleteAll, process others + val mutateEdgeFutures = edges.toList match { + case head :: tail => + val edgeFuture = mutateEdgesInner(storage)(edges, checkConsistency = true , withWait) + + //TODO: decide what we will do on failure on vertex put + val puts = storage.buildVertexPutsAsync(head) + val vertexFuture = storage.writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait) + Seq(edgeFuture, vertexFuture) + case Nil => Nil + } + + val composed = for { + // deleteRet <- Future.sequence(deleteAllFutures) + mutateRet <- Future.sequence(mutateEdgeFutures) + } yield mutateRet + + composed.map(_.forall(_.isSuccess)).map { ret => idxs.map( idx => idx -> ret) } + } + + Future.sequence(mutateEdges).map { squashedRets => + squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2) + } + } + + + private def mutateEdgesInner(storage: Storage[_])(edges: Seq[S2Edge], + checkConsistency: Boolean, + withWait: Boolean): Future[MutateResponse] = { + assert(edges.nonEmpty) + // TODO:: remove after code review: unreachable code + if (!checkConsistency) { + + val zkQuorum = edges.head.innerLabel.hbaseZkAddr + val futures = edges.map { edge => + val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge)) + + val (bufferIncr, nonBufferIncr) = storage.increments(edgeUpdate.deepCopy) + val mutations = + storage.indexedEdgeMutations(edgeUpdate.deepCopy) ++ storage.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr + + if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false) + + storage.writeToStorage(zkQuorum, mutations, withWait) + } + Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) } + } else { + storage.fetchSnapshotEdgeInner(edges.head).flatMap { case (queryParam, snapshotEdgeOpt, kvOpt) => + storage.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_)) + } + } + } + + def mutateVertices(vertices: Seq[S2Vertex], withWait: Boolean = false): Future[Seq[MutateResponse]] = { + def mutateVertex(storage: Storage[_])(vertex: S2Vertex, withWait: Boolean): Future[MutateResponse] = { + if (vertex.op == GraphUtil.operations("delete")) { + storage.writeToStorage(vertex.hbaseZkAddr, + storage.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) + } else if (vertex.op == GraphUtil.operations("deleteAll")) { + logger.info(s"deleteAll for vertex is truncated. $vertex") + Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time + } else { + storage.writeToStorage(vertex.hbaseZkAddr, storage.buildPutsAll(vertex), withWait) + } + } + + def mutateVertices(storage: Storage[_])(vertices: Seq[S2Vertex], + withWait: Boolean = false): Future[Seq[MutateResponse]] = { + val futures = vertices.map { vertex => mutateVertex(storage)(vertex, withWait) } + Future.sequence(futures) + } + val verticesWithIdx = vertices.zipWithIndex val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => - getStorage(service).mutateVertices(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2))) + mutateVertices(getStorage(service))(vertexGroup.map(_._1), withWait).map(_.zip(vertexGroup.map(_._2))) } Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } } - def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[(Boolean, Long, Long)]] = { + + + def incrementCounts(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = { + def incrementCounts(storage: Storage[_])(edges: Seq[S2Edge], withWait: Boolean): Future[Seq[MutateResponse]] = { + val futures = for { + edge <- edges + } yield { + val kvs = for { + relEdge <- edge.relatedEdges + edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid) + } yield { + val countWithTs = edge.propertyValueInner(LabelMeta.count) + val countVal = countWithTs.innerVal.toString().toLong + storage.buildIncrementsCountAsync(edgeWithIndex, countVal).head + } + storage.writeToStorage(edge.innerLabel.hbaseZkAddr, kvs, withWait = withWait) + } + + Future.sequence(futures) + } + val edgesWithIdx = edges.zipWithIndex val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) => - getStorage(label).incrementCounts(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) + incrementCounts(getStorage(label))(edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) + } + Future.sequence(futures).map { ls => + ls.flatten.toSeq.sortBy(_._2).map(_._1) } - Future.sequence(futures).map { ls => ls.flatten.toSeq.sortBy(_._2).map(_._1) } } - def updateDegree(edge: S2Edge, degreeVal: Long = 0): Future[Boolean] = { + def updateDegree(edge: S2Edge, degreeVal: Long = 0): Future[MutateResponse] = { val label = edge.innerLabel val storage = getStorage(label) @@ -1840,7 +1959,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph val vertex = newVertex(id, ts, props, op, belongLabelIds) val future = mutateVertices(Seq(vertex), withWait = true).map { rets => - if (rets.forall(identity)) vertex + if (rets.forall(_.isSuccess)) vertex else throw new RuntimeException("addVertex failed.") } Await.ready(future, WaitTimeout) @@ -1850,7 +1969,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends Graph def addVertexInner(vertex: S2Vertex): S2Vertex = { val future = mutateVertices(Seq(vertex), withWait = true).flatMap { rets => - if (rets.forall(identity)) { + if (rets.forall(_.isSuccess)) { indexProvider.mutateVerticesAsync(Seq(vertex)) } else throw new RuntimeException("addVertex failed.") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala index c0dc23b..177d859 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Vertex.scala @@ -169,6 +169,29 @@ case class S2Vertex(graph: S2Graph, graph.fetchEdges(this, labelNameWithDirs.distinct) } + private def edgesAsync(direction: Direction, labelNames: String*): Future[util.Iterator[Edge]] = { + val labelNameWithDirs = + if (labelNames.isEmpty) { + // TODO: Let's clarify direction + if (direction == Direction.BOTH) { + Label.findBySrcColumnId(id.colId).map(l => l.label -> Direction.OUT.name) ++ + Label.findByTgtColumnId(id.colId).map(l => l.label -> Direction.IN.name) + } else if (direction == Direction.IN) { + Label.findByTgtColumnId(id.colId).map(l => l.label -> direction.name) + } else { + Label.findBySrcColumnId(id.colId).map(l => l.label -> direction.name) + } + } else { + direction match { + case Direction.BOTH => + Seq(Direction.OUT, Direction.IN).flatMap { dir => labelNames.map(_ -> dir.name()) } + case _ => labelNames.map(_ -> direction.name()) + } + } + + graph.fetchEdgesAsync(this, labelNameWithDirs.distinct) + } + // do no save to storage def propertyInner[V](cardinality: Cardinality, key: String, value: V, objects: AnyRef*): VertexProperty[V] = { S2Property.assertValidProp(key, value) @@ -242,21 +265,18 @@ case class S2Vertex(graph: S2Graph, // remove edge // TODO: remove related edges also. implicit val ec = graph.ec - val ts = System.currentTimeMillis() - val outLabels = Label.findBySrcColumnId(id.colId) - val inLabels = Label.findByTgtColumnId(id.colId) + val verticesToDelete = Seq(this.copy(op = GraphUtil.operations("delete"))) - val outFuture = graph.deleteAllAdjacentEdges(verticesToDelete, outLabels, GraphUtil.directions("out"), ts) - val inFuture = graph.deleteAllAdjacentEdges(verticesToDelete, inLabels, GraphUtil.directions("in"), ts) + val vertexFuture = graph.mutateVertices(verticesToDelete, withWait = true) + val future = for { - outSuccess <- outFuture - inSuccess <- inFuture vertexSuccess <- vertexFuture + edges <- edgesAsync(Direction.BOTH) } yield { - if (!outSuccess) throw new RuntimeException("Vertex.remove out direction edge delete failed.") - if (!inSuccess) throw new RuntimeException("Vertex.remove in direction edge delete failed.") - if (!vertexSuccess.forall(identity)) throw new RuntimeException("Vertex.remove vertex delete failed.") + edges.asScala.toSeq.foreach { edge => edge.remove() } + if (!vertexSuccess.forall(_.isSuccess)) throw new RuntimeException("Vertex.remove vertex delete failed.") + true } Await.result(future, graph.WaitTimeout) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala deleted file mode 100644 index af20483..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Deserializable.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage - -import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.types.{HBaseType, LabelWithDirection, SourceVertexId, VertexId} - - -trait Deserializable[E] extends StorageDeserializable[E] { - import StorageDeserializable._ - - type RowKeyRaw = (VertexId, LabelWithDirection, Byte, Boolean, Int) - -// /** version 1 and version 2 share same code for parsing row key part */ -// def parseRow(kv: SKeyValue, version: String = HBaseType.DEFAULT_VERSION): RowKeyRaw = { -// var pos = 0 -// val (srcVertexId, srcIdLen) = SourceVertexId.fromBytes(kv.row, pos, kv.row.length, version) -// pos += srcIdLen -// val labelWithDir = LabelWithDirection(Bytes.toInt(kv.row, pos, 4)) -// pos += 4 -// val (labelIdxSeq, isInverted) = bytesToLabelIndexSeqWithIsInverted(kv.row, pos) -// -// val rowLen = srcIdLen + 4 + 1 -// (srcVertexId, labelWithDir, labelIdxSeq, isInverted, rowLen) -// } -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/MutateResponse.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/MutateResponse.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/MutateResponse.scala new file mode 100644 index 0000000..bed1152 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/MutateResponse.scala @@ -0,0 +1,12 @@ +package org.apache.s2graph.core.storage + +object MutateResponse { + val Success = new MutateResponse(isSuccess = true) + val Failure = new MutateResponse(isSuccess = false) + val IncrementFailure = new IncrementResponse(isSuccess = false, -1, -1) + val IncrementSuccess = new IncrementResponse(isSuccess = true, -1, -1) +} + +class MutateResponse(val isSuccess: Boolean) + +case class IncrementResponse(override val isSuccess: Boolean, afterCount: Long, beforeCount: Long) extends MutateResponse(isSuccess) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala index db9a9da..924d9a3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/SKeyValue.scala @@ -26,6 +26,7 @@ import org.hbase.async.KeyValue object SKeyValue { val EdgeCf = "e".getBytes("UTF-8") + val VertexCf = "v".getBytes("UTF-8") val Put = 1 val Delete = 2 val Increment = 3 http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/39544dc5/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala deleted file mode 100644 index 6de0b30..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Serializable.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage - -object Serializable { - val vertexCf = "v".getBytes("UTF-8") - val edgeCf = "e".getBytes("UTF-8") -} - -trait Serializable[E] extends StorageSerializable[E]
