http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/build.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-rateevent/build.sbt b/examples/scala-parallel-similarproduct/add-rateevent/build.sbt deleted file mode 100644 index ef66b2f..0000000 --- a/examples/scala-parallel-similarproduct/add-rateevent/build.sbt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import AssemblyKeys._ - -assemblySettings - -name := "template-scala-parallel-similarproduct" - -organization := "org.apache.predictionio" - -libraryDependencies ++= Seq( - "org.apache.predictionio" %% "core" % pioVersion.value % "provided", - "org.apache.spark" %% "spark-core" % "1.2.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided")
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/data/import_eventserver.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-rateevent/data/import_eventserver.py b/examples/scala-parallel-similarproduct/add-rateevent/data/import_eventserver.py deleted file mode 100644 index 6107d1c..0000000 --- a/examples/scala-parallel-similarproduct/add-rateevent/data/import_eventserver.py +++ /dev/null @@ -1,90 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Import sample data for similar product engine -""" - -import predictionio -import argparse -import random - -SEED = 3 - -def import_events(client): - random.seed(SEED) - count = 0 - print client.get_status() - print "Importing data..." - - # generate 10 users, with user ids u1,u2,....,u10 - user_ids = ["u%s" % i for i in range(1, 11)] - for user_id in user_ids: - print "Set user", user_id - client.create_event( - event="$set", - entity_type="user", - entity_id=user_id - ) - count += 1 - - # generate 50 items, with item ids i1,i2,....,i50 - # random assign 1 to 4 categories among c1-c6 to items - categories = ["c%s" % i for i in range(1, 7)] - item_ids = ["i%s" % i for i in range(1, 51)] - for item_id in item_ids: - print "Set item", item_id - client.create_event( - event="$set", - entity_type="item", - entity_id=item_id, - properties={ - "categories" : random.sample(categories, random.randint(1, 4)) - } - ) - count += 1 - - # each user randomly viewed 10 items - for user_id in user_ids: - for viewed_item in random.sample(item_ids, 10): - print "User", user_id ,"views item", viewed_item - client.create_event( - event="view", - entity_type="user", - entity_id=user_id, - target_entity_type="item", - target_entity_id=viewed_item - ) - count += 1 - - print "%s events are imported." % count - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description="Import sample data for similar product engine") - parser.add_argument('--access_key', default='invald_access_key') - parser.add_argument('--url', default="http://localhost:7070") - - args = parser.parse_args() - print args - - client = predictionio.EventClient( - access_key=args.access_key, - url=args.url, - threads=5, - qsize=500) - import_events(client) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/data/send_query.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-rateevent/data/send_query.py b/examples/scala-parallel-similarproduct/add-rateevent/data/send_query.py deleted file mode 100644 index 8678b15..0000000 --- a/examples/scala-parallel-similarproduct/add-rateevent/data/send_query.py +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Send sample query to prediction engine -""" - -import predictionio -engine_client = predictionio.EngineClient(url="http://localhost:8000") -print engine_client.send_query({"items": ["i1", "i3"], "num": 4}) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/engine.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-rateevent/engine.json b/examples/scala-parallel-similarproduct/add-rateevent/engine.json deleted file mode 100644 index c55849f..0000000 --- a/examples/scala-parallel-similarproduct/add-rateevent/engine.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "id": "default", - "description": "Default settings", - "engineFactory": "org.template.similarproduct.SimilarProductEngine", - "datasource": { - "params" : { - "appId": 9 - } - }, - "algorithms": [ - { - "name": "als", - "params": { - "rank": 10, - "numIterations" : 20, - "lambda": 0.01, - "seed": 3 - } - } - ] -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-rateevent/project/assembly.sbt b/examples/scala-parallel-similarproduct/add-rateevent/project/assembly.sbt deleted file mode 100644 index 54c3252..0000000 --- a/examples/scala-parallel-similarproduct/add-rateevent/project/assembly.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/project/pio-build.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-rateevent/project/pio-build.sbt b/examples/scala-parallel-similarproduct/add-rateevent/project/pio-build.sbt deleted file mode 100644 index 9aed0ee..0000000 --- a/examples/scala-parallel-similarproduct/add-rateevent/project/pio-build.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("org.apache.predictionio" % "pio-build" % "0.9.0") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/ALSAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/ALSAlgorithm.scala deleted file mode 100644 index b5c3d3f..0000000 --- a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/ALSAlgorithm.scala +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.P2LAlgorithm -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.BiMap - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.recommendation.ALS -import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} - -import grizzled.slf4j.Logger - -import scala.collection.mutable.PriorityQueue - -case class ALSAlgorithmParams( - rank: Int, - numIterations: Int, - lambda: Double, - seed: Option[Long]) extends Params - -class ALSModel( - val productFeatures: Map[Int, Array[Double]], - val itemStringIntMap: BiMap[String, Int], - val items: Map[Int, Item] -) extends Serializable { - - @transient lazy val itemIntStringMap = itemStringIntMap.inverse - - override def toString = { - s" productFeatures: [${productFeatures.size}]" + - s"(${productFeatures.take(2).toList}...)" + - s" itemStringIntMap: [${itemStringIntMap.size}]" + - s"(${itemStringIntMap.take(2).toString}...)]" + - s" items: [${items.size}]" + - s"(${items.take(2).toString}...)]" - } -} - -/** - * Use ALS to build item x feature matrix - */ -class ALSAlgorithm(val ap: ALSAlgorithmParams) - extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] { - - @transient lazy val logger = Logger[this.type] - - def train(sc:SparkContext ,data: PreparedData): ALSModel = { - require(!data.rateEvents.take(1).isEmpty, - s"rateEvents in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - require(!data.users.take(1).isEmpty, - s"users in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - require(!data.items.take(1).isEmpty, - s"items in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - // create User and item's String ID to integer index BiMap - val userStringIntMap = BiMap.stringInt(data.users.keys) - val itemStringIntMap = BiMap.stringInt(data.items.keys) - - // collect Item as Map and convert ID to Int index - val items: Map[Int, Item] = data.items.map { case (id, item) => - (itemStringIntMap(id), item) - }.collectAsMap.toMap - - val mllibRatings = data.rateEvents - .map { r => - // Convert user and item String IDs to Int index for MLlib - val uindex = userStringIntMap.getOrElse(r.user, -1) - val iindex = itemStringIntMap.getOrElse(r.item, -1) - - if (uindex == -1) - logger.info(s"Couldn't convert nonexistent user ID ${r.user}" - + " to Int index.") - - if (iindex == -1) - logger.info(s"Couldn't convert nonexistent item ID ${r.item}" - + " to Int index.") - - ((uindex, iindex), (r.rating,r.t)) //MODIFIED - }.filter { case ((u, i), v) => - // keep events with valid user and item index - (u != -1) && (i != -1) - } - .reduceByKey { case (v1, v2) => // MODIFIED - // if a user may rate same item with different value at different times, - // use the latest value for this case. - // Can remove this reduceByKey() if no need to support this case. - val (rating1, t1) = v1 - val (rating2, t2) = v2 - // keep the latest value - if (t1 > t2) v1 else v2 - } - .map { case ((u, i), (rating, t)) => // MODIFIED - // MLlibRating requires integer index for user and item - MLlibRating(u, i, rating) // MODIFIED - }.cache() - - // MLLib ALS cannot handle empty training data. - require(!mllibRatings.take(1).isEmpty, - s"mllibRatings cannot be empty." + - " Please check if your events contain valid user and item ID.") - - // seed for MLlib ALS - val seed = ap.seed.getOrElse(System.nanoTime) - - val m = ALS.train( - ratings = mllibRatings, - rank = ap.rank, - iterations = ap.numIterations, - lambda = ap.lambda, - blocks = -1, - seed = seed) - - new ALSModel( - productFeatures = m.productFeatures.collectAsMap.toMap, - itemStringIntMap = itemStringIntMap, - items = items - ) - } - - def predict(model: ALSModel, query: Query): PredictedResult = { - - val productFeatures = model.productFeatures - - // convert items to Int index - val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_)) - .flatten.toSet - - val queryFeatures: Vector[Array[Double]] = queryList.toVector - // productFeatures may not contain the requested item - .map { item => productFeatures.get(item) } - .flatten - - val whiteList: Option[Set[Int]] = query.whiteList.map( set => - set.map(model.itemStringIntMap.get(_)).flatten - ) - val blackList: Option[Set[Int]] = query.blackList.map ( set => - set.map(model.itemStringIntMap.get(_)).flatten - ) - - val ord = Ordering.by[(Int, Double), Double](_._2).reverse - - val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) { - logger.info(s"No productFeatures vector for query items ${query.items}.") - Array[(Int, Double)]() - } else { - productFeatures.par // convert to parallel collection - .mapValues { f => - queryFeatures.map{ qf => - cosine(qf, f) - }.reduce(_ + _) - } - .filter(_._2 > 0) // keep items with score > 0 - .seq // convert back to sequential collection - .toArray - } - - val filteredScore = indexScores.view.filter { case (i, v) => - isCandidateItem( - i = i, - items = model.items, - categories = query.categories, - queryList = queryList, - whiteList = whiteList, - blackList = blackList - ) - } - - val topScores = getTopN(filteredScore, query.num)(ord).toArray - - val itemScores = topScores.map { case (i, s) => - new ItemScore( - item = model.itemIntStringMap(i), - score = s - ) - } - - new PredictedResult(itemScores) - } - - private - def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = { - - val q = PriorityQueue() - - for (x <- s) { - if (q.size < n) - q.enqueue(x) - else { - // q is full - if (ord.compare(x, q.head) < 0) { - q.dequeue() - q.enqueue(x) - } - } - } - - q.dequeueAll.toSeq.reverse - } - - private - def cosine(v1: Array[Double], v2: Array[Double]): Double = { - val size = v1.size - var i = 0 - var n1: Double = 0 - var n2: Double = 0 - var d: Double = 0 - while (i < size) { - n1 += v1(i) * v1(i) - n2 += v2(i) * v2(i) - d += v1(i) * v2(i) - i += 1 - } - val n1n2 = (math.sqrt(n1) * math.sqrt(n2)) - if (n1n2 == 0) 0 else (d / n1n2) - } - - private - def isCandidateItem( - i: Int, - items: Map[Int, Item], - categories: Option[Set[String]], - queryList: Set[Int], - whiteList: Option[Set[Int]], - blackList: Option[Set[Int]] - ): Boolean = { - whiteList.map(_.contains(i)).getOrElse(true) && - blackList.map(!_.contains(i)).getOrElse(true) && - // discard items in query as well - (!queryList.contains(i)) && - // filter categories - categories.map { cat => - items(i).categories.map { itemCat => - // keep this item if has ovelap categories with the query - !(itemCat.toSet.intersect(cat).isEmpty) - }.getOrElse(false) // discard this item if it has no categories - }.getOrElse(true) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/DataSource.scala b/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/DataSource.scala deleted file mode 100644 index 46c4b7a..0000000 --- a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/DataSource.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.PDataSource -import org.apache.predictionio.controller.EmptyEvaluationInfo -import org.apache.predictionio.controller.EmptyActualResult -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.Storage - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import grizzled.slf4j.Logger - -case class DataSourceParams(appId: Int) extends Params - -class DataSource(val dsp: DataSourceParams) - extends PDataSource[TrainingData, - EmptyEvaluationInfo, Query, EmptyActualResult] { - - @transient lazy val logger = Logger[this.type] - - override - def readTraining(sc: SparkContext): TrainingData = { - val eventsDb = Storage.getPEvents() - - // create a RDD of (entityID, User) - val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties( - appId = dsp.appId, - entityType = "user" - )(sc).map { case (entityId, properties) => - val user = try { - User() - } catch { - case e: Exception => { - logger.error(s"Failed to get properties ${properties} of" + - s" user ${entityId}. Exception: ${e}.") - throw e - } - } - (entityId, user) - }.cache() - - // create a RDD of (entityID, Item) - val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties( - appId = dsp.appId, - entityType = "item" - )(sc).map { case (entityId, properties) => - val item = try { - // Assume categories is optional property of item. - Item(categories = properties.getOpt[List[String]]("categories")) - } catch { - case e: Exception => { - logger.error(s"Failed to get properties ${properties} of" + - s" item ${entityId}. Exception: ${e}.") - throw e - } - } - (entityId, item) - }.cache() - - // get all "user" "rate" "item" events - val rateEventsRDD: RDD[RateEvent] = eventsDb.find( //MODIFIED - appId = dsp.appId, - entityType = Some("user"), - eventNames = Some(List("rate")), //MODIFIED - // targetEntityType is optional field of an event. - targetEntityType = Some(Some("item")))(sc) - // eventsDb.find() returns RDD[Event] - .map { event => - val rateEvent = try { - event.event match { - case "rate" => RateEvent( //MODIFIED - user = event.entityId, - item = event.targetEntityId.get, - rating = event.properties.get[Double]("rating"), // ADDED - t = event.eventTime.getMillis) - case _ => throw new Exception(s"Unexpected event ${event} is read.") - } - } catch { - case e: Exception => { - logger.error(s"Cannot convert ${event} to RateEvent." + //MODIFIED - s" Exception: ${e}.") - throw e - } - } - rateEvent - }.cache() - - new TrainingData( - users = usersRDD, - items = itemsRDD, - rateEvents = rateEventsRDD - ) - } -} - -case class User() - -case class Item(categories: Option[List[String]]) - -case class RateEvent(user: String, item: String, rating: Double, t: Long) - -case class TrainingData( - users: RDD[(String, User)], - items: RDD[(String, Item)], - rateEvents: RDD[RateEvent] -) { - override def toString = { - s"users: [${users.count()} (${users.take(2).toList}...)]" + - s"items: [${items.count()} (${items.take(2).toList}...)]" + - s"rateEvents: [${rateEvents.count()}] (${rateEvents.take(2).toList}...)" - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Engine.scala b/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Engine.scala deleted file mode 100644 index 52b19fe..0000000 --- a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Engine.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.Engine - -case class Query( - items: List[String], - num: Int, - categories: Option[Set[String]], - whiteList: Option[Set[String]], - blackList: Option[Set[String]] -) - -case class PredictedResult( - itemScores: Array[ItemScore] -) - -case class ItemScore( - item: String, - score: Double -) - -object SimilarProductEngine extends IEngineFactory { - def apply() = { - new Engine( - classOf[DataSource], - classOf[Preparator], - Map("als" -> classOf[ALSAlgorithm]), - classOf[Serving]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Preparator.scala b/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Preparator.scala deleted file mode 100644 index c7c2b94..0000000 --- a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Preparator.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.PPreparator - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class Preparator - extends PPreparator[TrainingData, PreparedData] { - - def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { - new PreparedData( - users = trainingData.users, - items = trainingData.items, - rateEvents = trainingData.rateEvents) - } -} - -class PreparedData( - val users: RDD[(String, User)], - val items: RDD[(String, Item)], - val rateEvents: RDD[RateEvent] -) extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Serving.scala b/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Serving.scala deleted file mode 100644 index 1180afd..0000000 --- a/examples/scala-parallel-similarproduct/add-rateevent/src/main/scala/Serving.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.LServing - -class Serving - extends LServing[Query, PredictedResult] { - - override - def serve(query: Query, - predictedResults: Seq[PredictedResult]): PredictedResult = { - predictedResults.head - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/template.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-rateevent/template.json b/examples/scala-parallel-similarproduct/add-rateevent/template.json deleted file mode 100644 index 932e603..0000000 --- a/examples/scala-parallel-similarproduct/add-rateevent/template.json +++ /dev/null @@ -1 +0,0 @@ -{"pio": {"version": { "min": "0.9.0" }}} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/.gitignore ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/filterbyyear/.gitignore b/examples/scala-parallel-similarproduct/filterbyyear/.gitignore deleted file mode 100644 index 64fa18b..0000000 --- a/examples/scala-parallel-similarproduct/filterbyyear/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -manifest.json -target/ -pio.log http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/README.md ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/filterbyyear/README.md b/examples/scala-parallel-similarproduct/filterbyyear/README.md deleted file mode 100644 index 5af7bc9..0000000 --- a/examples/scala-parallel-similarproduct/filterbyyear/README.md +++ /dev/null @@ -1,150 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> - -# Similar Product Template With Filter by Item Year support - -This example engine is based on Similar Product Tempplate version v0.1.1 and is modified to support filter recommendation by the item property 'year'. - -For example, recommend movies after year 1990. - -## Documentation - -Please refer to http://predictionio.incubator.apache.org/templates/similarproduct/quickstart/ - -## Development Notes - -### Sample data - -The movie lens 100k data which is in below format: - -UserID::MovieID::Rating::Timestamp - -### import ML-100K sample data - -Import movielens data using below repository -https://github.com/k4hoo/systest/tree/master/0.8/ml100k/demo-movielens - -``` -$ python -m batch_import <Access Key> http://127.0.0.1:7070 -``` - -### Changes to Engine.scala - - -1) Added ârecommendFromYearâ attribute to the Query class. we can pass the ârecommendFromYearâ attribute from the query request. - -```scala -case class Query( - items: List[String], - num: Int, - categories: Option[Set[String]], - whiteList: Option[Set[String]], - blackList: Option[Set[String]], - recommendFromYear: Option[Int] -) extends Serializable -``` - -2) Added âyearâ attribute to the class ItemScore. - -```scala -case class ItemScore( - item: String, - score: Double, - year: Int -) extends Serializable - -``` - -### Changes to DataSource.scala - -1) Added attribute âyearâ to the class Item - -```scala -case class Item(categories: Option[List[String]],year: Int) -``` - -2) In the eventsDb.aggregateProperties, adding year property - -```scala - Item(categories = properties.getOpt[List[String]]("categories"),year = properties.get[Int]("year")) -``` - -### Changes to ALSAlgorihm.scala - - -1) In the predict method, passing ârecommendFromYearâ attribute to the isCandidateItem method - -```scala - isCandidateItem( - i = i, - items = model.items, - categories = query.categories, - queryList = queryList, - whiteList = whiteList, - blackList = blackList, - recommendFromYear = query.recommendFromYear - ) -``` - -2) In âisCandidateItemâ method, verifying if Itemâs year is greater than ârecommendFromYearâ attribute. - -```scala - private def isCandidateItem( - i: Int, - items: Map[Int, Item], - categories: Option[Set[String]], - queryList: Set[Int], - whiteList: Option[Set[Int]], - blackList: Option[Set[Int]], - recommendFromYear: Option[Int] - ): Boolean = { - whiteList.map(_.contains(i)).getOrElse(true) && - blackList.map(!_.contains(i)).getOrElse(true) && - // discard items in query as well - (!queryList.contains(i)) && - // filter categories - items(i).year > recommendFromYear.getOrElse(1) && - categories.map { cat => - items(i).categories.map { itemCat => - // keep this item if has ovelap categories with the query - !(itemCat.toSet.intersect(cat).isEmpty) - }.getOrElse(false) // discard this item if it has no categories - }.getOrElse(true) - } -``` - -3) In the predict method, returning year as well as part of ItemScore - -```scala - val itemScores = topScores.map { case (i, s) => - new ItemScore( - item = model.itemIntStringMap(i), - score = s, - year = model.items(i).year - ) - } - - new PredictedResult(itemScores) -``` - -### Example Request - -``` -curl -H "Content-Type: application/json" \ --d '{ "items": ["171"], "num": 10, "recommendFromYear":1990 }' \ -http://localhost:8000/queries.json -``` http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/build.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/filterbyyear/build.sbt b/examples/scala-parallel-similarproduct/filterbyyear/build.sbt deleted file mode 100644 index 1680f6b..0000000 --- a/examples/scala-parallel-similarproduct/filterbyyear/build.sbt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import AssemblyKeys._ - -assemblySettings - -name := "template-scala-parallel-similarproduct" - -organization := "org.apache.predictionio" - -libraryDependencies ++= Seq( - "org.apache.predictionio" %% "core" % "0.10.0-SNAPSHOT" % "provided", - "org.apache.spark" %% "spark-core" % "1.2.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/data/import_eventserver.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/filterbyyear/data/import_eventserver.py b/examples/scala-parallel-similarproduct/filterbyyear/data/import_eventserver.py deleted file mode 100644 index 6107d1c..0000000 --- a/examples/scala-parallel-similarproduct/filterbyyear/data/import_eventserver.py +++ /dev/null @@ -1,90 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Import sample data for similar product engine -""" - -import predictionio -import argparse -import random - -SEED = 3 - -def import_events(client): - random.seed(SEED) - count = 0 - print client.get_status() - print "Importing data..." - - # generate 10 users, with user ids u1,u2,....,u10 - user_ids = ["u%s" % i for i in range(1, 11)] - for user_id in user_ids: - print "Set user", user_id - client.create_event( - event="$set", - entity_type="user", - entity_id=user_id - ) - count += 1 - - # generate 50 items, with item ids i1,i2,....,i50 - # random assign 1 to 4 categories among c1-c6 to items - categories = ["c%s" % i for i in range(1, 7)] - item_ids = ["i%s" % i for i in range(1, 51)] - for item_id in item_ids: - print "Set item", item_id - client.create_event( - event="$set", - entity_type="item", - entity_id=item_id, - properties={ - "categories" : random.sample(categories, random.randint(1, 4)) - } - ) - count += 1 - - # each user randomly viewed 10 items - for user_id in user_ids: - for viewed_item in random.sample(item_ids, 10): - print "User", user_id ,"views item", viewed_item - client.create_event( - event="view", - entity_type="user", - entity_id=user_id, - target_entity_type="item", - target_entity_id=viewed_item - ) - count += 1 - - print "%s events are imported." % count - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description="Import sample data for similar product engine") - parser.add_argument('--access_key', default='invald_access_key') - parser.add_argument('--url', default="http://localhost:7070") - - args = parser.parse_args() - print args - - client = predictionio.EventClient( - access_key=args.access_key, - url=args.url, - threads=5, - qsize=500) - import_events(client) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/data/send_query.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/filterbyyear/data/send_query.py b/examples/scala-parallel-similarproduct/filterbyyear/data/send_query.py deleted file mode 100644 index 8678b15..0000000 --- a/examples/scala-parallel-similarproduct/filterbyyear/data/send_query.py +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Send sample query to prediction engine -""" - -import predictionio -engine_client = predictionio.EngineClient(url="http://localhost:8000") -print engine_client.send_query({"items": ["i1", "i3"], "num": 4}) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/engine.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/filterbyyear/engine.json b/examples/scala-parallel-similarproduct/filterbyyear/engine.json deleted file mode 100644 index 9fc4958..0000000 --- a/examples/scala-parallel-similarproduct/filterbyyear/engine.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "id": "default", - "description": "Default settings", - "engineFactory": "com.test.SimilarProductEngine", - "datasource": { - "params" : { - "appId": 1 - "eventWindow": { - "duration": "5 minutes", - "removeDuplicates": true, - "compressProperties": true - } - } - }, - "algorithms": [ - { - "name": "als", - "params": { - "rank": 10, - "numIterations" : 20, - "lambda": 0.01, - "seed": 3 - } - } - ] -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/filterbyyear/project/assembly.sbt b/examples/scala-parallel-similarproduct/filterbyyear/project/assembly.sbt deleted file mode 100644 index 54c3252..0000000 --- a/examples/scala-parallel-similarproduct/filterbyyear/project/assembly.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/ALSAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/ALSAlgorithm.scala deleted file mode 100644 index 1604d74..0000000 --- a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/ALSAlgorithm.scala +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.test - -import org.apache.predictionio.controller.P2LAlgorithm -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.BiMap - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.recommendation.ALS -import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} - -import grizzled.slf4j.Logger - -import scala.collection.mutable.PriorityQueue - -case class ALSAlgorithmParams( - rank: Int, - numIterations: Int, - lambda: Double, - seed: Option[Long]) extends Params - -class ALSModel( - val productFeatures: Map[Int, Array[Double]], - val itemStringIntMap: BiMap[String, Int], - val items: Map[Int, Item] -) extends Serializable { - - @transient lazy val itemIntStringMap = itemStringIntMap.inverse - - override def toString = { - s" productFeatures: [${productFeatures.size}]" + - s"(${productFeatures.take(2).toList}...)" + - s" itemStringIntMap: [${itemStringIntMap.size}]" + - s"(${itemStringIntMap.take(2).toString}...)]" + - s" items: [${items.size}]" + - s"(${items.take(2).toString}...)]" - } -} - -/** - * Use ALS to build item x feature matrix - */ -class ALSAlgorithm(val ap: ALSAlgorithmParams) - extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] { - - @transient lazy val logger = Logger[this.type] - - def train(sc: SparkContext, data: PreparedData): ALSModel = { - require(!data.viewEvents.take(1).isEmpty, - s"viewEvents in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - require(!data.users.take(1).isEmpty, - s"users in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - require(!data.items.take(1).isEmpty, - s"items in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - // create User and item's String ID to integer index BiMap - val userStringIntMap = BiMap.stringInt(data.users.keys) - val itemStringIntMap = BiMap.stringInt(data.items.keys) - - // collect Item as Map and convert ID to Int index - val items: Map[Int, Item] = data.items.map { case (id, item) => - (itemStringIntMap(id), item) - }.collectAsMap.toMap - - val mllibRatings = data.viewEvents - .map { r => - // Convert user and item String IDs to Int index for MLlib - val uindex = userStringIntMap.getOrElse(r.user, -1) - val iindex = itemStringIntMap.getOrElse(r.item, -1) - - if (uindex == -1) - logger.info(s"Couldn't convert nonexistent user ID ${r.user}" - + " to Int index.") - - if (iindex == -1) - logger.info(s"Couldn't convert nonexistent item ID ${r.item}" - + " to Int index.") - - ((uindex, iindex), 1) - }.filter { case ((u, i), v) => - // keep events with valid user and item index - (u != -1) && (i != -1) - }.reduceByKey(_ + _) // aggregate all view events of same user-item pair - .map { case ((u, i), v) => - // MLlibRating requires integer index for user and item - MLlibRating(u, i, v) - } - - // MLLib ALS cannot handle empty training data. - require(!mllibRatings.take(1).isEmpty, - s"mllibRatings cannot be empty." + - " Please check if your events contain valid user and item ID.") - - // seed for MLlib ALS - val seed = ap.seed.getOrElse(System.nanoTime) - - val m = ALS.trainImplicit( - ratings = mllibRatings, - rank = ap.rank, - iterations = ap.numIterations, - lambda = ap.lambda, - blocks = -1, - alpha = 1.0, - seed = seed) - - new ALSModel( - productFeatures = m.productFeatures.collectAsMap.toMap, - itemStringIntMap = itemStringIntMap, - items = items - ) - } - - def predict(model: ALSModel, query: Query): PredictedResult = { - - val productFeatures = model.productFeatures - - // convert items to Int index - val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_)) - .flatten.toSet - - val queryFeatures: Vector[Array[Double]] = queryList.toVector - // productFeatures may not contain the requested item - .map { item => productFeatures.get(item) } - .flatten - - val whiteList: Option[Set[Int]] = query.whiteList.map( set => - set.map(model.itemStringIntMap.get(_)).flatten - ) - val blackList: Option[Set[Int]] = query.blackList.map ( set => - set.map(model.itemStringIntMap.get(_)).flatten - ) - - val ord = Ordering.by[(Int, Double), Double](_._2).reverse - - val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) { - logger.info(s"No productFeatures vector for query items ${query.items}.") - Array[(Int, Double)]() - } else { - productFeatures.par // convert to parallel collection - .mapValues { f => - queryFeatures.map{ qf => - cosine(qf, f) - }.reduce(_ + _) - } - .filter(_._2 > 0) // keep items with score > 0 - .seq // convert back to sequential collection - .toArray - } - - val filteredScore = indexScores.view.filter { case (i, v) => - isCandidateItem( - i = i, - items = model.items, - categories = query.categories, - queryList = queryList, - whiteList = whiteList, - blackList = blackList, - recommendFromYear = query.recommendFromYear - ) - } - - val topScores = getTopN(filteredScore, query.num)(ord).toArray - - val itemScores = topScores.map { case (i, s) => - new ItemScore( - item = model.itemIntStringMap(i), - score = s, - year = model.items(i).year - ) - } - - new PredictedResult(itemScores) - } - - private - def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = { - - val q = PriorityQueue() - - for (x <- s) { - if (q.size < n) - q.enqueue(x) - else { - // q is full - if (ord.compare(x, q.head) < 0) { - q.dequeue() - q.enqueue(x) - } - } - } - - q.dequeueAll.toSeq.reverse - } - - private - def cosine(v1: Array[Double], v2: Array[Double]): Double = { - val size = v1.size - var i = 0 - var n1: Double = 0 - var n2: Double = 0 - var d: Double = 0 - while (i < size) { - n1 += v1(i) * v1(i) - n2 += v2(i) * v2(i) - d += v1(i) * v2(i) - i += 1 - } - val n1n2 = (math.sqrt(n1) * math.sqrt(n2)) - if (n1n2 == 0) 0 else (d / n1n2) - } - - private - def isCandidateItem( - i: Int, - items: Map[Int, Item], - categories: Option[Set[String]], - queryList: Set[Int], - whiteList: Option[Set[Int]], - blackList: Option[Set[Int]], - recommendFromYear: Option[Int] - ): Boolean = { - whiteList.map(_.contains(i)).getOrElse(true) && - blackList.map(!_.contains(i)).getOrElse(true) && - // discard items in query as well - (!queryList.contains(i)) && - items(i).year > recommendFromYear.getOrElse(1) && - // filter categories - categories.map { cat => - items(i).categories.map { itemCat => - // keep this item if has ovelap categories with the query - !(itemCat.toSet.intersect(cat).isEmpty) - }.getOrElse(false) // discard this item if it has no categories - }.getOrElse(true) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/DataSource.scala b/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/DataSource.scala deleted file mode 100644 index 89e80ce..0000000 --- a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/DataSource.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.test - -import org.apache.predictionio.core.SelfCleaningDataSource -import org.apache.predictionio.core.EventWindow - -import org.apache.predictionio.controller.PDataSource -import org.apache.predictionio.controller.EmptyEvaluationInfo -import org.apache.predictionio.controller.EmptyActualResult -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.Storage - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import grizzled.slf4j.Logger - -case class DataSourceParams(appName: String, eventWindow: Option[EventWindow], appId: Int) extends Params - -class DataSource(val dsp: DataSourceParams) - extends PDataSource[TrainingData, - EmptyEvaluationInfo, Query, EmptyActualResult] with SelfCleaningDataSource { - - @transient override lazy val logger = Logger[this.type] - - override def appName = dsp.appName - override def eventWindow = dsp.eventWindow - - override - def readTraining(sc: SparkContext): TrainingData = { - cleanPersistedPEvents(sc) - - val eventsDb = Storage.getPEvents() - - // create a RDD of (entityID, User) - val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties( - appId = dsp.appId, - entityType = "user" - )(sc).map { case (entityId, properties) => - val user = try { - User() - } catch { - case e: Exception => { - logger.error(s"Failed to get properties ${properties} of" + - s" user ${entityId}. Exception: ${e}.") - throw e - } - } - (entityId, user) - }.cache() - - // create a RDD of (entityID, Item) - val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties( - appId = dsp.appId, - entityType = "item" - )(sc).map { case (entityId, properties) => - val item = try { - // Assume categories is optional property of item. - Item(categories = properties.getOpt[List[String]]("categories"), year = properties.get[Int]("year")) - } catch { - case e: Exception => { - logger.error(s"Failed to get properties ${properties} of" + - s" item ${entityId}. Exception: ${e}.") - throw e - } - } - (entityId, item) - }.cache() - - // get all "user" "view" "item" events - val viewEventsRDD: RDD[ViewEvent] = eventsDb.find( - appId = dsp.appId, - entityType = Some("user"), - eventNames = Some(List("view")), - // targetEntityType is optional field of an event. - targetEntityType = Some(Some("item")))(sc) - // eventsDb.find() returns RDD[Event] - .map { event => - val viewEvent = try { - event.event match { - case "view" => ViewEvent( - user = event.entityId, - item = event.targetEntityId.get, - t = event.eventTime.getMillis) - case _ => throw new Exception(s"Unexpected event ${event} is read.") - } - } catch { - case e: Exception => { - logger.error(s"Cannot convert ${event} to ViewEvent." + - s" Exception: ${e}.") - throw e - } - } - viewEvent - }.cache() - - new TrainingData( - users = usersRDD, - items = itemsRDD, - viewEvents = viewEventsRDD - ) - } -} - -case class User() - -case class Item(categories: Option[List[String]], year:Int) - -case class ViewEvent(user: String, item: String, t: Long) - -class TrainingData( - val users: RDD[(String, User)], - val items: RDD[(String, Item)], - val viewEvents: RDD[ViewEvent] -) extends Serializable { - override def toString = { - s"users: [${users.count()} (${users.take(2).toList}...)]" + - s"items: [${items.count()} (${items.take(2).toList}...)]" + - s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)" - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Engine.scala b/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Engine.scala deleted file mode 100644 index 6800c7b..0000000 --- a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Engine.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.test - -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.Engine - -case class Query( - items: List[String], - num: Int, - categories: Option[Set[String]], - whiteList: Option[Set[String]], - blackList: Option[Set[String]], - recommendFromYear: Option[Int] -) - -case class PredictedResult( - itemScores: Array[ItemScore] -) - -case class ItemScore( - item: String, - score: Double, - year: Int -) - -object SimilarProductEngine extends IEngineFactory { - def apply() = { - new Engine( - classOf[DataSource], - classOf[Preparator], - Map("als" -> classOf[ALSAlgorithm]), - classOf[Serving]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Preparator.scala b/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Preparator.scala deleted file mode 100644 index 189b7b3..0000000 --- a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Preparator.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.test - -import org.apache.predictionio.controller.PPreparator - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class Preparator - extends PPreparator[TrainingData, PreparedData] { - - def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { - new PreparedData( - users = trainingData.users, - items = trainingData.items, - viewEvents = trainingData.viewEvents) - } -} - -class PreparedData( - val users: RDD[(String, User)], - val items: RDD[(String, Item)], - val viewEvents: RDD[ViewEvent] -) extends Serializable \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Serving.scala b/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Serving.scala deleted file mode 100644 index cd1a715..0000000 --- a/examples/scala-parallel-similarproduct/filterbyyear/src/main/scala/Serving.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.test - -import org.apache.predictionio.controller.LServing - -class Serving - extends LServing[Query, PredictedResult] { - - override def serve(query: Query, - predictedResults: Seq[PredictedResult]): PredictedResult = { - predictedResults.head - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/.gitignore ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/.gitignore b/examples/scala-parallel-similarproduct/multi-events-multi-algos/.gitignore new file mode 100644 index 0000000..5dbe602 --- /dev/null +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/.gitignore @@ -0,0 +1,4 @@ +manifest.json +target/ +pio.log +/pio.sbt \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/build.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/build.sbt b/examples/scala-parallel-similarproduct/multi-events-multi-algos/build.sbt new file mode 100644 index 0000000..1daded6 --- /dev/null +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/build.sbt @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +name := "template-scala-parallel-similarproduct" + +organization := "org.apache.predictionio" +scalaVersion := "2.11.8" +libraryDependencies ++= Seq( + "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided", + "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/import_eventserver.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/import_eventserver.py b/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/import_eventserver.py new file mode 100644 index 0000000..4a5cccf --- /dev/null +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/import_eventserver.py @@ -0,0 +1,113 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Import sample data for similar product engine +""" + +import predictionio +import argparse +import random + +SEED = 3 + +def import_events(client): + random.seed(SEED) + count = 0 + print(client.get_status()) + print("Importing data...") + + # generate 10 users, with user ids u1,u2,....,u10 + user_ids = ["u%s" % i for i in range(1, 11)] + for user_id in user_ids: + print("Set user", user_id) + client.create_event( + event="$set", + entity_type="user", + entity_id=user_id + ) + count += 1 + + # generate 50 items, with item ids i1,i2,....,i50 + # random assign 1 to 4 categories among c1-c6 to items + categories = ["c%s" % i for i in range(1, 7)] + item_ids = ["i%s" % i for i in range(1, 51)] + for item_id in item_ids: + print("Set item", item_id) + client.create_event( + event="$set", + entity_type="item", + entity_id=item_id, + properties={ + "categories" : random.sample(categories, random.randint(1, 4)) + } + ) + count += 1 + + # each user randomly viewed 10 items + for user_id in user_ids: + for viewed_item in random.sample(item_ids, 10): + print("User", user_id ,"views item", viewed_item) + client.create_event( + event="view", + entity_type="user", + entity_id=user_id, + target_entity_type="item", + target_entity_id=viewed_item + ) + count += 1 + + # each user randomly liked/disliked 10 items + for user_id in user_ids: + for viewed_item in random.sample(item_ids, 10): + if random.choice((False, True)) : + print "User", user_id ,"likes item", viewed_item + client.create_event( + event="like", + entity_type="user", + entity_id=user_id, + target_entity_type="item", + target_entity_id=viewed_item + ) + else: + print "User", user_id ,"dislikes item", viewed_item + client.create_event( + event="dislike", + entity_type="user", + entity_id=user_id, + target_entity_type="item", + target_entity_id=viewed_item + ) + count += 1 + + print("%s events are imported." % count) + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Import sample data for similar product engine") + parser.add_argument('--access_key', default='invald_access_key') + parser.add_argument('--url', default="http://localhost:7070") + + args = parser.parse_args() + print(args) + + client = predictionio.EventClient( + access_key=args.access_key, + url=args.url, + threads=5, + qsize=500) + import_events(client) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/send_query.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/send_query.py b/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/send_query.py new file mode 100644 index 0000000..0a70f28 --- /dev/null +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/data/send_query.py @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Send sample query to prediction engine +""" + +import predictionio +engine_client = predictionio.EngineClient(url="http://localhost:8000") +print(engine_client.send_query({"items": ["i1", "i3"], "num": 4})) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine-cooccurrence.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine-cooccurrence.json b/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine-cooccurrence.json new file mode 100644 index 0000000..c31b88e --- /dev/null +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine-cooccurrence.json @@ -0,0 +1,18 @@ +{ + "id": "default", + "description": "Default settings", + "engineFactory": "org.apache.predictionio.examples.similarproduct.SimilarProductEngine", + "datasource": { + "params" : { + "appName": "MyApp1" + } + }, + "algorithms": [ + { + "name": "cooccurrence", + "params": { + "n": 20 + } + } + ] +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine.json b/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine.json new file mode 100644 index 0000000..dd27a2d --- /dev/null +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/engine.json @@ -0,0 +1,30 @@ +{ + "id": "default", + "description": "Default settings", + "engineFactory": "org.apache.predictionio.examples.similarproduct.SimilarProductEngine", + "datasource": { + "params" : { + "appName": "MyApp1" + } + }, + "algorithms": [ + { + "name": "als", + "params": { + "rank": 10, + "numIterations" : 20, + "lambda": 0.01, + "seed": 3 + } + }, + { + "name": "likealgo", + "params": { + "rank": 8, + "numIterations" : 15, + "lambda": 0.01, + "seed": 3 + } + } + ] +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/assembly.sbt b/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/assembly.sbt new file mode 100644 index 0000000..e17409e --- /dev/null +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/build.properties ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/build.properties b/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/build.properties new file mode 100644 index 0000000..64317fd --- /dev/null +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.15 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala new file mode 100644 index 0000000..aae8322 --- /dev/null +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/ALSAlgorithm.scala @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.similarproduct + +import org.apache.predictionio.controller.P2LAlgorithm +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data.storage.BiMap + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.recommendation.ALS +import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} + +import grizzled.slf4j.Logger + +import scala.collection.mutable.PriorityQueue + +case class ALSAlgorithmParams( + rank: Int, + numIterations: Int, + lambda: Double, + seed: Option[Long]) extends Params + +class ALSModel( + val productFeatures: Map[Int, Array[Double]], + val itemStringIntMap: BiMap[String, Int], + val items: Map[Int, Item] +) extends Serializable { + + @transient lazy val itemIntStringMap = itemStringIntMap.inverse + + override def toString = { + s" productFeatures: [${productFeatures.size}]" + + s"(${productFeatures.take(2).toList}...)" + + s" itemStringIntMap: [${itemStringIntMap.size}]" + + s"(${itemStringIntMap.take(2).toString}...)]" + + s" items: [${items.size}]" + + s"(${items.take(2).toString}...)]" + } +} + +/** + * Use ALS to build item x feature matrix + */ +class ALSAlgorithm(val ap: ALSAlgorithmParams) + extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] { + + @transient lazy val logger = Logger[this.type] + + def train(sc: SparkContext, data: PreparedData): ALSModel = { + require(!data.viewEvents.take(1).isEmpty, + s"viewEvents in PreparedData cannot be empty." + + " Please check if DataSource generates TrainingData" + + " and Preprator generates PreparedData correctly.") + require(!data.users.take(1).isEmpty, + s"users in PreparedData cannot be empty." + + " Please check if DataSource generates TrainingData" + + " and Preprator generates PreparedData correctly.") + require(!data.items.take(1).isEmpty, + s"items in PreparedData cannot be empty." + + " Please check if DataSource generates TrainingData" + + " and Preprator generates PreparedData correctly.") + // create User and item's String ID to integer index BiMap + val userStringIntMap = BiMap.stringInt(data.users.keys) + val itemStringIntMap = BiMap.stringInt(data.items.keys) + + // collect Item as Map and convert ID to Int index + val items: Map[Int, Item] = data.items.map { case (id, item) => + (itemStringIntMap(id), item) + }.collectAsMap.toMap + + val mllibRatings = data.viewEvents + .map { r => + // Convert user and item String IDs to Int index for MLlib + val uindex = userStringIntMap.getOrElse(r.user, -1) + val iindex = itemStringIntMap.getOrElse(r.item, -1) + + if (uindex == -1) + logger.info(s"Couldn't convert nonexistent user ID ${r.user}" + + " to Int index.") + + if (iindex == -1) + logger.info(s"Couldn't convert nonexistent item ID ${r.item}" + + " to Int index.") + + ((uindex, iindex), 1) + }.filter { case ((u, i), v) => + // keep events with valid user and item index + (u != -1) && (i != -1) + }.reduceByKey(_ + _) // aggregate all view events of same user-item pair + .map { case ((u, i), v) => + // MLlibRating requires integer index for user and item + MLlibRating(u, i, v) + } + .cache() + + // MLLib ALS cannot handle empty training data. + require(!mllibRatings.take(1).isEmpty, + s"mllibRatings cannot be empty." + + " Please check if your events contain valid user and item ID.") + + // seed for MLlib ALS + val seed = ap.seed.getOrElse(System.nanoTime) + + val m = ALS.trainImplicit( + ratings = mllibRatings, + rank = ap.rank, + iterations = ap.numIterations, + lambda = ap.lambda, + blocks = -1, + alpha = 1.0, + seed = seed) + + new ALSModel( + productFeatures = m.productFeatures.collectAsMap.toMap, + itemStringIntMap = itemStringIntMap, + items = items + ) + } + + def predict(model: ALSModel, query: Query): PredictedResult = { + + val productFeatures = model.productFeatures + + // convert items to Int index + val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_)) + .flatten.toSet + + val queryFeatures: Vector[Array[Double]] = queryList.toVector + // productFeatures may not contain the requested item + .map { item => productFeatures.get(item) } + .flatten + + val whiteList: Option[Set[Int]] = query.whiteList.map( set => + set.map(model.itemStringIntMap.get(_)).flatten + ) + val blackList: Option[Set[Int]] = query.blackList.map ( set => + set.map(model.itemStringIntMap.get(_)).flatten + ) + + val ord = Ordering.by[(Int, Double), Double](_._2).reverse + + val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) { + logger.info(s"No productFeatures vector for query items ${query.items}.") + Array[(Int, Double)]() + } else { + productFeatures.par // convert to parallel collection + .mapValues { f => + queryFeatures.map{ qf => + cosine(qf, f) + }.reduce(_ + _) + } + .filter(_._2 > 0) // keep items with score > 0 + .seq // convert back to sequential collection + .toArray + } + + val filteredScore = indexScores.view.filter { case (i, v) => + isCandidateItem( + i = i, + items = model.items, + categories = query.categories, + categoryBlackList = query.categoryBlackList, + queryList = queryList, + whiteList = whiteList, + blackList = blackList + ) + } + + val topScores = getTopN(filteredScore, query.num)(ord).toArray + + val itemScores = topScores.map { case (i, s) => + new ItemScore( + item = model.itemIntStringMap(i), + score = s + ) + } + + new PredictedResult(itemScores) + } + + private + def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = { + + val q = PriorityQueue() + + for (x <- s) { + if (q.size < n) + q.enqueue(x) + else { + // q is full + if (ord.compare(x, q.head) < 0) { + q.dequeue() + q.enqueue(x) + } + } + } + + q.dequeueAll.toSeq.reverse + } + + private + def cosine(v1: Array[Double], v2: Array[Double]): Double = { + val size = v1.size + var i = 0 + var n1: Double = 0 + var n2: Double = 0 + var d: Double = 0 + while (i < size) { + n1 += v1(i) * v1(i) + n2 += v2(i) * v2(i) + d += v1(i) * v2(i) + i += 1 + } + val n1n2 = (math.sqrt(n1) * math.sqrt(n2)) + if (n1n2 == 0) 0 else (d / n1n2) + } + + private + def isCandidateItem( + i: Int, + items: Map[Int, Item], + categories: Option[Set[String]], + categoryBlackList: Option[Set[String]], + queryList: Set[Int], + whiteList: Option[Set[Int]], + blackList: Option[Set[Int]] + ): Boolean = { + whiteList.map(_.contains(i)).getOrElse(true) && + blackList.map(!_.contains(i)).getOrElse(true) && + // discard items in query as well + (!queryList.contains(i)) && + // filter categories + categories.map { cat => + items(i).categories.map { itemCat => + // keep this item if has ovelap categories with the query + !(itemCat.toSet.intersect(cat).isEmpty) + }.getOrElse(false) // discard this item if it has no categories + }.getOrElse(true) && + categoryBlackList.map { cat => + items(i).categories.map { itemCat => + // discard this item if has ovelap categories with the query + (itemCat.toSet.intersect(cat).isEmpty) + }.getOrElse(true) // keep this item if it has no categories + }.getOrElse(true) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala new file mode 100644 index 0000000..30d0b3e --- /dev/null +++ b/examples/scala-parallel-similarproduct/multi-events-multi-algos/src/main/scala/CooccurrenceAlgorithm.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.similarproduct + +import org.apache.predictionio.controller.P2LAlgorithm +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data.storage.BiMap + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +case class CooccurrenceAlgorithmParams( + n: Int // top co-occurrence +) extends Params + +class CooccurrenceModel( + val topCooccurrences: Map[Int, Array[(Int, Int)]], + val itemStringIntMap: BiMap[String, Int], + val items: Map[Int, Item] +) extends Serializable { + @transient lazy val itemIntStringMap = itemStringIntMap.inverse + + override def toString(): String = { + val s = topCooccurrences.mapValues { v => v.mkString(",") } + s.toString + } +} + +class CooccurrenceAlgorithm(val ap: CooccurrenceAlgorithmParams) + extends P2LAlgorithm[PreparedData, CooccurrenceModel, Query, PredictedResult] { + + def train(sc: SparkContext, data: PreparedData): CooccurrenceModel = { + + val itemStringIntMap = BiMap.stringInt(data.items.keys) + + val topCooccurrences = trainCooccurrence( + events = data.viewEvents, + n = ap.n, + itemStringIntMap = itemStringIntMap + ) + + // collect Item as Map and convert ID to Int index + val items: Map[Int, Item] = data.items.map { case (id, item) => + (itemStringIntMap(id), item) + }.collectAsMap.toMap + + new CooccurrenceModel( + topCooccurrences = topCooccurrences, + itemStringIntMap = itemStringIntMap, + items = items + ) + + } + + /* given the user-item events, find out top n co-occurrence pair for each item */ + def trainCooccurrence( + events: RDD[ViewEvent], + n: Int, + itemStringIntMap: BiMap[String, Int]): Map[Int, Array[(Int, Int)]] = { + + val userItem = events + // map item from string to integer index + .flatMap { + case ViewEvent(user, item, _) if itemStringIntMap.contains(item) => + Some(user, itemStringIntMap(item)) + case _ => None + } + // if user view same item multiple times, only count as once + .distinct() + .cache() + + val cooccurrences: RDD[((Int, Int), Int)] = userItem.join(userItem) + // remove duplicate pair in reversed order for each user. eg. (a,b) vs. (b,a) + .filter { case (user, (item1, item2)) => item1 < item2 } + .map { case (user, (item1, item2)) => ((item1, item2), 1) } + .reduceByKey{ (a: Int, b: Int) => a + b } + + val topCooccurrences = cooccurrences + .flatMap{ case (pair, count) => + Seq((pair._1, (pair._2, count)), (pair._2, (pair._1, count))) + } + .groupByKey + .map { case (item, itemCounts) => + (item, itemCounts.toArray.sortBy(_._2)(Ordering.Int.reverse).take(n)) + } + .collectAsMap.toMap + + topCooccurrences + } + + def predict(model: CooccurrenceModel, query: Query): PredictedResult = { + + // convert items to Int index + val queryList: Set[Int] = query.items + .flatMap(model.itemStringIntMap.get(_)) + .toSet + + val whiteList: Option[Set[Int]] = query.whiteList.map( set => + set.map(model.itemStringIntMap.get(_)).flatten + ) + + val blackList: Option[Set[Int]] = query.blackList.map ( set => + set.map(model.itemStringIntMap.get(_)).flatten + ) + + val counts: Array[(Int, Int)] = queryList.toVector + .flatMap { q => + model.topCooccurrences.getOrElse(q, Array()) + } + .groupBy { case (index, count) => index } + .map { case (index, indexCounts) => (index, indexCounts.map(_._2).sum) } + .toArray + + val itemScores = counts + .filter { case (i, v) => + isCandidateItem( + i = i, + items = model.items, + categories = query.categories, + queryList = queryList, + whiteList = whiteList, + blackList = blackList + ) + } + .sortBy(_._2)(Ordering.Int.reverse) + .take(query.num) + .map { case (index, count) => + ItemScore( + item = model.itemIntStringMap(index), + score = count + ) + } + + new PredictedResult(itemScores) + + } + + private + def isCandidateItem( + i: Int, + items: Map[Int, Item], + categories: Option[Set[String]], + queryList: Set[Int], + whiteList: Option[Set[Int]], + blackList: Option[Set[Int]] + ): Boolean = { + whiteList.map(_.contains(i)).getOrElse(true) && + blackList.map(!_.contains(i)).getOrElse(true) && + // discard items in query as well + (!queryList.contains(i)) && + // filter categories + categories.map { cat => + items(i).categories.map { itemCat => + // keep this item if has ovelap categories with the query + !(itemCat.toSet.intersect(cat).isEmpty) + }.getOrElse(false) // discard this item if it has no categories + }.getOrElse(true) + } + +}
