Repository: incubator-s2graph Updated Branches: refs/heads/master 7af37dbd3 -> 33f4d0550
- Add Fetcher/Mutator interface for query/mutation. - Refactor Storage to use Fetcher/Mutator interface. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2357d810 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2357d810 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2357d810 Branch: refs/heads/master Commit: 2357d810a6419011d4fd38af248089d9551a200c Parents: 7af37db Author: DO YUNG YOON <[email protected]> Authored: Tue May 8 16:27:07 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Tue May 8 16:27:07 2018 +0900 ---------------------------------------------------------------------- project/Common.scala | 2 + s2core/build.sbt | 2 +- .../scala/org/apache/s2graph/core/Fetcher.scala | 36 ++++++ .../org/apache/s2graph/core/Management.scala | 15 ++- .../scala/org/apache/s2graph/core/Mutator.scala | 41 +++++++ .../org/apache/s2graph/core/QueryResult.scala | 4 +- .../scala/org/apache/s2graph/core/S2Graph.scala | 39 ++++-- .../org/apache/s2graph/core/S2GraphLike.scala | 11 ++ .../apache/s2graph/core/TraversalHelper.scala | 8 +- .../s2graph/core/model/ImportStatus.scala | 59 +++++++++ .../apache/s2graph/core/model/Importer.scala | 122 +++++++++++++++++++ .../s2graph/core/model/MemoryModelFetcher.scala | 59 +++++++++ .../s2graph/core/model/ModelManager.scala | 103 ++++++++++++++++ .../org/apache/s2graph/core/schema/Label.scala | 35 +++--- .../org/apache/s2graph/core/schema/Schema.scala | 4 +- .../apache/s2graph/core/schema/Service.scala | 2 +- .../apache/s2graph/core/storage/Storage.scala | 45 +++---- .../s2graph/core/storage/StorageReadable.scala | 22 ++-- .../s2graph/core/storage/StorageWritable.scala | 19 +-- .../storage/WriteWriteConflictResolver.scala | 2 +- .../core/storage/hbase/AsynchbaseStorage.scala | 12 +- .../hbase/AsynchbaseStorageWritable.scala | 11 +- .../core/storage/rocks/RocksStorage.scala | 11 +- .../storage/rocks/RocksStorageReadable.scala | 2 +- .../storage/rocks/RocksStorageWritable.scala | 10 +- .../core/storage/serde/MutationHelper.scala | 32 +++-- .../apache/s2graph/core/model/FetcherTest.scala | 87 +++++++++++++ .../apache/s2graph/graphql/GraphQLServer.scala | 18 ++- .../org/apache/s2graph/graphql/HttpServer.scala | 2 + 29 files changed, 695 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/project/Common.scala ---------------------------------------------------------------------- diff --git a/project/Common.scala b/project/Common.scala index 96109d3..b46d190 100644 --- a/project/Common.scala +++ b/project/Common.scala @@ -33,6 +33,8 @@ object Common { val KafkaVersion = "0.10.2.1" + val rocksVersion = "5.11.3" + /** use Log4j 1.2.17 as the SLF4j backend in runtime, with bridging libraries to forward JCL and JUL logs to SLF4j */ val loggingRuntime = Seq( "log4j" % "log4j" % "1.2.17", http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/build.sbt ---------------------------------------------------------------------- diff --git a/s2core/build.sbt b/s2core/build.sbt index cc70e97..12319d8 100644 --- a/s2core/build.sbt +++ b/s2core/build.sbt @@ -50,7 +50,7 @@ libraryDependencies ++= Seq( "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion , "org.apache.lucene" % "lucene-core" % "6.6.0", "org.apache.lucene" % "lucene-queryparser" % "6.6.0", - "org.rocksdb" % "rocksdbjni" % "5.8.0", + "org.rocksdb" % "rocksdbjni" % rocksVersion, "org.scala-lang.modules" %% "scala-java8-compat" % "0.8.0", "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion excludeLogging(), "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion excludeLogging(), http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala new file mode 100644 index 0000000..57d2f29 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core + +import com.typesafe.config.Config +import org.apache.s2graph.core.types.VertexId + +import scala.concurrent.{ExecutionContext, Future} + +trait Fetcher { + + def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = + Future.successful(this) + + def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] + + def close(): Unit = {} +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index d026e5b..7ff5a9e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -20,6 +20,7 @@ package org.apache.s2graph.core import java.util +import java.util.concurrent.Executors import com.typesafe.config.{Config, ConfigFactory} import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNameTooLongException, LabelNotExistException} @@ -28,8 +29,10 @@ import org.apache.s2graph.core.schema._ import org.apache.s2graph.core.types.HBaseType._ import org.apache.s2graph.core.types._ import org.apache.s2graph.core.JSONParser._ +import org.apache.s2graph.core.model.Importer import play.api.libs.json._ +import scala.concurrent.{ExecutionContext, Future} import scala.util.Try /** @@ -70,7 +73,6 @@ object Management { case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None) } - def findService(serviceName: String) = { Service.findByName(serviceName, useCache = false) } @@ -298,9 +300,18 @@ object Management { class Management(graph: S2GraphLike) { + val importEx = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) import Management._ - import scala.collection.JavaConversions._ + + def importModel(labelName: String, options: String): Future[Importer] = { + Label.updateOption(labelName, options) + + val label = Label.findByName(labelName, false).getOrElse(throw new LabelNotExistException(labelName)) + val config = ConfigFactory.parseString(options) + + graph.modelManager.importModel(label, config)(importEx) + } def createStorageTable(zkAddr: String, tableName: String, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala b/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala new file mode 100644 index 0000000..53161e1 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/Mutator.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core + +import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue} + +import scala.concurrent.{ExecutionContext, Future} + +trait Mutator { + def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] + + def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] + + def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] + + def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] + + def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] + + def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, + requestTs: Long, + retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] +} + http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala index be57017..4a1018f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -237,7 +237,7 @@ object StepResult { // val newOrderByValues = updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, newScore) val newOrderByValues = - if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.getTsInnerValValue(), None, None) + if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.getTs(), None, None) else toTuple4(newT.toValues(globalQueryOption.orderByKeys)) val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys) @@ -262,7 +262,7 @@ object StepResult { // val newOrderByValues = updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, newScore) val newOrderByValues = - if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.getTsInnerValValue(), None, None) + if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.getTs(), None, None) else toTuple4(newT.toValues(globalQueryOption.orderByKeys)) val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 7816a63..4b2274a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -27,6 +27,7 @@ import com.typesafe.config.{Config, ConfigFactory} import org.apache.commons.configuration.{BaseConfiguration, Configuration} import org.apache.s2graph.core.index.IndexProvider import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy +import org.apache.s2graph.core.model.ModelManager import org.apache.s2graph.core.schema._ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage import org.apache.s2graph.core.storage.rocks.RocksStorage @@ -186,6 +187,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override val management = new Management(this) + override val modelManager = new ModelManager(this) + override val indexProvider = IndexProvider.apply(config) override val elementBuilder = new GraphElementBuilder(this) @@ -247,6 +250,25 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap storagePool.getOrElse(s"label:${label.label}", defaultStorage) } + //TODO: + override def getFetcher(column: ServiceColumn): Fetcher = { + getStorage(column.service).reader + } + + override def getFetcher(label: Label): Fetcher = { + if (label.fetchConfigExist) modelManager.getFetcher(label) + else getStorage(label).reader + } + + override def getMutator(column: ServiceColumn): Mutator = { + getStorage(column.service).mutator + } + + override def getMutator(label: Label): Mutator = { + getStorage(label).mutator + } + + //TODO: override def flushStorage(): Unit = { storagePool.foreach { case (_, storage) => @@ -302,7 +324,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap def mutateVertices(storage: Storage)(zkQuorum: String, vertices: Seq[S2VertexLike], withWait: Boolean = false): Future[Seq[MutateResponse]] = { val futures = vertices.map { vertex => - storage.mutateVertex(zkQuorum, vertex, withWait) + getMutator(vertex.serviceColumn).mutateVertex(zkQuorum, vertex, withWait) } Future.sequence(futures) } @@ -329,12 +351,12 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap val weakEdgesFutures = weakEdges.groupBy { case (edge, idx) => edge.innerLabel.hbaseZkAddr }.map { case (zkQuorum, edgeWithIdxs) => val futures = edgeWithIdxs.groupBy(_._1.innerLabel).map { case (label, edgeGroup) => - val storage = getStorage(label) + val mutator = getMutator(label) val edges = edgeGroup.map(_._1) val idxs = edgeGroup.map(_._2) /* multiple edges with weak consistency level will be processed as batch */ - storage.mutateWeakEdges(zkQuorum, edges, withWait) + mutator.mutateWeakEdges(zkQuorum, edges, withWait) } Future.sequence(futures) } @@ -347,9 +369,10 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap val strongEdgesFutures = strongEdgesAll.groupBy { case (edge, idx) => edge.innerLabel }.map { case (label, edgeGroup) => val edges = edgeGroup.map(_._1) val idxs = edgeGroup.map(_._2) - val storage = getStorage(label) + val mutator = getMutator(label) val zkQuorum = label.hbaseZkAddr - storage.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets => + + mutator.mutateStrongEdges(zkQuorum, edges, withWait = true).map { rets => idxs.zip(rets) } } @@ -474,7 +497,7 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override def incrementCounts(edges: Seq[S2EdgeLike], withWait: Boolean): Future[Seq[MutateResponse]] = { val edgesWithIdx = edges.zipWithIndex val futures = edgesWithIdx.groupBy { case (e, idx) => e.innerLabel }.map { case (label, edgeGroup) => - getStorage(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) + getMutator(label).incrementCounts(label.hbaseZkAddr, edgeGroup.map(_._1), withWait).map(_.zip(edgeGroup.map(_._2))) } Future.sequence(futures).map { ls => @@ -484,9 +507,9 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override def updateDegree(edge: S2EdgeLike, degreeVal: Long = 0): Future[MutateResponse] = { val label = edge.innerLabel - val storage = getStorage(label) + val mutator = getMutator(label) - storage.updateDegree(label.hbaseZkAddr, edge, degreeVal) + mutator.updateDegree(label.hbaseZkAddr, edge, degreeVal) } override def getVertex(vertexId: VertexId): Option[S2VertexLike] = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala index cbd31cc..6ed78b0 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala @@ -31,6 +31,7 @@ import org.apache.s2graph.core.GraphExceptions.LabelNotExistException import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName} import org.apache.s2graph.core.features.{S2Features, S2GraphVariables} import org.apache.s2graph.core.index.IndexProvider +import org.apache.s2graph.core.model.ModelManager import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, ServiceColumn} import org.apache.s2graph.core.storage.{MutateResponse, Storage} import org.apache.s2graph.core.types.{InnerValLike, VertexId} @@ -68,6 +69,8 @@ trait S2GraphLike extends Graph { val traversalHelper: TraversalHelper + val modelManager: ModelManager + lazy val MaxRetryNum: Int = config.getInt("max.retry.number") lazy val MaxBackOff: Int = config.getInt("max.back.off") lazy val BackoffTimeout: Int = config.getInt("back.off.timeout") @@ -90,6 +93,14 @@ trait S2GraphLike extends Graph { def getStorage(label: Label): Storage + def getFetcher(column: ServiceColumn): Fetcher + + def getFetcher(label: Label): Fetcher + + def getMutator(label: Label): Mutator + + def getMutator(column: ServiceColumn): Mutator + def flushStorage(): Unit def shutdown(modelDataDelete: Boolean = false): Unit http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala index 0dc2aa2..d19dd1f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala @@ -204,7 +204,7 @@ class TraversalHelper(graph: S2GraphLike) { val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) => for { prev <- prevFuture - cur <- graph.getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges) + cur <- graph.getFetcher(label).fetches(reqWithIdxs.map(_._1), prevStepEdges) } yield { prev ++ reqWithIdxs.map(_._2).zip(cur).toMap } @@ -256,7 +256,7 @@ class TraversalHelper(graph: S2GraphLike) { */ graph.mutateEdges(edgesToDelete.map(_.edge), withWait = true).map(_.forall(_.isSuccess)) } else { - graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) + graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) } case _ => @@ -264,7 +264,7 @@ class TraversalHelper(graph: S2GraphLike) { * read: x * write: N x ((1(snapshotEdge) + 2(1 for incr, 1 for delete) x indices) */ - graph.getStorage(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) + graph.getMutator(label).deleteAllFetchedEdgesAsyncOld(stepResult, requestTs, MaxRetryNum) } ret } @@ -389,7 +389,7 @@ class TraversalHelper(graph: S2GraphLike) { val newEdgeWithScore = edgeWithScore.copy(edge = newEdge) /* OrderBy */ val orderByValues = - if (queryOption.orderByKeys.isEmpty) (score, edge.getTsInnerValValue(), None, None) + if (queryOption.orderByKeys.isEmpty) (score, edge.getTs(), None, None) else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys)) /* StepGroupBy */ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala new file mode 100644 index 0000000..189a6d0 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.model + +import java.util.concurrent.atomic.AtomicInteger + +trait ImportStatus { + val done: AtomicInteger + + def isCompleted: Boolean + + def percentage: Int + + val total: Int +} + +class ImportRunningStatus(val total: Int) extends ImportStatus { + require(total > 0, s"Total should be positive: $total") + + val done = new AtomicInteger(0) + + def isCompleted: Boolean = total == done.get + + def percentage = 100 * done.get / total +} + +case object ImportDoneStatus extends ImportStatus { + val total = 1 + + val done = new AtomicInteger(1) + + def isCompleted: Boolean = true + + def percentage = 100 +} + +object ImportStatus { + def apply(total: Int): ImportStatus = new ImportRunningStatus(total) + + def unapply(importResult: ImportStatus): Option[(Boolean, Int, Int)] = + Some((importResult.isCompleted, importResult.total, importResult.done.get)) +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala new file mode 100644 index 0000000..e3084dd --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.model + +import java.io.File + +import com.typesafe.config.Config +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.s2graph.core.{Fetcher, S2GraphLike} +import org.apache.s2graph.core.utils.logger + +import scala.concurrent.{ExecutionContext, Future} + +object Importer { + def toHDFSConfiguration(hdfsConfDir: String): Configuration = { + val conf = new Configuration + + val hdfsConfDirectory = new File(hdfsConfDir) + if (hdfsConfDirectory.exists()) { + if (!hdfsConfDirectory.isDirectory || !hdfsConfDirectory.canRead) { + throw new IllegalStateException(s"HDFS configuration directory ($hdfsConfDirectory) cannot be read.") + } + + val path = hdfsConfDirectory.getAbsolutePath + conf.addResource(new Path(s"file:///$path/core-site.xml")) + conf.addResource(new Path(s"file:///$path/hdfs-site.xml")) + } else { + logger.warn("RocksDBImporter doesn't have valid hadoop configuration directory..") + } + conf + } +} + +trait Importer { + @volatile var isFinished: Boolean = false + + def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] + + def status: Boolean = isFinished + + def setStatus(otherStatus: Boolean): Boolean = { + this.isFinished = otherStatus + this.isFinished + } + + def close(): Unit +} + +case class IdentityImporter(graph: S2GraphLike) extends Importer { + override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = { + Future.successful(this) + } + + override def close(): Unit = {} +} + +object HDFSImporter { + + import scala.collection.JavaConverters._ + + val PathsKey = "paths" + val HDFSConfDirKey = "hdfsConfDir" + + def extractPaths(config: Config): Map[String, String] = { + config.getConfigList(PathsKey).asScala.map { e => + val key = e.getString("src") + val value = e.getString("tgt") + + key -> value + }.toMap + } +} + +case class HDFSImporter(graph: S2GraphLike) extends Importer { + + import HDFSImporter._ + + override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = { + Future { + val paths = extractPaths(config) + val hdfsConfiDir = config.getString(HDFSConfDirKey) + + val hadoopConfig = Importer.toHDFSConfiguration(hdfsConfiDir) + val fs = FileSystem.get(hadoopConfig) + + def copyToLocal(remoteSrc: String, localSrc: String): Unit = { + val remoteSrcPath = new Path(remoteSrc) + val localSrcPath = new Path(localSrc) + + fs.copyToLocalFile(remoteSrcPath, localSrcPath) + } + + paths.foreach { case (srcPath, tgtPath) => + copyToLocal(srcPath, tgtPath) + } + + this + } + } + + // override def status: ImportStatus = ??? + + override def close(): Unit = {} +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala new file mode 100644 index 0000000..2130066 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.model + +import com.typesafe.config.Config +import org.apache.s2graph.core._ +import org.apache.s2graph.core.types.VertexId + +import scala.concurrent.{ExecutionContext, Future} + +/** + * Reference implementation for Fetcher interface. + * it only produce constant edges. + */ +class MemoryModelFetcher(val graph: S2GraphLike) extends Fetcher { + val builder = graph.elementBuilder + val ranges = (0 until 10) + + override def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = { + Future.successful(this) + } + + override def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = { + val stepResultLs = queryRequests.map { queryRequest => + val queryParam = queryRequest.queryParam + val edges = ranges.map { ith => + val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString) + + graph.toEdge(queryRequest.vertex.innerIdVal, + tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction) + } + + val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label)) + StepResult(edgeWithScores, Nil, Nil) + } + + Future.successful(stepResultLs) + } + + override def close(): Unit = {} +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala new file mode 100644 index 0000000..3cad13c --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.model + +import com.typesafe.config.Config +import org.apache.s2graph.core.schema.Label +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{Fetcher, S2GraphLike} + +import scala.concurrent.{ExecutionContext, Future} + +object ModelManager { + val ClassNameKey = "className" +} + +class ModelManager(s2GraphLike: S2GraphLike) { + + import ModelManager._ + + private val fetcherPool = scala.collection.mutable.Map.empty[String, Fetcher] + private val ImportLock = new java.util.concurrent.ConcurrentHashMap[String, Importer] + + def toImportLockKey(label: Label): String = label.label + + def getFetcher(label: Label): Fetcher = { + fetcherPool.getOrElse(toImportLockKey(label), throw new IllegalStateException(s"$label is not imported.")) + } + + def initImporter(config: Config): Importer = { + val className = config.getString(ClassNameKey) + + Class.forName(className) + .getConstructor(classOf[S2GraphLike]) + .newInstance(s2GraphLike) + .asInstanceOf[Importer] + } + + def initFetcher(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = { + val className = config.getString(ClassNameKey) + + val fetcher = Class.forName(className) + .getConstructor(classOf[S2GraphLike]) + .newInstance(s2GraphLike) + .asInstanceOf[Fetcher] + + fetcher.init(config) + } + + def importModel(label: Label, config: Config)(implicit ec: ExecutionContext): Future[Importer] = { + val importer = ImportLock.computeIfAbsent(toImportLockKey(label), new java.util.function.Function[String, Importer] { + override def apply(k: String): Importer = { + val importer = initImporter(config.getConfig("importer")) + + //TODO: Update Label's extra options. + importer + .run(config.getConfig("importer")) + .map { importer => + logger.info(s"Close importer") + importer.close() + + initFetcher(config.getConfig("fetcher")).map { fetcher => + importer.setStatus(true) + + fetcherPool + .remove(k) + .foreach { oldFetcher => + logger.info(s"Delete old storage ($k) => $oldFetcher") + oldFetcher.close() + } + + fetcherPool += (k -> fetcher) + } + } + .onComplete { _ => + logger.info(s"ImportLock release: $k") + ImportLock.remove(k) + } + + importer + } + }) + + Future.successful(importer) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala index 7fb1183..cca1769 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala @@ -259,6 +259,20 @@ object Label extends SQLSyntaxSupport[Label] { cnt } + def updateOption(labelName: String, options: String)(implicit session: DBSession = AutoSession) = { + scala.util.Try(Json.parse(options)).getOrElse(throw new RuntimeException("invalid Json option")) + logger.info(s"update options of label $labelName, ${options}") + val cnt = sql"""update labels set options = $options where label = $labelName""".update().apply() + val label = Label.findByName(labelName, useCache = false).get + + val cacheKeys = List(s"id=${label.id.get}", s"label=${label.label}") + cacheKeys.foreach { key => + expireCache(className + key) + expireCaches(className + key) + } + cnt + } + def delete(id: Int)(implicit session: DBSession = AutoSession) = { val label = findById(id) logger.info(s"delete label: $label") @@ -369,19 +383,6 @@ case class Label(id: Option[Int], label: String, prop <- metaProps if LabelMeta.isValidSeq(prop.seq) jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType) } yield prop -> jsValue).toMap -// lazy val extraOptions = Model.extraOptions(Option("""{ -// "storage": { -// "s2graph.storage.backend": "rocks", -// "rocks.db.path": "/tmp/db" -// } -// }""")) - - lazy val tokens: Set[String] = extraOptions.get("tokens").fold(Set.empty[String]) { - case JsArray(tokens) => tokens.map(_.as[String]).toSet - case _ => - logger.error("Invalid token JSON") - Set.empty[String] - } lazy val extraOptions = Schema.extraOptions(options) @@ -389,8 +390,14 @@ case class Label(id: Option[Int], label: String, lazy val storageConfigOpt: Option[Config] = toStorageConfig + lazy val fetchConfigExist: Boolean = toFetcherConfig.isDefined + + def toFetcherConfig: Option[Config] = { + Schema.toConfig(extraOptions, "fetcher") + } + def toStorageConfig: Option[Config] = { - Schema.toStorageConfig(extraOptions) + Schema.toConfig(extraOptions, "storage") } def srcColumnWithDir(dir: Int) = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala index c28df80..50c1b7f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala @@ -175,9 +175,9 @@ object Schema { } } - def toStorageConfig(options: Map[String, JsValue]): Option[Config] = { + def toConfig(options: Map[String, JsValue], key: String): Option[Config] = { try { - options.get("storage").map { jsValue => + options.get(key).map { jsValue => import scala.collection.JavaConverters._ val configMap = jsValue.as[JsObject].fieldSet.toMap.map { case (key, value) => key -> JSONParser.jsValueToAny(value).getOrElse(throw new RuntimeException("!!")) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala index 611a746..dbbfed7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala @@ -129,5 +129,5 @@ case class Service(id: Option[Int], lazy val extraOptions = Schema.extraOptions(options) lazy val storageConfigOpt: Option[Config] = toStorageConfig def serviceColumns(useCache: Boolean): Seq[ServiceColumn] = ServiceColumn.findByServiceId(id.get, useCache = useCache) - def toStorageConfig: Option[Config] = Schema.toStorageConfig(extraOptions) + def toStorageConfig: Option[Config] = Schema.toConfig(extraOptions, "storage") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index 18f6b1e..d2500a6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -22,7 +22,7 @@ package org.apache.s2graph.core.storage import com.typesafe.config.Config import org.apache.s2graph.core._ -import org.apache.s2graph.core.storage.serde.{Deserializable, MutationHelper} +import org.apache.s2graph.core.storage.serde.Deserializable import org.apache.s2graph.core.storage.serde.indexedge.tall.IndexEdgeDeserializable import org.apache.s2graph.core.types._ @@ -33,14 +33,11 @@ abstract class Storage(val graph: S2GraphLike, /* Storage backend specific resource management */ val management: StorageManagement - /* Physically store given KeyValue into backend storage. */ - val mutator: StorageWritable - /* * Given QueryRequest/Vertex/Edge, fetch KeyValue from storage * then convert them into Edge/Vertex */ - val fetcher: StorageReadable + val reader: StorageReadable /* * Serialize Edge/Vertex, to common KeyValue, SKeyValue that @@ -50,6 +47,11 @@ abstract class Storage(val graph: S2GraphLike, val serDe: StorageSerDe /* + * Responsible to connect physical storage backend to store GraphElement(Edge/Vertex). + */ + val mutator: Mutator + + /* * Common helper to translate SKeyValue to Edge/Vertex and vice versa. * Note that it require storage backend specific implementation for serialize/deserialize. */ @@ -60,31 +62,24 @@ abstract class Storage(val graph: S2GraphLike, * Note that it require storage backend specific implementations for * all of StorageWritable, StorageReadable, StorageSerDe, StorageIO */ - lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, fetcher) - - lazy val mutationHelper: MutationHelper = new MutationHelper(this) - - /** Mutation **/ - def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = - mutator.writeToStorage(cluster, kvs, withWait) +// lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, reader) +// lazy val mutationHelper: MutationHelper = new MutationHelper(this) - def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] = - mutator.writeLock(requestKeyValue, expectedOpt) /** Fetch **/ def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = - fetcher.fetches(queryRequests, prevStepEdges) + reader.fetches(queryRequests, prevStepEdges) def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = - fetcher.fetchVertices(vertices) + reader.fetchVertices(vertices) - def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = fetcher.fetchEdgesAll() + def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = reader.fetchEdgesAll() - def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = fetcher.fetchVerticesAll() + def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = reader.fetchVerticesAll() def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = - fetcher.fetchSnapshotEdgeInner(edge) + reader.fetchSnapshotEdgeInner(edge) /** Management **/ def flush(): Unit = management.flush() @@ -102,21 +97,21 @@ abstract class Storage(val graph: S2GraphLike, def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, requestTs: Long, retryNum: Int)(implicit ec: ExecutionContext): Future[Boolean] = - mutationHelper.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum) + mutator.deleteAllFetchedEdgesAsyncOld(stepInnerResult, requestTs, retryNum) def mutateVertex(zkQuorum: String, vertex: S2VertexLike, withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = - mutationHelper.mutateVertex(zkQuorum: String, vertex, withWait) + mutator.mutateVertex(zkQuorum: String, vertex, withWait) def mutateStrongEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[Boolean]] = - mutationHelper.mutateStrongEdges(zkQuorum, _edges, withWait) + mutator.mutateStrongEdges(zkQuorum, _edges, withWait) def mutateWeakEdges(zkQuorum: String, _edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[(Int, Boolean)]] = - mutationHelper.mutateWeakEdges(zkQuorum, _edges, withWait) + mutator.mutateWeakEdges(zkQuorum, _edges, withWait) def incrementCounts(zkQuorum: String, edges: Seq[S2EdgeLike], withWait: Boolean)(implicit ec: ExecutionContext): Future[Seq[MutateResponse]] = - mutationHelper.incrementCounts(zkQuorum, edges, withWait) + mutator.incrementCounts(zkQuorum, edges, withWait) def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = - mutationHelper.updateDegree(zkQuorum, edge, degreeVal) + mutator.updateDegree(zkQuorum, edge, degreeVal) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala index 0965f68..b10feb9 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala @@ -19,6 +19,7 @@ package org.apache.s2graph.core.storage +import com.typesafe.config.Config import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException import org.apache.s2graph.core._ import org.apache.s2graph.core.types.VertexId @@ -26,18 +27,18 @@ import org.apache.s2graph.core.utils.logger import scala.concurrent.{ExecutionContext, Future} -trait StorageReadable { +trait StorageReadable extends Fetcher { val io: StorageIO val serDe: StorageSerDe - /** - * responsible to fire parallel fetch call into storage and create future that will return merged result. - * - * @param queryRequests - * @param prevStepEdges - * @return - */ - def fetches(queryRequests: Seq[QueryRequest], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] +// /** +// * responsible to fire parallel fetch call into storage and create future that will return merged result. +// * +// * @param queryRequests +// * @param prevStepEdges +// * @return +// */ +// def fetches(queryRequests: Seq[QueryRequest], +// prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] @@ -92,4 +93,5 @@ trait StorageReadable { Future.sequence(futures).map(_.flatten) } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala index 80da3a9..8c2fb27 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageWritable.scala @@ -19,20 +19,21 @@ package org.apache.s2graph.core.storage +import org.apache.s2graph.core.Mutator + import scala.concurrent.{ExecutionContext, Future} -trait StorageWritable { +trait OptimisticMutator extends Mutator { /** * decide how to store given key values Seq[SKeyValue] into storage using storage's client. * note that this should be return true on all success. * we assumes that each storage implementation has client as member variable. * - * - * @param cluster: where this key values should be stored. - * @param kvs: sequence of SKeyValue that need to be stored in storage. - * @param withWait: flag to control wait ack from storage. - * note that in AsynchbaseStorage(which support asynchronous operations), even with true, - * it never block thread, but rather submit work and notified by event loop when storage send ack back. + * @param cluster : where this key values should be stored. + * @param kvs : sequence of SKeyValue that need to be stored in storage. + * @param withWait : flag to control wait ack from storage. + * note that in AsynchbaseStorage(which support asynchronous operations), even with true, + * it never block thread, but rather submit work and notified by event loop when storage send ack back. * @return ack message from storage. */ def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] @@ -55,10 +56,10 @@ trait StorageWritable { * for storage that does not support concurrency control, then storage implementation * itself can maintain manual locks that synchronize read(fetchSnapshotEdgeKeyValues) * and write(writeLock). + * * @param requestKeyValue * @param expectedOpt * @return */ def writeLock(requestKeyValue: SKeyValue, expectedOpt: Option[SKeyValue])(implicit ec: ExecutionContext): Future[MutateResponse] - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala index dcef1cc..bfc5bc6 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/WriteWriteConflictResolver.scala @@ -31,7 +31,7 @@ import scala.util.Random class WriteWriteConflictResolver(graph: S2GraphLike, serDe: StorageSerDe, io: StorageIO, - mutator: StorageWritable, + mutator: OptimisticMutator, fetcher: StorageReadable) { val BackoffTimeout = graph.BackoffTimeout val MaxRetryNum = graph.MaxRetryNum http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 8b3d862..4be3767 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -151,17 +151,9 @@ class AsynchbaseStorage(override val graph: S2GraphLike, override val management: StorageManagement = new AsynchbaseStorageManagement(config, clients) - override val mutator: StorageWritable = new AsynchbaseStorageWritable(client, clientWithFlush) - override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph) - override val fetcher: StorageReadable = new AsynchbaseStorageReadable(graph, config, client, serDe, io) - - // val hbaseExecutor: ExecutorService = - // if (config.getString("hbase.zookeeper.quorum") == "localhost") - // AsynchbaseStorage.initLocalHBase(config) - // else - // null - + override val reader: StorageReadable = new AsynchbaseStorageReadable(graph, config, client, serDe, io) + override val mutator: Mutator = new AsynchbaseStorageWritable(graph, serDe, reader, client, clientWithFlush) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala index 7ca3782..b4236b9 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorageWritable.scala @@ -20,14 +20,19 @@ package org.apache.s2graph.core.storage.hbase import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.storage.{IncrementResponse, MutateResponse, SKeyValue, StorageWritable} +import org.apache.s2graph.core.S2GraphLike +import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.utils.{Extensions, logger} import org.hbase.async.{AtomicIncrementRequest, DeleteRequest, HBaseClient, PutRequest} + import scala.collection.mutable.ArrayBuffer import scala.concurrent.{ExecutionContext, Future} -class AsynchbaseStorageWritable(val client: HBaseClient, - val clientWithFlush: HBaseClient) extends StorageWritable { +class AsynchbaseStorageWritable(val graph: S2GraphLike, + val serDe: StorageSerDe, + val reader: StorageReadable, + val client: HBaseClient, + val clientWithFlush: HBaseClient) extends DefaultOptimisticMutator(graph, serDe, reader) { import Extensions.DeferOps private def client(withWait: Boolean): HBaseClient = if (withWait) clientWithFlush else client http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala index 11fae17..b24e375 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala @@ -26,7 +26,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import com.google.common.hash.Hashing import com.typesafe.config.Config import org.apache.s2graph.core._ -import org.apache.s2graph.core.storage.Storage +import org.apache.s2graph.core.storage.{Storage, StorageManagement, StorageReadable, StorageSerDe} import org.apache.s2graph.core.storage.rocks.RocksHelper.RocksRPC import org.apache.s2graph.core.utils.logger import org.rocksdb._ @@ -150,11 +150,12 @@ class RocksStorage(override val graph: S2GraphLike, .maximumSize(1000 * 10 * 10 * 10 * 10) .build[String, ReentrantLock](cacheLoader) - override val management = new RocksStorageManagement(config, vdb, db) + override val management: StorageManagement = new RocksStorageManagement(config, vdb, db) - override val mutator = new RocksStorageWritable(db, vdb, lockMap) + override val serDe: StorageSerDe = new RocksStorageSerDe(graph) - override val serDe = new RocksStorageSerDe(graph) + override val reader: StorageReadable = new RocksStorageReadable(graph, config, db, vdb, serDe, io) + + override val mutator: Mutator = new RocksStorageWritable(graph, serDe, reader, db, vdb, lockMap) - override val fetcher = new RocksStorageReadable(graph, config, db, vdb, serDe, io) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala index 5db02cc..27e3efd 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala @@ -27,7 +27,7 @@ import org.apache.s2graph.core._ import org.apache.s2graph.core.schema.{Label, ServiceColumn} import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, RocksRPC, ScanWithRange} import org.apache.s2graph.core.storage.serde.StorageSerializable -import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageReadable, StorageSerDe} +import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.types.{HBaseType, VertexId} import org.rocksdb.RocksDB http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala index 7ec147d..d29ccce 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageWritable.scala @@ -23,15 +23,19 @@ import java.util.concurrent.locks.ReentrantLock import com.google.common.cache.{Cache, LoadingCache} import org.apache.hadoop.hbase.util.Bytes -import org.apache.s2graph.core.storage.{MutateResponse, SKeyValue, StorageWritable} +import org.apache.s2graph.core.S2GraphLike +import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.utils.logger import org.rocksdb.{RocksDB, RocksDBException, WriteBatch, WriteOptions} import scala.concurrent.{ExecutionContext, Future} -class RocksStorageWritable(val db: RocksDB, +class RocksStorageWritable(val graph: S2GraphLike, + val serDe: StorageSerDe, + val reader: StorageReadable, + val db: RocksDB, val vdb: RocksDB, - val lockMap: LoadingCache[String, ReentrantLock]) extends StorageWritable { + val lockMap: LoadingCache[String, ReentrantLock]) extends DefaultOptimisticMutator(graph, serDe, reader) { override def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext) = { if (kvs.isEmpty) { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala index 0748efb..8cd32d4 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala @@ -16,25 +16,23 @@ * specific language governing permissions and limitations * under the License. */ - -package org.apache.s2graph.core.storage.serde - -import org.apache.s2graph.core.schema.LabelMeta +package org.apache.s2graph.core.storage import org.apache.s2graph.core._ -import org.apache.s2graph.core.storage._ +import org.apache.s2graph.core.schema.LabelMeta import org.apache.s2graph.core.utils.logger import scala.concurrent.{ExecutionContext, Future} -class MutationHelper(storage: Storage) { - val serDe = storage.serDe - val io = storage.io - val fetcher = storage.fetcher - val mutator = storage.mutator - val conflictResolver = storage.conflictResolver +abstract class DefaultOptimisticMutator(graph: S2GraphLike, + serDe: StorageSerDe, + reader: StorageReadable) extends OptimisticMutator { + val fetcher = reader + + lazy val io: StorageIO = new StorageIO(graph, serDe) + lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, this, reader) - private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = - mutator.writeToStorage(cluster, kvs, withWait) +// private def writeToStorage(cluster: String, kvs: Seq[SKeyValue], withWait: Boolean)(implicit ec: ExecutionContext): Future[MutateResponse] = +// mutator.writeToStorage(cluster, kvs, withWait) def deleteAllFetchedEdgesAsyncOld(stepInnerResult: StepResult, requestTs: Long, @@ -93,7 +91,7 @@ class MutationHelper(storage: Storage) { val (bufferIncr, nonBufferIncr) = io.increments(edgeUpdate.deepCopy) - if (bufferIncr.nonEmpty) storage.writeToStorage(zkQuorum, bufferIncr, withWait = false) + if (bufferIncr.nonEmpty) writeToStorage(zkQuorum, bufferIncr, withWait = false) io.buildVertexPutsAsync(edge) ++ io.indexedEdgeMutations(edgeUpdate.deepCopy) ++ io.snapshotEdgeMutations(edgeUpdate.deepCopy) ++ nonBufferIncr } @@ -152,7 +150,7 @@ class MutationHelper(storage: Storage) { } val composed = for { - // deleteRet <- Future.sequence(deleteAllFutures) + // deleteRet <- Future.sequence(deleteAllFutures) mutateRet <- Future.sequence(mutateEdgeFutures) } yield mutateRet @@ -185,6 +183,6 @@ class MutationHelper(storage: Storage) { def updateDegree(zkQuorum: String, edge: S2EdgeLike, degreeVal: Long = 0)(implicit ec: ExecutionContext): Future[MutateResponse] = { val kvs = io.buildDegreePuts(edge, degreeVal) - mutator.writeToStorage(zkQuorum, kvs, withWait = true) + writeToStorage(zkQuorum, kvs, withWait = true) } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala new file mode 100644 index 0000000..6c76cdf --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core.model + +import com.typesafe.config.ConfigFactory +import org.apache.s2graph.core.Integrate.IntegrateCommon +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.schema.Label +import org.apache.s2graph.core.{Query, QueryParam} + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext} + +class FetcherTest extends IntegrateCommon { + + import scala.collection.JavaConverters._ + + test("MemoryModelFetcher") { + // 1. create label. + // 2. importLabel. + // 3. fetch. + val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get + val serviceColumn = + management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true))) + val labelName = "fetcher_test" + val options = + s"""{ + | + | "importer": { + | "${ModelManager.ClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter" + | }, + | "fetcher": { + | "${ModelManager.ClassNameKey}": "org.apache.s2graph.core.model.MemoryModelFetcher" + | } + |}""".stripMargin + + Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } + + val label = management.createLabel( + labelName, + serviceColumn, + serviceColumn, + true, + service.serviceName, + Seq.empty[Index].asJava, + Seq.empty[Prop].asJava, + "strong", + null, + -1, + "v3", + "gz", + options + ) + val config = ConfigFactory.parseString(options) + val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global) + Await.ready(importerFuture, Duration("60 seconds")) + + Thread.sleep(1000) + + val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "daewon") + val queryParam = QueryParam(labelName = labelName) + + val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam)) + val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds")) + + stepResult.edgeWithScores.foreach { es => + println(es.edge) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala index 391a99f..0bf62d9 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala @@ -37,7 +37,7 @@ import sangria.execution.deferred.DeferredResolver import sangria.marshalling.sprayJson._ import sangria.parser.QueryParser import sangria.schema.Schema -import spray.json.{JsObject, JsString} +import spray.json.{JsBoolean, JsObject, JsString} import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext @@ -63,8 +63,22 @@ object GraphQLServer { val schemaCache = new SafeUpdateCache(schemaConfig) - def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = { + def importModel(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = { + val ret = Try { + val spray.json.JsObject(fields) = requestJSON + val spray.json.JsString(labelName) = fields("label") + val jsOptions = fields("options") + + s2graph.management.importModel(labelName, jsOptions.compactPrint) + } + ret match { + case Success(f) => complete(f.map(i => OK -> JsString("start"))) + case Failure(e) => complete(InternalServerError -> spray.json.JsObject("message" -> JsString(e.toString))) + } + } + + def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = { val spray.json.JsObject(fields) = requestJSON val spray.json.JsString(query) = fields("query") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2357d810/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala index 685e87b..38cdce3 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala @@ -44,6 +44,8 @@ object Server extends App { val route: Flow[HttpRequest, HttpResponse, Any] = (post & path("graphql")) { entity(as[spray.json.JsValue])(GraphQLServer.endpoint) + } ~ (post & path("importModel")) { + entity(as[spray.json.JsValue])(GraphQLServer.importModel) } ~ { getFromResource("assets/graphiql.html") }
