change annoy java library to annoy4s.

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/08a80cdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/08a80cdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/08a80cdd

Branch: refs/heads/master
Commit: 08a80cdd68d18a95cbddea91966b0dc765d77284
Parents: 2529af1
Author: DO YUNG YOON <[email protected]>
Authored: Fri May 4 14:25:24 2018 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Fri May 4 14:25:24 2018 +0900

----------------------------------------------------------------------
 example/movielens/jobdesc.template              |  13 +-
 example/run.sh                                  |  14 +-
 s2core/build.sbt                                |   2 +-
 .../s2graph/core/model/AnnoyModelFetcher.scala  | 137 +++++++++++--------
 s2jobs/build.sbt                                |   4 +-
 .../task/custom/process/ALSModelProcess.scala   |  22 ++-
 .../custom/process/ALSModelProcessTest.scala    | 126 ++++++++++++++++-
 7 files changed, 244 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/example/movielens/jobdesc.template
----------------------------------------------------------------------
diff --git a/example/movielens/jobdesc.template 
b/example/movielens/jobdesc.template
index ca27e26..7cb1581 100644
--- a/example/movielens/jobdesc.template
+++ b/example/movielens/jobdesc.template
@@ -123,7 +123,7 @@
       ],
       "type": "sql",
       "options": {
-        "sql": "SELECT CAST(movieId as INT) as idx, CAST(movieId as INT) as 
movieId FROM movies"
+        "sql": "SELECT CAST(movieId as INT) as idx FROM movies where movieId 
!= null"
       }
     }
   ],
@@ -155,6 +155,17 @@
       }
     },
     {
+      "name": "als_sink",
+      "inputs": [
+        "factorize_rating"
+      ],
+      "type": "file",
+      "options": {
+        "path": "/tmp/als_item",
+        "format": "json"
+      }
+    },
+    {
       "name": "annoy_index_build",
       "inputs": [
         "factorize_rating"

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/example/run.sh
----------------------------------------------------------------------
diff --git a/example/run.sh b/example/run.sh
index 5256974..4312860 100644
--- a/example/run.sh
+++ b/example/run.sh
@@ -29,13 +29,13 @@ while IFS='' read -r line || [[ -n "$line" ]]; do
     info "$line"
 done < "$SERVICE/desc.md"
 
-q "First of all, we will check prerequisites"
-sh ./prepare.sh $SERVICE
-[ $? -ne 0 ] && { exit -1; }
-
-q "And now, we create vertex and edge schema using graphql"
-sh ./create_schema.sh $SERVICE
-[ $? -ne 0 ] && { exit -1; }
+#q "First of all, we will check prerequisites"
+#sh ./prepare.sh $SERVICE
+#[ $? -ne 0 ] && { exit -1; }
+#
+#q "And now, we create vertex and edge schema using graphql"
+#sh ./create_schema.sh $SERVICE
+#[ $? -ne 0 ] && { exit -1; }
 
 q "Finally, we import example data to service"
 sh ./import_data.sh $SERVICE

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/s2core/build.sbt
----------------------------------------------------------------------
diff --git a/s2core/build.sbt b/s2core/build.sbt
index 0b83c3d..bd84c37 100644
--- a/s2core/build.sbt
+++ b/s2core/build.sbt
@@ -56,7 +56,7 @@ libraryDependencies ++= Seq(
   "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion 
excludeLogging(),
   "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion 
excludeLogging(),
   "org.scala-lang.modules" %% "scala-pickling" % "0.10.1",
-  "com.spotify" % "annoy" % "0.2.5",
+  "net.pishen" %% "annoy4s" % annoy4sVersion,
   "org.tensorflow" % "tensorflow" % tensorflowVersion
 )
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala
----------------------------------------------------------------------
diff --git 
a/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala 
b/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala
index a3a0f58..2f2a40c 100644
--- 
a/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala
+++ 
b/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala
@@ -1,15 +1,13 @@
 package org.apache.s2graph.core.model
 
-import java.io.File
-
-import com.spotify.annoy.{ANNIndex, IndexType}
+import annoy4s.Converters.KeyConverter
+import annoy4s._
 import com.typesafe.config.Config
-import org.apache.s2graph.core.types.VertexId
 import org.apache.s2graph.core._
+import org.apache.s2graph.core.model.AnnoyModelFetcher.IndexFilePathKey
+import org.apache.s2graph.core.types.VertexId
 
 import scala.concurrent.{ExecutionContext, Future}
-import scala.io.Source
-import scala.util.Try
 
 object AnnoyModelFetcher {
   val IndexFilePathKey = "annoyIndexFilePath"
@@ -17,56 +15,70 @@ object AnnoyModelFetcher {
   val DimensionKey = "annoyIndexDimension"
   val IndexTypeKey = "annoyIndexType"
 
-  def loadDictFromLocal(file: File): Map[Int, String] = {
-    val files = if (file.isDirectory) {
-      file.listFiles()
-    } else {
-      Array(file)
-    }
-
-    files.flatMap { file =>
-      Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, 
_idx) =>
-        val tokens = line.stripMargin.split(",")
-        try {
-          val tpl = if (tokens.length < 2) {
-            (_idx + 1, tokens.head)
-          } else {
-            (tokens.head.toInt, tokens.tail.head)
-          }
-          Seq(tpl)
-        } catch {
-          case e: Exception => Nil
-        }
-      }
-    }.toMap
+  //  def loadDictFromLocal(file: File): Map[Int, String] = {
+  //    val files = if (file.isDirectory) {
+  //      file.listFiles()
+  //    } else {
+  //      Array(file)
+  //    }
+  //
+  //    files.flatMap { file =>
+  //      Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, 
_idx) =>
+  //        val tokens = line.stripMargin.split(",")
+  //        try {
+  //          val tpl = if (tokens.length < 2) {
+  //            (tokens.head.toInt, tokens.head)
+  //          } else {
+  //            (tokens.head.toInt, tokens.tail.head)
+  //          }
+  //          Seq(tpl)
+  //        } catch {
+  //          case e: Exception => Nil
+  //        }
+  //      }
+  //    }.toMap
+  //  }
+
+  def buildAnnoy4s[T](indexPath: String)(implicit converter: KeyConverter[T]): 
Annoy[T] = {
+    Annoy.load[T](indexPath)
   }
 
-  def buildIndex(config: Config): ANNIndexWithDict = {
-    val filePath = config.getString(IndexFilePathKey)
-    val dictPath = config.getString(DictFilePathKey)
-
-    val dimension = config.getInt(DimensionKey)
-    val indexType = Try { config.getString(IndexTypeKey) 
}.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR)
-
-    val dict = loadDictFromLocal(new File(dictPath))
-    val index = new ANNIndex(dimension, filePath, indexType)
-    ANNIndexWithDict(index, dict)
-  }
+  //  def buildIndex(indexPath: String,
+  //                 dictPath: String,
+  //                 dimension: Int,
+  //                 indexType: IndexType): ANNIndexWithDict = {
+  //    val dict = loadDictFromLocal(new File(dictPath))
+  //    val index = new ANNIndex(dimension, indexPath, indexType)
+  //
+  //    ANNIndexWithDict(index, dict)
+  //  }
+  //
+  //  def buildIndex(config: Config): ANNIndexWithDict = {
+  //    val indexPath = config.getString(IndexFilePathKey)
+  //    val dictPath = config.getString(DictFilePathKey)
+  //
+  //    val dimension = config.getInt(DimensionKey)
+  //    val indexType = Try { config.getString(IndexTypeKey) 
}.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR)
+  //
+  //    buildIndex(indexPath, dictPath, dimension, indexType)
+  //  }
 }
 
-case class ANNIndexWithDict(index: ANNIndex, dict: Map[Int, String]) {
-  val dictRev = dict.map(kv => kv._2 -> kv._1)
-}
+//
+//case class ANNIndexWithDict(index: ANNIndex, dict: Map[Int, String]) {
+//  val dictRev = dict.map(kv => kv._2 -> kv._1)
+//}
 
 class AnnoyModelFetcher(val graph: S2GraphLike) extends Fetcher {
-  import scala.collection.JavaConverters._
   val builder = graph.elementBuilder
 
-  var model: ANNIndexWithDict = _
+  //  var model: ANNIndexWithDict = _
+  var model: Annoy[String] = _
 
   override def init(config: Config)(implicit ec: ExecutionContext): 
Future[Fetcher] = {
     Future {
-      model = AnnoyModelFetcher.buildIndex(config)
+      model = 
AnnoyModelFetcher.buildAnnoy4s(config.getString(IndexFilePathKey))
+      //        AnnoyModelFetcher.buildIndex(config)
 
       this
     }
@@ -79,21 +91,32 @@ class AnnoyModelFetcher(val graph: S2GraphLike) extends 
Fetcher {
       val vertex = queryRequest.vertex
       val queryParam = queryRequest.queryParam
 
-      val srcIndexOpt = model.dictRev.get(vertex.innerId.toIdString())
+      val edgeWithScores = model.query(vertex.innerId.toIdString(), 
queryParam.limit).getOrElse(Nil).map { case (tgtId, score) =>
+        val tgtVertexId = builder.newVertexId(queryParam.label.service,
+          queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), 
tgtId)
 
-      srcIndexOpt.map { srcIdx =>
-        val srcVector = model.index.getItemVector(srcIdx)
-        val nns = model.index.getNearest(srcVector, queryParam.limit).asScala
+        val edge = graph.toEdge(vertex.innerId.value, 
tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction)
 
-        val edges = nns.map { tgtIdx =>
-          val tgtVertexId = builder.newVertexId(queryParam.label.service,
-            queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), 
model.dict(tgtIdx))
+        EdgeWithScore(edge, score, queryParam.label)
+      }
 
-          graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, 
queryParam.labelName, queryParam.direction)
-        }
-        val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, 
queryParam.label))
-        StepResult(edgeWithScores, Nil, Nil)
-      }.getOrElse(StepResult.Empty)
+      StepResult(edgeWithScores, Nil, Nil)
+      //
+      //      val srcIndexOpt = model.dictRev.get(vertex.innerId.toIdString())
+      //
+      //      srcIndexOpt.map { srcIdx =>
+      //        val srcVector = model.index.getItemVector(srcIdx)
+      //        val nns = model.index.getNearest(srcVector, 
queryParam.limit).asScala
+      //
+      //        val edges = nns.map { tgtIdx =>
+      //          val tgtVertexId = 
builder.newVertexId(queryParam.label.service,
+      //            
queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), 
model.dict(tgtIdx))
+      //
+      //          graph.toEdge(vertex.innerId.value, 
tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction)
+      //        }
+      //        val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, 
queryParam.label))
+      //        StepResult(edgeWithScores, Nil, Nil)
+      //      }.getOrElse(StepResult.Empty)
     }
 
     Future.successful(stepResultLs)

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/s2jobs/build.sbt
----------------------------------------------------------------------
diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt
index ac231c1..f647040 100644
--- a/s2jobs/build.sbt
+++ b/s2jobs/build.sbt
@@ -38,9 +38,7 @@ libraryDependencies ++= Seq(
   "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion,
   "org.elasticsearch" % "elasticsearch-spark-20_2.11" % elastic4sVersion,
   "com.github.scopt" %% "scopt" % "3.7.0",
-  "com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % Test,
-  "net.pishen" %% "annoy4s" % annoy4sVersion,
-  "org.tensorflow" % "tensorflow" % tensorflowVersion
+  "com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % Test
 )
 
 crossScalaVersions := Seq("2.10.6")

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
index 7b4b17b..9ffb341 100644
--- 
a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala
@@ -2,18 +2,21 @@ package org.apache.s2graph.s2jobs.task.custom.process
 
 import java.io.File
 
-import annoy4s.{Angular, Annoy}
+import annoy4s._
+//import org.apache.spark.ml.nn.Annoy
+
+//import annoy4s.{Angular, Annoy}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
 import org.apache.s2graph.s2jobs.task.{Sink, TaskConf}
-import org.apache.spark.ml.recommendation.{ALS, ALSModel}
+import org.apache.spark.ml.recommendation.ALS
 import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
 
 object ALSModelProcess {
 
   def runALS(ss: SparkSession,
-                      conf: TaskConf,
-                      dataFrame: DataFrame): DataFrame = {
+             conf: TaskConf,
+             dataFrame: DataFrame): DataFrame = {
     // als model params.
     val rank = conf.options.getOrElse("rank", "10").toInt
     val maxIter = conf.options.getOrElse("maxIter", "5").toInt
@@ -37,6 +40,17 @@ object ALSModelProcess {
     model.itemFactors
   }
 
+//  def buildAnnoyIndex(conf: TaskConf,
+//                      dataFrame: DataFrame): Unit = {
+//    val ann = new Annoy()
+//      .setNumTrees(2)
+//      .setFraction(0.1)
+//      .setIdCol("id")
+//      .setFeaturesCol("features")
+//
+//    val itemAnnModel = ann.fit(dataFrame)
+//    itemAnnModel.saveAsAnnoyBinary(conf.options("itemFactors"))
+//  }
   def buildAnnoyIndex(conf: TaskConf,
                       dataFrame: DataFrame): Unit = {
     // annoy tree params.

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
index d16ebf0..a8479fe 100644
--- 
a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
+++ 
b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala
@@ -8,7 +8,7 @@ import org.apache.commons.io.FileUtils
 import org.apache.s2graph.core.Integrate.IntegrateCommon
 import org.apache.s2graph.core.Management.JsonModel.{Index, Prop}
 import org.apache.s2graph.core.{Query, QueryParam}
-import org.apache.s2graph.core.model.{AnnoyModelFetcher, HDFSImporter, 
ModelManager}
+import org.apache.s2graph.core.model.{ANNIndexWithDict, AnnoyModelFetcher, 
HDFSImporter, ModelManager}
 import org.apache.s2graph.core.schema.Label
 import org.apache.s2graph.s2jobs.task.TaskConf
 
@@ -101,4 +101,128 @@ class ALSModelProcessTest extends IntegrateCommon with 
DataFrameSuiteBase {
 //    FileUtils.deleteDirectory(new File(outputPath))
 //  }
 
+  def annoyLabelOptions(indexPath: String, dictPath: String): String = {
+    val options = s"""{
+                     | "importer": {
+                     |   "${ModelManager.ImporterClassNameKey}": 
"org.apache.s2graph.core.model.IdentityImporter"
+                     | },
+                     | "fetcher": {
+                     |   "${ModelManager.FetcherClassNameKey}": 
"org.apache.s2graph.core.model.AnnoyModelFetcher",
+                     |   "${AnnoyModelFetcher.IndexFilePathKey}": 
"${indexPath}",
+                     |   "${AnnoyModelFetcher.DictFilePathKey}": "${dictPath}",
+                     |   "${AnnoyModelFetcher.DimensionKey}": 10
+                     | }
+                     |}""".stripMargin
+    options
+  }
+  def labelImport(labelName: String, indexPath: String, dictPath: String): 
Label = {
+    val service = management.createService("s2graph", "localhost", 
"s2graph_htable", -1, None).get
+    val serviceColumn =
+      management.createServiceColumn("s2graph", "movie", "string", 
Seq(Prop("age", "0", "int", true)))
+
+    val options = annoyLabelOptions(indexPath, dictPath)
+
+    Label.findByName(labelName, useCache = false).foreach { label => 
Label.delete(label.id.get) }
+
+    val label = management.createLabel(
+      labelName,
+      serviceColumn,
+      serviceColumn,
+      true,
+      service.serviceName,
+      Seq.empty[Index].asJava,
+      Seq.empty[Prop].asJava,
+      "strong",
+      null,
+      -1,
+      "v3",
+      "gz",
+      options
+    )
+
+    val config = ConfigFactory.parseString(options)
+    val importerFuture = graph.modelManager.importModel(label, 
config)(ExecutionContext.Implicits.global)
+    Await.result(importerFuture, Duration("3 minutes"))
+
+    Thread.sleep(10000)
+
+    label
+  }
+
+  def buildALS(ratingsPath: String, indexPath: String) = {
+    import spark.sqlContext.implicits._
+
+    FileUtils.deleteQuietly(new File(indexPath))
+
+    val buffer = scala.collection.mutable.ListBuffer.empty[(Int, Int, Float)]
+
+    val lines = Source.fromFile(ratingsPath).getLines()
+    // skip over header.
+    lines.next()
+
+    while (lines.hasNext) {
+      val line = lines.next()
+      try {
+        val Array(userId, movieId, rating, ts) = line.split(",")
+        buffer += ((userId.toInt, movieId.toInt, rating.toFloat))
+      } catch {
+        case e: Exception => // skip over.
+      }
+    }
+
+    val rating = buffer.toDF("userId", "movieId", "rating")
+
+    val processConf = TaskConf(name = "test", `type` = "test", inputs = Nil,
+      options = Map.empty)
+
+    val process = new ALSModelProcess(processConf)
+    val df = process.execute(spark, Map("test" -> rating))
+
+    val sinkConf = TaskConf(name = "sink", `type` = "sink", inputs = Nil,
+      options = Map("path" -> indexPath, "itemFactors" -> indexPath))
+
+    val sink = new AnnoyIndexBuildSink("sinkTest", sinkConf)
+    sink.write(df)
+  }
+
+  test("ALS ModelProcess and AnnoyIndexBuildSink") {
+    import spark.sqlContext.implicits._
+
+    val inputPath = 
"/Users/shon/Workspace/incubator-s2graph/example/movielens/input/ratings.csv"
+    val indexPath = "./annoy_result"
+    val dictPath = "./example/movielens/input/movie.dict"
+
+    buildALS(inputPath, indexPath)
+
+    val labelName = "annoy_index_test"
+    val label = labelImport(labelName, indexPath, dictPath)
+//    val options = annoyLabelOptions(indexPath, dictPath)
+//
+//    val config = 
ConfigFactory.parseString(label.options.get).getConfig("fetcher")
+//    val config = ConfigFactory.parseString(options).getConfig("fetcher")
+
+//    val ANNIndexWithDict(index, dict) = AnnoyModelFetcher.buildIndex(config)
+//    val v = index.getItemVector(1)
+//
+//    import scala.collection.JavaConverters._
+//    index.getNearest(v, 10).asScala.foreach { x =>
+//      println(x)
+//    }
+
+
+//
+    val service = management.createService("s2graph", "localhost", 
"s2graph_htable", -1, None).get
+    val serviceColumn =
+      management.createServiceColumn("s2graph", "user", "string", 
Seq(Prop("age", "0", "int", true)))
+
+    val vertex = graph.elementBuilder.toVertex(service.serviceName, 
serviceColumn.columnName, "1")
+    val queryParam = QueryParam(labelName = labelName, limit = 5)
+
+    val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = 
Seq(queryParam))
+    val stepResult = Await.result(graph.getEdges(query), Duration("60 
seconds"))
+
+    stepResult.edgeWithScores.foreach { es =>
+      println(es.edge.tgtVertex.innerIdVal)
+    }
+  }
 }

Reply via email to