add updateOptions

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/2529af1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/2529af1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/2529af1e

Branch: refs/heads/master
Commit: 2529af1e2f605b24bd9c14b7ff4a4fa90fa0a6f7
Parents: 130fed2
Author: daewon <[email protected]>
Authored: Thu May 3 19:58:48 2018 +0900
Committer: daewon <[email protected]>
Committed: Thu May 3 19:58:48 2018 +0900

----------------------------------------------------------------------
 example/movielens/jobdesc.template              | 21 ++++++++++++++++
 .../org/apache/s2graph/core/Management.scala    | 10 ++++----
 .../s2graph/core/model/AnnoyModelFetcher.scala  | 25 +++++++++++++++-----
 .../org/apache/s2graph/core/schema/Label.scala  | 15 ++++++++++++
 .../s2graph/core/model/HDFSImporterTest.scala   | 10 ++++++--
 .../apache/s2graph/graphql/GraphQLServer.scala  | 11 ++++++++-
 .../org/apache/s2graph/graphql/HttpServer.scala |  2 ++
 .../task/custom/process/ALSModelProcess.scala   |  2 ++
 8 files changed, 82 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/example/movielens/jobdesc.template
----------------------------------------------------------------------
diff --git a/example/movielens/jobdesc.template 
b/example/movielens/jobdesc.template
index 13ef1cd..ca27e26 100644
--- a/example/movielens/jobdesc.template
+++ b/example/movielens/jobdesc.template
@@ -115,6 +115,16 @@
         "itemCol": "movieId",
         "ratingCol": "rating"
       }
+    },
+    {
+      "name": "build_movie_dict",
+      "inputs": [
+        "movies"
+      ],
+      "type": "sql",
+      "options": {
+        "sql": "SELECT CAST(movieId as INT) as idx, CAST(movieId as INT) as 
movieId FROM movies"
+      }
     }
   ],
   "sink": [
@@ -154,6 +164,17 @@
         "itemFactors": "/tmp/itemFactors",
         "path": "/tmp/annoy_result"
       }
+    },
+    {
+      "name": "dict_sink",
+      "inputs": [
+        "build_movie_dict"
+      ],
+      "type": "file",
+      "options": {
+        "format": "csv",
+        "path": "/tmp/annoy_dict"
+      }
     }
   ]
 }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
index 868fac7..7ff5a9e 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala
@@ -304,11 +304,11 @@ class Management(graph: S2GraphLike) {
 
   import Management._
 
-  def importModel(labelName: String): Future[Importer] = {
-    val label = Label.findByName(labelName).getOrElse(throw new 
LabelNotExistException(labelName))
-    val config = label.toFetcherConfig.getOrElse {
-      throw new IllegalArgumentException(s"${label.label} is not importable 
since there is no configuration on label.")
-    }
+  def importModel(labelName: String, options: String): Future[Importer] = {
+    Label.updateOption(labelName, options)
+
+    val label = Label.findByName(labelName, false).getOrElse(throw new 
LabelNotExistException(labelName))
+    val config = ConfigFactory.parseString(options)
 
     graph.modelManager.importModel(label, config)(importEx)
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala
index 5e0b979..a3a0f58 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala
@@ -18,12 +18,25 @@ object AnnoyModelFetcher {
   val IndexTypeKey = "annoyIndexType"
 
   def loadDictFromLocal(file: File): Map[Int, String] = {
-    Source.fromFile(file).getLines().zipWithIndex.map { case (line, _idx) =>
-      val tokens = line.stripMargin.split("\t")
-      if (tokens.length < 2) {
-        (_idx + 1, tokens.head)
-      } else {
-        (tokens.head.toInt, tokens.tail.head)
+    val files = if (file.isDirectory) {
+      file.listFiles()
+    } else {
+      Array(file)
+    }
+
+    files.flatMap { file =>
+      Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, 
_idx) =>
+        val tokens = line.stripMargin.split(",")
+        try {
+          val tpl = if (tokens.length < 2) {
+            (_idx + 1, tokens.head)
+          } else {
+            (tokens.head.toInt, tokens.tail.head)
+          }
+          Seq(tpl)
+        } catch {
+          case e: Exception => Nil
+        }
       }
     }.toMap
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
----------------------------------------------------------------------
diff --git a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
index c671381..cca1769 100644
--- a/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
+++ b/s2core/src/main/scala/org/apache/s2graph/core/schema/Label.scala
@@ -259,6 +259,20 @@ object Label extends SQLSyntaxSupport[Label] {
     cnt
   }
 
+  def updateOption(labelName: String, options: String)(implicit session: 
DBSession = AutoSession) = {
+    scala.util.Try(Json.parse(options)).getOrElse(throw new 
RuntimeException("invalid Json option"))
+    logger.info(s"update options of label $labelName, ${options}")
+    val cnt = sql"""update labels set options = $options where label = 
$labelName""".update().apply()
+    val label = Label.findByName(labelName, useCache = false).get
+
+    val cacheKeys = List(s"id=${label.id.get}", s"label=${label.label}")
+    cacheKeys.foreach { key =>
+      expireCache(className + key)
+      expireCaches(className + key)
+    }
+    cnt
+  }
+
   def delete(id: Int)(implicit session: DBSession = AutoSession) = {
     val label = findById(id)
     logger.info(s"delete label: $label")
@@ -381,6 +395,7 @@ case class Label(id: Option[Int], label: String,
   def toFetcherConfig: Option[Config] = {
     Schema.toConfig(extraOptions, "fetcher")
   }
+
   def toStorageConfig: Option[Config] = {
     Schema.toConfig(extraOptions, "storage")
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala 
b/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala
index 3b274ab..8825a56 100644
--- a/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala
+++ b/s2core/src/test/scala/org/apache/s2graph/core/model/HDFSImporterTest.scala
@@ -66,13 +66,19 @@ class HDFSImporterTest extends IntegrateCommon {
       -1,
       "v3",
       "gz",
-      options
+      ""
     )
+
+    Label.updateOption(label.label, options)
+
+    println("*" * 80)
+    println(Label.findByName(label.label, false).get.toFetcherConfig)
+
     val config = ConfigFactory.parseString(options)
     val importerFuture = graph.modelManager.importModel(label, 
config)(ExecutionContext.Implicits.global)
     Await.result(importerFuture, Duration("3 minutes"))
 
-    Thread.sleep(10000)
+    Thread.sleep(5000)
 
     val vertex = graph.elementBuilder.toVertex(service.serviceName, 
serviceColumn.columnName, "0")
     val queryParam = QueryParam(labelName = labelName, limit = 5)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
----------------------------------------------------------------------
diff --git 
a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala 
b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
index 391a99f..22ef43d 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/GraphQLServer.scala
@@ -63,8 +63,17 @@ object GraphQLServer {
 
   val schemaCache = new SafeUpdateCache(schemaConfig)
 
-  def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): 
Route = {
+  def importModel(requestJSON: spray.json.JsValue)(implicit e: 
ExecutionContext): Route = {
+    val spray.json.JsObject(fields) = requestJSON
+    val spray.json.JsString(labelName) = fields("label")
+    val jsOptions = fields("options")
+
+    complete {
+      s2graph.management.importModel(labelName, jsOptions.compactPrint).map(a 
=> OK -> JsString(""))
+    }
+  }
 
+  def endpoint(requestJSON: spray.json.JsValue)(implicit e: ExecutionContext): 
Route = {
     val spray.json.JsObject(fields) = requestJSON
     val spray.json.JsString(query) = fields("query")
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
----------------------------------------------------------------------
diff --git 
a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala 
b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
index 685e87b..38cdce3 100644
--- a/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
+++ b/s2graphql/src/main/scala/org/apache/s2graph/graphql/HttpServer.scala
@@ -44,6 +44,8 @@ object Server extends App {
 
   val route: Flow[HttpRequest, HttpResponse, Any] = (post & path("graphql")) {
     entity(as[spray.json.JsValue])(GraphQLServer.endpoint)
+  } ~ (post & path("importModel")) {
+    entity(as[spray.json.JsValue])(GraphQLServer.importModel)
   } ~ {
     getFromResource("assets/graphiql.html")
   }

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/2529af1e/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
index 26ef6ad..7b4b17b 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
@@ -48,6 +48,8 @@ object ALSModelProcess {
     val annoyResultPath = conf.options("path")
     val numDimensions = conf.options.getOrElse("dimensions", "10").toInt
 
+    FileUtil.fullyDelete(new File(tempInputPath))
+
     saveFeatures(dataFrame, itemFactorsPath)
     copyToLocal(dataFrame.sparkSession.sparkContext.hadoopConfiguration, 
itemFactorsPath, tempInputPath)
 

Reply via email to