change annoy java library to annoy4s.
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/08a80cdd Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/08a80cdd Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/08a80cdd Branch: refs/heads/master Commit: 08a80cdd68d18a95cbddea91966b0dc765d77284 Parents: 2529af1 Author: DO YUNG YOON <[email protected]> Authored: Fri May 4 14:25:24 2018 +0900 Committer: DO YUNG YOON <[email protected]> Committed: Fri May 4 14:25:24 2018 +0900 ---------------------------------------------------------------------- example/movielens/jobdesc.template | 13 +- example/run.sh | 14 +- s2core/build.sbt | 2 +- .../s2graph/core/model/AnnoyModelFetcher.scala | 137 +++++++++++-------- s2jobs/build.sbt | 4 +- .../task/custom/process/ALSModelProcess.scala | 22 ++- .../custom/process/ALSModelProcessTest.scala | 126 ++++++++++++++++- 7 files changed, 244 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/example/movielens/jobdesc.template ---------------------------------------------------------------------- diff --git a/example/movielens/jobdesc.template b/example/movielens/jobdesc.template index ca27e26..7cb1581 100644 --- a/example/movielens/jobdesc.template +++ b/example/movielens/jobdesc.template @@ -123,7 +123,7 @@ ], "type": "sql", "options": { - "sql": "SELECT CAST(movieId as INT) as idx, CAST(movieId as INT) as movieId FROM movies" + "sql": "SELECT CAST(movieId as INT) as idx FROM movies where movieId != null" } } ], @@ -155,6 +155,17 @@ } }, { + "name": "als_sink", + "inputs": [ + "factorize_rating" + ], + "type": "file", + "options": { + "path": "/tmp/als_item", + "format": "json" + } + }, + { "name": "annoy_index_build", "inputs": [ "factorize_rating" http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/example/run.sh ---------------------------------------------------------------------- diff --git a/example/run.sh b/example/run.sh index 5256974..4312860 100644 --- a/example/run.sh +++ b/example/run.sh @@ -29,13 +29,13 @@ while IFS='' read -r line || [[ -n "$line" ]]; do info "$line" done < "$SERVICE/desc.md" -q "First of all, we will check prerequisites" -sh ./prepare.sh $SERVICE -[ $? -ne 0 ] && { exit -1; } - -q "And now, we create vertex and edge schema using graphql" -sh ./create_schema.sh $SERVICE -[ $? -ne 0 ] && { exit -1; } +#q "First of all, we will check prerequisites" +#sh ./prepare.sh $SERVICE +#[ $? -ne 0 ] && { exit -1; } +# +#q "And now, we create vertex and edge schema using graphql" +#sh ./create_schema.sh $SERVICE +#[ $? -ne 0 ] && { exit -1; } q "Finally, we import example data to service" sh ./import_data.sh $SERVICE http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/s2core/build.sbt ---------------------------------------------------------------------- diff --git a/s2core/build.sbt b/s2core/build.sbt index 0b83c3d..bd84c37 100644 --- a/s2core/build.sbt +++ b/s2core/build.sbt @@ -56,7 +56,7 @@ libraryDependencies ++= Seq( "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sVersion excludeLogging(), "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sVersion excludeLogging(), "org.scala-lang.modules" %% "scala-pickling" % "0.10.1", - "com.spotify" % "annoy" % "0.2.5", + "net.pishen" %% "annoy4s" % annoy4sVersion, "org.tensorflow" % "tensorflow" % tensorflowVersion ) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala ---------------------------------------------------------------------- diff --git a/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala b/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala index a3a0f58..2f2a40c 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/model/AnnoyModelFetcher.scala @@ -1,15 +1,13 @@ package org.apache.s2graph.core.model -import java.io.File - -import com.spotify.annoy.{ANNIndex, IndexType} +import annoy4s.Converters.KeyConverter +import annoy4s._ import com.typesafe.config.Config -import org.apache.s2graph.core.types.VertexId import org.apache.s2graph.core._ +import org.apache.s2graph.core.model.AnnoyModelFetcher.IndexFilePathKey +import org.apache.s2graph.core.types.VertexId import scala.concurrent.{ExecutionContext, Future} -import scala.io.Source -import scala.util.Try object AnnoyModelFetcher { val IndexFilePathKey = "annoyIndexFilePath" @@ -17,56 +15,70 @@ object AnnoyModelFetcher { val DimensionKey = "annoyIndexDimension" val IndexTypeKey = "annoyIndexType" - def loadDictFromLocal(file: File): Map[Int, String] = { - val files = if (file.isDirectory) { - file.listFiles() - } else { - Array(file) - } - - files.flatMap { file => - Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, _idx) => - val tokens = line.stripMargin.split(",") - try { - val tpl = if (tokens.length < 2) { - (_idx + 1, tokens.head) - } else { - (tokens.head.toInt, tokens.tail.head) - } - Seq(tpl) - } catch { - case e: Exception => Nil - } - } - }.toMap + // def loadDictFromLocal(file: File): Map[Int, String] = { + // val files = if (file.isDirectory) { + // file.listFiles() + // } else { + // Array(file) + // } + // + // files.flatMap { file => + // Source.fromFile(file).getLines().zipWithIndex.flatMap { case (line, _idx) => + // val tokens = line.stripMargin.split(",") + // try { + // val tpl = if (tokens.length < 2) { + // (tokens.head.toInt, tokens.head) + // } else { + // (tokens.head.toInt, tokens.tail.head) + // } + // Seq(tpl) + // } catch { + // case e: Exception => Nil + // } + // } + // }.toMap + // } + + def buildAnnoy4s[T](indexPath: String)(implicit converter: KeyConverter[T]): Annoy[T] = { + Annoy.load[T](indexPath) } - def buildIndex(config: Config): ANNIndexWithDict = { - val filePath = config.getString(IndexFilePathKey) - val dictPath = config.getString(DictFilePathKey) - - val dimension = config.getInt(DimensionKey) - val indexType = Try { config.getString(IndexTypeKey) }.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR) - - val dict = loadDictFromLocal(new File(dictPath)) - val index = new ANNIndex(dimension, filePath, indexType) - ANNIndexWithDict(index, dict) - } + // def buildIndex(indexPath: String, + // dictPath: String, + // dimension: Int, + // indexType: IndexType): ANNIndexWithDict = { + // val dict = loadDictFromLocal(new File(dictPath)) + // val index = new ANNIndex(dimension, indexPath, indexType) + // + // ANNIndexWithDict(index, dict) + // } + // + // def buildIndex(config: Config): ANNIndexWithDict = { + // val indexPath = config.getString(IndexFilePathKey) + // val dictPath = config.getString(DictFilePathKey) + // + // val dimension = config.getInt(DimensionKey) + // val indexType = Try { config.getString(IndexTypeKey) }.toOption.map(IndexType.valueOf).getOrElse(IndexType.ANGULAR) + // + // buildIndex(indexPath, dictPath, dimension, indexType) + // } } -case class ANNIndexWithDict(index: ANNIndex, dict: Map[Int, String]) { - val dictRev = dict.map(kv => kv._2 -> kv._1) -} +// +//case class ANNIndexWithDict(index: ANNIndex, dict: Map[Int, String]) { +// val dictRev = dict.map(kv => kv._2 -> kv._1) +//} class AnnoyModelFetcher(val graph: S2GraphLike) extends Fetcher { - import scala.collection.JavaConverters._ val builder = graph.elementBuilder - var model: ANNIndexWithDict = _ + // var model: ANNIndexWithDict = _ + var model: Annoy[String] = _ override def init(config: Config)(implicit ec: ExecutionContext): Future[Fetcher] = { Future { - model = AnnoyModelFetcher.buildIndex(config) + model = AnnoyModelFetcher.buildAnnoy4s(config.getString(IndexFilePathKey)) + // AnnoyModelFetcher.buildIndex(config) this } @@ -79,21 +91,32 @@ class AnnoyModelFetcher(val graph: S2GraphLike) extends Fetcher { val vertex = queryRequest.vertex val queryParam = queryRequest.queryParam - val srcIndexOpt = model.dictRev.get(vertex.innerId.toIdString()) + val edgeWithScores = model.query(vertex.innerId.toIdString(), queryParam.limit).getOrElse(Nil).map { case (tgtId, score) => + val tgtVertexId = builder.newVertexId(queryParam.label.service, + queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), tgtId) - srcIndexOpt.map { srcIdx => - val srcVector = model.index.getItemVector(srcIdx) - val nns = model.index.getNearest(srcVector, queryParam.limit).asScala + val edge = graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction) - val edges = nns.map { tgtIdx => - val tgtVertexId = builder.newVertexId(queryParam.label.service, - queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), model.dict(tgtIdx)) + EdgeWithScore(edge, score, queryParam.label) + } - graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction) - } - val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label)) - StepResult(edgeWithScores, Nil, Nil) - }.getOrElse(StepResult.Empty) + StepResult(edgeWithScores, Nil, Nil) + // + // val srcIndexOpt = model.dictRev.get(vertex.innerId.toIdString()) + // + // srcIndexOpt.map { srcIdx => + // val srcVector = model.index.getItemVector(srcIdx) + // val nns = model.index.getNearest(srcVector, queryParam.limit).asScala + // + // val edges = nns.map { tgtIdx => + // val tgtVertexId = builder.newVertexId(queryParam.label.service, + // queryParam.label.tgtColumnWithDir(queryParam.labelWithDir.dir), model.dict(tgtIdx)) + // + // graph.toEdge(vertex.innerId.value, tgtVertexId.innerId.value, queryParam.labelName, queryParam.direction) + // } + // val edgeWithScores = edges.map(e => EdgeWithScore(e, 1.0, queryParam.label)) + // StepResult(edgeWithScores, Nil, Nil) + // }.getOrElse(StepResult.Empty) } Future.successful(stepResultLs) http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/s2jobs/build.sbt ---------------------------------------------------------------------- diff --git a/s2jobs/build.sbt b/s2jobs/build.sbt index ac231c1..f647040 100644 --- a/s2jobs/build.sbt +++ b/s2jobs/build.sbt @@ -38,9 +38,7 @@ libraryDependencies ++= Seq( "org.apache.hadoop" % "hadoop-distcp" % hadoopVersion, "org.elasticsearch" % "elasticsearch-spark-20_2.11" % elastic4sVersion, "com.github.scopt" %% "scopt" % "3.7.0", - "com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % Test, - "net.pishen" %% "annoy4s" % annoy4sVersion, - "org.tensorflow" % "tensorflow" % tensorflowVersion + "com.holdenkarau" %% "spark-testing-base" % "2.3.0_0.9.0" % Test ) crossScalaVersions := Seq("2.10.6") http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala index 7b4b17b..9ffb341 100644 --- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala +++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcess.scala @@ -2,18 +2,21 @@ package org.apache.s2graph.s2jobs.task.custom.process import java.io.File -import annoy4s.{Angular, Annoy} +import annoy4s._ +//import org.apache.spark.ml.nn.Annoy + +//import annoy4s.{Angular, Annoy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.s2graph.s2jobs.task.{Sink, TaskConf} -import org.apache.spark.ml.recommendation.{ALS, ALSModel} +import org.apache.spark.ml.recommendation.ALS import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object ALSModelProcess { def runALS(ss: SparkSession, - conf: TaskConf, - dataFrame: DataFrame): DataFrame = { + conf: TaskConf, + dataFrame: DataFrame): DataFrame = { // als model params. val rank = conf.options.getOrElse("rank", "10").toInt val maxIter = conf.options.getOrElse("maxIter", "5").toInt @@ -37,6 +40,17 @@ object ALSModelProcess { model.itemFactors } +// def buildAnnoyIndex(conf: TaskConf, +// dataFrame: DataFrame): Unit = { +// val ann = new Annoy() +// .setNumTrees(2) +// .setFraction(0.1) +// .setIdCol("id") +// .setFeaturesCol("features") +// +// val itemAnnModel = ann.fit(dataFrame) +// itemAnnModel.saveAsAnnoyBinary(conf.options("itemFactors")) +// } def buildAnnoyIndex(conf: TaskConf, dataFrame: DataFrame): Unit = { // annoy tree params. http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/08a80cdd/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala ---------------------------------------------------------------------- diff --git a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala index d16ebf0..a8479fe 100644 --- a/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala +++ b/s2jobs/src/test/scala/org/apache/s2graph/s2jobs/task/custom/process/ALSModelProcessTest.scala @@ -8,7 +8,7 @@ import org.apache.commons.io.FileUtils import org.apache.s2graph.core.Integrate.IntegrateCommon import org.apache.s2graph.core.Management.JsonModel.{Index, Prop} import org.apache.s2graph.core.{Query, QueryParam} -import org.apache.s2graph.core.model.{AnnoyModelFetcher, HDFSImporter, ModelManager} +import org.apache.s2graph.core.model.{ANNIndexWithDict, AnnoyModelFetcher, HDFSImporter, ModelManager} import org.apache.s2graph.core.schema.Label import org.apache.s2graph.s2jobs.task.TaskConf @@ -101,4 +101,128 @@ class ALSModelProcessTest extends IntegrateCommon with DataFrameSuiteBase { // FileUtils.deleteDirectory(new File(outputPath)) // } + def annoyLabelOptions(indexPath: String, dictPath: String): String = { + val options = s"""{ + | "importer": { + | "${ModelManager.ImporterClassNameKey}": "org.apache.s2graph.core.model.IdentityImporter" + | }, + | "fetcher": { + | "${ModelManager.FetcherClassNameKey}": "org.apache.s2graph.core.model.AnnoyModelFetcher", + | "${AnnoyModelFetcher.IndexFilePathKey}": "${indexPath}", + | "${AnnoyModelFetcher.DictFilePathKey}": "${dictPath}", + | "${AnnoyModelFetcher.DimensionKey}": 10 + | } + |}""".stripMargin + options + } + def labelImport(labelName: String, indexPath: String, dictPath: String): Label = { + val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get + val serviceColumn = + management.createServiceColumn("s2graph", "movie", "string", Seq(Prop("age", "0", "int", true))) + + val options = annoyLabelOptions(indexPath, dictPath) + + Label.findByName(labelName, useCache = false).foreach { label => Label.delete(label.id.get) } + + val label = management.createLabel( + labelName, + serviceColumn, + serviceColumn, + true, + service.serviceName, + Seq.empty[Index].asJava, + Seq.empty[Prop].asJava, + "strong", + null, + -1, + "v3", + "gz", + options + ) + + val config = ConfigFactory.parseString(options) + val importerFuture = graph.modelManager.importModel(label, config)(ExecutionContext.Implicits.global) + Await.result(importerFuture, Duration("3 minutes")) + + Thread.sleep(10000) + + label + } + + def buildALS(ratingsPath: String, indexPath: String) = { + import spark.sqlContext.implicits._ + + FileUtils.deleteQuietly(new File(indexPath)) + + val buffer = scala.collection.mutable.ListBuffer.empty[(Int, Int, Float)] + + val lines = Source.fromFile(ratingsPath).getLines() + // skip over header. + lines.next() + + while (lines.hasNext) { + val line = lines.next() + try { + val Array(userId, movieId, rating, ts) = line.split(",") + buffer += ((userId.toInt, movieId.toInt, rating.toFloat)) + } catch { + case e: Exception => // skip over. + } + } + + val rating = buffer.toDF("userId", "movieId", "rating") + + val processConf = TaskConf(name = "test", `type` = "test", inputs = Nil, + options = Map.empty) + + val process = new ALSModelProcess(processConf) + val df = process.execute(spark, Map("test" -> rating)) + + val sinkConf = TaskConf(name = "sink", `type` = "sink", inputs = Nil, + options = Map("path" -> indexPath, "itemFactors" -> indexPath)) + + val sink = new AnnoyIndexBuildSink("sinkTest", sinkConf) + sink.write(df) + } + + test("ALS ModelProcess and AnnoyIndexBuildSink") { + import spark.sqlContext.implicits._ + + val inputPath = "/Users/shon/Workspace/incubator-s2graph/example/movielens/input/ratings.csv" + val indexPath = "./annoy_result" + val dictPath = "./example/movielens/input/movie.dict" + + buildALS(inputPath, indexPath) + + val labelName = "annoy_index_test" + val label = labelImport(labelName, indexPath, dictPath) +// val options = annoyLabelOptions(indexPath, dictPath) +// +// val config = ConfigFactory.parseString(label.options.get).getConfig("fetcher") +// val config = ConfigFactory.parseString(options).getConfig("fetcher") + +// val ANNIndexWithDict(index, dict) = AnnoyModelFetcher.buildIndex(config) +// val v = index.getItemVector(1) +// +// import scala.collection.JavaConverters._ +// index.getNearest(v, 10).asScala.foreach { x => +// println(x) +// } + + +// + val service = management.createService("s2graph", "localhost", "s2graph_htable", -1, None).get + val serviceColumn = + management.createServiceColumn("s2graph", "user", "string", Seq(Prop("age", "0", "int", true))) + + val vertex = graph.elementBuilder.toVertex(service.serviceName, serviceColumn.columnName, "1") + val queryParam = QueryParam(labelName = labelName, limit = 5) + + val query = Query.toQuery(srcVertices = Seq(vertex), queryParams = Seq(queryParam)) + val stepResult = Await.result(graph.getEdges(query), Duration("60 seconds")) + + stepResult.edgeWithScores.foreach { es => + println(es.edge.tgtVertex.innerIdVal) + } + } }
