add updateOptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2529af1e Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2529af1e Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2529af1e Branch: refs/heads/master Commit: 2529af1e2f605b24bd9c14b7ff4a4fa90fa0a6f7 Parents: 130fed2 Author: daewon <[email protected]> Authored: Thu May 3 19:58:48 2018 +0900 Committer: daewon <[email protected]> Committed: Thu May 3 19:58:48 2018 +0900 ---------------------------------------------------------------------- example/movielens/jobdesc.template | 21 ++++++++++++++++ .../org/apache/s2graph/core/Management.scala | 10 ++++---- .../s2graph/core/model/AnnoyModelFetcher.scala | 25 +++++++++++++++----- .../org/apache/s2graph/core/schema/Label.scala | 15 ++++++++++++ .../s2graph/core/model/HDFSImporterTest.scala | 10 ++++++-- .../apache/s2graph/graphql/GraphQLServer.scala | 11 ++++++++- .../org/apache/s2graph/graphql/HttpServer.scala | 2 ++ .../task/custom/process/ALSModelProcess.scala | 2 ++ 8 files changed, 82 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/example/movielens/jobdesc.template ---------------------------------------------------------------------- diff --git a/example/movielens/jobdesc.template b/example/movielens/jobdesc.template index 13ef1cd..ca27e26 100644 --- a/example/movielens/jobdesc.template +++ b/example/movielens/jobdesc.template @@ -115,6 +115,16 @@ "itemCol": "movieId", "ratingCol": "rating" } + }, + { + "name": "build_movie_dict", + "inputs": [ + "movies" + ], + "type": "sql", + "options": { + "sql": "SELECT CAST(movieId as INT) as idx, CAST(movieId as INT) as movieId FROM movies" + } } ], "sink": [ @@ -154,6 +164,17 @@ "itemFactors": "/tmp/itemFactors", "path": "/tmp/annoy_result" } + }, + { + "name": "dict_sink", + "inputs": [ + "build_movie_dict" + ], + "type": "file", + "options": { + "format": "csv", + "path": "/tmp/annoy_dict" + } } ] } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2core/src/main/scala/org/apache/s2graph/core/Management.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index 868fac7..7ff5a9e 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -304,11 +304,11 @@ class Management(graph: S2GraphLike) { import Management._ - def importModel(labelName: String): Future[Importer] = { - val label = Label.findByName(labelName).getOrElse(throw new LabelNotExistException(labelName)) - val config = label.toFetcherConfig.getOrElse { - throw new IllegalArgumentException(s"${label.label} is not importable since there is no configuration on label.") - } + def importModel(labelName: String, options: String): Future[Importer] = { + Label.updateOption(labelName, options) + + val label = Label.findByName(labelName, false).getOrElse(throw new LabelNotExistException(labelName)) + val config = ConfigFactory.parseString(options) graph.modelManager.importModel(label, config)(importEx) } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala index 5e0b979..a3a0f58 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala @@ -18,12 +18,25 @@ object AnnoyModelFetcher { val IndexTypeKey = "annoyIndexType" def loadDictFromLocal(file: File): Map[Int, String] = { - Source.fromFile(file).getLines().zipWithIndex.map { case (line, _idx) => - val tokens = line.stripMargin.split("\t") - if (tokens.length < 2) { - (_idx + 1, tokens.head) - } else { - (tokens.head.toInt, tokens.tail.head) + val files = if (file.isDirectory) { + file.listFiles() + } else { + Array(file) + } + + files.flatMap { file => + Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, _idx) => + val tokens = line.stripMargin.split(",") + try { + val tpl = if (tokens.length < 2) { + (_idx + 1, tokens.head) + } else { + (tokens.head.toInt, tokens.tail.head) + } + Seq(tpl) + } catch { + case e: Exception => Nil + } } }.toMap } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala index c671381..cca1769 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala @@ -259,6 +259,20 @@ object Label extends SQLSyntaxSupport[Label] { cnt } + def updateOption(labelName: String, options: String)(implicit session: DBSession = AutoSession) = { + scala.util.Try(Json.parse(options)).getOrElse(throw new RuntimeException("invalid Json option")) + logger.info(s"update options of label $labelName, ${options}") + val cnt = sql"""update labels set options = $options where label = $labelName""".update().apply() + val label = Label.findByName(labelName, useCache = false).get + + val cacheKeys = List(s"id=${label.id.get}", s"label=${label.label}") + cacheKeys.foreach { key => + expireCache(className + key) + expireCaches(className + key) + } + cnt + } + def delete(id: Int)(implicit session: DBSession = AutoSession) = { val label = findById(id) logger.info(s"delete label: $label") @@ -381,6 +395,7 @@ case class Label(id: Option[Int], label: String, def toFetcherConfig: Option[Config] = { Schema.toConfig(extraOptions, "fetcher") } + def toStorageConfig: Option[Config] = { Schema.toConfig(extraOptions, "storage") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala ---------------------------------------------------------------------- diff --git a/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala b/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala index 3b274ab..8825a56 100644 --- a/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala +++ b/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala @@ -66,13 +66,19 @@ class HDFSImporterTest extends IntegrateCommon { -1, "v3", "gz", - options + "" ) + + Label.updateOption(label.label, options) + + println("*" * 80) + println(Label.findByName(label.label, false).get.toFetcherConfig) + val config = ConfigFactory.parseString(options) val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global) Await.result(importerFuture, Duration("3 minutes")) - Thread.sleep(10000) + Thread.sleep(5000) val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "0") val queryParam = QueryParam(labelName = labelName, limit = 5) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala index 391a99f..22ef43d 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala @@ -63,8 +63,17 @@ object GraphQLServer { val schemaCache = new SafeUpdateCache(schemaConfig) - def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = { + def importModel(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = { + val spray.json.JsObject(fields) = requestJSON + val spray.json.JsString(labelName) = fields("label") + val jsOptions = fields("options") + + complete { + s2graph.management.importModel(labelName, jsOptions.compactPrint).map(a => OK -> JsString("")) + } + } + def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): Route = { val spray.json.JsObject(fields) = requestJSON val spray.json.JsString(query) = fields("query") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala ---------------------------------------------------------------------- diff --git a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala index 685e87b..38cdce3 100644 --- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala +++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala @@ -44,6 +44,8 @@ object Server extends App { val route: Flow[HttpRequest, HttpResponse, Any] = (post & path("graphql")) { entity(as[spray.json.JsValue])(GraphQLServer.endpoint) + } ~ (post & path("importModel")) { + entity(as[spray.json.JsValue])(GraphQLServer.importModel) } ~ { getFromResource("assets/graphiql.html") } http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala index 26ef6ad..7b4b17b 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala @@ -48,6 +48,8 @@ object ALSModelProcess { val annoyResultPath = conf.options("path") val numDimensions = conf.options.getOrElse("dimensions", "10").toInt + FileUtil.fullyDelete(new File(tempInputPath)) + saveFeatures(dataFrame, itemFactorsPath) copyToLocal(dataFrame.sparkSession.sparkContext.hadoopConfiguration, itemFactorsPath, tempInputPath)
