separate Storage into multiple small interfaces such as EdgeFetcher/VertexMutator, ...
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/43f627e5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/43f627e5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/43f627e5 Branch: refs/heads/master Commit: 43f627e551fd0b744a858c9e6e7feba7fd68e58c Parents: 2357d81 Author: DO YUNG YOON <[email protected]> Authored: Wed May 9 15:58:19 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Wed May 9 16:56:37 2018 +0900 ---------------------------------------------------------------------- .../apache/s2graph/core/EdgeBulkFetcher.scala | 28 ++ .../org/apache/s2graph/core/EdgeFetcher.scala | 35 ++ .../org/apache/s2graph/core/EdgeMutator.scala | 38 ++ .../scala/org/apache/s2graph/core/Fetcher.scala | 36 -- .../org/apache/s2graph/core/Management.scala | 4 +- .../scala/org/apache/s2graph/core/Mutator.scala | 41 --- .../scala/org/apache/s2graph/core/S2Graph.scala | 51 ++- .../org/apache/s2graph/core/S2GraphLike.scala | 25 +- .../apache/s2graph/core/TraversalHelper.scala | 6 +- .../apache/s2graph/core/VertexBulkFetcher.scala | 26 ++ .../org/apache/s2graph/core/VertexFetcher.scala | 31 ++ .../org/apache/s2graph/core/VertexMutator.scala | 28 ++ .../s2graph/core/fetcher/FetcherManager.scala | 106 ++++++ .../core/fetcher/MemoryModelEdgeFetcher.scala | 54 +++ .../s2graph/core/model/ImportStatus.scala | 59 --- .../apache/s2graph/core/model/Importer.scala | 122 ------ .../s2graph/core/model/MemoryModelFetcher.scala | 59 --- .../s2graph/core/model/ModelManager.scala | 103 ------ .../core/storage/DefaultOptimisticMutator.scala | 190 ++++++++++ .../core/storage/OptimisticEdgeFetcher.scala | 56 +++ .../core/storage/OptimisticMutator.scala | 63 ++++ .../apache/s2graph/core/storage/Storage.scala | 72 +--- .../s2graph/core/storage/StorageReadable.scala | 49 +-- .../s2graph/core/storage/StorageWritable.scala | 65 ---- .../storage/WriteWriteConflictResolver.scala | 6 +- .../hbase/AsynchbaseEdgeBulkFetcher.scala | 69 ++++ .../storage/hbase/AsynchbaseEdgeFetcher.scala | 120 ++++++ .../hbase/AsynchbaseOptimisticEdgeFetcher.scala | 35 ++ .../hbase/AsynchbaseOptimisticMutator.scala | 142 +++++++ .../core/storage/hbase/AsynchbaseStorage.scala | 188 +++++++++- .../hbase/AsynchbaseStorageReadable.scala | 367 ------------------- .../hbase/AsynchbaseStorageWritable.scala | 142 ------- .../hbase/AsynchbaseVertexBulkFetcher.scala | 63 ++++ .../storage/hbase/AsynchbaseVertexFetcher.scala | 61 +++ .../storage/rocks/RocksEdgeBulkFetcher.scala | 68 ++++ .../core/storage/rocks/RocksEdgeFetcher.scala | 60 +++ .../rocks/RocksOptimisticEdgeFetcher.scala | 41 +++ .../storage/rocks/RocksOptimisticMutator.scala | 133 +++++++ .../core/storage/rocks/RocksStorage.scala | 104 +++++- .../storage/rocks/RocksStorageReadable.scala | 234 ------------ .../storage/rocks/RocksStorageWritable.scala | 133 ------- .../storage/rocks/RocksVertexBulkFetcher.scala | 88 +++++ .../core/storage/rocks/RocksVertexFetcher.scala | 61 +++ .../core/storage/serde/MutationHelper.scala | 188 ---------- .../s2graph/core/utils/ImportStatus.scala | 59 +++ .../apache/s2graph/core/utils/Importer.scala | 122 ++++++ .../s2graph/core/fetcher/EdgeFetcherTest.scala | 87 +++++ .../apache/s2graph/core/model/FetcherTest.scala | 87 ----- 48 files changed, 2244 insertions(+), 1761 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala new file mode 100644 index 0000000..646f5f4 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeBulkFetcher.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core + +import com.typesafe.config.Config + +import scala.concurrent.{ExecutionContext, Future} + +trait EdgeBulkFetcher { + def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala new file mode 100644 index 0000000..f28a161 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeFetcher.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core + +import com.typesafe.config.Config +import org.apache.s2graph.core.types.VertexId + +import scala.concurrent.{ExecutionContext, Future} + +trait EdgeFetcher { + + def init(config: Config)(implicit ec: ExecutionContext): Unit = {} + + def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] + + def close(): Unit = {} +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala new file mode 100644 index 0000000..dc0099e --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/EdgeMutator.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core + +import org.apache.s2graph.core.storage.MutateResponse + +import scala.concurrent.{ExecutionContext, Future} + +trait EdgeMutator { + def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] + + def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] + + def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] + + def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] + + def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, + requestTs: Long, + retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala deleted file mode 100644 index 57d2f29..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core - -import com.typesafe.config.Config -import org.apache.s2graph.core.types.VertexId - -import scala.concurrent.{ExecutionContext, Future} - -trait Fetcher { - - def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = - Future.successful(this) - - def fetches(queryRequests: Seq[QueryRequest], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] - - def close(): Unit = {} -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index 7ff5a9e..9046449 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -19,7 +19,7 @@ package org.apache.s2graph.core -import java.util + import java.util.concurrent.Executors import com.typesafe.config.{Config, ConfigFactory} @@ -29,7 +29,7 @@ import org.apache.s2graph.core.schema._ import org.apache.s2graph.core.types.HBaseType._ import org.apache.s2graph.core.types._ import org.apache.s2graph.core.JSONParser._ -import org.apache.s2graph.core.model.Importer +import org.apache.s2graph.core.utils.Importer import play.api.libs.json._ import scala.concurrent.{ExecutionContext, Future} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala deleted file mode 100644 index 53161e1..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core - -import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue} - -import scala.concurrent.{ExecutionContext, Future} - -trait Mutator { - def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] - - def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] - - def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] - - def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] - - def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] - - def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, - requestTs: Long, - retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] -} - http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 4b2274a..c4cb48f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -27,11 +27,11 @@ import com.typesafe.config.{Config, ConfigFactory} import org.apache.commons.configuration.{BaseConfiguration, Configuration} import org.apache.s2graph.core.index.IndexProvider import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy -import org.apache.s2graph.core.model.ModelManager +import org.apache.s2graph.core.fetcher.FetcherManager import org.apache.s2graph.core.schema._ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage import org.apache.s2graph.core.storage.rocks.RocksStorage -import org.apache.s2graph.core.storage.{MutateResponse, Storage} +import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage} import org.apache.s2graph.core.types._ import org.apache.s2graph.core.utils.{DeferCache, Extensions, logger} import org.apache.tinkerpop.gremlin.process.traversal.{P, TraversalStrategies} @@ -187,7 +187,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override val management = new Management(this) - override val modelManager = new ModelManager(this) + override val modelManager = new FetcherManager(this) override val indexProvider = IndexProvider.apply(config) @@ -251,21 +251,34 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap } //TODO: - override def getFetcher(column: ServiceColumn): Fetcher = { - getStorage(column.service).reader + override def getVertexFetcher(column: ServiceColumn): VertexFetcher = { + getStorage(column.service).vertexFetcher + } + override def getVertexBulkFetcher: VertexBulkFetcher = { + defaultStorage.vertexBulkFetcher } - override def getFetcher(label: Label): Fetcher = { + override def getEdgeFetcher(label: Label): EdgeFetcher = { if (label.fetchConfigExist) modelManager.getFetcher(label) - else getStorage(label).reader + else getStorage(label).edgeFetcher + } + + override def getEdgeBulkFetcher: EdgeBulkFetcher = { + defaultStorage.edgeBulkFetcher } - override def getMutator(column: ServiceColumn): Mutator = { - getStorage(column.service).mutator + override def getVertexMutator(column: ServiceColumn): VertexMutator = { + getStorage(column.service).vertexMutator } - override def getMutator(label: Label): Mutator = { - getStorage(label).mutator + override def getEdgeMutator(label: Label): EdgeMutator = { + getStorage(label).edgeMutator + } + + /** optional */ + override def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher = { +// getStorage(label).optimisticEdgeFetcher + null } //TODO: @@ -296,8 +309,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override def getVertices(vertices: Seq[S2VertexLike]): Future[Seq[S2VertexLike]] = { val verticesWithIdx = vertices.zipWithIndex - val futures = verticesWithIdx.groupBy { case (v, idx) => v.service }.map { case (service, vertexGroup) => - getStorage(service).fetchVertices(vertices).map(_.zip(vertexGroup.map(_._2))) + val futures = verticesWithIdx.groupBy { case (v, idx) => v.serviceColumn }.map { case (serviceColumn, vertexGroup) => + getVertexFetcher(serviceColumn).fetchVertices(vertices).map(_.zip(vertexGroup.map(_._2))) } Future.sequence(futures).map { ls => @@ -309,7 +322,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap val futures = for { edge <- edges } yield { - getStorage(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (edgeOpt, _) => + getOptimisticEdgeFetcher(edge.innerLabel).fetchSnapshotEdgeInner(edge).map { case (edgeOpt, _) => edgeOpt.toSeq.map(e => EdgeWithScore(e, 1.0, edge.innerLabel)) } } @@ -324,7 +337,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = { val futures = vertices.map { vertex => - getMutator(vertex.serviceColumn).mutateVertex(zkQuorum, vertex, withWait) + getVertexMutator(vertex.serviceColumn).mutateVertex(zkQuorum, vertex, withWait) } Future.sequence(futures) } @@ -351,7 +364,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) => val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) => - val mutator = getMutator(label) + val mutator = getEdgeMutator(label) val edges = edgeGroup.map(_._1) val idxs = edgeGroup.map(_._2) @@ -369,7 +382,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) => val edges = edgeGroup.map(_._1) val idxs = edgeGroup.map(_._2) - val mutator = getMutator(label) + val mutator = getEdgeMutator(label) val zkQuorum = label.hbaseZkAddr mutator.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets => @@ -497,7 +510,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = { val edgesWithIdx = edges.zipWithIndex val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) => - getMutator(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) + getEdgeMutator(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) } Future.sequence(futures).map { ls => @@ -507,7 +520,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = { val label = edge.innerLabel - val mutator = getMutator(label) + val mutator = getEdgeMutator(label) mutator.updateDegree(label.hbaseZkAddr, edge, degreeVal) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala index 6ed78b0..5e2c168 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala @@ -31,9 +31,9 @@ import org.apache.s2graph.core.GraphExceptions.LabelNotExistException import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName} import org.apache.s2graph.core.features.{S2Features, S2GraphVariables} import org.apache.s2graph.core.index.IndexProvider -import org.apache.s2graph.core.model.ModelManager +import org.apache.s2graph.core.fetcher.FetcherManager import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, ServiceColumn} -import org.apache.s2graph.core.storage.{MutateResponse, Storage} +import org.apache.s2graph.core.storage.{MutateResponse, OptimisticEdgeFetcher, Storage} import org.apache.s2graph.core.types.{InnerValLike, VertexId} import org.apache.tinkerpop.gremlin.process.computer.GraphComputer import org.apache.tinkerpop.gremlin.structure @@ -69,7 +69,7 @@ trait S2GraphLike extends Graph { val traversalHelper: TraversalHelper - val modelManager: ModelManager + val modelManager: FetcherManager lazy val MaxRetryNum: Int = config.getInt("max.retry.number") lazy val MaxBackOff: Int = config.getInt("max.back.off") @@ -93,13 +93,20 @@ trait S2GraphLike extends Graph { def getStorage(label: Label): Storage - def getFetcher(column: ServiceColumn): Fetcher + def getVertexFetcher(column: ServiceColumn): VertexFetcher - def getFetcher(label: Label): Fetcher + def getVertexBulkFetcher(): VertexBulkFetcher - def getMutator(label: Label): Mutator + def getEdgeFetcher(label: Label): EdgeFetcher - def getMutator(column: ServiceColumn): Mutator + def getEdgeBulkFetcher(): EdgeBulkFetcher + + /** optional */ + def getOptimisticEdgeFetcher(label: Label): OptimisticEdgeFetcher + + def getEdgeMutator(label: Label): EdgeMutator + + def getVertexMutator(column: ServiceColumn): VertexMutator def flushStorage(): Unit @@ -204,7 +211,7 @@ trait S2GraphLike extends Graph { if (ids.isEmpty) { //TODO: default storage need to be fixed. - Await.result(defaultStorage.fetchVerticesAll(), WaitTimeout).iterator + Await.result(getVertexBulkFetcher().fetchVerticesAll(), WaitTimeout).iterator } else { val vertices = ids.collect { case s2Vertex: S2VertexLike => s2Vertex @@ -229,7 +236,7 @@ trait S2GraphLike extends Graph { def edges(edgeIds: AnyRef*): util.Iterator[structure.Edge] = { if (edgeIds.isEmpty) { // FIXME - Await.result(defaultStorage.fetchEdgesAll(), WaitTimeout).iterator + Await.result(getEdgeBulkFetcher().fetchEdgesAll(), WaitTimeout).iterator } else { Await.result(edgesAsync(edgeIds: _*), WaitTimeout) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala index d19dd1f..0a4a49b 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala @@ -204,7 +204,7 @@ class TraversalHelper(graph: S2GraphLike) { val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) => for { prev <- prevFuture - cur <- graph.getFetcher(label).fetches(reqWithIdxs.map(_._1), prevStepEdges) + cur <- graph.getEdgeFetcher(label).fetches(reqWithIdxs.map(_._1), prevStepEdges) } yield { prev ++ reqWithIdxs.map(_._2).zip(cur).toMap } @@ -256,7 +256,7 @@ class TraversalHelper(graph: S2GraphLike) { */ graph.mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess)) } else { - graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) + graph.getEdgeMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) } case _ => @@ -264,7 +264,7 @@ class TraversalHelper(graph: S2GraphLike) { * read: x * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) */ - graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) + graph.getEdgeMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) } ret } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala new file mode 100644 index 0000000..cbebab5 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexBulkFetcher.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core + +import scala.concurrent.{ExecutionContext, Future} + +trait VertexBulkFetcher { + def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala new file mode 100644 index 0000000..5c10d18 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexFetcher.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core + +import com.typesafe.config.Config +import org.apache.s2graph.core.types.VertexId + +import scala.concurrent.{ExecutionContext, Future} + +trait VertexFetcher { + def init(config: Config)(implicit ec: ExecutionContext): Unit = {} + def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] + def close(): Unit = {} +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala new file mode 100644 index 0000000..18be890 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/VertexMutator.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core + +import org.apache.s2graph.core.storage.MutateResponse + +import scala.concurrent.{ExecutionContext, Future} + +trait VertexMutator { + def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala new file mode 100644 index 0000000..26db7ff --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/FetcherManager.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.fetcher + +import com.typesafe.config.Config +import org.apache.s2graph.core.schema.Label +import org.apache.s2graph.core.utils.{Importer, logger} +import org.apache.s2graph.core.{EdgeFetcher, S2GraphLike} + +import scala.concurrent.{ExecutionContext, Future} + +object FetcherManager { + val ClassNameKey = "className" +} + +class FetcherManager(s2GraphLike: S2GraphLike) { + + import FetcherManager._ + + private val fetcherPool = scala.collection.mutable.Map.empty[String, EdgeFetcher] + + private val ImportLock = new java.util.concurrent.ConcurrentHashMap[String, Importer] + + def toImportLockKey(label: Label): String = label.label + + def getFetcher(label: Label): EdgeFetcher = { + fetcherPool.getOrElse(toImportLockKey(label), throw new IllegalStateException(s"$label is not imported.")) + } + + def initImporter(config: Config): Importer = { + val className = config.getString(ClassNameKey) + + Class.forName(className) + .getConstructor(classOf[S2GraphLike]) + .newInstance(s2GraphLike) + .asInstanceOf[Importer] + } + + def initFetcher(config: Config)(implicit ec: ExecutionContext): Future[EdgeFetcher] = { + val className = config.getString(ClassNameKey) + + val fetcher = Class.forName(className) + .getConstructor(classOf[S2GraphLike]) + .newInstance(s2GraphLike) + .asInstanceOf[EdgeFetcher] + + fetcher.init(config) + + Future.successful(fetcher) + } + + def importModel(label: Label, config: Config)(implicit ec: ExecutionContext): Future[Importer] = { + val importer = ImportLock.computeIfAbsent(toImportLockKey(label), new java.util.function.Function[String, Importer] { + override def apply(k: String): Importer = { + val importer = initImporter(config.getConfig("importer")) + + //TODO: Update Label's extra options. + importer + .run(config.getConfig("importer")) + .map { importer => + logger.info(s"Close importer") + importer.close() + + initFetcher(config.getConfig("fetcher")).map { fetcher => + importer.setStatus(true) + + fetcherPool + .remove(k) + .foreach { oldFetcher => + logger.info(s"Delete old storage ($k) => $oldFetcher") + oldFetcher.close() + } + + fetcherPool += (k -> fetcher) + } + } + .onComplete { _ => + logger.info(s"ImportLock release: $k") + ImportLock.remove(k) + } + + importer + } + }) + + Future.successful(importer) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala new file mode 100644 index 0000000..bf90d69 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/fetcher/MemoryModelEdgeFetcher.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.fetcher + +import com.typesafe.config.Config +import org.apache.s2graph.core._ +import org.apache.s2graph.core.types.VertexId + +import scala.concurrent.{ExecutionContext, Future} + +/** + * Reference implementation for Fetcher interface. + * it only produce constant edges. + */ +class MemoryModelEdgeFetcher(val graph: S2GraphLike) extends EdgeFetcher { + val builder = graph.elementBuilder + val ranges = (0 until 10) + + + override def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = { + val stepResultLs = queryRequests.map { queryRequest => + val queryParam = queryRequest.queryParam + val edges = ranges.map { ith => + val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString) + + graph.toEdge(queryRequest.vertex.innerIdVal, + tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction) + } + + val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label)) + StepResult(edgeWithScores, Nil, Nil) + } + + Future.successful(stepResultLs) + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala deleted file mode 100644 index 189a6d0..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.model - -import java.util.concurrent.atomic.AtomicInteger - -trait ImportStatus { - val done: AtomicInteger - - def isCompleted: Boolean - - def percentage: Int - - val total: Int -} - -class ImportRunningStatus(val total: Int) extends ImportStatus { - require(total > 0, s"Total should be positive: $total") - - val done = new AtomicInteger(0) - - def isCompleted: Boolean = total == done.get - - def percentage = 100 * done.get / total -} - -case object ImportDoneStatus extends ImportStatus { - val total = 1 - - val done = new AtomicInteger(1) - - def isCompleted: Boolean = true - - def percentage = 100 -} - -object ImportStatus { - def apply(total: Int): ImportStatus = new ImportRunningStatus(total) - - def unapply(importResult: ImportStatus): Option[(Boolean, Int, Int)] = - Some((importResult.isCompleted, importResult.total, importResult.done.get)) -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala deleted file mode 100644 index e3084dd..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.model - -import java.io.File - -import com.typesafe.config.Config -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.s2graph.core.{Fetcher, S2GraphLike} -import org.apache.s2graph.core.utils.logger - -import scala.concurrent.{ExecutionContext, Future} - -object Importer { - def toHDFSConfiguration(hdfsConfDir: String): Configuration = { - val conf = new Configuration - - val hdfsConfDirectory = new File(hdfsConfDir) - if (hdfsConfDirectory.exists()) { - if (!hdfsConfDirectory.isDirectory || !hdfsConfDirectory.canRead) { - throw new IllegalStateException(s"HDFS configuration directory ($hdfsConfDirectory) cannot be read.") - } - - val path = hdfsConfDirectory.getAbsolutePath - conf.addResource(new Path(s"file:///$path/core-site.xml")) - conf.addResource(new Path(s"file:///$path/hdfs-site.xml")) - } else { - logger.warn("RocksDBImporter doesn't have valid hadoop configuration directory..") - } - conf - } -} - -trait Importer { - @volatile var isFinished: Boolean = false - - def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] - - def status: Boolean = isFinished - - def setStatus(otherStatus: Boolean): Boolean = { - this.isFinished = otherStatus - this.isFinished - } - - def close(): Unit -} - -case class IdentityImporter(graph: S2GraphLike) extends Importer { - override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = { - Future.successful(this) - } - - override def close(): Unit = {} -} - -object HDFSImporter { - - import scala.collection.JavaConverters._ - - val PathsKey = "paths" - val HDFSConfDirKey = "hdfsConfDir" - - def extractPaths(config: Config): Map[String, String] = { - config.getConfigList(PathsKey).asScala.map { e => - val key = e.getString("src") - val value = e.getString("tgt") - - key -> value - }.toMap - } -} - -case class HDFSImporter(graph: S2GraphLike) extends Importer { - - import HDFSImporter._ - - override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = { - Future { - val paths = extractPaths(config) - val hdfsConfiDir = config.getString(HDFSConfDirKey) - - val hadoopConfig = Importer.toHDFSConfiguration(hdfsConfiDir) - val fs = FileSystem.get(hadoopConfig) - - def copyToLocal(remoteSrc: String, localSrc: String): Unit = { - val remoteSrcPath = new Path(remoteSrc) - val localSrcPath = new Path(localSrc) - - fs.copyToLocalFile(remoteSrcPath, localSrcPath) - } - - paths.foreach { case (srcPath, tgtPath) => - copyToLocal(srcPath, tgtPath) - } - - this - } - } - - // override def status: ImportStatus = ??? - - override def close(): Unit = {} -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala deleted file mode 100644 index 2130066..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.model - -import com.typesafe.config.Config -import org.apache.s2graph.core._ -import org.apache.s2graph.core.types.VertexId - -import scala.concurrent.{ExecutionContext, Future} - -/** - * Reference implementation for Fetcher interface. - * it only produce constant edges. - */ -class MemoryModelFetcher(val graph: S2GraphLike) extends Fetcher { - val builder = graph.elementBuilder - val ranges = (0 until 10) - - override def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = { - Future.successful(this) - } - - override def fetches(queryRequests: Seq[QueryRequest], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = { - val stepResultLs = queryRequests.map { queryRequest => - val queryParam = queryRequest.queryParam - val edges = ranges.map { ith => - val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString) - - graph.toEdge(queryRequest.vertex.innerIdVal, - tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction) - } - - val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label)) - StepResult(edgeWithScores, Nil, Nil) - } - - Future.successful(stepResultLs) - } - - override def close(): Unit = {} -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala deleted file mode 100644 index 3cad13c..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.model - -import com.typesafe.config.Config -import org.apache.s2graph.core.schema.Label -import org.apache.s2graph.core.utils.logger -import org.apache.s2graph.core.{Fetcher, S2GraphLike} - -import scala.concurrent.{ExecutionContext, Future} - -object ModelManager { - val ClassNameKey = "className" -} - -class ModelManager(s2GraphLike: S2GraphLike) { - - import ModelManager._ - - private val fetcherPool = scala.collection.mutable.Map.empty[String, Fetcher] - private val ImportLock = new java.util.concurrent.ConcurrentHashMap[String, Importer] - - def toImportLockKey(label: Label): String = label.label - - def getFetcher(label: Label): Fetcher = { - fetcherPool.getOrElse(toImportLockKey(label), throw new IllegalStateException(s"$label is not imported.")) - } - - def initImporter(config: Config): Importer = { - val className = config.getString(ClassNameKey) - - Class.forName(className) - .getConstructor(classOf[S2GraphLike]) - .newInstance(s2GraphLike) - .asInstanceOf[Importer] - } - - def initFetcher(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = { - val className = config.getString(ClassNameKey) - - val fetcher = Class.forName(className) - .getConstructor(classOf[S2GraphLike]) - .newInstance(s2GraphLike) - .asInstanceOf[Fetcher] - - fetcher.init(config) - } - - def importModel(label: Label, config: Config)(implicit ec: ExecutionContext): Future[Importer] = { - val importer = ImportLock.computeIfAbsent(toImportLockKey(label), new java.util.function.Function[String, Importer] { - override def apply(k: String): Importer = { - val importer = initImporter(config.getConfig("importer")) - - //TODO: Update Label's extra options. - importer - .run(config.getConfig("importer")) - .map { importer => - logger.info(s"Close importer") - importer.close() - - initFetcher(config.getConfig("fetcher")).map { fetcher => - importer.setStatus(true) - - fetcherPool - .remove(k) - .foreach { oldFetcher => - logger.info(s"Delete old storage ($k) => $oldFetcher") - oldFetcher.close() - } - - fetcherPool += (k -> fetcher) - } - } - .onComplete { _ => - logger.info(s"ImportLock release: $k") - ImportLock.remove(k) - } - - importer - } - }) - - Future.successful(importer) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala new file mode 100644 index 0000000..82cc27a --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/DefaultOptimisticMutator.scala @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.s2graph.core.storage + +import org.apache.s2graph.core._ +import org.apache.s2graph.core.schema.LabelMeta +import org.apache.s2graph.core.utils.logger + +import scala.concurrent.{ExecutionContext, Future} + +class DefaultOptimisticMutator(graph: S2GraphLike, + serDe: StorageSerDe, + optimisticEdgeFetcher: OptimisticEdgeFetcher, + optimisticMutator: OptimisticMutator) extends VertexMutator with EdgeMutator { + + lazy val io: StorageIO = new StorageIO(graph, serDe) + + lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, optimisticMutator, optimisticEdgeFetcher) + + private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = + optimisticMutator.writeToStorage(cluster, kvs, withWait) + + def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, + requestTs: Long, + retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = { + if (stepInnerResult.isEmpty) Future.successful(true) + else { + val head = stepInnerResult.edgeWithScores.head + val zkQuorum = head.edge.innerLabel.hbaseZkAddr + val futures = for { + edgeWithScore <- stepInnerResult.edgeWithScores + } yield { + val edge = edgeWithScore.edge + + val edgeSnapshot = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) + val reversedSnapshotEdgeMutations = serDe.snapshotEdgeSerializer(edgeSnapshot.toSnapshotEdge).toKeyValues.map(_.copy(operation = SKeyValue.Put)) + + val edgeForward = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) + val forwardIndexedEdgeMutations = edgeForward.edgesWithIndex.flatMap { indexEdge => + serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + io.buildIncrementsAsync(indexEdge, -1L) + } + + /* reverted direction */ + val edgeRevert = edge.copyEdgeWithState(S2Edge.propsToState(edge.updatePropsWithTs())) + val reversedIndexedEdgesMutations = edgeRevert.duplicateEdge.edgesWithIndex.flatMap { indexEdge => + serDe.indexEdgeSerializer(indexEdge).toKeyValues.map(_.copy(operation = SKeyValue.Delete)) ++ + io.buildIncrementsAsync(indexEdge, -1L) + } + + val mutations = reversedIndexedEdgesMutations ++ reversedSnapshotEdgeMutations ++ forwardIndexedEdgeMutations + + writeToStorage(zkQuorum, mutations, withWait = true) + } + + Future.sequence(futures).map { rets => rets.forall(_.isSuccess) } + } + } + + def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { + if (vertex.op == GraphUtil.operations("delete")) { + writeToStorage(zkQuorum, + serDe.vertexSerializer(vertex).toKeyValues.map(_.copy(operation = SKeyValue.Delete)), withWait) + } else if (vertex.op == GraphUtil.operations("deleteAll")) { + logger.info(s"deleteAll for vertex is truncated. $vertex") + Future.successful(MutateResponse.Success) // Ignore withWait parameter, because deleteAll operation may takes long time + } else { + writeToStorage(zkQuorum, io.buildPutsAll(vertex), withWait) + } + } + + def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = { + val mutations = _edges.flatMap { edge => + val (_, edgeUpdate) = + if (edge.getOp() == GraphUtil.operations("delete")) S2Edge.buildDeleteBulk(None, edge) + else S2Edge.buildOperation(None, Seq(edge)) + + val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) + + if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) + io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr + } + + writeToStorage(zkQuorum, mutations, withWait).map { ret => + _edges.zipWithIndex.map { case (edge, idx) => + idx -> ret.isSuccess + } + } + } + + def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = { + def mutateEdgesInner(edges: Seq[S2EdgeLike], + checkConsistency: Boolean, + withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = { + assert(edges.nonEmpty) + // TODO:: remove after code review: unreachable code + if (!checkConsistency) { + + val futures = edges.map { edge => + val (_, edgeUpdate) = S2Edge.buildOperation(None, Seq(edge)) + + val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) + val mutations = + io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr + + if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) + + writeToStorage(zkQuorum, mutations, withWait) + } + Future.sequence(futures).map { rets => new MutateResponse(rets.forall(_.isSuccess)) } + } else { + optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => + conflictResolver.retry(1)(edges, 0, snapshotEdgeOpt).map(new MutateResponse(_)) + } + } + } + + val edgeWithIdxs = _edges.zipWithIndex + val grouped = edgeWithIdxs.groupBy { case (edge, idx) => + (edge.innerLabel, edge.srcVertex.innerId, edge.tgtVertex.innerId) + } toSeq + + val mutateEdges = grouped.map { case ((_, _, _), edgeGroup) => + val edges = edgeGroup.map(_._1) + val idxs = edgeGroup.map(_._2) + // After deleteAll, process others + val mutateEdgeFutures = edges.toList match { + case head :: tail => + val edgeFuture = mutateEdgesInner(edges, checkConsistency = true, withWait) + + //TODO: decide what we will do on failure on vertex put + val puts = io.buildVertexPutsAsync(head) + val vertexFuture = writeToStorage(head.innerLabel.hbaseZkAddr, puts, withWait) + Seq(edgeFuture, vertexFuture) + case Nil => Nil + } + + val composed = for { + // deleteRet <- Future.sequence(deleteAllFutures) + mutateRet <- Future.sequence(mutateEdgeFutures) + } yield mutateRet + + composed.map(_.forall(_.isSuccess)).map { ret => idxs.map(idx => idx -> ret) } + } + + Future.sequence(mutateEdges).map { squashedRets => + squashedRets.flatten.sortBy { case (idx, ret) => idx }.map(_._2) + } + } + + def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = { + val futures = for { + edge <- edges + } yield { + val kvs = for { + relEdge <- edge.relatedEdges + edgeWithIndex <- EdgeMutate.filterIndexOption(relEdge.edgesWithIndexValid) + } yield { + val countWithTs = edge.propertyValueInner(LabelMeta.count) + val countVal = countWithTs.innerVal.toString().toLong + io.buildIncrementsCountAsync(edgeWithIndex, countVal).head + } + writeToStorage(zkQuorum, kvs, withWait = withWait) + } + + Future.sequence(futures) + } + + def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = { + val kvs = io.buildDegreePuts(edge, degreeVal) + + writeToStorage(zkQuorum, kvs, withWait = true) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticEdgeFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticEdgeFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticEdgeFetcher.scala new file mode 100644 index 0000000..4111cc4 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticEdgeFetcher.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage + +import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException +import org.apache.s2graph.core._ +import org.apache.s2graph.core.utils.logger + +import scala.concurrent.{ExecutionContext, Future} + +trait OptimisticEdgeFetcher { + val io: StorageIO + protected def fetchKeyValues(queryRequest: QueryRequest, + edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] + + def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = { + val queryParam = QueryParam(labelName = edge.innerLabel.label, + direction = GraphUtil.fromDirection(edge.getDir()), + tgtVertexIdOpt = Option(edge.tgtVertex.innerIdVal), + cacheTTLInMillis = -1) + val q = Query.toQuery(Seq(edge.srcVertex), Seq(queryParam)) + val queryRequest = QueryRequest(q, 0, edge.srcVertex, queryParam) + + fetchKeyValues(queryRequest, edge).map { kvs => + val (edgeOpt, kvOpt) = + if (kvs.isEmpty) (None, None) + else { + import CanSKeyValue._ + val snapshotEdgeOpt = io.toSnapshotEdge(kvs.head, queryRequest, isInnerCall = true, parentEdges = Nil) + val _kvOpt = kvs.headOption + (snapshotEdgeOpt, _kvOpt) + } + (edgeOpt, kvOpt) + } recoverWith { case ex: Throwable => + logger.error(s"fetchQueryParam failed. fallback return.", ex) + throw new FetchTimeoutException(s"${edge.toLogString}") + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala new file mode 100644 index 0000000..22269df --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/OptimisticMutator.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage + +import scala.concurrent.{ExecutionContext, Future} + +trait OptimisticMutator { + /** + * decide how to store given key values Seq[SKeyValue] into storage using storage's client. + * note that this should be return true on all success. + * we assumes that each storage implementation has client as member variable. + * + * @param cluster : where this key values should be stored. + * @param kvs : sequence of SKeyValue that need to be stored in storage. + * @param withWait : flag to control wait ack from storage. + * note that in AsynchbaseStorage(which support asynchronous operations), even with true, + * it never block thread, but rather submit work and notified by event loop when storage send ack back. + * @return ack message from storage. + */ + def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] + + /** + * write requestKeyValue into storage if the current value in storage that is stored matches. + * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge. + * + * Most important thing is this have to be 'atomic' operation. + * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be + * either blocked or failed on write-write conflict case. + * + * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to + * prevent wrong data for read. + * + * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction, + * compareAndSet to synchronize. + * + * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'. + * for storage that does not support concurrency control, then storage implementation + * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues) + * and write(writeLock). + * + * @param requestKeyValue + * @param expectedOpt + * @return + */ + def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index d2500a6..36ecfcb 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -34,52 +34,30 @@ abstract class Storage(val graph: S2GraphLike, val management: StorageManagement /* - * Given QueryRequest/Vertex/Edge, fetch KeyValue from storage - * then convert them into Edge/Vertex - */ - val reader: StorageReadable - - /* - * Serialize Edge/Vertex, to common KeyValue, SKeyValue that - * can be stored aligned to backend storage's physical schema. - * Also Deserialize storage backend's KeyValue to SKeyValue. - */ + * Serialize Edge/Vertex, to common KeyValue, SKeyValue that + * can be stored aligned to backend storage's physical schema. + * Also Deserialize storage backend's KeyValue to SKeyValue. + */ val serDe: StorageSerDe - /* - * Responsible to connect physical storage backend to store GraphElement(Edge/Vertex). - */ - val mutator: Mutator + val edgeFetcher: EdgeFetcher - /* - * Common helper to translate SKeyValue to Edge/Vertex and vice versa. - * Note that it require storage backend specific implementation for serialize/deserialize. - */ - lazy val io: StorageIO = new StorageIO(graph, serDe) + val edgeBulkFetcher: EdgeBulkFetcher - /* - * Common helper to resolve write-write conflict on snapshot edge with same EdgeId. - * Note that it require storage backend specific implementations for - * all of StorageWritable, StorageReadable, StorageSerDe, StorageIO - */ -// lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, reader) -// lazy val mutationHelper: MutationHelper = new MutationHelper(this) + val vertexFetcher: VertexFetcher + val vertexBulkFetcher: VertexBulkFetcher - /** Fetch **/ - def fetches(queryRequests: Seq[QueryRequest], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = - reader.fetches(queryRequests, prevStepEdges) + val edgeMutator: EdgeMutator - def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = - reader.fetchVertices(vertices) + val vertexMutator: VertexMutator - def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = reader.fetchEdgesAll() - - def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = reader.fetchVerticesAll() + /* + * Common helper to translate SKeyValue to Edge/Vertex and vice versa. + * Note that it require storage backend specific implementation for serialize/deserialize. + */ + lazy val io: StorageIO = new StorageIO(graph, serDe) - def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = - reader.fetchSnapshotEdgeInner(edge) /** Management **/ def flush(): Unit = management.flush() @@ -94,24 +72,4 @@ abstract class Storage(val graph: S2GraphLike, def info: Map[String, String] = Map("className" -> this.getClass.getSimpleName) - def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, - requestTs: Long, - retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = - mutator.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum) - - def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = - mutator.mutateVertex(zkQuorum: String, vertex, withWait) - - def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = - mutator.mutateStrongEdges(zkQuorum, _edges, withWait) - - - def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = - mutator.mutateWeakEdges(zkQuorum, _edges, withWait) - - def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = - mutator.incrementCounts(zkQuorum, edges, withWait) - - def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = - mutator.updateDegree(zkQuorum, edge, degreeVal) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala index b10feb9..c3abd03 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala @@ -27,7 +27,7 @@ import org.apache.s2graph.core.utils.logger import scala.concurrent.{ExecutionContext, Future} -trait StorageReadable extends Fetcher { +trait StorageReadable extends EdgeFetcher { val io: StorageIO val serDe: StorageSerDe // /** @@ -44,9 +44,14 @@ trait StorageReadable extends Fetcher { def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] + + + + + protected def fetchKeyValues(queryRequest: QueryRequest, edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] - protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] +// protected def fetchKeyValues(queryRequest: QueryRequest, vertex: S2VertexLike)(implicit ec: ExecutionContext): Future[Seq[SKeyValue]] def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = { @@ -73,25 +78,25 @@ trait StorageReadable extends Fetcher { } } - def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = { - def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = { - if (kvs.isEmpty) Nil - else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq - } - - val futures = vertices.map { vertex => - val queryParam = QueryParam.Empty - val q = Query.toQuery(Seq(vertex), Seq(queryParam)) - val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) - - fetchKeyValues(queryRequest, vertex).map { kvs => - fromResult(kvs, vertex.serviceColumn.schemaVersion) - } recoverWith { - case ex: Throwable => Future.successful(Nil) - } - } - - Future.sequence(futures).map(_.flatten) - } +// def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = { +// def fromResult(kvs: Seq[SKeyValue], version: String): Seq[S2VertexLike] = { +// if (kvs.isEmpty) Nil +// else serDe.vertexDeserializer(version).fromKeyValues(kvs, None).toSeq +// } +// +// val futures = vertices.map { vertex => +// val queryParam = QueryParam.Empty +// val q = Query.toQuery(Seq(vertex), Seq(queryParam)) +// val queryRequest = QueryRequest(q, stepIdx = -1, vertex, queryParam) +// +// fetchKeyValues(queryRequest, vertex).map { kvs => +// fromResult(kvs, vertex.serviceColumn.schemaVersion) +// } recoverWith { +// case ex: Throwable => Future.successful(Nil) +// } +// } +// +// Future.sequence(futures).map(_.flatten) +// } } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala deleted file mode 100644 index 8c2fb27..0000000 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.s2graph.core.storage - -import org.apache.s2graph.core.Mutator - -import scala.concurrent.{ExecutionContext, Future} - -trait OptimisticMutator extends Mutator { - /** - * decide how to store given key values Seq[SKeyValue] into storage using storage's client. - * note that this should be return true on all success. - * we assumes that each storage implementation has client as member variable. - * - * @param cluster : where this key values should be stored. - * @param kvs : sequence of SKeyValue that need to be stored in storage. - * @param withWait : flag to control wait ack from storage. - * note that in AsynchbaseStorage(which support asynchronous operations), even with true, - * it never block thread, but rather submit work and notified by event loop when storage send ack back. - * @return ack message from storage. - */ - def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] - - /** - * write requestKeyValue into storage if the current value in storage that is stored matches. - * note that we only use SnapshotEdge as place for lock, so this method only change SnapshotEdge. - * - * Most important thing is this have to be 'atomic' operation. - * When this operation is mutating requestKeyValue's snapshotEdge, then other thread need to be - * either blocked or failed on write-write conflict case. - * - * Also while this method is still running, then fetchSnapshotEdgeKeyValues should be synchronized to - * prevent wrong data for read. - * - * Best is use storage's concurrency control(either pessimistic or optimistic) such as transaction, - * compareAndSet to synchronize. - * - * for example, AsynchbaseStorage use HBase's CheckAndSet atomic operation to guarantee 'atomicity'. - * for storage that does not support concurrency control, then storage implementation - * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues) - * and write(writeLock). - * - * @param requestKeyValue - * @param expectedOpt - * @return - */ - def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala index bfc5bc6..18159f6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala @@ -32,7 +32,7 @@ class WriteWriteConflictResolver(graph: S2GraphLike, serDe: StorageSerDe, io: StorageIO, mutator: OptimisticMutator, - fetcher: StorageReadable) { + optimisticEdgeFetcher: OptimisticEdgeFetcher) { val BackoffTimeout = graph.BackoffTimeout val MaxRetryNum = graph.MaxRetryNum val MaxBackOff = graph.MaxBackOff @@ -68,7 +68,7 @@ class WriteWriteConflictResolver(graph: S2GraphLike, case FetchTimeoutException(retryEdge) => logger.info(s"[Try: $tryNum], Fetch fail.\n${retryEdge}") /* fetch failed. re-fetch should be done */ - fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => + optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) } @@ -90,7 +90,7 @@ class WriteWriteConflictResolver(graph: S2GraphLike, val future = if (failedStatusCode == 0) { // acquire Lock failed. other is mutating so this thead need to re-fetch snapshotEdge. /* fetch failed. re-fetch should be done */ - fetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => + optimisticEdgeFetcher.fetchSnapshotEdgeInner(edges.head).flatMap { case (snapshotEdgeOpt, kvOpt) => retry(tryNum + 1)(edges, statusCode, snapshotEdgeOpt) } } else { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/43f627e5/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala new file mode 100644 index 0000000..3d25dd9 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseEdgeBulkFetcher.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.storage.hbase + +import java.util + +import com.typesafe.config.Config +import org.apache.s2graph.core.schema.Label +import org.apache.s2graph.core.storage.serde.Serializable +import org.apache.s2graph.core.{EdgeBulkFetcher, S2EdgeLike, S2Graph, S2GraphLike} +import org.apache.s2graph.core.storage.{CanSKeyValue, StorageIO, StorageSerDe} +import org.apache.s2graph.core.types.HBaseType +import org.apache.s2graph.core.utils.{CanDefer, Extensions} +import org.hbase.async.{HBaseClient, KeyValue} + +import scala.concurrent.{ExecutionContext, Future} + +class AsynchbaseEdgeBulkFetcher(val graph: S2GraphLike, + val config: Config, + val client: HBaseClient, + val serDe: StorageSerDe, + val io: StorageIO) extends EdgeBulkFetcher { + import Extensions.DeferOps + import CanDefer._ + import scala.collection.JavaConverters._ + import AsynchbaseStorage._ + + override def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = { + val futures = Label.findAll().groupBy(_.hbaseTableName).toSeq.map { case (hTableName, labels) => + val distinctLabels = labels.toSet + val scan = AsynchbasePatcher.newScanner(client, hTableName) + scan.setFamily(Serializable.edgeCf) + scan.setMaxVersions(1) + + scan.nextRows(S2Graph.FetchAllLimit).toFuture(emptyKeyValuesLs).map { + case null => Seq.empty + case kvsLs => + kvsLs.asScala.flatMap { kvs => + kvs.asScala.flatMap { kv => + val sKV = implicitly[CanSKeyValue[KeyValue]].toSKeyValue(kv) + + serDe.indexEdgeDeserializer(schemaVer = HBaseType.DEFAULT_VERSION) + .fromKeyValues(Seq(kv), None) + .filter(e => distinctLabels(e.innerLabel) && e.getDirection() == "out" && !e.isDegree) + } + } + } + } + + Future.sequence(futures).map(_.flatten) + } +}
