Repository: incubator-predictionio Updated Branches: refs/heads/livedoc 951d7a0a0 -> 018ea8e34
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/CooccurrenceAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/CooccurrenceAlgorithm.scala b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/CooccurrenceAlgorithm.scala new file mode 100644 index 0000000..30d0b3e --- /dev/null +++ b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/CooccurrenceAlgorithm.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.similarproduct + +import org.apache.predictionio.controller.P2LAlgorithm +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data.storage.BiMap + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +case class CooccurrenceAlgorithmParams( + n: Int // top co-occurrence +) extends Params + +class CooccurrenceModel( + val topCooccurrences: Map[Int, Array[(Int, Int)]], + val itemStringIntMap: BiMap[String, Int], + val items: Map[Int, Item] +) extends Serializable { + @transient lazy val itemIntStringMap = itemStringIntMap.inverse + + override def toString(): String = { + val s = topCooccurrences.mapValues { v => v.mkString(",") } + s.toString + } +} + +class CooccurrenceAlgorithm(val ap: CooccurrenceAlgorithmParams) + extends P2LAlgorithm[PreparedData, CooccurrenceModel, Query, PredictedResult] { + + def train(sc: SparkContext, data: PreparedData): CooccurrenceModel = { + + val itemStringIntMap = BiMap.stringInt(data.items.keys) + + val topCooccurrences = trainCooccurrence( + events = data.viewEvents, + n = ap.n, + itemStringIntMap = itemStringIntMap + ) + + // collect Item as Map and convert ID to Int index + val items: Map[Int, Item] = data.items.map { case (id, item) => + (itemStringIntMap(id), item) + }.collectAsMap.toMap + + new CooccurrenceModel( + topCooccurrences = topCooccurrences, + itemStringIntMap = itemStringIntMap, + items = items + ) + + } + + /* given the user-item events, find out top n co-occurrence pair for each item */ + def trainCooccurrence( + events: RDD[ViewEvent], + n: Int, + itemStringIntMap: BiMap[String, Int]): Map[Int, Array[(Int, Int)]] = { + + val userItem = events + // map item from string to integer index + .flatMap { + case ViewEvent(user, item, _) if itemStringIntMap.contains(item) => + Some(user, itemStringIntMap(item)) + case _ => None + } + // if user view same item multiple times, only count as once + .distinct() + .cache() + + val cooccurrences: RDD[((Int, Int), Int)] = userItem.join(userItem) + // remove duplicate pair in reversed order for each user. eg. (a,b) vs. (b,a) + .filter { case (user, (item1, item2)) => item1 < item2 } + .map { case (user, (item1, item2)) => ((item1, item2), 1) } + .reduceByKey{ (a: Int, b: Int) => a + b } + + val topCooccurrences = cooccurrences + .flatMap{ case (pair, count) => + Seq((pair._1, (pair._2, count)), (pair._2, (pair._1, count))) + } + .groupByKey + .map { case (item, itemCounts) => + (item, itemCounts.toArray.sortBy(_._2)(Ordering.Int.reverse).take(n)) + } + .collectAsMap.toMap + + topCooccurrences + } + + def predict(model: CooccurrenceModel, query: Query): PredictedResult = { + + // convert items to Int index + val queryList: Set[Int] = query.items + .flatMap(model.itemStringIntMap.get(_)) + .toSet + + val whiteList: Option[Set[Int]] = query.whiteList.map( set => + set.map(model.itemStringIntMap.get(_)).flatten + ) + + val blackList: Option[Set[Int]] = query.blackList.map ( set => + set.map(model.itemStringIntMap.get(_)).flatten + ) + + val counts: Array[(Int, Int)] = queryList.toVector + .flatMap { q => + model.topCooccurrences.getOrElse(q, Array()) + } + .groupBy { case (index, count) => index } + .map { case (index, indexCounts) => (index, indexCounts.map(_._2).sum) } + .toArray + + val itemScores = counts + .filter { case (i, v) => + isCandidateItem( + i = i, + items = model.items, + categories = query.categories, + queryList = queryList, + whiteList = whiteList, + blackList = blackList + ) + } + .sortBy(_._2)(Ordering.Int.reverse) + .take(query.num) + .map { case (index, count) => + ItemScore( + item = model.itemIntStringMap(index), + score = count + ) + } + + new PredictedResult(itemScores) + + } + + private + def isCandidateItem( + i: Int, + items: Map[Int, Item], + categories: Option[Set[String]], + queryList: Set[Int], + whiteList: Option[Set[Int]], + blackList: Option[Set[Int]] + ): Boolean = { + whiteList.map(_.contains(i)).getOrElse(true) && + blackList.map(!_.contains(i)).getOrElse(true) && + // discard items in query as well + (!queryList.contains(i)) && + // filter categories + categories.map { cat => + items(i).categories.map { itemCat => + // keep this item if has ovelap categories with the query + !(itemCat.toSet.intersect(cat).isEmpty) + }.getOrElse(false) // discard this item if it has no categories + }.getOrElse(true) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/DataSource.scala b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/DataSource.scala new file mode 100644 index 0000000..cc7c188 --- /dev/null +++ b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/DataSource.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.similarproduct + +import org.apache.predictionio.controller.PDataSource +import org.apache.predictionio.controller.EmptyEvaluationInfo +import org.apache.predictionio.controller.EmptyActualResult +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.store.PEventStore + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import grizzled.slf4j.Logger + +case class DataSourceParams(appName: String) extends Params + +class DataSource(val dsp: DataSourceParams) + extends PDataSource[TrainingData, + EmptyEvaluationInfo, Query, EmptyActualResult] { + + @transient lazy val logger = Logger[this.type] + + override + def readTraining(sc: SparkContext): TrainingData = { + + // create a RDD of (entityID, Item) + val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties( + appName = dsp.appName, + entityType = "item" + )(sc).map { case (entityId, properties) => + val item = try { + // Assume categories is optional property of item. + Item(categories = properties.getOpt[List[String]]("categories")) + } catch { + case e: Exception => { + logger.error(s"Failed to get properties ${properties} of" + + s" item ${entityId}. Exception: ${e}.") + throw e + } + } + (entityId, item) + }.cache() + + // get all "user" "view" "item" events + val viewEventsRDD: RDD[ViewEvent] = PEventStore.find( + appName = dsp.appName, + entityType = Some("user"), + eventNames = Some(List("view")), + // targetEntityType is optional field of an event. + targetEntityType = Some(Some("item")))(sc) + // eventsDb.find() returns RDD[Event] + .map { event => + val viewEvent = try { + event.event match { + case "view" => ViewEvent( + user = event.entityId, + item = event.targetEntityId.get, + t = event.eventTime.getMillis) + case _ => throw new Exception(s"Unexpected event ${event} is read.") + } + } catch { + case e: Exception => { + logger.error(s"Cannot convert ${event} to ViewEvent." + + s" Exception: ${e}.") + throw e + } + } + viewEvent + }.cache() + + new TrainingData( + items = itemsRDD, + viewEvents = viewEventsRDD + ) + } +} + +case class Item(categories: Option[List[String]]) + +case class ViewEvent(user: String, item: String, t: Long) + +class TrainingData( + val items: RDD[(String, Item)], + val viewEvents: RDD[ViewEvent] +) extends Serializable { + override def toString = { + s"items: [${items.count()} (${items.take(2).toList}...)]" + + s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)" + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/Engine.scala b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/Engine.scala new file mode 100644 index 0000000..2563fdf --- /dev/null +++ b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/Engine.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.similarproduct + +import org.apache.predictionio.controller.EngineFactory +import org.apache.predictionio.controller.Engine + +case class Query( + items: List[String], + num: Int, + categories: Option[Set[String]], + categoryBlackList: Option[Set[String]], + whiteList: Option[Set[String]], + blackList: Option[Set[String]] +) extends Serializable + +case class PredictedResult( + itemScores: Array[ItemScore] +) extends Serializable { + override def toString: String = itemScores.mkString(",") +} + +case class ItemScore( + item: String, + score: Double +) extends Serializable + +object SimilarProductEngine extends EngineFactory { + def apply() = { + new Engine( + classOf[DataSource], + classOf[Preparator], + Map( + "als" -> classOf[ALSAlgorithm], + "cooccurrence" -> classOf[CooccurrenceAlgorithm]), + classOf[Serving]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/Preparator.scala b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/Preparator.scala new file mode 100644 index 0000000..908b9b8 --- /dev/null +++ b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/Preparator.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.similarproduct + +import org.apache.predictionio.controller.PPreparator + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +class Preparator + extends PPreparator[TrainingData, PreparedData] { + + def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { + new PreparedData( + items = trainingData.items, + viewEvents = trainingData.viewEvents) + } +} + +class PreparedData( + val items: RDD[(String, Item)], + val viewEvents: RDD[ViewEvent] +) extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/Serving.scala b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/Serving.scala new file mode 100644 index 0000000..91abca6 --- /dev/null +++ b/examples/scala-parallel-similarproduct/rid-user-set-event/src/main/scala/Serving.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.similarproduct + +import org.apache.predictionio.controller.LServing + +class Serving + extends LServing[Query, PredictedResult] { + + override + def serve(query: Query, + predictedResults: Seq[PredictedResult]): PredictedResult = { + predictedResults.head + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/rid-user-set-event/template.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/rid-user-set-event/template.json b/examples/scala-parallel-similarproduct/rid-user-set-event/template.json new file mode 100644 index 0000000..d076ec5 --- /dev/null +++ b/examples/scala-parallel-similarproduct/rid-user-set-event/template.json @@ -0,0 +1 @@ +{"pio": {"version": { "min": "0.10.0-incubating" }}} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/build.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/build.sbt b/examples/scala-parallel-similarproduct/train-with-rate-event/build.sbt new file mode 100644 index 0000000..1daded6 --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/build.sbt @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +name := "template-scala-parallel-similarproduct" + +organization := "org.apache.predictionio" +scalaVersion := "2.11.8" +libraryDependencies ++= Seq( + "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided", + "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/data/import_eventserver.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/data/import_eventserver.py b/examples/scala-parallel-similarproduct/train-with-rate-event/data/import_eventserver.py new file mode 100644 index 0000000..ae26352 --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/data/import_eventserver.py @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Import sample data for similar product engine +""" + +import predictionio +import argparse +import random + +SEED = 3 + +def import_events(client): + random.seed(SEED) + count = 0 + print(client.get_status()) + print("Importing data...") + + # generate 10 users, with user ids u1,u2,....,u10 + user_ids = ["u%s" % i for i in range(1, 11)] + for user_id in user_ids: + print("Set user", user_id) + client.create_event( + event="$set", + entity_type="user", + entity_id=user_id + ) + count += 1 + + # generate 50 items, with item ids i1,i2,....,i50 + # random assign 1 to 4 categories among c1-c6 to items + categories = ["c%s" % i for i in range(1, 7)] + item_ids = ["i%s" % i for i in range(1, 51)] + for item_id in item_ids: + print("Set item", item_id) + client.create_event( + event="$set", + entity_type="item", + entity_id=item_id, + properties={ + "categories" : random.sample(categories, random.randint(1, 4)) + } + ) + count += 1 + + # each user randomly viewed 10 items + for user_id in user_ids: + for viewed_item in random.sample(item_ids, 10): + print("User", user_id ,"views item", viewed_item) + client.create_event( + event="view", + entity_type="user", + entity_id=user_id, + target_entity_type="item", + target_entity_id=viewed_item + ) + count += 1 + # randomly rate some of the viewed items + if random.choice([True, False]): + rating = random.choice(range(1,6)) + print("User", user_id ,"rates item", viewed_item, "rating", rating) + client.create_event( + event="rate", + entity_type="user", + entity_id=user_id, + target_entity_type="item", + target_entity_id=viewed_item, + properties={ + "rating": rating + } + ) + count += 1 + + print("%s events are imported." % count) + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Import sample data for similar product engine") + parser.add_argument('--access_key', default='invald_access_key') + parser.add_argument('--url', default="http://localhost:7070") + + args = parser.parse_args() + print(args) + + client = predictionio.EventClient( + access_key=args.access_key, + url=args.url, + threads=5, + qsize=500) + import_events(client) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/data/send_query.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/data/send_query.py b/examples/scala-parallel-similarproduct/train-with-rate-event/data/send_query.py new file mode 100644 index 0000000..0a70f28 --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/data/send_query.py @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Send sample query to prediction engine +""" + +import predictionio +engine_client = predictionio.EngineClient(url="http://localhost:8000") +print(engine_client.send_query({"items": ["i1", "i3"], "num": 4})) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/engine-cooccurrence.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/engine-cooccurrence.json b/examples/scala-parallel-similarproduct/train-with-rate-event/engine-cooccurrence.json new file mode 100644 index 0000000..c31b88e --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/engine-cooccurrence.json @@ -0,0 +1,18 @@ +{ + "id": "default", + "description": "Default settings", + "engineFactory": "org.apache.predictionio.examples.similarproduct.SimilarProductEngine", + "datasource": { + "params" : { + "appName": "MyApp1" + } + }, + "algorithms": [ + { + "name": "cooccurrence", + "params": { + "n": 20 + } + } + ] +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/engine.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/engine.json b/examples/scala-parallel-similarproduct/train-with-rate-event/engine.json new file mode 100644 index 0000000..a652ec4 --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/engine.json @@ -0,0 +1,21 @@ +{ + "id": "default", + "description": "Default settings", + "engineFactory": "org.apache.predictionio.examples.similarproduct.SimilarProductEngine", + "datasource": { + "params" : { + "appName": "MyApp1" + } + }, + "algorithms": [ + { + "name": "als", + "params": { + "rank": 10, + "numIterations" : 20, + "lambda": 0.01, + "seed": 3 + } + } + ] +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/project/assembly.sbt b/examples/scala-parallel-similarproduct/train-with-rate-event/project/assembly.sbt new file mode 100644 index 0000000..e17409e --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/project/build.properties ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/project/build.properties b/examples/scala-parallel-similarproduct/train-with-rate-event/project/build.properties new file mode 100644 index 0000000..64317fd --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.15 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/ALSAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/ALSAlgorithm.scala new file mode 100644 index 0000000..eaefe17 --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/ALSAlgorithm.scala @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.similarproduct + +import org.apache.predictionio.controller.P2LAlgorithm +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data.storage.BiMap + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.recommendation.ALS +import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} + +import grizzled.slf4j.Logger + +import scala.collection.mutable.PriorityQueue + +case class ALSAlgorithmParams( + rank: Int, + numIterations: Int, + lambda: Double, + seed: Option[Long]) extends Params + +class ALSModel( + val productFeatures: Map[Int, Array[Double]], + val itemStringIntMap: BiMap[String, Int], + val items: Map[Int, Item] +) extends Serializable { + + @transient lazy val itemIntStringMap = itemStringIntMap.inverse + + override def toString = { + s" productFeatures: [${productFeatures.size}]" + + s"(${productFeatures.take(2).toList}...)" + + s" itemStringIntMap: [${itemStringIntMap.size}]" + + s"(${itemStringIntMap.take(2).toString}...)]" + + s" items: [${items.size}]" + + s"(${items.take(2).toString}...)]" + } +} + +/** + * Use ALS to build item x feature matrix + */ +class ALSAlgorithm(val ap: ALSAlgorithmParams) + extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] { + + @transient lazy val logger = Logger[this.type] + + def train(sc:SparkContext ,data: PreparedData): ALSModel = { + require(!data.rateEvents.take(1).isEmpty, // MODIFIED + s"rateEvents in PreparedData cannot be empty." + // MODIFIED + " Please check if DataSource generates TrainingData" + + " and Preprator generates PreparedData correctly.") + require(!data.users.take(1).isEmpty, + s"users in PreparedData cannot be empty." + + " Please check if DataSource generates TrainingData" + + " and Preprator generates PreparedData correctly.") + require(!data.items.take(1).isEmpty, + s"items in PreparedData cannot be empty." + + " Please check if DataSource generates TrainingData" + + " and Preprator generates PreparedData correctly.") + // create User and item's String ID to integer index BiMap + val userStringIntMap = BiMap.stringInt(data.users.keys) + val itemStringIntMap = BiMap.stringInt(data.items.keys) + + // collect Item as Map and convert ID to Int index + val items: Map[Int, Item] = data.items.map { case (id, item) => + (itemStringIntMap(id), item) + }.collectAsMap.toMap + + val mllibRatings = data.rateEvents // MODIFIED + .map { r => + // Convert user and item String IDs to Int index for MLlib + val uindex = userStringIntMap.getOrElse(r.user, -1) + val iindex = itemStringIntMap.getOrElse(r.item, -1) + + if (uindex == -1) + logger.info(s"Couldn't convert nonexistent user ID ${r.user}" + + " to Int index.") + + if (iindex == -1) + logger.info(s"Couldn't convert nonexistent item ID ${r.item}" + + " to Int index.") + + ((uindex, iindex), (r.rating,r.t)) //MODIFIED + }.filter { case ((u, i), v) => + // keep events with valid user and item index + (u != -1) && (i != -1) + } + .reduceByKey { case (v1, v2) => // MODIFIED + // if a user may rate same item with different value at different times, + // use the latest value for this case. + // Can remove this reduceByKey() if no need to support this case. + val (rating1, t1) = v1 + val (rating2, t2) = v2 + // keep the latest value + if (t1 > t2) v1 else v2 + } + .map { case ((u, i), (rating, t)) => // MODIFIED + // MLlibRating requires integer index for user and item + MLlibRating(u, i, rating) // MODIFIED + } + .cache() + + // MLLib ALS cannot handle empty training data. + require(!mllibRatings.take(1).isEmpty, + s"mllibRatings cannot be empty." + + " Please check if your events contain valid user and item ID.") + + // seed for MLlib ALS + val seed = ap.seed.getOrElse(System.nanoTime) + + val m = ALS.train( // MODIFIED + ratings = mllibRatings, + rank = ap.rank, + iterations = ap.numIterations, + lambda = ap.lambda, + blocks = -1, + seed = seed) + + new ALSModel( + productFeatures = m.productFeatures.collectAsMap.toMap, + itemStringIntMap = itemStringIntMap, + items = items + ) + } + + def predict(model: ALSModel, query: Query): PredictedResult = { + + val productFeatures = model.productFeatures + + // convert items to Int index + val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_)) + .flatten.toSet + + val queryFeatures: Vector[Array[Double]] = queryList.toVector + // productFeatures may not contain the requested item + .map { item => productFeatures.get(item) } + .flatten + + val whiteList: Option[Set[Int]] = query.whiteList.map( set => + set.map(model.itemStringIntMap.get(_)).flatten + ) + val blackList: Option[Set[Int]] = query.blackList.map ( set => + set.map(model.itemStringIntMap.get(_)).flatten + ) + + val ord = Ordering.by[(Int, Double), Double](_._2).reverse + + val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) { + logger.info(s"No productFeatures vector for query items ${query.items}.") + Array[(Int, Double)]() + } else { + productFeatures.par // convert to parallel collection + .mapValues { f => + queryFeatures.map{ qf => + cosine(qf, f) + }.reduce(_ + _) + } + .filter(_._2 > 0) // keep items with score > 0 + .seq // convert back to sequential collection + .toArray + } + + val filteredScore = indexScores.view.filter { case (i, v) => + isCandidateItem( + i = i, + items = model.items, + categories = query.categories, + categoryBlackList = query.categoryBlackList, + queryList = queryList, + whiteList = whiteList, + blackList = blackList + ) + } + + val topScores = getTopN(filteredScore, query.num)(ord).toArray + + val itemScores = topScores.map { case (i, s) => + new ItemScore( + item = model.itemIntStringMap(i), + score = s + ) + } + + new PredictedResult(itemScores) + } + + private + def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = { + + val q = PriorityQueue() + + for (x <- s) { + if (q.size < n) + q.enqueue(x) + else { + // q is full + if (ord.compare(x, q.head) < 0) { + q.dequeue() + q.enqueue(x) + } + } + } + + q.dequeueAll.toSeq.reverse + } + + private + def cosine(v1: Array[Double], v2: Array[Double]): Double = { + val size = v1.size + var i = 0 + var n1: Double = 0 + var n2: Double = 0 + var d: Double = 0 + while (i < size) { + n1 += v1(i) * v1(i) + n2 += v2(i) * v2(i) + d += v1(i) * v2(i) + i += 1 + } + val n1n2 = (math.sqrt(n1) * math.sqrt(n2)) + if (n1n2 == 0) 0 else (d / n1n2) + } + + private + def isCandidateItem( + i: Int, + items: Map[Int, Item], + categories: Option[Set[String]], + categoryBlackList: Option[Set[String]], + queryList: Set[Int], + whiteList: Option[Set[Int]], + blackList: Option[Set[Int]] + ): Boolean = { + whiteList.map(_.contains(i)).getOrElse(true) && + blackList.map(!_.contains(i)).getOrElse(true) && + // discard items in query as well + (!queryList.contains(i)) && + // filter categories + categories.map { cat => + items(i).categories.map { itemCat => + // keep this item if has ovelap categories with the query + !(itemCat.toSet.intersect(cat).isEmpty) + }.getOrElse(false) // discard this item if it has no categories + }.getOrElse(true) && + categoryBlackList.map { cat => + items(i).categories.map { itemCat => + // discard this item if has ovelap categories with the query + (itemCat.toSet.intersect(cat).isEmpty) + }.getOrElse(true) // keep this item if it has no categories + }.getOrElse(true) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/CooccurrenceAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/CooccurrenceAlgorithm.scala b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/CooccurrenceAlgorithm.scala new file mode 100644 index 0000000..b5035f8 --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/CooccurrenceAlgorithm.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.similarproduct + +import org.apache.predictionio.controller.P2LAlgorithm +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data.storage.BiMap + +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD + +case class CooccurrenceAlgorithmParams( + n: Int // top co-occurrence +) extends Params + +class CooccurrenceModel( + val topCooccurrences: Map[Int, Array[(Int, Int)]], + val itemStringIntMap: BiMap[String, Int], + val items: Map[Int, Item] +) extends Serializable { + @transient lazy val itemIntStringMap = itemStringIntMap.inverse + + override def toString(): String = { + val s = topCooccurrences.mapValues { v => v.mkString(",") } + s.toString + } +} + +class CooccurrenceAlgorithm(val ap: CooccurrenceAlgorithmParams) + extends P2LAlgorithm[PreparedData, CooccurrenceModel, Query, PredictedResult] { + + def train(sc: SparkContext, data: PreparedData): CooccurrenceModel = { + + val itemStringIntMap = BiMap.stringInt(data.items.keys) + + val topCooccurrences = trainCooccurrence( + events = data.rateEvents, // MODIFIED + n = ap.n, + itemStringIntMap = itemStringIntMap + ) + + // collect Item as Map and convert ID to Int index + val items: Map[Int, Item] = data.items.map { case (id, item) => + (itemStringIntMap(id), item) + }.collectAsMap.toMap + + new CooccurrenceModel( + topCooccurrences = topCooccurrences, + itemStringIntMap = itemStringIntMap, + items = items + ) + + } + + /* given the user-item events, find out top n co-occurrence pair for each item */ + def trainCooccurrence( + events: RDD[RateEvent], // MODIFIED + n: Int, + itemStringIntMap: BiMap[String, Int]): Map[Int, Array[(Int, Int)]] = { + + val userItem = events + // map item from string to integer index + .flatMap { + // MODIFIED + case RateEvent(user, item, _, _) if itemStringIntMap.contains(item) => + Some(user, itemStringIntMap(item)) + case _ => None + } + // if user view same item multiple times, only count as once + .distinct() + .cache() + + val cooccurrences: RDD[((Int, Int), Int)] = userItem.join(userItem) + // remove duplicate pair in reversed order for each user. eg. (a,b) vs. (b,a) + .filter { case (user, (item1, item2)) => item1 < item2 } + .map { case (user, (item1, item2)) => ((item1, item2), 1) } + .reduceByKey{ (a: Int, b: Int) => a + b } + + val topCooccurrences = cooccurrences + .flatMap{ case (pair, count) => + Seq((pair._1, (pair._2, count)), (pair._2, (pair._1, count))) + } + .groupByKey + .map { case (item, itemCounts) => + (item, itemCounts.toArray.sortBy(_._2)(Ordering.Int.reverse).take(n)) + } + .collectAsMap.toMap + + topCooccurrences + } + + def predict(model: CooccurrenceModel, query: Query): PredictedResult = { + + // convert items to Int index + val queryList: Set[Int] = query.items + .flatMap(model.itemStringIntMap.get(_)) + .toSet + + val whiteList: Option[Set[Int]] = query.whiteList.map( set => + set.map(model.itemStringIntMap.get(_)).flatten + ) + + val blackList: Option[Set[Int]] = query.blackList.map ( set => + set.map(model.itemStringIntMap.get(_)).flatten + ) + + val counts: Array[(Int, Int)] = queryList.toVector + .flatMap { q => + model.topCooccurrences.getOrElse(q, Array()) + } + .groupBy { case (index, count) => index } + .map { case (index, indexCounts) => (index, indexCounts.map(_._2).sum) } + .toArray + + val itemScores = counts + .filter { case (i, v) => + isCandidateItem( + i = i, + items = model.items, + categories = query.categories, + queryList = queryList, + whiteList = whiteList, + blackList = blackList + ) + } + .sortBy(_._2)(Ordering.Int.reverse) + .take(query.num) + .map { case (index, count) => + ItemScore( + item = model.itemIntStringMap(index), + score = count + ) + } + + new PredictedResult(itemScores) + + } + + private + def isCandidateItem( + i: Int, + items: Map[Int, Item], + categories: Option[Set[String]], + queryList: Set[Int], + whiteList: Option[Set[Int]], + blackList: Option[Set[Int]] + ): Boolean = { + whiteList.map(_.contains(i)).getOrElse(true) && + blackList.map(!_.contains(i)).getOrElse(true) && + // discard items in query as well + (!queryList.contains(i)) && + // filter categories + categories.map { cat => + items(i).categories.map { itemCat => + // keep this item if has ovelap categories with the query + !(itemCat.toSet.intersect(cat).isEmpty) + }.getOrElse(false) // discard this item if it has no categories + }.getOrElse(true) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/DataSource.scala b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/DataSource.scala new file mode 100644 index 0000000..6cb567a --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/DataSource.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.similarproduct + +import org.apache.predictionio.controller.PDataSource +import org.apache.predictionio.controller.EmptyEvaluationInfo +import org.apache.predictionio.controller.EmptyActualResult +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.store.PEventStore + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import grizzled.slf4j.Logger + +case class DataSourceParams(appName: String) extends Params + +class DataSource(val dsp: DataSourceParams) + extends PDataSource[TrainingData, + EmptyEvaluationInfo, Query, EmptyActualResult] { + + @transient lazy val logger = Logger[this.type] + + override + def readTraining(sc: SparkContext): TrainingData = { + + // create a RDD of (entityID, User) + val usersRDD: RDD[(String, User)] = PEventStore.aggregateProperties( + appName = dsp.appName, + entityType = "user" + )(sc).map { case (entityId, properties) => + val user = try { + User() + } catch { + case e: Exception => { + logger.error(s"Failed to get properties ${properties} of" + + s" user ${entityId}. Exception: ${e}.") + throw e + } + } + (entityId, user) + }.cache() + + // create a RDD of (entityID, Item) + val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties( + appName = dsp.appName, + entityType = "item" + )(sc).map { case (entityId, properties) => + val item = try { + // Assume categories is optional property of item. + Item(categories = properties.getOpt[List[String]]("categories")) + } catch { + case e: Exception => { + logger.error(s"Failed to get properties ${properties} of" + + s" item ${entityId}. Exception: ${e}.") + throw e + } + } + (entityId, item) + }.cache() + + // get all "user" "rate" "item" events + val rateEventsRDD: RDD[RateEvent] = PEventStore.find( // MODIFIED + appName = dsp.appName, + entityType = Some("user"), + eventNames = Some(List("rate")), // MODIFIED + // targetEntityType is optional field of an event. + targetEntityType = Some(Some("item")))(sc) + // eventsDb.find() returns RDD[Event] + .map { event => + val rateEvent = try { // MODIFIED + event.event match { + case "rate" => RateEvent( // MODIFIED + user = event.entityId, + item = event.targetEntityId.get, + rating = event.properties.get[Double]("rating"), // ADDED + t = event.eventTime.getMillis) + case _ => throw new Exception(s"Unexpected event ${event} is read.") + } + } catch { + case e: Exception => { + logger.error(s"Cannot convert ${event} to RateEvent." + // MODIFIED + s" Exception: ${e}.") + throw e + } + } + rateEvent // MODIFIED + }.cache() + + new TrainingData( + users = usersRDD, + items = itemsRDD, + rateEvents = rateEventsRDD // MODIFIED + ) + } +} + +case class User() + +case class Item(categories: Option[List[String]]) + +// MODIFIED +case class RateEvent(user: String, item: String, rating: Double, t: Long) + +class TrainingData( + val users: RDD[(String, User)], + val items: RDD[(String, Item)], + val rateEvents: RDD[RateEvent] // MODIFIED +) extends Serializable { + override def toString = { + s"users: [${users.count()} (${users.take(2).toList}...)]" + + s"items: [${items.count()} (${items.take(2).toList}...)]" + + // MODIFIED + s"rateEvents: [${rateEvents.count()}] (${rateEvents.take(2).toList}...)" + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/Engine.scala b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/Engine.scala new file mode 100644 index 0000000..2563fdf --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/Engine.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.similarproduct + +import org.apache.predictionio.controller.EngineFactory +import org.apache.predictionio.controller.Engine + +case class Query( + items: List[String], + num: Int, + categories: Option[Set[String]], + categoryBlackList: Option[Set[String]], + whiteList: Option[Set[String]], + blackList: Option[Set[String]] +) extends Serializable + +case class PredictedResult( + itemScores: Array[ItemScore] +) extends Serializable { + override def toString: String = itemScores.mkString(",") +} + +case class ItemScore( + item: String, + score: Double +) extends Serializable + +object SimilarProductEngine extends EngineFactory { + def apply() = { + new Engine( + classOf[DataSource], + classOf[Preparator], + Map( + "als" -> classOf[ALSAlgorithm], + "cooccurrence" -> classOf[CooccurrenceAlgorithm]), + classOf[Serving]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/Preparator.scala b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/Preparator.scala new file mode 100644 index 0000000..187e423 --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/Preparator.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.similarproduct + +import org.apache.predictionio.controller.PPreparator + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +class Preparator + extends PPreparator[TrainingData, PreparedData] { + + def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { + new PreparedData( + users = trainingData.users, + items = trainingData.items, + rateEvents = trainingData.rateEvents) // MODIFIED + } +} + +class PreparedData( + val users: RDD[(String, User)], + val items: RDD[(String, Item)], + val rateEvents: RDD[RateEvent] // MODIFIED +) extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/Serving.scala b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/Serving.scala new file mode 100644 index 0000000..91abca6 --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/src/main/scala/Serving.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.similarproduct + +import org.apache.predictionio.controller.LServing + +class Serving + extends LServing[Query, PredictedResult] { + + override + def serve(query: Query, + predictedResults: Seq[PredictedResult]): PredictedResult = { + predictedResults.head + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/train-with-rate-event/template.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/train-with-rate-event/template.json b/examples/scala-parallel-similarproduct/train-with-rate-event/template.json new file mode 100644 index 0000000..d076ec5 --- /dev/null +++ b/examples/scala-parallel-similarproduct/train-with-rate-event/template.json @@ -0,0 +1 @@ +{"pio": {"version": { "min": "0.10.0-incubating" }}}
