http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSAlgorithm.scala deleted file mode 100644 index 917153b..0000000 --- a/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSAlgorithm.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.PAlgorithm -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.BiMap - -import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.recommendation.ALS -import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} - -import grizzled.slf4j.Logger - -case class ALSAlgorithmParams(rank: Int, numIterations: Int, lambda: Double, - seed: Option[Long]) extends Params - -/** - * Use ALS to build item x feature matrix - */ -class ALSAlgorithm(val ap: ALSAlgorithmParams) - extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] { - - @transient lazy val logger = Logger[this.type] - - def train(data: PreparedData): ALSModel = { - require(!data.ratings.take(1).isEmpty, - s"viewEvents in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - require(!data.items.take(1).isEmpty, - s"items in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - // create item's String ID to integer index BiMap - val itemStringIntMap = BiMap.stringInt(data.items.keys) - val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user)) - - // HOWTO: collect Item as Map and convert ID to Int index - val items: Map[Int, Item] = data.items.map { case (id, item) â - (itemStringIntMap(id), item) - }.collectAsMap.toMap - - val mllibRatings = data.ratings.map { r => - // Convert user and item String IDs to Int index for MLlib - val iindex = itemStringIntMap.getOrElse(r.item, -1) - val uindex = userStringIntMap.getOrElse(r.user, -1) - - if (iindex == -1) - logger.info(s"Couldn't convert nonexistent item ID ${r.item}" - + " to Int index.") - - (uindex -> iindex) -> 1 - }.filter { case ((u, i), v) => (i != -1) && (u != -1) } - .reduceByKey(_ + _) // aggregate all view events of same item - .map { case ((u, i), v) => MLlibRating(u, i, v) } - - // MLLib ALS cannot handle empty training data. - require(!mllibRatings.take(1).isEmpty, - s"mllibRatings cannot be empty." + - " Please check if your events contain valid user and item ID.") - - // seed for MLlib ALS - val seed = ap.seed.getOrElse(System.nanoTime) - - val m = ALS.trainImplicit( - ratings = mllibRatings, - rank = ap.rank, - iterations = ap.numIterations, - lambda = ap.lambda, - blocks = -1, - alpha = 1.0, - seed = seed) - - new ALSModel(productFeatures = m.productFeatures, - itemStringIntMap = itemStringIntMap, items = items) - } - - def predict(model: ALSModel, query: Query): PredictedResult = { - val queryFeatures = - model.items.keys.flatMap(model.productFeatures.lookup(_).headOption) - - val indexScores = if (queryFeatures.isEmpty) { - logger.info(s"No productFeatures found for query ${query}.") - Array[(Int, Double)]() - } else { - model.productFeatures.mapValues { f â - queryFeatures.map(cosine(_, f)).reduce(_ + _) - }.filter(_._2 > 0) // keep items with score > 0 - .collect() - } - - // HOWTO: filter predicted results by query. - val filteredScores = filterItems(indexScores, model.items, query) - - implicit val ord = Ordering.by[(Int, Double), Double](_._2) - val topScores = getTopN(filteredScores, query.num).toArray - - val itemScores = topScores.map { case (i, s) â - new ItemScore(item = model.itemIntStringMap(i), score = s, - creationYear = model.items(i).creationYear) - } - - new PredictedResult(itemScores) - } - - private def getTopN[T](s: Seq[T], n: Int) - (implicit ord: Ordering[T]): Iterable[T] = { - - var result = List.empty[T] - - for (x <- s) { - if (result.size < n) - result = x :: result - else { - val min = result.min - if (ord.compare(x, min) < 0) { - result = x :: result.filter(_ != min) - } - } - } - - result.sorted.reverse - } - - private def cosine(v1: Array[Double], v2: Array[Double]): Double = { - val size = v1.size - var i = 0 - var n1: Double = 0 - var n2: Double = 0 - var d: Double = 0 - while (i < size) { - n1 += v1(i) * v1(i) - n2 += v2(i) * v2(i) - d += v1(i) * v2(i) - i += 1 - } - val n1n2 = (math.sqrt(n1) * math.sqrt(n2)) - if (n1n2 == 0) 0 else (d / n1n2) - } - - // HOWTO: actual filter of predicted movie results. - // filter selects all movies - // that were made after the year specified in the query - private def filterItems(selectedScores: Array[(Int, Double)], - items: Map[Int, Item], - query: Query) = - selectedScores.view.filter { case (iId, _) â - items(iId).creationYear.map(icr â query.creationYear.forall(icr >= _)) - .getOrElse(true) - } -}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSModel.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSModel.scala deleted file mode 100644 index d463204..0000000 --- a/examples/scala-parallel-recommendation/custom-query/src/main/scala/ALSModel.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.IPersistentModel -import org.apache.predictionio.controller.IPersistentModelLoader -import org.apache.predictionio.data.storage.BiMap - -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -class ALSModel( - val productFeatures: RDD[(Int, Array[Double])], - val itemStringIntMap: BiMap[String, Int], - // HOWTO: added a map of `generatedItemIntId -> Item` to the algo data model. - val items: Map[Int, Item]) - extends IPersistentModel[ALSAlgorithmParams] with Serializable { - - @transient lazy val itemIntStringMap = itemStringIntMap.inverse - - def save(id: String, params: ALSAlgorithmParams, - sc: SparkContext): Boolean = { - - productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures") - sc.parallelize(Seq(itemStringIntMap)) - .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap") - // HOWTO: save items too as part of algo model - sc.parallelize(Seq(items)) - .saveAsObjectFile(s"/tmp/${id}/items") - true - } - - override def toString = { - s" productFeatures: [${productFeatures.count()}]" + - s"(${productFeatures.take(2).toList}...)" + - s" itemStringIntMap: [${itemStringIntMap.size}]" + - s"(${itemStringIntMap.take(2).toString}...)]" + - s" items: [${items.size}]" + - s"(${items.take(2).toString}...)]" - } -} - -object ALSModel extends IPersistentModelLoader[ALSAlgorithmParams, ALSModel] { - def apply(id: String, params: ALSAlgorithmParams, sc: Option[SparkContext]) = - new ALSModel( - productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"), - itemStringIntMap = sc.get - .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first, - // HOWTO: read items too as part of algo model - items = sc.get - .objectFile[Map[Int, Item]](s"/tmp/${id}/items").first) -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-query/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/custom-query/src/main/scala/DataSource.scala deleted file mode 100644 index 942afb1..0000000 --- a/examples/scala-parallel-recommendation/custom-query/src/main/scala/DataSource.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.PDataSource -import org.apache.predictionio.controller.EmptyEvaluationInfo -import org.apache.predictionio.controller.EmptyActualResult -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.{DataMap, Event, Storage} - -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -import grizzled.slf4j.Logger - -case class DataSourceParams(appId: Int) extends Params - -case class Item(creationYear: Option[Int]) -object Item { - object Fields { - val CreationYear = "creationYear" - } -} - -class DataSource(val dsp: DataSourceParams) - extends PDataSource[TrainingData, - EmptyEvaluationInfo, Query, EmptyActualResult] { - - @transient lazy val logger = Logger[this.type] - private lazy val EntityType = "movie" - - override - def readTraining(sc: SparkContext): TrainingData = { - val eventsDb = Storage.getPEvents() - - // create a RDD of (entityID, Item) - // HOWTO: collecting items(movies) - val itemsRDD = eventsDb.aggregateProperties( - appId = dsp.appId, - entityType = "item" - )(sc).flatMap { case (entityId, properties) â - ItemMarshaller.unmarshall(properties).map(entityId â _) - } - - // get all user rate events - val rateEventsRDD: RDD[Event] = eventsDb.find( - appId = dsp.appId, - entityType = Some("user"), - eventNames = Some(List("rate")), // read "rate" - // targetEntityType is optional field of an event. - targetEntityType = Some(Some(EntityType)))(sc) - - // collect ratings - val ratingsRDD = rateEventsRDD.flatMap { event â - try { - (event.event match { - case "rate" => event.properties.getOpt[Double]("rating") - case _ â None - }).map(Rating(event.entityId, event.targetEntityId.get, _)) - } catch { case e: Exception â - logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.") - throw e - } - }.cache() - - new TrainingData(ratingsRDD, itemsRDD) - } -} - -object ItemMarshaller { - // HOWTO: implemented unmarshaller to collect properties for filtering. - def unmarshall(properties: DataMap): Option[Item] = - Some(Item(properties.getOpt[Int](Item.Fields.CreationYear))) -} - -case class Rating(user: String, item: String, rating: Double) - -class TrainingData(val ratings: RDD[Rating], val items: RDD[(String, Item)]) - extends Serializable { - - override def toString = - s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)" + - s"items: [${items.count()} (${items.take(2).toList}...)]" -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-query/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/custom-query/src/main/scala/Engine.scala deleted file mode 100644 index 369128b..0000000 --- a/examples/scala-parallel-recommendation/custom-query/src/main/scala/Engine.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.Engine - -case class Query(user: String, num: Int, creationYear: Option[Int] = None) - -case class PredictedResult(itemScores: Array[ItemScore]) - -// HOWTO: added movie creation year to predicted result. -case class ItemScore(item: String, score: Double, creationYear: Option[Int]) - -object RecommendationEngine extends IEngineFactory { - def apply() = - new Engine(classOf[DataSource], - classOf[Preparator], - Map("als" â classOf[ALSAlgorithm]), - classOf[Serving]) -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-query/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/custom-query/src/main/scala/Preparator.scala deleted file mode 100644 index ef40ac6..0000000 --- a/examples/scala-parallel-recommendation/custom-query/src/main/scala/Preparator.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.PPreparator - -import org.apache.spark.SparkContext -import org.apache.spark.rdd.RDD - -class Preparator extends PPreparator[TrainingData, PreparedData] { - def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = - new PreparedData(ratings = trainingData.ratings, items = trainingData.items) -} - -// HOWTO: added items(movies) list to prepared data to have possiblity to sort -// them in predict stage. -class PreparedData(val ratings: RDD[Rating], val items: RDD[(String, Item)]) - extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-query/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-query/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/custom-query/src/main/scala/Serving.scala deleted file mode 100644 index c14137d..0000000 --- a/examples/scala-parallel-recommendation/custom-query/src/main/scala/Serving.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.LServing - -class Serving extends LServing[Query, PredictedResult] { - - override def serve(query: Query, - predictedResults: Seq[PredictedResult]): PredictedResult = - predictedResults.headOption.map { result â - val preparedItems = result.itemScores - .sortBy { case ItemScore(item, score, year) â year }( - Ordering.Option[Int].reverse) - new PredictedResult(preparedItems) - }.getOrElse(new PredictedResult(Array.empty[ItemScore])) -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/.gitignore ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-serving/.gitignore b/examples/scala-parallel-recommendation/custom-serving/.gitignore deleted file mode 100644 index df6668d..0000000 --- a/examples/scala-parallel-recommendation/custom-serving/.gitignore +++ /dev/null @@ -1,5 +0,0 @@ -data/sample_movielens_data.txt -manifest.json -target/ -/pio.sbt -pio.log http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/build.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-serving/build.sbt b/examples/scala-parallel-recommendation/custom-serving/build.sbt deleted file mode 100644 index 81cd3ec..0000000 --- a/examples/scala-parallel-recommendation/custom-serving/build.sbt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import AssemblyKeys._ - -assemblySettings - -name := "template-scala-parallel-recommendation" - -organization := "org.apache.predictionio" - -libraryDependencies ++= Seq( - "org.apache.predictionio" %% "core" % pioVersion.value % "provided", - "org.apache.spark" %% "spark-core" % "1.2.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/data/sample_disabled_items.txt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-serving/data/sample_disabled_items.txt b/examples/scala-parallel-recommendation/custom-serving/data/sample_disabled_items.txt deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/engine.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-serving/engine.json b/examples/scala-parallel-recommendation/custom-serving/engine.json deleted file mode 100644 index 54ea3e8..0000000 --- a/examples/scala-parallel-recommendation/custom-serving/engine.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "id": "default", - "description": "Default settings", - "engineFactory": "org.template.recommendation.RecommendationEngine", - "datasource": { - "params": { - "appId": 1 - } - }, - "algorithms": [ - { - "name": "als", - "params": { - "rank": 10, - "numIterations": 20, - "lambda": 0.01, - "seed": 3 - } - } - ], - "serving": { - "params": { - "filepath": "./data/sample_disabled_items.txt" - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-serving/project/assembly.sbt b/examples/scala-parallel-recommendation/custom-serving/project/assembly.sbt deleted file mode 100644 index 54c3252..0000000 --- a/examples/scala-parallel-recommendation/custom-serving/project/assembly.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/project/pio-build.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-serving/project/pio-build.sbt b/examples/scala-parallel-recommendation/custom-serving/project/pio-build.sbt deleted file mode 100644 index 9aed0ee..0000000 --- a/examples/scala-parallel-recommendation/custom-serving/project/pio-build.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("org.apache.predictionio" % "pio-build" % "0.9.0") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSAlgorithm.scala deleted file mode 100644 index 22904af..0000000 --- a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSAlgorithm.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.PAlgorithm -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.BiMap - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.recommendation.ALS -import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} -import org.apache.spark.mllib.recommendation.ALSModel - -import grizzled.slf4j.Logger - -case class ALSAlgorithmParams( - rank: Int, - numIterations: Int, - lambda: Double, - seed: Option[Long]) extends Params - -class ALSAlgorithm(val ap: ALSAlgorithmParams) - extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] { - - @transient lazy val logger = Logger[this.type] - - def train(sc: SparkContext, data: PreparedData): ALSModel = { - // MLLib ALS cannot handle empty training data. - require(!data.ratings.take(1).isEmpty, - s"RDD[Rating] in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - // Convert user and item String IDs to Int index for MLlib - val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user)) - val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item)) - val mllibRatings = data.ratings.map( r => - // MLlibRating requires integer index for user and item - MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating) - ) - - // seed for MLlib ALS - val seed = ap.seed.getOrElse(System.nanoTime) - - // If you only have one type of implicit event (Eg. "view" event only), - // replace ALS.train(...) with - //val m = ALS.trainImplicit( - //ratings = mllibRatings, - //rank = ap.rank, - //iterations = ap.numIterations, - //lambda = ap.lambda, - //blocks = -1, - //alpha = 1.0, - //seed = seed) - - val m = ALS.train( - ratings = mllibRatings, - rank = ap.rank, - iterations = ap.numIterations, - lambda = ap.lambda, - blocks = -1, - seed = seed) - - new ALSModel( - rank = m.rank, - userFeatures = m.userFeatures, - productFeatures = m.productFeatures, - userStringIntMap = userStringIntMap, - itemStringIntMap = itemStringIntMap) - } - - def predict(model: ALSModel, query: Query): PredictedResult = { - // Convert String ID to Int index for Mllib - model.userStringIntMap.get(query.user).map { userInt => - // create inverse view of itemStringIntMap - val itemIntStringMap = model.itemStringIntMap.inverse - // recommendProducts() returns Array[MLlibRating], which uses item Int - // index. Convert it to String ID for returning PredictedResult - val itemScores = model.recommendProducts(userInt, query.num) - .map (r => ItemScore(itemIntStringMap(r.product), r.rating)) - new PredictedResult(itemScores) - }.getOrElse{ - logger.info(s"No prediction for unknown user ${query.user}.") - new PredictedResult(Array.empty) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSModel.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSModel.scala deleted file mode 100644 index 4697732..0000000 --- a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/ALSModel.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.recommendation -// This must be the same package as Spark's MatrixFactorizationModel because -// MatrixFactorizationModel's constructor is private and we are using -// its constructor in order to save and load the model - -import org.template.recommendation.ALSAlgorithmParams - -import org.apache.predictionio.controller.IPersistentModel -import org.apache.predictionio.controller.IPersistentModelLoader -import org.apache.predictionio.data.storage.BiMap - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class ALSModel( - override val rank: Int, - override val userFeatures: RDD[(Int, Array[Double])], - override val productFeatures: RDD[(Int, Array[Double])], - val userStringIntMap: BiMap[String, Int], - val itemStringIntMap: BiMap[String, Int]) - extends MatrixFactorizationModel(rank, userFeatures, productFeatures) - with IPersistentModel[ALSAlgorithmParams] { - - def save(id: String, params: ALSAlgorithmParams, - sc: SparkContext): Boolean = { - - sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank") - userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures") - productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures") - sc.parallelize(Seq(userStringIntMap)) - .saveAsObjectFile(s"/tmp/${id}/userStringIntMap") - sc.parallelize(Seq(itemStringIntMap)) - .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap") - true - } - - override def toString = { - s"userFeatures: [${userFeatures.count()}]" + - s"(${userFeatures.take(2).toList}...)" + - s" productFeatures: [${productFeatures.count()}]" + - s"(${productFeatures.take(2).toList}...)" + - s" userStringIntMap: [${userStringIntMap.size}]" + - s"(${userStringIntMap.take(2)}...)" + - s" itemStringIntMap: [${itemStringIntMap.size}]" + - s"(${itemStringIntMap.take(2)}...)" - } -} - -object ALSModel - extends IPersistentModelLoader[ALSAlgorithmParams, ALSModel] { - def apply(id: String, params: ALSAlgorithmParams, - sc: Option[SparkContext]) = { - new ALSModel( - rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first, - userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"), - productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"), - userStringIntMap = sc.get - .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first, - itemStringIntMap = sc.get - .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/custom-serving/src/main/scala/DataSource.scala deleted file mode 100644 index 1d3f0df..0000000 --- a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/DataSource.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.PDataSource -import org.apache.predictionio.controller.EmptyEvaluationInfo -import org.apache.predictionio.controller.EmptyActualResult -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.Storage - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import grizzled.slf4j.Logger - -case class DataSourceParams(appId: Int) extends Params - -class DataSource(val dsp: DataSourceParams) - extends PDataSource[TrainingData, - EmptyEvaluationInfo, Query, EmptyActualResult] { - - @transient lazy val logger = Logger[this.type] - - override - def readTraining(sc: SparkContext): TrainingData = { - val eventsDb = Storage.getPEvents() - val eventsRDD: RDD[Event] = eventsDb.find( - appId = dsp.appId, - entityType = Some("user"), - eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event - // targetEntityType is optional field of an event. - targetEntityType = Some(Some("item")))(sc) - - val ratingsRDD: RDD[Rating] = eventsRDD.map { event => - val rating = try { - val ratingValue: Double = event.event match { - case "rate" => event.properties.get[Double]("rating") - case "buy" => 4.0 // map buy event to rating value of 4 - case _ => throw new Exception(s"Unexpected event ${event} is read.") - } - // entityId and targetEntityId is String - Rating(event.entityId, - event.targetEntityId.get, - ratingValue) - } catch { - case e: Exception => { - logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.") - throw e - } - } - rating - } - new TrainingData(ratingsRDD) - } -} - -case class Rating( - user: String, - item: String, - rating: Double -) - -class TrainingData( - val ratings: RDD[Rating] -) extends Serializable { - override def toString = { - s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)" - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Engine.scala deleted file mode 100644 index 1446ca4..0000000 --- a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Engine.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.Engine - -case class Query( - user: String, - num: Int -) - -case class PredictedResult( - itemScores: Array[ItemScore] -) - -case class ItemScore( - item: String, - score: Double -) - -object RecommendationEngine extends IEngineFactory { - def apply() = { - new Engine( - classOf[DataSource], - classOf[Preparator], - Map("als" -> classOf[ALSAlgorithm]), - classOf[Serving]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Preparator.scala deleted file mode 100644 index 4cd812c..0000000 --- a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Preparator.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.PPreparator - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class Preparator - extends PPreparator[TrainingData, PreparedData] { - - def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { - new PreparedData(ratings = trainingData.ratings) - } -} - -case class PreparedData( - ratings: RDD[Rating] -) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Serving.scala deleted file mode 100644 index 52d40e1..0000000 --- a/examples/scala-parallel-recommendation/custom-serving/src/main/scala/Serving.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.LServing - -import scala.io.Source - -import org.apache.predictionio.controller.Params // ADDED - -// ADDED ServingParams to specify the blacklisting file location. -case class ServingParams(filepath: String) extends Params - -class Serving(val params: ServingParams) - extends LServing[Query, PredictedResult] { - - override - def serve(query: Query, predictedResults: Seq[PredictedResult]) - : PredictedResult = { - val disabledProducts: Set[String] = Source - .fromFile(params.filepath) - .getLines - .toSet - - val itemScores = predictedResults.head.itemScores - PredictedResult(itemScores.filter(ps => !disabledProducts(ps.item))) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/.gitignore ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/.gitignore b/examples/scala-parallel-recommendation/customize-data-prep/.gitignore new file mode 100644 index 0000000..3f3403a --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/.gitignore @@ -0,0 +1,5 @@ +data/sample_movielens_data.txt +manifest.json +target/ +pio.log +/pio.sbt \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/build.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/build.sbt b/examples/scala-parallel-recommendation/customize-data-prep/build.sbt new file mode 100644 index 0000000..3d25df1 --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/build.sbt @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +name := "template-scala-parallel-recommendation" + +organization := "org.apache.predictionio" +scalaVersion := "2.11.8" +libraryDependencies ++= Seq( + "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided", + "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/data/import_eventserver.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/data/import_eventserver.py b/examples/scala-parallel-recommendation/customize-data-prep/data/import_eventserver.py new file mode 100644 index 0000000..63694cf --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/data/import_eventserver.py @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Import sample data for recommendation engine +""" + +import predictionio +import argparse +import random + +RATE_ACTIONS_DELIMITER = "::" +SEED = 3 + +def import_events(client, file): + f = open(file, 'r') + random.seed(SEED) + count = 0 + print("Importing data...") + for line in f: + data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER) + # For demonstration purpose, randomly mix in some buy events + if (random.randint(0, 1) == 1): + client.create_event( + event="rate", + entity_type="user", + entity_id=data[0], + target_entity_type="item", + target_entity_id=data[1], + properties= { "rating" : float(data[2]) } + ) + else: + client.create_event( + event="buy", + entity_type="user", + entity_id=data[0], + target_entity_type="item", + target_entity_id=data[1] + ) + count += 1 + f.close() + print("%s events are imported." % count) + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Import sample data for recommendation engine") + parser.add_argument('--access_key', default='invald_access_key') + parser.add_argument('--url', default="http://localhost:7070") + parser.add_argument('--file', default="./data/sample_movielens_data.txt") + + args = parser.parse_args() + print(args) + + client = predictionio.EventClient( + access_key=args.access_key, + url=args.url, + threads=5, + qsize=500) + import_events(client, args.file) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/data/sample_not_train_data.txt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/data/sample_not_train_data.txt b/examples/scala-parallel-recommendation/customize-data-prep/data/sample_not_train_data.txt new file mode 100644 index 0000000..077e96b --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/data/sample_not_train_data.txt @@ -0,0 +1,8 @@ +3 +4 +10 +22 +34 +54 +65 +89 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/data/send_query.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/data/send_query.py b/examples/scala-parallel-recommendation/customize-data-prep/data/send_query.py new file mode 100644 index 0000000..f6ec9ab --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/data/send_query.py @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Send sample query to prediction engine +""" + +import predictionio +engine_client = predictionio.EngineClient(url="http://localhost:8000") +print(engine_client.send_query({"user": "1", "num": 4})) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/engine.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/engine.json b/examples/scala-parallel-recommendation/customize-data-prep/engine.json new file mode 100644 index 0000000..23fa1c9 --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/engine.json @@ -0,0 +1,26 @@ +{ + "id": "default", + "description": "Default settings", + "engineFactory": "org.apache.predictionio.examples.recommendation.RecommendationEngine", + "datasource": { + "params" : { + "appName": "MyApp1" + } + }, + "preparator": { + "params": { + "filepath": "./data/sample_not_train_data.txt" + } + }, + "algorithms": [ + { + "name": "als", + "params": { + "rank": 10, + "numIterations": 20, + "lambda": 0.01, + "seed": 3 + } + } + ] +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/project/assembly.sbt b/examples/scala-parallel-recommendation/customize-data-prep/project/assembly.sbt new file mode 100644 index 0000000..92636bc --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/project/build.properties ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/project/build.properties b/examples/scala-parallel-recommendation/customize-data-prep/project/build.properties new file mode 100644 index 0000000..64317fd --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.15 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala new file mode 100644 index 0000000..b0f874d --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSAlgorithm.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.PAlgorithm +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data.storage.BiMap + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.recommendation.ALS +import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} +import org.apache.spark.mllib.recommendation.ALSModel + +import grizzled.slf4j.Logger + +case class ALSAlgorithmParams( + rank: Int, + numIterations: Int, + lambda: Double, + seed: Option[Long]) extends Params + +class ALSAlgorithm(val ap: ALSAlgorithmParams) + extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] { + + @transient lazy val logger = Logger[this.type] + + if (ap.numIterations > 30) { + logger.warn( + s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " + + s"There is a chance of running to StackOverflowException." + + s"To remedy it, set lower numIterations or checkpoint parameters.") + } + + def train(sc: SparkContext, data: PreparedData): ALSModel = { + // MLLib ALS cannot handle empty training data. + require(!data.ratings.take(1).isEmpty, + s"RDD[Rating] in PreparedData cannot be empty." + + " Please check if DataSource generates TrainingData" + + " and Preparator generates PreparedData correctly.") + // Convert user and item String IDs to Int index for MLlib + + val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user)) + val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item)) + val mllibRatings = data.ratings.map( r => + // MLlibRating requires integer index for user and item + MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating) + ) + + // seed for MLlib ALS + val seed = ap.seed.getOrElse(System.nanoTime) + + // Set checkpoint directory + // sc.setCheckpointDir("checkpoint") + + // If you only have one type of implicit event (Eg. "view" event only), + // set implicitPrefs to true + val implicitPrefs = false + val als = new ALS() + als.setUserBlocks(-1) + als.setProductBlocks(-1) + als.setRank(ap.rank) + als.setIterations(ap.numIterations) + als.setLambda(ap.lambda) + als.setImplicitPrefs(implicitPrefs) + als.setAlpha(1.0) + als.setSeed(seed) + als.setCheckpointInterval(10) + val m = als.run(mllibRatings) + + new ALSModel( + rank = m.rank, + userFeatures = m.userFeatures, + productFeatures = m.productFeatures, + userStringIntMap = userStringIntMap, + itemStringIntMap = itemStringIntMap) + } + + def predict(model: ALSModel, query: Query): PredictedResult = { + // Convert String ID to Int index for Mllib + model.userStringIntMap.get(query.user).map { userInt => + // create inverse view of itemStringIntMap + val itemIntStringMap = model.itemStringIntMap.inverse + // recommendProducts() returns Array[MLlibRating], which uses item Int + // index. Convert it to String ID for returning PredictedResult + val itemScores = model.recommendProducts(userInt, query.num) + .map (r => ItemScore(itemIntStringMap(r.product), r.rating)) + new PredictedResult(itemScores) + }.getOrElse{ + logger.info(s"No prediction for unknown user ${query.user}.") + new PredictedResult(Array.empty) + } + } + + // This function is used by the evaluation module, where a batch of queries is sent to this engine + // for evaluation purpose. + override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = { + val userIxQueries: RDD[(Int, (Long, Query))] = queries + .map { case (ix, query) => { + // If user not found, then the index is -1 + val userIx = model.userStringIntMap.get(query.user).getOrElse(-1) + (userIx, (ix, query)) + }} + + // Cross product of all valid users from the queries and products in the model. + val usersProducts: RDD[(Int, Int)] = userIxQueries + .keys + .filter(_ != -1) + .cartesian(model.productFeatures.map(_._1)) + + // Call mllib ALS's predict function. + val ratings: RDD[MLlibRating] = model.predict(usersProducts) + + // The following code construct predicted results from mllib's ratings. + // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue + val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user) + + userIxQueries.leftOuterJoin(userRatings) + .map { + // When there are ratings + case (userIx, ((ix, query), Some(ratings))) => { + val topItemScores: Array[ItemScore] = ratings + .toArray + .sortBy(_.rating)(Ordering.Double.reverse) // note: from large to small ordering + .take(query.num) + .map { rating => ItemScore( + model.itemStringIntMap.inverse(rating.product), + rating.rating) } + + (ix, PredictedResult(itemScores = topItemScores)) + } + // When user doesn't exist in training data + case (userIx, ((ix, query), None)) => { + require(userIx == -1) + (ix, PredictedResult(itemScores = Array.empty)) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSModel.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSModel.scala new file mode 100644 index 0000000..898858d --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/ALSModel.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.recommendation +// This must be the same package as Spark's MatrixFactorizationModel because +// MatrixFactorizationModel's constructor is private and we are using +// its constructor in order to save and load the model + +import org.apache.predictionio.examples.recommendation.ALSAlgorithmParams + +import org.apache.predictionio.controller.PersistentModel +import org.apache.predictionio.controller.PersistentModelLoader +import org.apache.predictionio.data.storage.BiMap + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +class ALSModel( + override val rank: Int, + override val userFeatures: RDD[(Int, Array[Double])], + override val productFeatures: RDD[(Int, Array[Double])], + val userStringIntMap: BiMap[String, Int], + val itemStringIntMap: BiMap[String, Int]) + extends MatrixFactorizationModel(rank, userFeatures, productFeatures) + with PersistentModel[ALSAlgorithmParams] { + + def save(id: String, params: ALSAlgorithmParams, + sc: SparkContext): Boolean = { + + sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank") + userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures") + productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures") + sc.parallelize(Seq(userStringIntMap)) + .saveAsObjectFile(s"/tmp/${id}/userStringIntMap") + sc.parallelize(Seq(itemStringIntMap)) + .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap") + true + } + + override def toString = { + s"userFeatures: [${userFeatures.count()}]" + + s"(${userFeatures.take(2).toList}...)" + + s" productFeatures: [${productFeatures.count()}]" + + s"(${productFeatures.take(2).toList}...)" + + s" userStringIntMap: [${userStringIntMap.size}]" + + s"(${userStringIntMap.take(2)}...)" + + s" itemStringIntMap: [${itemStringIntMap.size}]" + + s"(${itemStringIntMap.take(2)}...)" + } +} + +object ALSModel + extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] { + def apply(id: String, params: ALSAlgorithmParams, + sc: Option[SparkContext]) = { + new ALSModel( + rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first, + userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"), + productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"), + userStringIntMap = sc.get + .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first, + itemStringIntMap = sc.get + .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/DataSource.scala new file mode 100644 index 0000000..d606ad3 --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/DataSource.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.PDataSource +import org.apache.predictionio.controller.EmptyEvaluationInfo +import org.apache.predictionio.controller.EmptyActualResult +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.store.PEventStore + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import grizzled.slf4j.Logger + +case class DataSourceEvalParams(kFold: Int, queryNum: Int) + +case class DataSourceParams( + appName: String, + evalParams: Option[DataSourceEvalParams]) extends Params + +class DataSource(val dsp: DataSourceParams) + extends PDataSource[TrainingData, + EmptyEvaluationInfo, Query, ActualResult] { + + @transient lazy val logger = Logger[this.type] + + def getRatings(sc: SparkContext): RDD[Rating] = { + + val eventsRDD: RDD[Event] = PEventStore.find( + appName = dsp.appName, + entityType = Some("user"), + eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event + // targetEntityType is optional field of an event. + targetEntityType = Some(Some("item")))(sc) + + val ratingsRDD: RDD[Rating] = eventsRDD.map { event => + val rating = try { + val ratingValue: Double = event.event match { + case "rate" => event.properties.get[Double]("rating") + case "buy" => 4.0 // map buy event to rating value of 4 + case _ => throw new Exception(s"Unexpected event ${event} is read.") + } + // entityId and targetEntityId is String + Rating(event.entityId, + event.targetEntityId.get, + ratingValue) + } catch { + case e: Exception => { + logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.") + throw e + } + } + rating + }.cache() + + ratingsRDD + } + + override + def readTraining(sc: SparkContext): TrainingData = { + new TrainingData(getRatings(sc)) + } + + override + def readEval(sc: SparkContext) + : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = { + require(!dsp.evalParams.isEmpty, "Must specify evalParams") + val evalParams = dsp.evalParams.get + + val kFold = evalParams.kFold + val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId + ratings.cache + + (0 until kFold).map { idx => { + val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1) + val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1) + + val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user) + + (new TrainingData(trainingRatings), + new EmptyEvaluationInfo(), + testingUsers.map { + case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray)) + } + ) + }} + } +} + +case class Rating( + user: String, + item: String, + rating: Double +) + +class TrainingData( + val ratings: RDD[Rating] +) extends Serializable { + override def toString = { + s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)" + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Engine.scala new file mode 100644 index 0000000..b2a668b --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Engine.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.EngineFactory +import org.apache.predictionio.controller.Engine + +case class Query( + user: String, + num: Int +) extends Serializable + +case class PredictedResult( + itemScores: Array[ItemScore] +) extends Serializable + +case class ActualResult( + ratings: Array[Rating] +) extends Serializable + +case class ItemScore( + item: String, + score: Double +) extends Serializable + +object RecommendationEngine extends EngineFactory { + def apply() = { + new Engine( + classOf[DataSource], + classOf[Preparator], + Map("als" -> classOf[ALSAlgorithm]), + classOf[Serving]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Evaluation.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Evaluation.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Evaluation.scala new file mode 100644 index 0000000..a665496 --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Evaluation.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.Evaluation +import org.apache.predictionio.controller.OptionAverageMetric +import org.apache.predictionio.controller.AverageMetric +import org.apache.predictionio.controller.EmptyEvaluationInfo +import org.apache.predictionio.controller.EngineParamsGenerator +import org.apache.predictionio.controller.EngineParams +import org.apache.predictionio.controller.MetricEvaluator + +// Usage: +// $ pio eval org.example.recommendation.RecommendationEvaluation \ +// org.example.recommendation.EngineParamsList + +case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0) + extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + require(k > 0, "k must be greater than 0") + + override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)" + + def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = { + val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet + + // If there is no positive results, Precision is undefined. We don't consider this case in the + // metrics, hence we return None. + if (positives.size == 0) { + None + } else { + val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size + Some(tpCount.toDouble / math.min(k, positives.size)) + } + } +} + +case class PositiveCount(ratingThreshold: Double = 2.0) + extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + override def header = s"PositiveCount (threshold=$ratingThreshold)" + + def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = { + a.ratings.filter(_.rating >= ratingThreshold).size + } +} + +object RecommendationEvaluation extends Evaluation { + engineEvaluator = ( + RecommendationEngine(), + MetricEvaluator( + metric = PrecisionAtK(k = 10, ratingThreshold = 4.0), + otherMetrics = Seq( + PositiveCount(ratingThreshold = 4.0), + PrecisionAtK(k = 10, ratingThreshold = 2.0), + PositiveCount(ratingThreshold = 2.0), + PrecisionAtK(k = 10, ratingThreshold = 1.0), + PositiveCount(ratingThreshold = 1.0) + ))) +} + + +object ComprehensiveRecommendationEvaluation extends Evaluation { + val ratingThresholds = Seq(0.0, 2.0, 4.0) + val ks = Seq(1, 3, 10) + + engineEvaluator = ( + RecommendationEngine(), + MetricEvaluator( + metric = PrecisionAtK(k = 3, ratingThreshold = 2.0), + otherMetrics = ( + (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++ + (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r)) + ))) +} + + +trait BaseEngineParamsList extends EngineParamsGenerator { + protected val baseEP = EngineParams( + dataSourceParams = DataSourceParams( + appName = "MyApp1", + evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10)))) +} + +object EngineParamsList extends BaseEngineParamsList { + engineParamsList = for( + rank <- Seq(5, 10, 20); + numIterations <- Seq(1, 5, 10)) + yield baseEP.copy( + algorithmParamsList = Seq( + ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3))))) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Preparator.scala new file mode 100644 index 0000000..cf792af --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Preparator.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.PPreparator + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import scala.io.Source // ADDED +import org.apache.predictionio.controller.Params // ADDED + +// ADDED CustomPreparatorParams case class +case class CustomPreparatorParams( + filepath: String +) extends Params + +class Preparator(pp: CustomPreparatorParams) // ADDED CustomPreparatorParams + extends PPreparator[TrainingData, PreparedData] { + + def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { + val noTrainItems = Source.fromFile(pp.filepath).getLines.toSet // CHANGED + val ratings = trainingData.ratings.filter( r => + !noTrainItems.contains(r.item) + ) + new PreparedData(ratings) + } +} + +class PreparedData( + val ratings: RDD[Rating] +) extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Serving.scala new file mode 100644 index 0000000..c478455 --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/src/main/scala/Serving.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.LServing + +class Serving + extends LServing[Query, PredictedResult] { + + override + def serve(query: Query, + predictedResults: Seq[PredictedResult]): PredictedResult = { + predictedResults.head + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-data-prep/template.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-data-prep/template.json b/examples/scala-parallel-recommendation/customize-data-prep/template.json new file mode 100644 index 0000000..d076ec5 --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-data-prep/template.json @@ -0,0 +1 @@ +{"pio": {"version": { "min": "0.10.0-incubating" }}} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/.gitignore ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-serving/.gitignore b/examples/scala-parallel-recommendation/customize-serving/.gitignore new file mode 100644 index 0000000..3f3403a --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-serving/.gitignore @@ -0,0 +1,5 @@ +data/sample_movielens_data.txt +manifest.json +target/ +pio.log +/pio.sbt \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/build.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-serving/build.sbt b/examples/scala-parallel-recommendation/customize-serving/build.sbt new file mode 100644 index 0000000..3d25df1 --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-serving/build.sbt @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +name := "template-scala-parallel-recommendation" + +organization := "org.apache.predictionio" +scalaVersion := "2.11.8" +libraryDependencies ++= Seq( + "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided", + "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/data/import_eventserver.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-serving/data/import_eventserver.py b/examples/scala-parallel-recommendation/customize-serving/data/import_eventserver.py new file mode 100644 index 0000000..63694cf --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-serving/data/import_eventserver.py @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Import sample data for recommendation engine +""" + +import predictionio +import argparse +import random + +RATE_ACTIONS_DELIMITER = "::" +SEED = 3 + +def import_events(client, file): + f = open(file, 'r') + random.seed(SEED) + count = 0 + print("Importing data...") + for line in f: + data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER) + # For demonstration purpose, randomly mix in some buy events + if (random.randint(0, 1) == 1): + client.create_event( + event="rate", + entity_type="user", + entity_id=data[0], + target_entity_type="item", + target_entity_id=data[1], + properties= { "rating" : float(data[2]) } + ) + else: + client.create_event( + event="buy", + entity_type="user", + entity_id=data[0], + target_entity_type="item", + target_entity_id=data[1] + ) + count += 1 + f.close() + print("%s events are imported." % count) + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Import sample data for recommendation engine") + parser.add_argument('--access_key', default='invald_access_key') + parser.add_argument('--url', default="http://localhost:7070") + parser.add_argument('--file', default="./data/sample_movielens_data.txt") + + args = parser.parse_args() + print(args) + + client = predictionio.EventClient( + access_key=args.access_key, + url=args.url, + threads=5, + qsize=500) + import_events(client, args.file) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/data/sample_disabled_items.txt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-serving/data/sample_disabled_items.txt b/examples/scala-parallel-recommendation/customize-serving/data/sample_disabled_items.txt new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/data/send_query.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-serving/data/send_query.py b/examples/scala-parallel-recommendation/customize-serving/data/send_query.py new file mode 100644 index 0000000..f6ec9ab --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-serving/data/send_query.py @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Send sample query to prediction engine +""" + +import predictionio +engine_client = predictionio.EngineClient(url="http://localhost:8000") +print(engine_client.send_query({"user": "1", "num": 4})) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/engine.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-serving/engine.json b/examples/scala-parallel-recommendation/customize-serving/engine.json new file mode 100644 index 0000000..75400bc --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-serving/engine.json @@ -0,0 +1,26 @@ +{ + "id": "default", + "description": "Default settings", + "engineFactory": "org.apache.predictionio.examples.recommendation.RecommendationEngine", + "datasource": { + "params" : { + "appName": "MyApp1" + } + }, + "algorithms": [ + { + "name": "als", + "params": { + "rank": 10, + "numIterations": 20, + "lambda": 0.01, + "seed": 3 + } + } + ], + "serving": { + "params": { + "filepath": "./data/sample_disabled_items.txt" + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-serving/project/assembly.sbt b/examples/scala-parallel-recommendation/customize-serving/project/assembly.sbt new file mode 100644 index 0000000..92636bc --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-serving/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/customize-serving/project/build.properties ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/customize-serving/project/build.properties b/examples/scala-parallel-recommendation/customize-serving/project/build.properties new file mode 100644 index 0000000..64317fd --- /dev/null +++ b/examples/scala-parallel-recommendation/customize-serving/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.15
