Repository: incubator-s2graph Updated Branches: refs/heads/master 33f4d0550 -> 33e3d267e
- abstract traversing edges as Fetcher interface. Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/72c35a39 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/72c35a39 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/72c35a39 Branch: refs/heads/master Commit: 72c35a39e9f739d6df941d86db546811c9cb8a2a Parents: e674a25 Author: DO YUNG YOON <[email protected]> Authored: Thu Apr 26 14:26:06 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Thu Apr 26 14:26:06 2018 +0900 ---------------------------------------------------------------------- s2core/build.sbt | 3 +- .../scala/org/apache/s2graph/core/Fetcher.scala | 17 +++ .../org/apache/s2graph/core/Management.scala | 15 ++- .../org/apache/s2graph/core/QueryResult.scala | 4 +- .../scala/org/apache/s2graph/core/S2Graph.scala | 13 ++ .../org/apache/s2graph/core/S2GraphLike.scala | 7 + .../apache/s2graph/core/TraversalHelper.scala | 4 +- .../s2graph/core/model/AnnoyModelFetcher.scala | 87 ++++++++++++ .../s2graph/core/model/ImportStatus.scala | 40 ++++++ .../apache/s2graph/core/model/Importer.scala | 99 ++++++++++++++ .../s2graph/core/model/MemoryModelFetcher.scala | 41 ++++++ .../s2graph/core/model/ModelManager.scala | 87 ++++++++++++ .../org/apache/s2graph/core/schema/Label.scala | 20 +-- .../org/apache/s2graph/core/schema/Schema.scala | 4 +- .../apache/s2graph/core/schema/Service.scala | 2 +- .../apache/s2graph/core/storage/Storage.scala | 14 +- .../s2graph/core/storage/StorageReadable.scala | 22 +-- .../core/storage/hbase/AsynchbaseStorage.scala | 9 +- .../core/storage/rocks/RocksStorage.scala | 2 +- .../storage/rocks/RocksStorageReadable.scala | 2 +- .../core/storage/serde/MutationHelper.scala | 2 +- .../apache/s2graph/core/model/FetcherTest.scala | 134 +++++++++++++++++++ .../s2graph/core/model/HDFSImporterTest.scala | 80 +++++++++++ 23 files changed, 656 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/build.sbt ---------------------------------------------------------------------- diff --git a/s2core/build.sbt b/s2core/build.sbt index cc70e97..6e062cc 100644 --- a/s2core/build.sbt +++ b/s2core/build.sbt @@ -55,7 +55,8 @@ libraryDependencies ++= Seq( "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion excludeLogging(), "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion excludeLogging(), "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging(), - "org.scala-lang.modules" %% "scala-pickling" % "0.10.1" + "org.scala-lang.modules" %% "scala-pickling" % "0.10.1", + "com.spotify" % "annoy" % "0.2.5" ) libraryDependencies := { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala new file mode 100644 index 0000000..737beb3 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/Fetcher.scala @@ -0,0 +1,17 @@ +package org.apache.s2graph.core + +import com.typesafe.config.Config +import org.apache.s2graph.core.types.VertexId + +import scala.concurrent.{ExecutionContext, Future} + +trait Fetcher { + + def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = + Future.successful(this) + + def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] + + def close(): Unit = {} +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index d026e5b..868fac7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -20,6 +20,7 @@ package org.apache.s2graph.core import java.util +import java.util.concurrent.Executors import com.typesafe.config.{Config, ConfigFactory} import org.apache.s2graph.core.GraphExceptions.{InvalidHTableException, LabelAlreadyExistException, LabelNameTooLongException, LabelNotExistException} @@ -28,8 +29,10 @@ import org.apache.s2graph.core.schema._ import org.apache.s2graph.core.types.HBaseType._ import org.apache.s2graph.core.types._ import org.apache.s2graph.core.JSONParser._ +import org.apache.s2graph.core.model.Importer import play.api.libs.json._ +import scala.concurrent.{ExecutionContext, Future} import scala.util.Try /** @@ -70,7 +73,6 @@ object Management { case class Index(name: String, propNames: Seq[String], direction: Option[Int] = None, options: Option[String] = None) } - def findService(serviceName: String) = { Service.findByName(serviceName, useCache = false) } @@ -298,9 +300,18 @@ object Management { class Management(graph: S2GraphLike) { + val importEx = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) import Management._ - import scala.collection.JavaConversions._ + + def importModel(labelName: String): Future[Importer] = { + val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) + val config = label.toFetcherConfig.getOrElse { + throw new IllegalArgumentException(s"${label.label} is not importable since there is no configuration on label.") + } + + graph.modelManager.importModel(label, config)(importEx) + } def createStorageTable(zkAddr: String, tableName: String, http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala index be57017..4a1018f 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/QueryResult.scala @@ -237,7 +237,7 @@ object StepResult { // val newOrderByValues = updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, newScore) val newOrderByValues = - if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.getTsInnerValValue(), None, None) + if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.getTs(), None, None) else toTuple4(newT.toValues(globalQueryOption.orderByKeys)) val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys) @@ -262,7 +262,7 @@ object StepResult { // val newOrderByValues = updateScoreOnOrderByValues(globalQueryOption.scoreFieldIdx, t.orderByValues, newScore) val newOrderByValues = - if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.getTsInnerValValue(), None, None) + if (globalQueryOption.orderByKeys.isEmpty) (newScore, t.edge.getTs(), None, None) else toTuple4(newT.toValues(globalQueryOption.orderByKeys)) val newGroupByValues = newT.toValues(globalQueryOption.groupBy.keys) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala index 7816a63..43ab92c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Graph.scala @@ -27,6 +27,7 @@ import com.typesafe.config.{Config, ConfigFactory} import org.apache.commons.configuration.{BaseConfiguration, Configuration} import org.apache.s2graph.core.index.IndexProvider import org.apache.s2graph.core.io.tinkerpop.optimize.S2GraphStepStrategy +import org.apache.s2graph.core.model.ModelManager import org.apache.s2graph.core.schema._ import org.apache.s2graph.core.storage.hbase.AsynchbaseStorage import org.apache.s2graph.core.storage.rocks.RocksStorage @@ -186,6 +187,8 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap override val management = new Management(this) + override val modelManager = new ModelManager(this) + override val indexProvider = IndexProvider.apply(config) override val elementBuilder = new GraphElementBuilder(this) @@ -247,6 +250,16 @@ class S2Graph(_config: Config)(implicit val ec: ExecutionContext) extends S2Grap storagePool.getOrElse(s"label:${label.label}", defaultStorage) } + //TODO: + override def getFetcher(column: ServiceColumn): Fetcher = { + getStorage(column.service).reader + } + + override def getFetcher(label: Label): Fetcher = { + if (label.fetchConfigExist) modelManager.getFetcher(label) + else getStorage(label).reader + } + override def flushStorage(): Unit = { storagePool.foreach { case (_, storage) => http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala index cbd31cc..fef0078 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2GraphLike.scala @@ -31,6 +31,7 @@ import org.apache.s2graph.core.GraphExceptions.LabelNotExistException import org.apache.s2graph.core.S2Graph.{DefaultColumnName, DefaultServiceName} import org.apache.s2graph.core.features.{S2Features, S2GraphVariables} import org.apache.s2graph.core.index.IndexProvider +import org.apache.s2graph.core.model.ModelManager import org.apache.s2graph.core.schema.{Label, LabelMeta, Service, ServiceColumn} import org.apache.s2graph.core.storage.{MutateResponse, Storage} import org.apache.s2graph.core.types.{InnerValLike, VertexId} @@ -68,6 +69,8 @@ trait S2GraphLike extends Graph { val traversalHelper: TraversalHelper + val modelManager: ModelManager + lazy val MaxRetryNum: Int = config.getInt("max.retry.number") lazy val MaxBackOff: Int = config.getInt("max.back.off") lazy val BackoffTimeout: Int = config.getInt("back.off.timeout") @@ -90,6 +93,10 @@ trait S2GraphLike extends Graph { def getStorage(label: Label): Storage + def getFetcher(column: ServiceColumn): Fetcher + + def getFetcher(label: Label): Fetcher + def flushStorage(): Unit def shutdown(modelDataDelete: Boolean = false): Unit http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala index 0dc2aa2..003a2d1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/TraversalHelper.scala @@ -204,7 +204,7 @@ class TraversalHelper(graph: S2GraphLike) { val aggFuture = requestsPerLabel.foldLeft(Future.successful(Map.empty[Int, StepResult])) { case (prevFuture, (label, reqWithIdxs)) => for { prev <- prevFuture - cur <- graph.getStorage(label).fetches(reqWithIdxs.map(_._1), prevStepEdges) + cur <- graph.getFetcher(label).fetches(reqWithIdxs.map(_._1), prevStepEdges) } yield { prev ++ reqWithIdxs.map(_._2).zip(cur).toMap } @@ -389,7 +389,7 @@ class TraversalHelper(graph: S2GraphLike) { val newEdgeWithScore = edgeWithScore.copy(edge = newEdge) /* OrderBy */ val orderByValues = - if (queryOption.orderByKeys.isEmpty) (score, edge.getTsInnerValValue(), None, None) + if (queryOption.orderByKeys.isEmpty) (score, edge.getTs(), None, None) else StepResult.toTuple4(newEdgeWithScore.toValues(queryOption.orderByKeys)) /* StepGroupBy */ http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala new file mode 100644 index 0000000..df083fa --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala @@ -0,0 +1,87 @@ +package org.apache.s2graph.core.model + +import java.io.File + +import com.spotify.annoy.{ANNIndex, IndexType} +import com.typesafe.config.Config +import org.apache.s2graph.core.types.VertexId +import org.apache.s2graph.core._ + +import scala.concurrent.{ExecutionContext, Future} +import scala.io.Source +import scala.util.Try + +object AnnoyModelFetcher { + val IndexFilePathKey = "annoyIndexFilePath" + val DictFilePathKey = "annoyDictFilePath" + val DimensionKey = "annoyIndexDimension" + val IndexTypeKey = "annoyIndexType" + + def loadDictFromLocal(file: File): Array[String] = { + Source.fromFile(file).getLines().map { line => + line.stripMargin + }.toArray + } + + def buildIndex(config: Config): ANNIndexWithDict = { + val filePath = config.getString(IndexFilePathKey) + val dictPath = config.getString(DictFilePathKey) + + val dimension = config.getInt(DimensionKey) + val indexType = Try { config.getString(IndexTypeKey) }.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR) + + val dict = loadDictFromLocal(new File(dictPath)) + val index = new ANNIndex(dimension, filePath, indexType) + ANNIndexWithDict(index, dict) + } +} + +case class ANNIndexWithDict(index: ANNIndex, dict: Array[String]) { + val dictRev = dict.zipWithIndex.toMap +} + +class AnnoyModelFetcher(val graph: S2GraphLike) extends Fetcher { + import scala.collection.JavaConverters._ + val builder = graph.elementBuilder + + var model: ANNIndexWithDict = _ + + override def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = { + Future { + model = AnnoyModelFetcher.buildIndex(config) + + this + } + } + + /** Fetch **/ + override def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = { + val stepResultLs = queryRequests.map { queryRequest => + val vertex = queryRequest.vertex + val queryParam = queryRequest.queryParam + + val srcIndexOpt = model.dictRev.get(vertex.innerId.toIdString()) + + srcIndexOpt.map { srcIdx => + val srcVector = model.index.getItemVector(srcIdx) + val nns = model.index.getNearest(srcVector, queryParam.limit).asScala + + val edges = nns.map { tgtIdx => + val tgtVertexId = builder.newVertexId(queryParam.label.service, + queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), model.dict(tgtIdx)) + + graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction) + } + val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label)) + StepResult(edgeWithScores, Nil, Nil) + }.getOrElse(StepResult.Empty) + } + + Future.successful(stepResultLs) + } + + override def close(): Unit = { + // do clean up + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala new file mode 100644 index 0000000..63e8cdd --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/ImportStatus.scala @@ -0,0 +1,40 @@ +package org.apache.s2graph.core.model + +import java.util.concurrent.atomic.AtomicInteger + +trait ImportStatus { + val done: AtomicInteger + + def isCompleted: Boolean + + def percentage: Int + + val total: Int +} + +class ImportRunningStatus(val total: Int) extends ImportStatus { + require(total > 0, s"Total should be positive: $total") + + val done = new AtomicInteger(0) + + def isCompleted: Boolean = total == done.get + + def percentage = 100 * done.get / total +} + +case object ImportDoneStatus extends ImportStatus { + val total = 1 + + val done = new AtomicInteger(1) + + def isCompleted: Boolean = true + + def percentage = 100 +} + +object ImportStatus { + def apply(total: Int): ImportStatus = new ImportRunningStatus(total) + + def unapply(importResult: ImportStatus): Option[(Boolean, Int, Int)] = + Some((importResult.isCompleted, importResult.total, importResult.done.get)) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala new file mode 100644 index 0000000..5265483 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/Importer.scala @@ -0,0 +1,99 @@ +package org.apache.s2graph.core.model + +import java.io.File + +import com.typesafe.config.Config +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.s2graph.core.{Fetcher, S2GraphLike} +import org.apache.s2graph.core.utils.logger + +import scala.concurrent.{ExecutionContext, Future} + +object Importer { + def toHDFSConfiguration(hdfsConfDir: String): Configuration = { + val conf = new Configuration + + val hdfsConfDirectory = new File(hdfsConfDir) + if (hdfsConfDirectory.exists()) { + if (!hdfsConfDirectory.isDirectory || !hdfsConfDirectory.canRead) { + throw new IllegalStateException(s"HDFS configuration directory ($hdfsConfDirectory) cannot be read.") + } + + val path = hdfsConfDirectory.getAbsolutePath + conf.addResource(new Path(s"file:///$path/core-site.xml")) + conf.addResource(new Path(s"file:///$path/hdfs-site.xml")) + } else { + logger.warn("RocksDBImporter doesn't have valid hadoop configuration directory..") + } + conf + } +} + +trait Importer { + @volatile var isFinished: Boolean = false + def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] + + def status: Boolean = isFinished + + def setStatus(otherStatus: Boolean): Boolean = { + this.isFinished = otherStatus + this.isFinished + } +// def status: ImportStatus + +// def getImportedStorage(graphExecutionContext: ExecutionContext): Storage[_, _] + def close(): Unit +} +case class IdentityImporter(graph: S2GraphLike) extends Importer { + override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = { + Future.successful(this) + } + + override def close(): Unit = {} +} +object HDFSImporter { + import scala.collection.JavaConverters._ + val PathsKey = "paths" + val HDFSConfDirKey = "hdfsConfDir" + + def extractPaths(config: Config): Map[String, String] = { + config.getConfigList(PathsKey).asScala.map { e => + val key = e.getString("src") + val value = e.getString("tgt") + + key -> value + }.toMap + } +} +case class HDFSImporter(graph: S2GraphLike) extends Importer { + + import HDFSImporter._ + + override def run(config: Config)(implicit ec: ExecutionContext): Future[Importer] = { + Future { + val paths = extractPaths(config) + val hdfsConfiDir = config.getString(HDFSConfDirKey) + + val hadoopConfig = Importer.toHDFSConfiguration(hdfsConfiDir) + val fs = FileSystem.get(hadoopConfig) + + def copyToLocal(remoteSrc: String, localSrc: String): Unit = { + val remoteSrcPath = new Path(remoteSrc) + val localSrcPath = new Path(localSrc) + + fs.copyToLocalFile(remoteSrcPath, localSrcPath) + } + + paths.foreach { case (srcPath, tgtPath) => + copyToLocal(srcPath, tgtPath) + } + + this + } + } + +// override def status: ImportStatus = ??? + + override def close(): Unit = {} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala new file mode 100644 index 0000000..1b0474a --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/MemoryModelFetcher.scala @@ -0,0 +1,41 @@ +package org.apache.s2graph.core.model + +import com.typesafe.config.Config +import org.apache.s2graph.core.types.{InnerValLikeWithTs, VertexId} +import org.apache.s2graph.core._ +import org.apache.s2graph.core.schema.LabelMeta + +import scala.concurrent.{ExecutionContext, Future} + +/** + * Reference implementation for Fetcher interface. + * it only produce constant edges. + */ +class MemoryModelFetcher(val graph: S2GraphLike) extends Fetcher { + val builder = graph.elementBuilder + val ranges = (0 until 10) + + override def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = { + Future.successful(this) + } + + override def fetches(queryRequests: Seq[QueryRequest], + prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = { + val stepResultLs = queryRequests.map { queryRequest => + val queryParam = queryRequest.queryParam + val edges = ranges.map { ith => + val tgtVertexId = builder.newVertexId(queryParam.label.service, queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), ith.toString) + + graph.toEdge(queryRequest.vertex.innerIdVal, + tgtVertexId.innerId.value, queryParam.label.label, queryParam.direction) + } + + val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label)) + StepResult(edgeWithScores, Nil, Nil) + } + + Future.successful(stepResultLs) + } + + override def close(): Unit = {} +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala new file mode 100644 index 0000000..4afd3e3 --- /dev/null +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/ModelManager.scala @@ -0,0 +1,87 @@ +package org.apache.s2graph.core.model + +import com.typesafe.config.Config +import org.apache.s2graph.core.schema.Label +import org.apache.s2graph.core.utils.logger +import org.apache.s2graph.core.{Fetcher, S2GraphLike} + +import scala.concurrent.{ExecutionContext, Future} + +object ModelManager { + val FetcherClassNameKey = "fetchClassName" + val ImporterClassNameKey = "importerClassName" +} + +class ModelManager(s2GraphLike: S2GraphLike) { + import ModelManager._ + + private val fetcherPool = scala.collection.mutable.Map.empty[String, Fetcher] + private val ImportLock = new java.util.concurrent.ConcurrentHashMap[String, Importer] + + def toImportLockKey(label: Label): String = label.label + + def getFetcher(label: Label): Fetcher = { + fetcherPool.getOrElse(toImportLockKey(label), throw new IllegalStateException(s"$label is not imported.")) + } + + def initImporter(config: Config): Importer = { + val className = config.getString(ImporterClassNameKey) + + Class.forName(className) + .getConstructor(classOf[S2GraphLike]) + .newInstance(s2GraphLike) + .asInstanceOf[Importer] + } + + def initFetcher(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = { + val className = config.getString(FetcherClassNameKey) + + val fetcher = Class.forName(className) + .getConstructor(classOf[S2GraphLike]) + .newInstance(s2GraphLike) + .asInstanceOf[Fetcher] + + fetcher.init(config) + } + + def importModel(label: Label, config: Config)(implicit ec: ExecutionContext): Future[Importer] = { + val importer = ImportLock.computeIfAbsent(toImportLockKey(label), new java.util.function.Function[String, Importer] { + override def apply(k: String): Importer = { + val importer = initImporter(config.getConfig("importer")) + + //TODO: Update Label's extra options. + importer + .run(config.getConfig("importer")) + .map { importer => + logger.info(s"Close importer") + importer.close() + + initFetcher(config.getConfig("fetcher")).map { fetcher => + importer.setStatus(true) + + + fetcherPool + .remove(k) + .foreach { oldFetcher => + logger.info(s"Delete old storage ($k) => $oldFetcher") + oldFetcher.close() + } + + fetcherPool += (k -> fetcher) + true + } + + true + } + .onComplete { _ => + logger.info(s"ImportLock release: $k") + ImportLock.remove(k) + } + importer + } + }) + + Future.successful(importer) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala index 7fb1183..c671381 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala @@ -369,19 +369,6 @@ case class Label(id: Option[Int], label: String, prop <- metaProps if LabelMeta.isValidSeq(prop.seq) jsValue <- innerValToJsValue(toInnerVal(prop.defaultValue, prop.dataType, schemaVersion), prop.dataType) } yield prop -> jsValue).toMap -// lazy val extraOptions = Model.extraOptions(Option("""{ -// "storage": { -// "s2graph.storage.backend": "rocks", -// "rocks.db.path": "/tmp/db" -// } -// }""")) - - lazy val tokens: Set[String] = extraOptions.get("tokens").fold(Set.empty[String]) { - case JsArray(tokens) => tokens.map(_.as[String]).toSet - case _ => - logger.error("Invalid token JSON") - Set.empty[String] - } lazy val extraOptions = Schema.extraOptions(options) @@ -389,8 +376,13 @@ case class Label(id: Option[Int], label: String, lazy val storageConfigOpt: Option[Config] = toStorageConfig + lazy val fetchConfigExist: Boolean = toFetcherConfig.isDefined + + def toFetcherConfig: Option[Config] = { + Schema.toConfig(extraOptions, "fetcher") + } def toStorageConfig: Option[Config] = { - Schema.toStorageConfig(extraOptions) + Schema.toConfig(extraOptions, "storage") } def srcColumnWithDir(dir: Int) = { http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala index ebae966..255cd5a 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Schema.scala @@ -174,9 +174,9 @@ object Schema { } } - def toStorageConfig(options: Map[String, JsValue]): Option[Config] = { + def toConfig(options: Map[String, JsValue], key: String): Option[Config] = { try { - options.get("storage").map { jsValue => + options.get(key).map { jsValue => import scala.collection.JavaConverters._ val configMap = jsValue.as[JsObject].fieldSet.toMap.map { case (key, value) => key -> JSONParser.jsValueToAny(value).getOrElse(throw new RuntimeException("!!")) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala index 611a746..dbbfed7 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Service.scala @@ -129,5 +129,5 @@ case class Service(id: Option[Int], lazy val extraOptions = Schema.extraOptions(options) lazy val storageConfigOpt: Option[Config] = toStorageConfig def serviceColumns(useCache: Boolean): Seq[ServiceColumn] = ServiceColumn.findByServiceId(id.get, useCache = useCache) - def toStorageConfig: Option[Config] = Schema.toStorageConfig(extraOptions) + def toStorageConfig: Option[Config] = Schema.toConfig(extraOptions, "storage") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index 18f6b1e..6ad62b1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -40,7 +40,7 @@ abstract class Storage(val graph: S2GraphLike, * Given QueryRequest/Vertex/Edge, fetch KeyValue from storage * then convert them into Edge/Vertex */ - val fetcher: StorageReadable + val reader: StorageReadable /* * Serialize Edge/Vertex, to common KeyValue, SKeyValue that @@ -60,7 +60,7 @@ abstract class Storage(val graph: S2GraphLike, * Note that it require storage backend specific implementations for * all of StorageWritable, StorageReadable, StorageSerDe, StorageIO */ - lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, fetcher) + lazy val conflictResolver: WriteWriteConflictResolver = new WriteWriteConflictResolver(graph, serDe, io, mutator, reader) lazy val mutationHelper: MutationHelper = new MutationHelper(this) @@ -74,17 +74,17 @@ abstract class Storage(val graph: S2GraphLike, /** Fetch **/ def fetches(queryRequests: Seq[QueryRequest], prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] = - fetcher.fetches(queryRequests, prevStepEdges) + reader.fetches(queryRequests, prevStepEdges) def fetchVertices(vertices: Seq[S2VertexLike])(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = - fetcher.fetchVertices(vertices) + reader.fetchVertices(vertices) - def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = fetcher.fetchEdgesAll() + def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] = reader.fetchEdgesAll() - def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = fetcher.fetchVerticesAll() + def fetchVerticesAll()(implicit ec: ExecutionContext): Future[Seq[S2VertexLike]] = reader.fetchVerticesAll() def fetchSnapshotEdgeInner(edge: S2EdgeLike)(implicit ec: ExecutionContext): Future[(Option[S2EdgeLike], Option[SKeyValue])] = - fetcher.fetchSnapshotEdgeInner(edge) + reader.fetchSnapshotEdgeInner(edge) /** Management **/ def flush(): Unit = management.flush() http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala index 0965f68..b10feb9 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/StorageReadable.scala @@ -19,6 +19,7 @@ package org.apache.s2graph.core.storage +import com.typesafe.config.Config import org.apache.s2graph.core.GraphExceptions.FetchTimeoutException import org.apache.s2graph.core._ import org.apache.s2graph.core.types.VertexId @@ -26,18 +27,18 @@ import org.apache.s2graph.core.utils.logger import scala.concurrent.{ExecutionContext, Future} -trait StorageReadable { +trait StorageReadable extends Fetcher { val io: StorageIO val serDe: StorageSerDe - /** - * responsible to fire parallel fetch call into storage and create future that will return merged result. - * - * @param queryRequests - * @param prevStepEdges - * @return - */ - def fetches(queryRequests: Seq[QueryRequest], - prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] +// /** +// * responsible to fire parallel fetch call into storage and create future that will return merged result. +// * +// * @param queryRequests +// * @param prevStepEdges +// * @return +// */ +// def fetches(queryRequests: Seq[QueryRequest], +// prevStepEdges: Map[VertexId, Seq[EdgeWithScore]])(implicit ec: ExecutionContext): Future[Seq[StepResult]] def fetchEdgesAll()(implicit ec: ExecutionContext): Future[Seq[S2EdgeLike]] @@ -92,4 +93,5 @@ trait StorageReadable { Future.sequence(futures).map(_.flatten) } + } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 8b3d862..e233277 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -155,13 +155,6 @@ class AsynchbaseStorage(override val graph: S2GraphLike, override val serDe: StorageSerDe = new AsynchbaseStorageSerDe(graph) - override val fetcher: StorageReadable = new AsynchbaseStorageReadable(graph, config, client, serDe, io) - - // val hbaseExecutor: ExecutorService = - // if (config.getString("hbase.zookeeper.quorum") == "localhost") - // AsynchbaseStorage.initLocalHBase(config) - // else - // null - + override val reader: StorageReadable = new AsynchbaseStorageReadable(graph, config, client, serDe, io) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala index 11fae17..e53aeb3 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorage.scala @@ -156,5 +156,5 @@ class RocksStorage(override val graph: S2GraphLike, override val serDe = new RocksStorageSerDe(graph) - override val fetcher = new RocksStorageReadable(graph, config, db, vdb, serDe, io) + override val reader = new RocksStorageReadable(graph, config, db, vdb, serDe, io) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala index 5db02cc..27e3efd 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/rocks/RocksStorageReadable.scala @@ -27,7 +27,7 @@ import org.apache.s2graph.core._ import org.apache.s2graph.core.schema.{Label, ServiceColumn} import org.apache.s2graph.core.storage.rocks.RocksHelper.{GetRequest, RocksRPC, ScanWithRange} import org.apache.s2graph.core.storage.serde.StorageSerializable -import org.apache.s2graph.core.storage.{SKeyValue, StorageIO, StorageReadable, StorageSerDe} +import org.apache.s2graph.core.storage._ import org.apache.s2graph.core.types.{HBaseType, VertexId} import org.rocksdb.RocksDB http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala index 0748efb..fecc6ea 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/serde/MutationHelper.scala @@ -29,7 +29,7 @@ import scala.concurrent.{ExecutionContext, Future} class MutationHelper(storage: Storage) { val serDe = storage.serDe val io = storage.io - val fetcher = storage.fetcher + val fetcher = storage.reader val mutator = storage.mutator val conflictResolver = storage.conflictResolver http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala new file mode 100644 index 0000000..e89f8de --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/model/FetcherTest.scala @@ -0,0 +1,134 @@ +package org.apache.s2graph.core.model + +import com.typesafe.config.ConfigFactory +import org.apache.s2graph.core.Integrate.IntegrateCommon +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.schema.Label +import org.apache.s2graph.core.{Query, QueryParam} + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext} + +class FetcherTest extends IntegrateCommon{ + import TestUtil._ + import scala.collection.JavaConverters._ + + test("MemoryModelFetcher") { + // 1. create label. + // 2. importLabel. + // 3. fetch. + val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get + val serviceColumn = + management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true))) + val labelName = "fetcher_test" + val options = s"""{ + | + | "importer": { + | "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter" + | }, + | "fetcher": { + | "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.MemoryModelFetcher" + | } + |}""".stripMargin + + Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } + + val label = management.createLabel( + labelName, + serviceColumn, + serviceColumn, + true, + service.serviceName, + Seq.empty[Index].asJava, + Seq.empty[Prop].asJava, + "strong", + null, + -1, + "v3", + "gz", + options + ) + val config = ConfigFactory.parseString(options) + val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global) + Await.ready(importerFuture, Duration("60 seconds")) + + Thread.sleep(1000) + + val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "daewon") + val queryParam = QueryParam(labelName = labelName) + + val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam)) + val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds")) + + stepResult.edgeWithScores.foreach { es => + println(es.edge) + } + } + test("AnnoyModelFetcher") { + + val labelName = "annoy_model_fetcher_test" + val hdfsConfDir = "/usr/local/Cellar/hadoop/2.7.3/libexec/etc/hadoop/" + + val REMOTE_INDEX_FILE = "/Users/shon/Downloads/test-index.tree" + val LOCAL_INDEX_FILE = "./test-index.tree" + val REMOTE_DICT_FILE = "/Users/shon/Downloads/test-index.dict" + val LOCAL_DICT_FILE = "./test-index.dict" + + val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get + val serviceColumn = + management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true))) + + val options = s"""{ + | "importer": { + | "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.HDFSImporter", + | "${HDFSImporter.HDFSConfDirKey}": "$hdfsConfDir", + | "${HDFSImporter.PathsKey}": [{ + | "src": "${REMOTE_INDEX_FILE}", + | "tgt": "${LOCAL_INDEX_FILE}" + | }, { + | "src": "${REMOTE_DICT_FILE}", + | "tgt": "${LOCAL_DICT_FILE}" + | }] + | }, + | "fetcher": { + | "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.AnnoyModelFetcher", + | "${AnnoyModelFetcher.IndexFilePathKey}": "${LOCAL_INDEX_FILE}", + | "${AnnoyModelFetcher.DictFilePathKey}": "${LOCAL_DICT_FILE}", + | "${AnnoyModelFetcher.DimensionKey}": 10 + | } + |}""".stripMargin + + Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } + + val label = management.createLabel( + labelName, + serviceColumn, + serviceColumn, + true, + service.serviceName, + Seq.empty[Index].asJava, + Seq.empty[Prop].asJava, + "strong", + null, + -1, + "v3", + "gz", + options + ) + val config = ConfigFactory.parseString(options) + val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global) + Await.result(importerFuture, Duration("3 minutes")) + + Thread.sleep(10000) + + val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "0") + val queryParam = QueryParam(labelName = labelName, limit = 5) + + val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam)) + val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds")) + + stepResult.edgeWithScores.foreach { es => + println(es.edge) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/72c35a39/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala new file mode 100644 index 0000000..385c9a7 --- /dev/null +++ b/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala @@ -0,0 +1,80 @@ +package org.apache.s2graph.core.model + +import com.typesafe.config.ConfigFactory +import org.apache.s2graph.core.Integrate.IntegrateCommon +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.{Query, QueryParam} +import org.apache.s2graph.core.schema.Label + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext} + +class HDFSImporterTest extends IntegrateCommon { + import scala.collection.JavaConverters._ + + test("hdfs test.") { + + val labelName = "hdfs_importer_test" + val hdfsConfDir = "/usr/local/Cellar/hadoop/2.7.3/libexec/etc/hadoop/" + + val REMOTE_INDEX_FILE = "/Users/shon/Downloads/test-index.tree" + val LOCAL_INDEX_FILE = "./test-index.tree" + val REMOTE_DICT_FILE = "/Users/shon/Downloads/test-index.dict" + val LOCAL_DICT_FILE = "./test-index.dict" + + val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get + val serviceColumn = + management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true))) + + val options = s"""{ + | "importer": { + | "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.HDFSImporter", + | "${HDFSImporter.HDFSConfDirKey}": "$hdfsConfDir", + | "${HDFSImporter.PathsKey}": [{ + | "src": "${REMOTE_INDEX_FILE}", + | "tgt": "${LOCAL_INDEX_FILE}" + | }, { + | "src": "${REMOTE_DICT_FILE}", + | "tgt": "${LOCAL_DICT_FILE}" + | } + | ] + | }, + | "fetcher": { + | "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.MemoryModelFetcher" + | } + |}""".stripMargin + + Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } + + val label = management.createLabel( + labelName, + serviceColumn, + serviceColumn, + true, + service.serviceName, + Seq.empty[Index].asJava, + Seq.empty[Prop].asJava, + "strong", + null, + -1, + "v3", + "gz", + options + ) + val config = ConfigFactory.parseString(options) + val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global) + Await.result(importerFuture, Duration("3 minutes")) + + Thread.sleep(10000) + + val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "0") + val queryParam = QueryParam(labelName = labelName, limit = 5) + + val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam)) + val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds")) + + stepResult.edgeWithScores.foreach { es => + println(es.edge) + } + } +}
