start working on ModelServing examples.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/130fed26 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/130fed26 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/130fed26 Branch: refs/heads/master Commit: 130fed262f5a4a529fed764103b5ee275a10e508 Parents: 8696d15 Author: DO YUNG YOON <[email protected]> Authored: Thu May 3 18:07:46 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Thu May 3 18:07:46 2018 +0900 ---------------------------------------------------------------------- example/movielens/jobdesc.template | 43 +++++++- example/run.sh | 3 + .../apache/s2graph/s2jobs/JobDescription.scala | 4 +- .../custom/process/ALSAnnoyBuildProcess.scala | 87 --------------- .../task/custom/process/ALSModelProcess.scala | 110 +++++++++++++++++++ .../process/ALSAnnoyBuildProcessTest.scala | 104 ------------------ .../custom/process/ALSModelProcessTest.scala | 104 ++++++++++++++++++ 7 files changed, 260 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/example/movielens/jobdesc.template ---------------------------------------------------------------------- diff --git a/example/movielens/jobdesc.template b/example/movielens/jobdesc.template index 0b0f0ad..13ef1cd 100644 --- a/example/movielens/jobdesc.template +++ b/example/movielens/jobdesc.template @@ -56,7 +56,7 @@ ], "type": "sql", "options": { - "sql": "SELECT \n(unix_timestamp() * 1000) as timestamp, \n'v' as elem, \nCAST(movieId AS LONG) AS id, \n'movielens' as service, \n'Movie' as column, \nto_json(\nnamed_struct(\n 'title', title, \n 'genres', genres\n)\n) as props \nFROM movies \nWHERE movieId != 'movieId'" + "sql": "SELECT \n(unix_timestamp() * 1000) as timestamp, \n'v' as elem, \nCAST(movieId AS INT) AS id, \n'movielens' as service, \n'Movie' as column, \nto_json(\nnamed_struct(\n 'title', title, \n 'genres', genres\n)\n) as props \nFROM movies \nWHERE movieId != 'movieId'" } }, { @@ -66,7 +66,7 @@ ], "type": "sql", "options": { - "sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS LONG) as `from`, \nCAST(movieId AS LONG) as to, \n'rated' as label, \nto_json(\nnamed_struct(\n 'score', CAST(rating as float)\n)\n) as props \nFROM ratings \nWHERE userId != 'userId'" + "sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS INT) as `from`, \nCAST(movieId AS INT) as to, \n'rated' as label, \nto_json(\nnamed_struct(\n 'score', CAST(rating as float)\n)\n) as props \nFROM ratings \nWHERE userId != 'userId'" } }, { @@ -76,7 +76,7 @@ ], "type": "sql", "options": { - "sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS LONG) as `from`, \nCAST(movieId AS LONG) as to, \n'tagged' as label, \nto_json(\nnamed_struct('tag', tag)\n) as props \nFROM tags \nWHERE userId != 'userId'" + "sql": "SELECT \nCAST(timestamp AS LONG) * 1000 AS timestamp, \n'e' as elem, \nCAST(userId AS INT) as `from`, \nCAST(movieId AS INT) as to, \n'tagged' as label, \nto_json(\nnamed_struct('tag', tag)\n) as props \nFROM tags \nWHERE userId != 'userId'" } }, { @@ -89,6 +89,32 @@ "options": { "sql": "SELECT * FROM edge_rated UNION SELECT * FROM edge_tagged" } + }, + { + "name": "build_als_input", + "inputs": [ + "edge_rated" + ], + "type": "sql", + "options": { + "sql": "SELECT \n`from` as userId, `to` as movieId, 1.0 as rating FROM edge_rated" + } + }, + { + "name": "factorize_rating", + "inputs": [ + "build_als_input" + ], + "type": "custom", + "options": { + "class": "org.apache.s2graph.s2jobs.task.custom.process.ALSModelProcess", + "rank": "10", + "maxIter": "5", + "regParam": "0.01", + "userCol": "userId", + "itemCol": "movieId", + "ratingCol": "rating" + } } ], "sink": [ @@ -117,6 +143,17 @@ "s2.spark.sql.streaming.sink.grouped.size": "10", "s2.spark.sql.streaming.sink.wait.time": "10" } + }, + { + "name": "annoy_index_build", + "inputs": [ + "factorize_rating" + ], + "type": "annoy", + "options": { + "itemFactors": "/tmp/itemFactors", + "path": "/tmp/annoy_result" + } } ] } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/example/run.sh ---------------------------------------------------------------------- diff --git a/example/run.sh b/example/run.sh index fd324ac..5256974 100644 --- a/example/run.sh +++ b/example/run.sh @@ -41,3 +41,6 @@ q "Finally, we import example data to service" sh ./import_data.sh $SERVICE [ $? -ne 0 ] && { exit -1; } +#q "Run ML Model into S2Graph by importing Model." +#sh ./import_model.sh $SERVICE +#[ $? -ne 0 ] && { exit -1; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala index 6abbe86..dc32bc5 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/JobDescription.scala @@ -21,6 +21,7 @@ package org.apache.s2graph.s2jobs import play.api.libs.json.{JsValue, Json} import org.apache.s2graph.s2jobs.task._ +import org.apache.s2graph.s2jobs.task.custom.process.AnnoyIndexBuildSink case class JobDescription( name:String, @@ -65,7 +66,7 @@ object JobDescription extends Logger { logger.debug(s"custom class init.. $customClass") Class.forName(customClass) - .getConstructor(TaskConf.getClass) + .getConstructor(classOf[TaskConf]) .newInstance(conf) .asInstanceOf[task.Process] @@ -82,6 +83,7 @@ object JobDescription extends Logger { case "file" => new FileSink(jobName, conf) case "es" => new ESSink(jobName, conf) case "s2graph" => new S2GraphSink(jobName, conf) + case "annoy" => new AnnoyIndexBuildSink(jobName, conf) case _ => throw new IllegalArgumentException(s"unsupported sink type : ${conf.`type`}") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcess.scala deleted file mode 100644 index 968bf5a..0000000 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcess.scala +++ /dev/null @@ -1,87 +0,0 @@ -package org.apache.s2graph.s2jobs.task.custom.process - -import java.io.File - -import annoy4s.{Angular, Annoy} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} -import org.apache.s2graph.s2jobs.task.TaskConf -import org.apache.spark.ml.recommendation.{ALS, ALSModel} -import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} - -object ALSAnnoyBuildProcess { - - def buildAnnoyIndex(ss: SparkSession, - conf: TaskConf, - dataFrame: DataFrame): Unit = { - // annoy tree params. - val outputPath = conf.options("outputPath") - val localInputPath = conf.options("localInputPath") - val localIndexPath = conf.options("localIndexPath") - val numDimensions = conf.options.getOrElse("dimensions", "10").toInt - - // als model params. - val rank = conf.options.getOrElse("rank", numDimensions.toString).toInt - val maxIter = conf.options.getOrElse("maxIter", "5").toInt - val regParam = conf.options.getOrElse("regParam", "0.01").toDouble - val userCol = conf.options.getOrElse("userCol", "userId") - val itemCol = conf.options.getOrElse("itemCol", "movieId") - val ratingCol = conf.options.getOrElse("ratingCol", "rating") - - assert(rank == numDimensions) - - val als = new ALS() - .setRank(rank) - .setMaxIter(maxIter) - .setRegParam(regParam) - .setUserCol(userCol) - .setItemCol(itemCol) - .setRatingCol(ratingCol) - - val model = als.fit(dataFrame) - - saveFeatures(ss, model.itemFactors, outputPath) - copyToLocal(ss.sparkContext.hadoopConfiguration, outputPath, localInputPath) - - FileUtil.fullyDelete(new File(localIndexPath)) - - Annoy.create[Int](s"${localInputPath}", numDimensions, outputDir = s"$localIndexPath", Angular) - } - - def saveFeatures(ss: SparkSession, - dataFrame: DataFrame, - outputPath: String, - idCol: String = "id", - featuresCol: String = "features"): Unit = { - import ss.sqlContext.implicits._ - - val result = dataFrame.map { row => - val id = row.getAs[Int](idCol) - val vector = row.getAs[Seq[Float]](featuresCol) - (Seq(id) ++ vector).mkString(" ") - } - - result.write.mode(SaveMode.Overwrite).csv(outputPath) - } - - def copyToLocal(configuration: Configuration, - remoteInputPath: String, - localOutputPath: String, - merge: Boolean = true): Unit = { - val fs = FileSystem.get(configuration) - val localFs = FileSystem.getLocal(configuration) - localFs.deleteOnExit(new Path(localOutputPath)) - - if (merge) - FileUtil.copyMerge(fs, new Path(remoteInputPath), localFs, new Path(localOutputPath), false, configuration, "") - else - fs.copyToLocalFile(new Path(remoteInputPath), new Path(localOutputPath)) - } -} -class ALSAnnoyBuildProcess(conf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(conf) { - override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = ??? - - override def mandatoryOptions: Set[String] = Set("outputPath") - - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala new file mode 100644 index 0000000..26ef6ad --- /dev/null +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala @@ -0,0 +1,110 @@ +package org.apache.s2graph.s2jobs.task.custom.process + +import java.io.File + +import annoy4s.{Angular, Annoy} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.s2graph.s2jobs.task.{Sink, TaskConf} +import org.apache.spark.ml.recommendation.{ALS, ALSModel} +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} + +object ALSModelProcess { + + def runALS(ss: SparkSession, + conf: TaskConf, + dataFrame: DataFrame): DataFrame = { + // als model params. + val rank = conf.options.getOrElse("rank", "10").toInt + val maxIter = conf.options.getOrElse("maxIter", "5").toInt + val regParam = conf.options.getOrElse("regParam", "0.01").toDouble + val userCol = conf.options.getOrElse("userCol", "userId") + val itemCol = conf.options.getOrElse("itemCol", "movieId") + val ratingCol = conf.options.getOrElse("ratingCol", "rating") + +// assert(rank == numDimensions) + + val als = new ALS() + .setRank(rank) + .setMaxIter(maxIter) + .setRegParam(regParam) + .setUserCol(userCol) + .setItemCol(itemCol) + .setRatingCol(ratingCol) + + val model = als.fit(dataFrame) + + model.itemFactors + } + + def buildAnnoyIndex(conf: TaskConf, + dataFrame: DataFrame): Unit = { + // annoy tree params. + val itemFactorsPath = conf.options("itemFactors") + val tempPath = conf.options.getOrElse("tempPath", "/tmp") + + val tempInputPath = tempPath + "/_tmp" + + val annoyResultPath = conf.options("path") + val numDimensions = conf.options.getOrElse("dimensions", "10").toInt + + saveFeatures(dataFrame, itemFactorsPath) + copyToLocal(dataFrame.sparkSession.sparkContext.hadoopConfiguration, itemFactorsPath, tempInputPath) + + FileUtil.fullyDelete(new File(annoyResultPath)) + + Annoy.create[Int](s"${tempInputPath}", numDimensions, outputDir = s"$annoyResultPath", Angular) + } + + def saveFeatures(dataFrame: DataFrame, + outputPath: String, + idCol: String = "id", + featuresCol: String = "features"): Unit = { + + import dataFrame.sparkSession.implicits._ + + val result = dataFrame.map { row => + val id = row.getAs[Int](idCol) + val vector = row.getAs[Seq[Float]](featuresCol) + (Seq(id) ++ vector).mkString(" ") + } + + result.write.mode(SaveMode.Overwrite).csv(outputPath) + } + + def copyToLocal(configuration: Configuration, + remoteInputPath: String, + localOutputPath: String, + merge: Boolean = true): Unit = { + val fs = FileSystem.get(configuration) + val localFs = FileSystem.getLocal(configuration) + localFs.deleteOnExit(new Path(localOutputPath)) + + if (merge) + FileUtil.copyMerge(fs, new Path(remoteInputPath), localFs, new Path(localOutputPath), false, configuration, "") + else + fs.copyToLocalFile(new Path(remoteInputPath), new Path(localOutputPath)) + } +} + +class ALSModelProcess(conf: TaskConf) extends org.apache.s2graph.s2jobs.task.Process(conf) { + override def execute(ss: SparkSession, inputMap: Map[String, DataFrame]): DataFrame = { + ALSModelProcess.runALS(ss, conf, inputMap.head._2) + } + override def mandatoryOptions: Set[String] = Set.empty +} + +class AnnoyIndexBuildSink(queryName: String, conf: TaskConf) extends Sink(queryName, conf) { + override val FORMAT: String = "parquet" + + override def mandatoryOptions: Set[String] = Set("path", "itemFactors") + + override def write(inputDF: DataFrame): Unit = { + val df = repartition(preprocess(inputDF), inputDF.sparkSession.sparkContext.defaultParallelism) + + if (inputDF.isStreaming) throw new IllegalStateException("AnnoyIndexBuildSink can not be run as streaming.") + else { + ALSModelProcess.buildAnnoyIndex(conf, inputDF) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcessTest.scala deleted file mode 100644 index 6279e82..0000000 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSAnnoyBuildProcessTest.scala +++ /dev/null @@ -1,104 +0,0 @@ -package org.apache.s2graph.s2jobs.task.custom.process - -import java.io.File - -import com.holdenkarau.spark.testing.DataFrameSuiteBase -import com.typesafe.config.ConfigFactory -import org.apache.commons.io.FileUtils -import org.apache.s2graph.core.Integrate.IntegrateCommon -import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} -import org.apache.s2graph.core.{Query, QueryParam} -import org.apache.s2graph.core.model.{AnnoyModelFetcher, HDFSImporter, ModelManager} -import org.apache.s2graph.core.schema.Label -import org.apache.s2graph.s2jobs.task.TaskConf - -import scala.concurrent.{Await, ExecutionContext} -import scala.concurrent.duration.Duration -import scala.io.Source - -class ALSAnnoyBuildProcessTest extends IntegrateCommon with DataFrameSuiteBase { - import scala.collection.JavaConverters._ - - // this test require adding movie lens rating data(u.data, movie.txt) under resources - // so ignore for now until figure out how to automate download dataset. - ignore("RUN ALS on movie lens rating data and build annoy index on itemFeatures, finally query.") { - import spark.sqlContext.implicits._ - val ratingPath = this.getClass.getResource("/u.data").toURI.getPath - - val ratings = Source.fromFile(new File(ratingPath)).getLines().toSeq.map { line => - val tokens = line.split("\t") - (tokens(0).toInt, tokens(1).toInt, tokens(2).toFloat) - }.toDF("userId", "movieId", "rating") - - val outputPath = "/tmp" - val localInputPath = "/tmp/annoy_input" - val localIndexPath = "/tmp/annoy_result" - - val taskOptions = Map( - "outputPath" -> outputPath, - "localInputPath" -> localInputPath, - "localIndexPath" -> localIndexPath - ) - - val conf = TaskConf("test", "test", Nil, taskOptions) - ALSAnnoyBuildProcess.buildAnnoyIndex(spark, conf, ratings) - - val labelName = "annoy_model_fetcher_test" - - val remoteIndexFilePath = s"${localIndexPath}/annoy-index" - val remoteDictFilePath = this.getClass.getResource(s"/movie.dict").toURI.getPath - - val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get - val serviceColumn = - management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true))) - - val options = s"""{ - | "importer": { - | "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter" - | }, - | "fetcher": { - | "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.AnnoyModelFetcher", - | "${AnnoyModelFetcher.IndexFilePathKey}": "${remoteIndexFilePath}", - | "${AnnoyModelFetcher.DictFilePathKey}": "${remoteDictFilePath}", - | "${AnnoyModelFetcher.DimensionKey}": 10 - | } - |}""".stripMargin - - Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } - - val label = management.createLabel( - labelName, - serviceColumn, - serviceColumn, - true, - service.serviceName, - Seq.empty[Index].asJava, - Seq.empty[Prop].asJava, - "strong", - null, - -1, - "v3", - "gz", - options - ) - val config = ConfigFactory.parseString(options) - val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global) - Await.result(importerFuture, Duration("3 minutes")) - - Thread.sleep(10000) - - val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "Toy Story (1995)") - val queryParam = QueryParam(labelName = labelName, limit = 5) - - val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam)) - val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds")) - - stepResult.edgeWithScores.foreach { es => - println(es.edge.tgtVertex.innerIdVal) - } - - // clean up temp directory. - FileUtils.deleteDirectory(new File(outputPath)) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/130fed26/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala new file mode 100644 index 0000000..d16ebf0 --- /dev/null +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala @@ -0,0 +1,104 @@ +package org.apache.s2graph.s2jobs.task.custom.process + +import java.io.File + +import com.holdenkarau.spark.testing.DataFrameSuiteBase +import com.typesafe.config.ConfigFactory +import org.apache.commons.io.FileUtils +import org.apache.s2graph.core.Integrate.IntegrateCommon +import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} +import org.apache.s2graph.core.{Query, QueryParam} +import org.apache.s2graph.core.model.{AnnoyModelFetcher, HDFSImporter, ModelManager} +import org.apache.s2graph.core.schema.Label +import org.apache.s2graph.s2jobs.task.TaskConf + +import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.duration.Duration +import scala.io.Source + +class ALSModelProcessTest extends IntegrateCommon with DataFrameSuiteBase { + import scala.collection.JavaConverters._ + + // this test require adding movie lens rating data(u.data, movie.txt) under resources + // so ignore for now until figure out how to automate download dataset. +// ignore("RUN ALS on movie lens rating data and build annoy index on itemFeatures, finally query.") { +// import spark.sqlContext.implicits._ +// val ratingPath = this.getClass.getResource("/u.data").toURI.getPath +// +// val ratings = Source.fromFile(new File(ratingPath)).getLines().toSeq.map { line => +// val tokens = line.split("\t") +// (tokens(0).toInt, tokens(1).toInt, tokens(2).toFloat) +// }.toDF("userId", "movieId", "rating") +// +// val outputPath = "/tmp" +// val localInputPath = "/tmp/annoy_input" +// val localIndexPath = "/tmp/annoy_result" +// +// val taskOptions = Map( +// "outputPath" -> outputPath, +// "localInputPath" -> localInputPath, +// "localIndexPath" -> localIndexPath +// ) +// +// val conf = TaskConf("test", "test", Nil, taskOptions) +// ALSModelProcess.buildAnnoyIndex(spark, conf, ratings) +// +// val labelName = "annoy_model_fetcher_test" +// +// val remoteIndexFilePath = s"${localIndexPath}/annoy-index" +// val remoteDictFilePath = this.getClass.getResource(s"/movie.dict").toURI.getPath +// +// val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get +// val serviceColumn = +// management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true))) +// +// val options = s"""{ +// | "importer": { +// | "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter" +// | }, +// | "fetcher": { +// | "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.AnnoyModelFetcher", +// | "${AnnoyModelFetcher.IndexFilePathKey}": "${remoteIndexFilePath}", +// | "${AnnoyModelFetcher.DictFilePathKey}": "${remoteDictFilePath}", +// | "${AnnoyModelFetcher.DimensionKey}": 10 +// | } +// |}""".stripMargin +// +// Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } +// +// val label = management.createLabel( +// labelName, +// serviceColumn, +// serviceColumn, +// true, +// service.serviceName, +// Seq.empty[Index].asJava, +// Seq.empty[Prop].asJava, +// "strong", +// null, +// -1, +// "v3", +// "gz", +// options +// ) +// val config = ConfigFactory.parseString(options) +// val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global) +// Await.result(importerFuture, Duration("3 minutes")) +// +// Thread.sleep(10000) +// +// val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "Toy Story (1995)") +// val queryParam = QueryParam(labelName = labelName, limit = 5) +// +// val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam)) +// val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds")) +// +// stepResult.edgeWithScores.foreach { es => +// println(es.edge.tgtVertex.innerIdVal) +// } +// +// // clean up temp directory. +// FileUtils.deleteDirectory(new File(outputPath)) +// } + +}
