http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-custom-datasource/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-custom-datasource/src/main/scala/Serving.scala b/examples/experimental/scala-parallel-recommendation-custom-datasource/src/main/scala/Serving.scala deleted file mode 100644 index 48fda35..0000000 --- a/examples/experimental/scala-parallel-recommendation-custom-datasource/src/main/scala/Serving.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.LServing - -class Serving - extends LServing[Query, PredictedResult] { - - override def serve(query: Query, - predictedResults: Seq[PredictedResult]): PredictedResult = { - predictedResults.head - } -}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-entitymap/.gitignore ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-entitymap/.gitignore b/examples/experimental/scala-parallel-recommendation-entitymap/.gitignore deleted file mode 100644 index ea4e89d..0000000 --- a/examples/experimental/scala-parallel-recommendation-entitymap/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -data/sample_movielens_data.txt -manifest.json http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-entitymap/build.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-entitymap/build.sbt b/examples/experimental/scala-parallel-recommendation-entitymap/build.sbt deleted file mode 100644 index 91e94f2..0000000 --- a/examples/experimental/scala-parallel-recommendation-entitymap/build.sbt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import AssemblyKeys._ - -assemblySettings - -name := "template-scala-parallel-recommendation" - -organization := "org.apache.predictionio" - -libraryDependencies ++= Seq( - "org.apache.predictionio" %% "core" % "0.9.1" % "provided", - "org.apache.spark" %% "spark-core" % "1.2.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-entitymap/data/import_eventserver.py ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-entitymap/data/import_eventserver.py b/examples/experimental/scala-parallel-recommendation-entitymap/data/import_eventserver.py deleted file mode 100644 index 57a9528..0000000 --- a/examples/experimental/scala-parallel-recommendation-entitymap/data/import_eventserver.py +++ /dev/null @@ -1,107 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Import sample data for recommendation engine -""" - -import predictionio -import argparse -import random - -SEED = 3 - -def import_events(client): - random.seed(SEED) - count = 0 - print "Importing data..." - - # generate 10 users, with user uid1,2,....,10 - # with some random attributes - user_ids = [ ("uid"+str(i)) for i in range(1, 11)] - for user_id in user_ids: - print "Set user", user_id - client.create_event( - event="$set", - entity_type="user", - entity_id=user_id, - properties={ - "attr0" : float(random.randint(0, 4)), - "attr1" : random.randint(10, 14), - "attr2" : random.randint(20, 24) - } - ) - count += 1 - - # generate 50 items, with iid1,2,....,50 - # with some randome attributes - item_ids = [ ("iid"+str(i)) for i in range(1, 51)] - for item_id in item_ids: - print "Set item", item_id - client.create_event( - event="$set", - entity_type="item", - entity_id=item_id, - properties={ - "attrA" : random.choice(["something1", "something2", "valueX"]), - "attrB" : random.randint(10, 30), - "attrC" : random.choice([True, False]) - } - ) - count += 1 - - # each user randomly rate or buy 10 items - for user_id in user_ids: - for viewed_item in random.sample(item_ids, 10): - if (random.randint(0, 1) == 1): - print "User", user_id ,"rates item", viewed_item - client.create_event( - event="rate", - entity_type="user", - entity_id=user_id, - target_entity_type="item", - target_entity_id=item_id, - properties= { "rating" : float(random.randint(1, 6)) } - ) - else: - print "User", user_id ,"buys item", viewed_item - client.create_event( - event="buy", - entity_type="user", - entity_id=user_id, - target_entity_type="item", - target_entity_id=item_id - ) - count += 1 - - print "%s events are imported." % count - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description="Import sample data for recommendation engine") - parser.add_argument('--access_key', default='invald_access_key') - parser.add_argument('--url', default="http://localhost:7070") - - args = parser.parse_args() - print args - - client = predictionio.EventClient( - access_key=args.access_key, - url=args.url, - threads=5, - qsize=500) - import_events(client) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-entitymap/data/send_query.py ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-entitymap/data/send_query.py b/examples/experimental/scala-parallel-recommendation-entitymap/data/send_query.py deleted file mode 100644 index cb453b4..0000000 --- a/examples/experimental/scala-parallel-recommendation-entitymap/data/send_query.py +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Send sample query to prediction engine -""" - -import predictionio -engine_client = predictionio.EngineClient(url="http://localhost:8000") -print engine_client.send_query({"user": "uid1", "num": 4}) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-entitymap/engine.json ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-entitymap/engine.json b/examples/experimental/scala-parallel-recommendation-entitymap/engine.json deleted file mode 100644 index 256a153..0000000 --- a/examples/experimental/scala-parallel-recommendation-entitymap/engine.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "id": "default", - "description": "Default settings", - "engineFactory": "org.template.recommendation.RecommendationEngine", - "datasource": { - "params": { - "appId": 1 - } - }, - "algorithms": [ - { - "name": "als", - "params": { - "rank": 10, - "numIterations": 20, - "lambda": 0.01 - } - } - ] -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-entitymap/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-entitymap/project/assembly.sbt b/examples/experimental/scala-parallel-recommendation-entitymap/project/assembly.sbt deleted file mode 100644 index 54c3252..0000000 --- a/examples/experimental/scala-parallel-recommendation-entitymap/project/assembly.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/ALSAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/ALSAlgorithm.scala b/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/ALSAlgorithm.scala deleted file mode 100644 index 211ac7d..0000000 --- a/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/ALSAlgorithm.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.PAlgorithm -import org.apache.predictionio.controller.Params - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.recommendation.ALS -import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} -import org.apache.spark.mllib.recommendation.ALSModel - -import grizzled.slf4j.Logger - -case class ALSAlgorithmParams( - rank: Int, - numIterations: Int, - lambda: Double) extends Params - -class ALSAlgorithm(val ap: ALSAlgorithmParams) - extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] { - - @transient lazy val logger = Logger[this.type] - - def train(data: PreparedData): ALSModel = { - // Convert user and item String IDs to Int index for MLlib - val mllibRatings = data.ratings.map( r => - // MLlibRating requires integer index for user and item - MLlibRating(data.users(r.user).toInt, - data.items(r.item).toInt, r.rating) - ) - val m = ALS.train(mllibRatings, ap.rank, ap.numIterations, ap.lambda) - new ALSModel( - rank = m.rank, - userFeatures = m.userFeatures, - productFeatures = m.productFeatures, - users = data.users, - items = data.items) - } - - def predict(model: ALSModel, query: Query): PredictedResult = { - // Convert String ID to Int index for Mllib - model.users.get(query.user).map { userInt => - // recommendProducts() returns Array[MLlibRating], which uses item Int - // index. Convert it to String ID for returning PredictedResult - val itemScores = model.recommendProducts(userInt.toInt, query.num) - .map (r => ItemScore(model.items(r.product.toLong), r.rating)) - new PredictedResult(itemScores) - }.getOrElse{ - logger.info(s"No prediction for unknown user ${query.user}.") - new PredictedResult(Array.empty) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/ALSModel.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/ALSModel.scala b/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/ALSModel.scala deleted file mode 100644 index 856f735..0000000 --- a/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/ALSModel.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.recommendation -// This must be the same package as Spark's MatrixFactorizationModel because -// MatrixFactorizationModel's constructor is private and we are using -// its constructor in order to save and load the model - -import org.template.recommendation.ALSAlgorithmParams - -import org.template.recommendation.User -import org.template.recommendation.Item - -import org.apache.predictionio.controller.IPersistentModel -import org.apache.predictionio.controller.IPersistentModelLoader -import org.apache.predictionio.data.storage.EntityMap - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class ALSModel( - override val rank: Int, - override val userFeatures: RDD[(Int, Array[Double])], - override val productFeatures: RDD[(Int, Array[Double])], - val users: EntityMap[User], - val items: EntityMap[Item]) - extends MatrixFactorizationModel(rank, userFeatures, productFeatures) - with IPersistentModel[ALSAlgorithmParams] { - - def save(id: String, params: ALSAlgorithmParams, - sc: SparkContext): Boolean = { - - sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank") - userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures") - productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures") - sc.parallelize(Seq(users)) - .saveAsObjectFile(s"/tmp/${id}/users") - sc.parallelize(Seq(items)) - .saveAsObjectFile(s"/tmp/${id}/items") - true - } - - override def toString = { - s"userFeatures: [${userFeatures.count()}]" + - s"(${userFeatures.take(2).toList}...)" + - s" productFeatures: [${productFeatures.count()}]" + - s"(${productFeatures.take(2).toList}...)" + - s" users: [${users.size}]" + - s"(${users.take(2)}...)" + - s" items: [${items.size}]" + - s"(${items.take(2)}...)" - } -} - -object ALSModel - extends IPersistentModelLoader[ALSAlgorithmParams, ALSModel] { - def apply(id: String, params: ALSAlgorithmParams, - sc: Option[SparkContext]) = { - new ALSModel( - rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first, - userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"), - productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"), - users = sc.get - .objectFile[EntityMap[User]](s"/tmp/${id}/users").first, - items = sc.get - .objectFile[EntityMap[Item]](s"/tmp/${id}/items").first) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/DataSource.scala b/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/DataSource.scala deleted file mode 100644 index 72dcbf9..0000000 --- a/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/DataSource.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.PDataSource -import org.apache.predictionio.controller.EmptyEvaluationInfo -import org.apache.predictionio.controller.EmptyActualResult -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.Storage -import org.apache.predictionio.data.storage.EntityMap - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import grizzled.slf4j.Logger - -case class DataSourceParams(appId: Int) extends Params - -class DataSource(val dsp: DataSourceParams) - extends PDataSource[TrainingData, - EmptyEvaluationInfo, Query, EmptyActualResult] { - - @transient lazy val logger = Logger[this.type] - - override - def readTraining(sc: SparkContext): TrainingData = { - val eventsDb = Storage.getPEvents() - - val users: EntityMap[User] = eventsDb.extractEntityMap[User]( - appId = dsp.appId, - entityType = "user", - required = Some(Seq("attr0", "attr1", "attr2")) - )(sc) { dm => - User( - attr0 = dm.get[Double]("attr0"), - attr1 = dm.get[Int]("attr1"), - attr2 = dm.get[Int]("attr2") - ) - } - - val items: EntityMap[Item] = eventsDb.extractEntityMap[Item]( - appId = dsp.appId, - entityType = "item", - required = Some(Seq("attrA", "attrB", "attrC")) - )(sc) { dm => - Item( - attrA = dm.get[String]("attrA"), - attrB = dm.get[Int]("attrB"), - attrC = dm.get[Boolean]("attrC") - ) - } - - val eventsRDD: RDD[Event] = eventsDb.find( - appId = dsp.appId, - entityType = Some("user"), - eventNames = Some(List("rate", "buy")), // read "rate" and "buy" event - // targetEntityType is optional field of an event. - targetEntityType = Some(Some("item")))(sc) - - val ratingsRDD: RDD[Rating] = eventsRDD.map { event => - val rating = try { - val ratingValue: Double = event.event match { - case "rate" => event.properties.get[Double]("rating") - case "buy" => 4.0 // map buy event to rating value of 4 - case _ => throw new Exception(s"Unexpected event ${event} is read.") - } - // entityId and targetEntityId is String - Rating(event.entityId, - event.targetEntityId.get, - ratingValue) - } catch { - case e: Exception => { - logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.") - throw e - } - } - rating - } - new TrainingData(users, items, ratingsRDD) - } -} - -case class User( - attr0: Double, - attr1: Int, - attr2: Int -) - -case class Item( - attrA: String, - attrB: Int, - attrC: Boolean -) - -case class Rating( - user: String, - item: String, - rating: Double -) - -class TrainingData( - val users: EntityMap[User], - val items: EntityMap[Item], - val ratings: RDD[Rating] -) extends Serializable { - override def toString = { - s"users: [${users.size} (${users.take(2).toString}...)]" + - s"items: [${items.size} (${items.take(2).toString}...)]" + - s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)" - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/Engine.scala b/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/Engine.scala deleted file mode 100644 index 1446ca4..0000000 --- a/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/Engine.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.Engine - -case class Query( - user: String, - num: Int -) - -case class PredictedResult( - itemScores: Array[ItemScore] -) - -case class ItemScore( - item: String, - score: Double -) - -object RecommendationEngine extends IEngineFactory { - def apply() = { - new Engine( - classOf[DataSource], - classOf[Preparator], - Map("als" -> classOf[ALSAlgorithm]), - classOf[Serving]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/Preparator.scala b/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/Preparator.scala deleted file mode 100644 index 968b526..0000000 --- a/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/Preparator.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.PPreparator -import org.apache.predictionio.data.storage.EntityMap - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class Preparator - extends PPreparator[TrainingData, PreparedData] { - - def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { - new PreparedData( - users = trainingData.users, - items = trainingData.items, - ratings = trainingData.ratings) - } -} - -class PreparedData( - val users: EntityMap[User], - val items: EntityMap[Item], - val ratings: RDD[Rating] -) extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/Serving.scala b/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/Serving.scala deleted file mode 100644 index 02eb0ec..0000000 --- a/examples/experimental/scala-parallel-recommendation-entitymap/src/main/scala/Serving.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.LServing - -class Serving - extends LServing[Query, PredictedResult] { - - override - def serve(query: Query, - predictedResults: Seq[PredictedResult]): PredictedResult = { - predictedResults.head - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-mongo-datasource/.gitignore ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-mongo-datasource/.gitignore b/examples/experimental/scala-parallel-recommendation-mongo-datasource/.gitignore deleted file mode 100644 index ea4e89d..0000000 --- a/examples/experimental/scala-parallel-recommendation-mongo-datasource/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -data/sample_movielens_data.txt -manifest.json http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-mongo-datasource/build.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-mongo-datasource/build.sbt b/examples/experimental/scala-parallel-recommendation-mongo-datasource/build.sbt deleted file mode 100644 index c2b4242..0000000 --- a/examples/experimental/scala-parallel-recommendation-mongo-datasource/build.sbt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import AssemblyKeys._ - -assemblySettings - -name := "template-scala-parallel-recommendation" - -organization := "org.apache.predictionio" - -libraryDependencies ++= Seq( - "org.apache.predictionio" %% "core" % "0.9.1" % "provided", - "org.apache.spark" %% "spark-core" % "1.2.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided") - -// ADDED FOR READING FROM MONGO IN DATASOURCE -libraryDependencies ++= Seq( - "org.mongodb" % "mongo-hadoop-core" % "1.3.0" - exclude("org.apache.hadoop", "hadoop-yarn-api") - exclude("org.apache.hadoop", "hadoop-common")) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-mongo-datasource/data/insert_sample_ratings_mongo.js ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-mongo-datasource/data/insert_sample_ratings_mongo.js b/examples/experimental/scala-parallel-recommendation-mongo-datasource/data/insert_sample_ratings_mongo.js deleted file mode 100644 index 39edb3a..0000000 --- a/examples/experimental/scala-parallel-recommendation-mongo-datasource/data/insert_sample_ratings_mongo.js +++ /dev/null @@ -1,24 +0,0 @@ -// MongoDB script to insert sample random ratings data into MongoDB -db = connect("localhost:27017/test"); - -print("Remove old data in test.sample_ratings collection...") -db.sample_ratings.remove(); - -// min <= x < max -function getRandomInt(min, max) { - return Math.floor(Math.random() * (max - min)) + min; -} - -print("Insert random movie rating data into test.sample_ratings collection...") -// for eah user 0 to 10, randomly view 10 items between 0 to 49 -for (var uid = 0; uid < 10; uid++) { - for (var n = 0; n < 10; n++) { - db.sample_ratings.insert( { - "uid" : uid.toString(), - "iid" : getRandomInt(0, 50).toString(), // 0 <= iid < 50 - "rating" : getRandomInt(1, 6) // 1 <= rating < 6 - }) - } -} - -print("done.") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-mongo-datasource/data/send_query.py ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-mongo-datasource/data/send_query.py b/examples/experimental/scala-parallel-recommendation-mongo-datasource/data/send_query.py deleted file mode 100644 index ca19dc5..0000000 --- a/examples/experimental/scala-parallel-recommendation-mongo-datasource/data/send_query.py +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Send sample query to prediction engine -""" - -import predictionio -engine_client = predictionio.EngineClient(url="http://localhost:8000") -print engine_client.send_query({"user": "1", "num": 4}) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-mongo-datasource/engine.json ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-mongo-datasource/engine.json b/examples/experimental/scala-parallel-recommendation-mongo-datasource/engine.json deleted file mode 100644 index a60eefa..0000000 --- a/examples/experimental/scala-parallel-recommendation-mongo-datasource/engine.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "id": "default", - "description": "Default settings", - "engineFactory": "org.template.recommendation.RecommendationEngine", - "datasource": { - "params": { - "host": "127.0.0.1", - "port": 27017, - "db": "test", - "collection": "sample_ratings" - } - }, - "algorithms": [ - { - "name": "als", - "params": { - "rank": 10, - "numIterations": 20, - "lambda": 0.01 - } - } - ] -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-mongo-datasource/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-mongo-datasource/project/assembly.sbt b/examples/experimental/scala-parallel-recommendation-mongo-datasource/project/assembly.sbt deleted file mode 100644 index 54c3252..0000000 --- a/examples/experimental/scala-parallel-recommendation-mongo-datasource/project/assembly.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/ALSAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/ALSAlgorithm.scala b/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/ALSAlgorithm.scala deleted file mode 100644 index fd93407..0000000 --- a/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/ALSAlgorithm.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.PAlgorithm -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.BiMap - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.recommendation.ALS -import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} -import org.apache.spark.mllib.recommendation.ALSModel - -import grizzled.slf4j.Logger - -case class ALSAlgorithmParams( - rank: Int, - numIterations: Int, - lambda: Double) extends Params - -class ALSAlgorithm(val ap: ALSAlgorithmParams) - extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] { - - @transient lazy val logger = Logger[this.type] - - def train(data: PreparedData): ALSModel = { - // Convert user and item String IDs to Int index for MLlib - val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user)) - val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item)) - val mllibRatings = data.ratings.map( r => - // MLlibRating requires integer index for user and item - MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating) - ) - val m = ALS.train(mllibRatings, ap.rank, ap.numIterations, ap.lambda) - new ALSModel( - rank = m.rank, - userFeatures = m.userFeatures, - productFeatures = m.productFeatures, - userStringIntMap = userStringIntMap, - itemStringIntMap = itemStringIntMap) - } - - def predict(model: ALSModel, query: Query): PredictedResult = { - // Convert String ID to Int index for Mllib - model.userStringIntMap.get(query.user).map { userInt => - // create inverse view of itemStringIntMap - val itemIntStringMap = model.itemStringIntMap.inverse - // recommendProducts() returns Array[MLlibRating], which uses item Int - // index. Convert it to String ID for returning PredictedResult - val itemScores = model.recommendProducts(userInt, query.num) - .map (r => ItemScore(itemIntStringMap(r.product), r.rating)) - new PredictedResult(itemScores) - }.getOrElse{ - logger.info(s"No prediction for unknown user ${query.user}.") - new PredictedResult(Array.empty) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/ALSModel.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/ALSModel.scala b/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/ALSModel.scala deleted file mode 100644 index 4697732..0000000 --- a/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/ALSModel.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.recommendation -// This must be the same package as Spark's MatrixFactorizationModel because -// MatrixFactorizationModel's constructor is private and we are using -// its constructor in order to save and load the model - -import org.template.recommendation.ALSAlgorithmParams - -import org.apache.predictionio.controller.IPersistentModel -import org.apache.predictionio.controller.IPersistentModelLoader -import org.apache.predictionio.data.storage.BiMap - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class ALSModel( - override val rank: Int, - override val userFeatures: RDD[(Int, Array[Double])], - override val productFeatures: RDD[(Int, Array[Double])], - val userStringIntMap: BiMap[String, Int], - val itemStringIntMap: BiMap[String, Int]) - extends MatrixFactorizationModel(rank, userFeatures, productFeatures) - with IPersistentModel[ALSAlgorithmParams] { - - def save(id: String, params: ALSAlgorithmParams, - sc: SparkContext): Boolean = { - - sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank") - userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures") - productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures") - sc.parallelize(Seq(userStringIntMap)) - .saveAsObjectFile(s"/tmp/${id}/userStringIntMap") - sc.parallelize(Seq(itemStringIntMap)) - .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap") - true - } - - override def toString = { - s"userFeatures: [${userFeatures.count()}]" + - s"(${userFeatures.take(2).toList}...)" + - s" productFeatures: [${productFeatures.count()}]" + - s"(${productFeatures.take(2).toList}...)" + - s" userStringIntMap: [${userStringIntMap.size}]" + - s"(${userStringIntMap.take(2)}...)" + - s" itemStringIntMap: [${itemStringIntMap.size}]" + - s"(${itemStringIntMap.take(2)}...)" - } -} - -object ALSModel - extends IPersistentModelLoader[ALSAlgorithmParams, ALSModel] { - def apply(id: String, params: ALSAlgorithmParams, - sc: Option[SparkContext]) = { - new ALSModel( - rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first, - userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"), - productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"), - userStringIntMap = sc.get - .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first, - itemStringIntMap = sc.get - .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/DataSource.scala b/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/DataSource.scala deleted file mode 100644 index a891cc5..0000000 --- a/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/DataSource.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.PDataSource -import org.apache.predictionio.controller.EmptyEvaluationInfo -import org.apache.predictionio.controller.EmptyActualResult -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.Storage - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import grizzled.slf4j.Logger - -import org.apache.hadoop.conf.Configuration // ADDED -import org.bson.BSONObject // ADDED -import com.mongodb.hadoop.MongoInputFormat // ADDED - -case class DataSourceParams( // CHANGED - host: String, - port: Int, - db: String, // DB name - collection: String // collection name -) extends Params - -class DataSource(val dsp: DataSourceParams) - extends PDataSource[TrainingData, - EmptyEvaluationInfo, Query, EmptyActualResult] { - - @transient lazy val logger = Logger[this.type] - - override - def readTraining(sc: SparkContext): TrainingData = { - // CHANGED - val config = new Configuration() - config.set("mongo.input.uri", - s"mongodb://${dsp.host}:${dsp.port}/${dsp.db}.${dsp.collection}") - - val mongoRDD = sc.newAPIHadoopRDD(config, - classOf[MongoInputFormat], - classOf[Object], - classOf[BSONObject]) - - // mongoRDD contains tuples of (ObjectId, BSONObject) - val ratings = mongoRDD.map { case (id, bson) => - Rating(bson.get("uid").asInstanceOf[String], - bson.get("iid").asInstanceOf[String], - bson.get("rating").asInstanceOf[Double]) - } - new TrainingData(ratings) - } -} - -case class Rating( - user: String, - item: String, - rating: Double -) - -class TrainingData( - val ratings: RDD[Rating] -) extends Serializable { - override def toString = { - s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)" - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/Engine.scala b/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/Engine.scala deleted file mode 100644 index 1446ca4..0000000 --- a/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/Engine.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.Engine - -case class Query( - user: String, - num: Int -) - -case class PredictedResult( - itemScores: Array[ItemScore] -) - -case class ItemScore( - item: String, - score: Double -) - -object RecommendationEngine extends IEngineFactory { - def apply() = { - new Engine( - classOf[DataSource], - classOf[Preparator], - Map("als" -> classOf[ALSAlgorithm]), - classOf[Serving]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/Preparator.scala b/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/Preparator.scala deleted file mode 100644 index 0bab35b..0000000 --- a/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/Preparator.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.PPreparator - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class Preparator - extends PPreparator[TrainingData, PreparedData] { - - def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { - new PreparedData(ratings = trainingData.ratings) - } -} - -class PreparedData( - val ratings: RDD[Rating] -) extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/Serving.scala b/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/Serving.scala deleted file mode 100644 index 48fda35..0000000 --- a/examples/experimental/scala-parallel-recommendation-mongo-datasource/src/main/scala/Serving.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.recommendation - -import org.apache.predictionio.controller.LServing - -class Serving - extends LServing[Query, PredictedResult] { - - override def serve(query: Query, - predictedResults: Seq[PredictedResult]): PredictedResult = { - predictedResults.head - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-regression/README.md ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-regression/README.md b/examples/experimental/scala-parallel-regression/README.md deleted file mode 100644 index cfb8c60..0000000 --- a/examples/experimental/scala-parallel-regression/README.md +++ /dev/null @@ -1,56 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> - -# Parallel Regression Engine - -## Configuration - -This sample regression engine reads data from file system. - -Edit the file path in `engine.json`, change `filepath` of `datasource` to an absolute path that points to -[lr_data.py](../data/lr_data.txt) - -``` -$ cat engine.json -... -"datasource": { - "filepath": <absolute_path_to_lr_data.txt>, - "k": 3, - "seed": 9527 -} -... - -``` - -## Register engine, train, and deploy. - -``` -$ pio build -$ pio train -$ pio deploy --port 9998 -``` - -## Query the Engine Instance - -``` -$ curl -X POST http://localhost:9998/queries.json -d \ - '[1.80,0.87,2.41,0.35,-0.21,1.35,0.51,1.55,-0.20,1.32]' - -0.8912731719174509 -``` - -0.89... is the prediction result. http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-regression/Run.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-regression/Run.scala b/examples/experimental/scala-parallel-regression/Run.scala deleted file mode 100644 index 232e61f..0000000 --- a/examples/experimental/scala-parallel-regression/Run.scala +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.predictionio.examples.regression.parallel - -import org.apache.predictionio.controller.Engine -import org.apache.predictionio.controller.Params -import org.apache.predictionio.controller.PDataSource -import org.apache.predictionio.controller.P2LAlgorithm -import org.apache.predictionio.controller.IdentityPreparator -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.LAverageServing -import org.apache.predictionio.controller.MeanSquareError -import org.apache.predictionio.controller.Utils -import org.apache.predictionio.controller.Workflow -import org.apache.predictionio.controller.WorkflowParams - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.linalg.DenseVector -import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.regression.LinearRegressionWithSGD -import org.apache.spark.mllib.regression.RegressionModel -import org.apache.spark.mllib.util.MLUtils -import org.apache.spark.rdd.RDD -import org.json4s._ -import java.io.File - -case class DataSourceParams( - val filepath: String, val k: Int = 3, val seed: Int = 9527) -extends Params - -case class ParallelDataSource(val dsp: DataSourceParams) - extends PDataSource[ - DataSourceParams, Integer, - RDD[LabeledPoint], Vector, Double] { - override - def read(sc: SparkContext) - : Seq[(Integer, RDD[LabeledPoint], RDD[(Vector, Double)])] = { - val input = sc.textFile(dsp.filepath) - val points = input.map { line => - val parts = line.split(' ').map(_.toDouble) - LabeledPoint(parts(0), Vectors.dense(parts.drop(1))) - } - - MLUtils.kFold(points, dsp.k, dsp.seed) - .zipWithIndex - .map { case (dataSet, index) => - (Int.box(index), dataSet._1, dataSet._2.map(p => (p.features, p.label))) - } - } -} - -case class AlgorithmParams( - val numIterations: Int = 200, val stepSize: Double = 0.1) extends Params - -case class ParallelSGDAlgorithm(val ap: AlgorithmParams) - extends P2LAlgorithm[ - AlgorithmParams, RDD[LabeledPoint], RegressionModel, Vector, Double] { - - def train(data: RDD[LabeledPoint]): RegressionModel = { - LinearRegressionWithSGD.train(data, ap.numIterations, ap.stepSize) - } - - def predict(model: RegressionModel, feature: Vector): Double = { - model.predict(feature) - } - - @transient override lazy val querySerializer = - Utils.json4sDefaultFormats + new VectorSerializer -} - -object RegressionEngineFactory extends IEngineFactory { - def apply() = { - new Engine( - classOf[ParallelDataSource], - classOf[IdentityPreparator[RDD[LabeledPoint]]], - Map("SGD" -> classOf[ParallelSGDAlgorithm]), - LAverageServing(classOf[ParallelSGDAlgorithm])) - } -} - -object Run { - def main(args: Array[String]) { - val filepath = new File("../data/lr_data.txt").getCanonicalPath - val dataSourceParams = DataSourceParams(filepath, 3) - val SGD = "SGD" - val algorithmParamsList = Seq( - (SGD, AlgorithmParams(stepSize = 0.1)), - (SGD, AlgorithmParams(stepSize = 0.2)), - (SGD, AlgorithmParams(stepSize = 0.4))) - - Workflow.run( - dataSourceClassOpt = Some(classOf[ParallelDataSource]), - dataSourceParams = dataSourceParams, - preparatorClassOpt = - Some(classOf[IdentityPreparator[RDD[LabeledPoint]]]), - algorithmClassMapOpt = Some(Map(SGD -> classOf[ParallelSGDAlgorithm])), - algorithmParamsList = algorithmParamsList, - servingClassOpt = Some(LAverageServing(classOf[ParallelSGDAlgorithm])), - evaluatorClassOpt = Some(classOf[MeanSquareError]), - params = WorkflowParams( - batch = "Imagine: Parallel Regression")) - } -} - -class VectorSerializer extends CustomSerializer[Vector](format => ( - { - case JArray(x) => - val v = x.toArray.map { y => - y match { - case JDouble(z) => z - } - } - new DenseVector(v) - }, - { - case x: Vector => - JArray(x.toArray.toList.map(d => JDouble(d))) - } -)) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-regression/build.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-regression/build.sbt b/examples/experimental/scala-parallel-regression/build.sbt deleted file mode 100644 index e053ba8..0000000 --- a/examples/experimental/scala-parallel-regression/build.sbt +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import AssemblyKeys._ - -assemblySettings - -name := "scala-parallel-regression" - -organization := "myorg" - -version := "0.0.1-SNAPSHOT" - -libraryDependencies ++= Seq( - "org.apache.predictionio" %% "core" % "0.9.1" % "provided", - "org.apache.spark" %% "spark-core" % "1.2.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.2.0" - exclude("org.apache.spark", "spark-core_2.10") - exclude("org.eclipse.jetty", "jetty-server")) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-regression/engine.json ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-regression/engine.json b/examples/experimental/scala-parallel-regression/engine.json deleted file mode 100644 index e078fad..0000000 --- a/examples/experimental/scala-parallel-regression/engine.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "id": "scala-parallel-regression", - "description": "scala-parallel-regression example", - "engineFactory": "org.apache.predictionio.examples.regression.parallel.RegressionEngineFactory", - "datasource": { - "params": { - "filepath": "../data/lr_data.txt", - "k": 3, - "seed": 9527 - } - }, - "algorithms": [ - { - "name": "SGD", - "params": { - "numIterations": 800, - "stepSize": 0.1 - } - } - ] -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-regression/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-regression/project/assembly.sbt b/examples/experimental/scala-parallel-regression/project/assembly.sbt deleted file mode 100644 index 54c3252..0000000 --- a/examples/experimental/scala-parallel-regression/project/assembly.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-dimsum/.gitignore ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-dimsum/.gitignore b/examples/experimental/scala-parallel-similarproduct-dimsum/.gitignore deleted file mode 100644 index ea4e89d..0000000 --- a/examples/experimental/scala-parallel-similarproduct-dimsum/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -data/sample_movielens_data.txt -manifest.json http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-dimsum/build.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-dimsum/build.sbt b/examples/experimental/scala-parallel-similarproduct-dimsum/build.sbt deleted file mode 100644 index 5ef8f87..0000000 --- a/examples/experimental/scala-parallel-similarproduct-dimsum/build.sbt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import AssemblyKeys._ - -assemblySettings - -name := "template-scala-parallel-similarproduct-dimsum" - -organization := "org.apache.predictionio" - -libraryDependencies ++= Seq( - "org.apache.predictionio" %% "core" % "0.9.1" % "provided", - "org.apache.spark" %% "spark-core" % "1.2.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-dimsum/engine.json ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-dimsum/engine.json b/examples/experimental/scala-parallel-similarproduct-dimsum/engine.json deleted file mode 100644 index 312501f..0000000 --- a/examples/experimental/scala-parallel-similarproduct-dimsum/engine.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "id": "default", - "description": "Default settings", - "engineFactory": "org.template.similarproduct.SimilarProductEngine", - "datasource": { - "params" : { - "appId": 9 - } - }, - "algorithms": [ - { - "name": "dimsum", - "params": { - "threshold": 1 - } - } - ] -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-dimsum/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-dimsum/project/assembly.sbt b/examples/experimental/scala-parallel-similarproduct-dimsum/project/assembly.sbt deleted file mode 100644 index 54c3252..0000000 --- a/examples/experimental/scala-parallel-similarproduct-dimsum/project/assembly.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/DIMSUMAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/DIMSUMAlgorithm.scala b/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/DIMSUMAlgorithm.scala deleted file mode 100644 index 73e15b8..0000000 --- a/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/DIMSUMAlgorithm.scala +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.PAlgorithm -import org.apache.predictionio.controller.Params -import org.apache.predictionio.controller.IPersistentModel -import org.apache.predictionio.controller.IPersistentModelLoader -import org.apache.predictionio.data.storage.BiMap - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.linalg.SparseVector -import org.apache.spark.mllib.linalg.distributed.MatrixEntry -import org.apache.spark.mllib.linalg.distributed.RowMatrix -import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix - -import grizzled.slf4j.Logger - -import scala.collection.mutable.PriorityQueue - -case class DIMSUMAlgorithmParams(threshold: Double) extends Params - -class DIMSUMModel( - val similarities: RDD[(Int, SparseVector)], - val itemStringIntMap: BiMap[String, Int], - val items: Map[Int, Item] - ) extends IPersistentModel[DIMSUMAlgorithmParams] { - - @transient lazy val itemIntStringMap = itemStringIntMap.inverse - - def save(id: String, params: DIMSUMAlgorithmParams, - sc: SparkContext): Boolean = { - - similarities.saveAsObjectFile(s"/tmp/${id}/similarities") - sc.parallelize(Seq(itemStringIntMap)) - .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap") - sc.parallelize(Seq(items)) - .saveAsObjectFile(s"/tmp/${id}/items") - true - } - - override def toString = { - s"similarities: [${similarities.count()}]" + - s"(${similarities.take(2).toList}...)" + - s" itemStringIntMap: [${itemStringIntMap.size}]" + - s"(${itemStringIntMap.take(2).toString}...)]" + - s" items: [${items.size}]" + - s"(${items.take(2).toString}...)]" - } -} - -object DIMSUMModel - extends IPersistentModelLoader[DIMSUMAlgorithmParams, DIMSUMModel] { - def apply(id: String, params: DIMSUMAlgorithmParams, - sc: Option[SparkContext]) = { - new DIMSUMModel( - similarities = sc.get.objectFile(s"/tmp/${id}/similarities"), - itemStringIntMap = sc.get - .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first, - items = sc.get - .objectFile[Map[Int, Item]](s"/tmp/${id}/items").first) - } -} - -class DIMSUMAlgorithm(val ap: DIMSUMAlgorithmParams) - extends PAlgorithm[PreparedData, DIMSUMModel, Query, PredictedResult] { - - @transient lazy val logger = Logger[this.type] - - def train(data: PreparedData): DIMSUMModel = { - - // create User and item's String ID to integer index BiMap - val userStringIntMap = BiMap.stringInt(data.users.keys) - val itemStringIntMap = BiMap.stringInt(data.items.keys) - - // collect Item as Map and convert ID to Int index - val items: Map[Int, Item] = data.items.map { case (id, item) => - (itemStringIntMap(id), item) - }.collectAsMap.toMap - val itemCount = items.size - - // each row is a sparse vector of rated items by this user - val rows: RDD[Vector] = data.viewEvents - .map { r => - // Convert user and item String IDs to Int index for MLlib - val uindex = userStringIntMap.getOrElse(r.user, -1) - val iindex = itemStringIntMap.getOrElse(r.item, -1) - - if (uindex == -1) - logger.info(s"Couldn't convert nonexistent user ID ${r.user}" - + " to Int index.") - - if (iindex == -1) - logger.info(s"Couldn't convert nonexistent item ID ${r.item}" - + " to Int index.") - - (uindex, (iindex, 1.0)) - }.filter { case (uindex, (iindex, v)) => - // keep events with valid user and item index - (uindex != -1) && (iindex != -1) - }.groupByKey().map { case (u, ir) => - // de-duplicate if user has multiple events on same item - val irDedup: Map[Int, Double] = ir.groupBy(_._1) // group By item index - .map { case (i, irGroup) => - // same item index group of (item index, rating value) tuple - val r = irGroup.reduce { (a, b) => - // Simply keep one copy. - a - // You may modify here to reduce same item tuple differently, - // such as summing all values: - //(a._1, (a._2 + b._2)) - } - (i, r._2) - } - - // NOTE: index array must be strictly increasing for Sparse Vector - val irSorted = irDedup.toArray.sortBy(_._1) - val indexes = irSorted.map(_._1) - val values = irSorted.map(_._2) - Vectors.sparse(itemCount, indexes, values) - } - - val mat = new RowMatrix(rows) - val scores = mat.columnSimilarities(ap.threshold) - val reversedEntries: RDD[MatrixEntry] = scores.entries - .map(e => new MatrixEntry(e.j, e.i, e.value)) - val combined = new CoordinateMatrix(scores.entries.union(reversedEntries)) - val similarities = combined.toIndexedRowMatrix.rows - .map( row => (row.index.toInt, row.vector.asInstanceOf[SparseVector])) - - new DIMSUMModel( - similarities = similarities, - itemStringIntMap = itemStringIntMap, - items = items - ) - } - - def predict(model: DIMSUMModel, query: Query): PredictedResult = { - // convert the white and black list items to Int index - val whiteList: Option[Set[Int]] = query.whiteList.map( set => - set.map(model.itemStringIntMap.get(_)).flatten - ) - val blackList: Option[Set[Int]] = query.blackList.map ( set => - set.map(model.itemStringIntMap.get(_)).flatten - ) - - val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_)) - .flatten.toSet - - val indexScores = query.items.flatMap { iid => - model.itemStringIntMap.get(iid).map { itemInt => - val simsSeq = model.similarities.lookup(itemInt) - if (simsSeq.isEmpty) { - logger.info(s"No similar items found for ${iid}.") - Array.empty[(Int, Double)] - } else { - val sims = simsSeq.head - sims.indices.zip(sims.values).filter { case (i, v) => - whiteList.map(_.contains(i)).getOrElse(true) && - blackList.map(!_.contains(i)).getOrElse(true) && - // discard items in query as well - (!queryList.contains(i)) && - // filter categories - query.categories.map { cat => - model.items(i).categories.map { itemCat => - // keep this item if has ovelap categories with the query - !(itemCat.toSet.intersect(cat).isEmpty) - }.getOrElse(false) // discard this item if it has no categories - }.getOrElse(true) - } - } - }.getOrElse { - logger.info(s"No similar items for unknown item ${iid}.") - Array.empty[(Int, Double)] - } - } - - val aggregatedScores = indexScores.groupBy(_._1) - .mapValues(_.foldLeft[Double](0)( (b,a) => b + a._2)) - .toList - - val ord = Ordering.by[(Int, Double), Double](_._2).reverse - val itemScores = getTopN(aggregatedScores, query.num)(ord) - .map{ case (i, s) => - new ItemScore( - item = model.itemIntStringMap(i), - score = s - ) - }.toArray - - new PredictedResult(itemScores) - } - - private - def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = { - - val q = PriorityQueue() - - for (x <- s) { - if (q.size < n) - q.enqueue(x) - else { - // q is full - if (ord.compare(x, q.head) < 0) { - q.dequeue() - q.enqueue(x) - } - } - } - - q.dequeueAll.toSeq.reverse - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/DataSource.scala b/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/DataSource.scala deleted file mode 100644 index 23d1d2b..0000000 --- a/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/DataSource.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.PDataSource -import org.apache.predictionio.controller.EmptyEvaluationInfo -import org.apache.predictionio.controller.EmptyActualResult -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.Storage - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import grizzled.slf4j.Logger - -case class DataSourceParams(appId: Int) extends Params - -class DataSource(val dsp: DataSourceParams) - extends PDataSource[TrainingData, - EmptyEvaluationInfo, Query, EmptyActualResult] { - - @transient lazy val logger = Logger[this.type] - - override - def readTraining(sc: SparkContext): TrainingData = { - val eventsDb = Storage.getPEvents() - - // create a RDD of (entityID, User) - val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties( - appId = dsp.appId, - entityType = "user" - )(sc).map { case (entityId, properties) => - val user = try { - User() - } catch { - case e: Exception => { - logger.error(s"Failed to get properties ${properties} of" + - s" user ${entityId}. Exception: ${e}.") - throw e - } - } - (entityId, user) - } - - // create a RDD of (entityID, Item) - val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties( - appId = dsp.appId, - entityType = "item" - )(sc).map { case (entityId, properties) => - val item = try { - // Assume categories is optional property of item. - Item(categories = properties.getOpt[List[String]]("categories")) - } catch { - case e: Exception => { - logger.error(s"Failed to get properties ${properties} of" + - s" item ${entityId}. Exception: ${e}.") - throw e - } - } - (entityId, item) - } - - // get all "user" "view" "item" events - val eventsRDD: RDD[Event] = eventsDb.find( - appId = dsp.appId, - entityType = Some("user"), - eventNames = Some(List("view")), - // targetEntityType is optional field of an event. - targetEntityType = Some(Some("item")))(sc) - - val viewEventsRDD: RDD[ViewEvent] = eventsRDD.map { event => - val viewEvent = try { - event.event match { - case "view" => ViewEvent( - user = event.entityId, - item = event.targetEntityId.get, - t = event.eventTime.getMillis) - case _ => throw new Exception(s"Unexpected event ${event} is read.") - } - } catch { - case e: Exception => { - logger.error(s"Cannot convert ${event} to U2IEvent. Exception: ${e}.") - throw e - } - } - viewEvent - } - - new TrainingData( - users = usersRDD, - items = itemsRDD, - viewEvents = viewEventsRDD - ) - } -} - -case class User() - -case class Item(categories: Option[List[String]]) - -case class ViewEvent( - user: String, - item: String, - t: Long -) - -class TrainingData( - val users: RDD[(String, User)], - val items: RDD[(String, Item)], - val viewEvents: RDD[ViewEvent] -) extends Serializable { - override def toString = { - s"users: [${users.count()} (${users.take(2).toList}...)]" + - s"items: [${items.count()} (${items.take(2).toList}...)]" + - s"ratings: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)" - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/Engine.scala b/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/Engine.scala deleted file mode 100644 index ba929ad..0000000 --- a/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/Engine.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.Engine - -case class Query( - items: List[String], - num: Int, - categories: Option[Set[String]], - whiteList: Option[Set[String]], - blackList: Option[Set[String]] -) - -case class PredictedResult( - itemScores: Array[ItemScore] -) - -case class ItemScore( - item: String, - score: Double -) - -object SimilarProductEngine extends IEngineFactory { - def apply() = { - new Engine( - classOf[DataSource], - classOf[Preparator], - Map("dimsum" -> classOf[DIMSUMAlgorithm]), - classOf[Serving]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/Preparator.scala b/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/Preparator.scala deleted file mode 100644 index e3394b0..0000000 --- a/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/Preparator.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.PPreparator - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class Preparator - extends PPreparator[TrainingData, PreparedData] { - - def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { - new PreparedData( - users = trainingData.users, - items = trainingData.items, - viewEvents = trainingData.viewEvents) - } -} - -class PreparedData( - val users: RDD[(String, User)], - val items: RDD[(String, Item)], - val viewEvents: RDD[ViewEvent] -) extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/Serving.scala b/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/Serving.scala deleted file mode 100644 index 3e115d5..0000000 --- a/examples/experimental/scala-parallel-similarproduct-dimsum/src/main/scala/Serving.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.LServing - -class Serving - extends LServing[Query, PredictedResult] { - - override def serve(query: Query, - predictedResults: Seq[PredictedResult]): PredictedResult = { - predictedResults.head - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-localmodel/.gitignore ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-localmodel/.gitignore b/examples/experimental/scala-parallel-similarproduct-localmodel/.gitignore deleted file mode 100644 index ea4e89d..0000000 --- a/examples/experimental/scala-parallel-similarproduct-localmodel/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -data/sample_movielens_data.txt -manifest.json
