http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-localmodel/build.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-localmodel/build.sbt b/examples/experimental/scala-parallel-similarproduct-localmodel/build.sbt deleted file mode 100644 index 87207e1..0000000 --- a/examples/experimental/scala-parallel-similarproduct-localmodel/build.sbt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import AssemblyKeys._ - -assemblySettings - -name := "template-scala-parallel-similarproduct" - -organization := "org.apache.predictionio" - -libraryDependencies ++= Seq( - "org.apache.predictionio" %% "core" % "0.9.1" % "provided", - "org.apache.spark" %% "spark-core" % "1.2.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided")
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-localmodel/data/import_eventserver.py ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-localmodel/data/import_eventserver.py b/examples/experimental/scala-parallel-similarproduct-localmodel/data/import_eventserver.py deleted file mode 100644 index 6107d1c..0000000 --- a/examples/experimental/scala-parallel-similarproduct-localmodel/data/import_eventserver.py +++ /dev/null @@ -1,90 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Import sample data for similar product engine -""" - -import predictionio -import argparse -import random - -SEED = 3 - -def import_events(client): - random.seed(SEED) - count = 0 - print client.get_status() - print "Importing data..." - - # generate 10 users, with user ids u1,u2,....,u10 - user_ids = ["u%s" % i for i in range(1, 11)] - for user_id in user_ids: - print "Set user", user_id - client.create_event( - event="$set", - entity_type="user", - entity_id=user_id - ) - count += 1 - - # generate 50 items, with item ids i1,i2,....,i50 - # random assign 1 to 4 categories among c1-c6 to items - categories = ["c%s" % i for i in range(1, 7)] - item_ids = ["i%s" % i for i in range(1, 51)] - for item_id in item_ids: - print "Set item", item_id - client.create_event( - event="$set", - entity_type="item", - entity_id=item_id, - properties={ - "categories" : random.sample(categories, random.randint(1, 4)) - } - ) - count += 1 - - # each user randomly viewed 10 items - for user_id in user_ids: - for viewed_item in random.sample(item_ids, 10): - print "User", user_id ,"views item", viewed_item - client.create_event( - event="view", - entity_type="user", - entity_id=user_id, - target_entity_type="item", - target_entity_id=viewed_item - ) - count += 1 - - print "%s events are imported." % count - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description="Import sample data for similar product engine") - parser.add_argument('--access_key', default='invald_access_key') - parser.add_argument('--url', default="http://localhost:7070") - - args = parser.parse_args() - print args - - client = predictionio.EventClient( - access_key=args.access_key, - url=args.url, - threads=5, - qsize=500) - import_events(client) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-localmodel/data/send_query.py ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-localmodel/data/send_query.py b/examples/experimental/scala-parallel-similarproduct-localmodel/data/send_query.py deleted file mode 100644 index 8678b15..0000000 --- a/examples/experimental/scala-parallel-similarproduct-localmodel/data/send_query.py +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Send sample query to prediction engine -""" - -import predictionio -engine_client = predictionio.EngineClient(url="http://localhost:8000") -print engine_client.send_query({"items": ["i1", "i3"], "num": 4}) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-localmodel/engine.json ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-localmodel/engine.json b/examples/experimental/scala-parallel-similarproduct-localmodel/engine.json deleted file mode 100644 index c55849f..0000000 --- a/examples/experimental/scala-parallel-similarproduct-localmodel/engine.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "id": "default", - "description": "Default settings", - "engineFactory": "org.template.similarproduct.SimilarProductEngine", - "datasource": { - "params" : { - "appId": 9 - } - }, - "algorithms": [ - { - "name": "als", - "params": { - "rank": 10, - "numIterations" : 20, - "lambda": 0.01, - "seed": 3 - } - } - ] -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-localmodel/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-localmodel/project/assembly.sbt b/examples/experimental/scala-parallel-similarproduct-localmodel/project/assembly.sbt deleted file mode 100644 index 54c3252..0000000 --- a/examples/experimental/scala-parallel-similarproduct-localmodel/project/assembly.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/ALSAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/ALSAlgorithm.scala b/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/ALSAlgorithm.scala deleted file mode 100644 index 62c0c00..0000000 --- a/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/ALSAlgorithm.scala +++ /dev/null @@ -1,261 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.P2LAlgorithm // ADDED -import org.apache.predictionio.controller.Params -import org.apache.predictionio.controller.IPersistentModel -import org.apache.predictionio.controller.IPersistentModelLoader -import org.apache.predictionio.data.storage.BiMap - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.recommendation.ALS -import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} - -import grizzled.slf4j.Logger - -import scala.collection.mutable.PriorityQueue - -case class ALSAlgorithmParams( - rank: Int, - numIterations: Int, - lambda: Double, - seed: Option[Long]) extends Params - -class ALSLocalModel( // MODIFIED - val productFeatures: Map[Int, Array[Double]], // MODIFIED - val itemStringIntMap: BiMap[String, Int], - val items: Map[Int, Item] -) extends Serializable { // MODIFIED - - // MODIFIED - - @transient lazy val itemIntStringMap = itemStringIntMap.inverse - - override def toString = { - s" productFeatures: [${productFeatures.size}]" + // MODIFIED - s"(${productFeatures.take(2).toList}...)" + - s" itemStringIntMap: [${itemStringIntMap.size}]" + - s"(${itemStringIntMap.take(2).toString}...)]" + - s" items: [${items.size}]" + - s"(${items.take(2).toString}...)]" - } -} - -/** - * Use ALS to build item x feature matrix - */ -class ALSAlgorithm(val ap: ALSAlgorithmParams) - // MODIFIED - extends P2LAlgorithm[PreparedData, ALSLocalModel, Query, PredictedResult] { - - @transient lazy val logger = Logger[this.type] - - def train(data: PreparedData): ALSLocalModel = { // MODIFIED - require(!data.viewEvents.take(1).isEmpty, - s"viewEvents in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - require(!data.users.take(1).isEmpty, - s"users in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - require(!data.items.take(1).isEmpty, - s"items in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - // create User and item's String ID to integer index BiMap - val userStringIntMap = BiMap.stringInt(data.users.keys) - val itemStringIntMap = BiMap.stringInt(data.items.keys) - - // collect Item as Map and convert ID to Int index - val items: Map[Int, Item] = data.items.map { case (id, item) => - (itemStringIntMap(id), item) - }.collectAsMap.toMap - - val mllibRatings = data.viewEvents - .map { r => - // Convert user and item String IDs to Int index for MLlib - val uindex = userStringIntMap.getOrElse(r.user, -1) - val iindex = itemStringIntMap.getOrElse(r.item, -1) - - if (uindex == -1) - logger.info(s"Couldn't convert nonexistent user ID ${r.user}" - + " to Int index.") - - if (iindex == -1) - logger.info(s"Couldn't convert nonexistent item ID ${r.item}" - + " to Int index.") - - ((uindex, iindex), 1) - }.filter { case ((u, i), v) => - // keep events with valid user and item index - (u != -1) && (i != -1) - }.reduceByKey(_ + _) // aggregate all view events of same user-item pair - .map { case ((u, i), v) => - // MLlibRating requires integer index for user and item - MLlibRating(u, i, v) - } - - // MLLib ALS cannot handle empty training data. - require(!mllibRatings.take(1).isEmpty, - s"mllibRatings cannot be empty." + - " Please check if your events contain valid user and item ID.") - - // seed for MLlib ALS - val seed = ap.seed.getOrElse(System.nanoTime) - - val m = ALS.trainImplicit( - ratings = mllibRatings, - rank = ap.rank, - iterations = ap.numIterations, - lambda = ap.lambda, - blocks = -1, - alpha = 1.0, - seed = seed) - - new ALSLocalModel( // MODIFIED - productFeatures = m.productFeatures.collectAsMap.toMap, // MODIFIED - itemStringIntMap = itemStringIntMap, - items = items - ) - } - - // MODIFIED - def predict(model: ALSLocalModel, query: Query): PredictedResult = { - - // convert items to Int index - val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_)) - .flatten.toSet - - // MODIFIED - val queryFeatures: Vector[Array[Double]] = queryList.toVector - .map { item => - // productFeatures may not contain the requested item - val qf: Option[Array[Double]] = model.productFeatures.get(item) - qf - }.flatten - - val whiteList: Option[Set[Int]] = query.whiteList.map( set => - set.map(model.itemStringIntMap.get(_)).flatten - ) - val blackList: Option[Set[Int]] = query.blackList.map ( set => - set.map(model.itemStringIntMap.get(_)).flatten - ) - - val ord = Ordering.by[(Int, Double), Double](_._2).reverse - - val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) { - logger.info(s"No productFeatures vector for query items ${query.items}.") - Array[(Int, Double)]() - } else { - model.productFeatures // MODIFIED - .mapValues { f => - queryFeatures.map{ qf => - cosine(qf, f) - }.reduce(_ + _) - } - .filter(_._2 > 0) // keep items with score > 0 - .toArray // MODIFIED - } - - val filteredScore = indexScores.view.filter { case (i, v) => - isCandidateItem( - i = i, - items = model.items, - categories = query.categories, - queryList = queryList, - whiteList = whiteList, - blackList = blackList - ) - } - - val topScores = getTopN(filteredScore, query.num)(ord).toArray - - val itemScores = topScores.map { case (i, s) => - new ItemScore( - item = model.itemIntStringMap(i), - score = s - ) - } - - new PredictedResult(itemScores) - } - - private - def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = { - - val q = PriorityQueue() - - for (x <- s) { - if (q.size < n) - q.enqueue(x) - else { - // q is full - if (ord.compare(x, q.head) < 0) { - q.dequeue() - q.enqueue(x) - } - } - } - - q.dequeueAll.toSeq.reverse - } - - private - def cosine(v1: Array[Double], v2: Array[Double]): Double = { - val size = v1.size - var i = 0 - var n1: Double = 0 - var n2: Double = 0 - var d: Double = 0 - while (i < size) { - n1 += v1(i) * v1(i) - n2 += v2(i) * v2(i) - d += v1(i) * v2(i) - i += 1 - } - val n1n2 = (math.sqrt(n1) * math.sqrt(n2)) - if (n1n2 == 0) 0 else (d / n1n2) - } - - private - def isCandidateItem( - i: Int, - items: Map[Int, Item], - categories: Option[Set[String]], - queryList: Set[Int], - whiteList: Option[Set[Int]], - blackList: Option[Set[Int]] - ): Boolean = { - whiteList.map(_.contains(i)).getOrElse(true) && - blackList.map(!_.contains(i)).getOrElse(true) && - // discard items in query as well - (!queryList.contains(i)) && - // filter categories - categories.map { cat => - items(i).categories.map { itemCat => - // keep this item if has ovelap categories with the query - !(itemCat.toSet.intersect(cat).isEmpty) - }.getOrElse(false) // discard this item if it has no categories - }.getOrElse(true) - } - -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/DataSource.scala b/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/DataSource.scala deleted file mode 100644 index af8b974..0000000 --- a/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/DataSource.scala +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.PDataSource -import org.apache.predictionio.controller.EmptyEvaluationInfo -import org.apache.predictionio.controller.EmptyActualResult -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.Storage - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import grizzled.slf4j.Logger - -case class DataSourceParams(appId: Int) extends Params - -class DataSource(val dsp: DataSourceParams) - extends PDataSource[TrainingData, - EmptyEvaluationInfo, Query, EmptyActualResult] { - - @transient lazy val logger = Logger[this.type] - - override - def readTraining(sc: SparkContext): TrainingData = { - val eventsDb = Storage.getPEvents() - - // create a RDD of (entityID, User) - val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties( - appId = dsp.appId, - entityType = "user" - )(sc).map { case (entityId, properties) => - val user = try { - User() - } catch { - case e: Exception => { - logger.error(s"Failed to get properties ${properties} of" + - s" user ${entityId}. Exception: ${e}.") - throw e - } - } - (entityId, user) - } - - // create a RDD of (entityID, Item) - val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties( - appId = dsp.appId, - entityType = "item" - )(sc).map { case (entityId, properties) => - val item = try { - // Assume categories is optional property of item. - Item(categories = properties.getOpt[List[String]]("categories")) - } catch { - case e: Exception => { - logger.error(s"Failed to get properties ${properties} of" + - s" item ${entityId}. Exception: ${e}.") - throw e - } - } - (entityId, item) - } - - // get all "user" "view" "item" events - val viewEventsRDD: RDD[ViewEvent] = eventsDb.find( - appId = dsp.appId, - entityType = Some("user"), - eventNames = Some(List("view")), - // targetEntityType is optional field of an event. - targetEntityType = Some(Some("item")))(sc) - // eventsDb.find() returns RDD[Event] - .map { event => - val viewEvent = try { - event.event match { - case "view" => ViewEvent( - user = event.entityId, - item = event.targetEntityId.get, - t = event.eventTime.getMillis) - case _ => throw new Exception(s"Unexpected event ${event} is read.") - } - } catch { - case e: Exception => { - logger.error(s"Cannot convert ${event} to ViewEvent." + - s" Exception: ${e}.") - throw e - } - } - viewEvent - } - - new TrainingData( - users = usersRDD, - items = itemsRDD, - viewEvents = viewEventsRDD - ) - } -} - -case class User() - -case class Item(categories: Option[List[String]]) - -case class ViewEvent(user: String, item: String, t: Long) - -class TrainingData( - val users: RDD[(String, User)], - val items: RDD[(String, Item)], - val viewEvents: RDD[ViewEvent] -) extends Serializable { - override def toString = { - s"users: [${users.count()} (${users.take(2).toList}...)]" + - s"items: [${items.count()} (${items.take(2).toList}...)]" + - s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)" - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/Engine.scala b/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/Engine.scala deleted file mode 100644 index 52b19fe..0000000 --- a/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/Engine.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.Engine - -case class Query( - items: List[String], - num: Int, - categories: Option[Set[String]], - whiteList: Option[Set[String]], - blackList: Option[Set[String]] -) - -case class PredictedResult( - itemScores: Array[ItemScore] -) - -case class ItemScore( - item: String, - score: Double -) - -object SimilarProductEngine extends IEngineFactory { - def apply() = { - new Engine( - classOf[DataSource], - classOf[Preparator], - Map("als" -> classOf[ALSAlgorithm]), - classOf[Serving]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/Preparator.scala b/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/Preparator.scala deleted file mode 100644 index e3394b0..0000000 --- a/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/Preparator.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.PPreparator - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class Preparator - extends PPreparator[TrainingData, PreparedData] { - - def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { - new PreparedData( - users = trainingData.users, - items = trainingData.items, - viewEvents = trainingData.viewEvents) - } -} - -class PreparedData( - val users: RDD[(String, User)], - val items: RDD[(String, Item)], - val viewEvents: RDD[ViewEvent] -) extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/Serving.scala b/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/Serving.scala deleted file mode 100644 index 3e115d5..0000000 --- a/examples/experimental/scala-parallel-similarproduct-localmodel/src/main/scala/Serving.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.LServing - -class Serving - extends LServing[Query, PredictedResult] { - - override def serve(query: Query, - predictedResults: Seq[PredictedResult]): PredictedResult = { - predictedResults.head - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-trim-app/build.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-trim-app/build.sbt b/examples/experimental/scala-parallel-trim-app/build.sbt deleted file mode 100644 index fdce0d6..0000000 --- a/examples/experimental/scala-parallel-trim-app/build.sbt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import AssemblyKeys._ - -assemblySettings - -name := "template-scala-parallel-vanilla" - -organization := "org.apache.predictionio" - -libraryDependencies ++= Seq( - "org.apache.predictionio" %% "core" % "0.9.1" % "provided", - "org.apache.spark" %% "spark-core" % "1.2.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-trim-app/engine.json ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-trim-app/engine.json b/examples/experimental/scala-parallel-trim-app/engine.json deleted file mode 100644 index 5695856..0000000 --- a/examples/experimental/scala-parallel-trim-app/engine.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "id": "default", - "description": "Default settings", - "engineFactory": "org.apache.predictionio.examples.experimental.trimapp.VanillaEngine", - "datasource": { - "params" : { - "srcAppId": 13, - "dstAppId": 15, - "startTime": "2014-07-01T00:00:00.000Z", - "untilTime": "2015-01-01T00:00:00.000Z" - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-trim-app/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-trim-app/project/assembly.sbt b/examples/experimental/scala-parallel-trim-app/project/assembly.sbt deleted file mode 100644 index 54c3252..0000000 --- a/examples/experimental/scala-parallel-trim-app/project/assembly.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-trim-app/src/main/scala/Algorithm.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-trim-app/src/main/scala/Algorithm.scala b/examples/experimental/scala-parallel-trim-app/src/main/scala/Algorithm.scala deleted file mode 100644 index f46a7b7..0000000 --- a/examples/experimental/scala-parallel-trim-app/src/main/scala/Algorithm.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.predictionio.examples.experimental.trimapp - -import org.apache.predictionio.controller.P2LAlgorithm -import org.apache.predictionio.controller.Params - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import grizzled.slf4j.Logger - -//case class AlgorithmParams(mult: Int) extends Params - -//class Algorithm(val ap: AlgorithmParams) -class Algorithm - extends P2LAlgorithm[TrainingData, Model, Query, PredictedResult] { - - @transient lazy val logger = Logger[this.type] - - def train(data: TrainingData): Model = { - new Model - } - - def predict(model: Model, query: Query): PredictedResult = { - // Prefix the query with the model data - PredictedResult(p = "") - } -} - -class Model extends Serializable { - override def toString = "Model" -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-trim-app/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-trim-app/src/main/scala/DataSource.scala b/examples/experimental/scala-parallel-trim-app/src/main/scala/DataSource.scala deleted file mode 100644 index 4f03dba..0000000 --- a/examples/experimental/scala-parallel-trim-app/src/main/scala/DataSource.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.predictionio.examples.experimental.trimapp - -import org.apache.predictionio.controller.PDataSource -import org.apache.predictionio.controller.EmptyEvaluationInfo -import org.apache.predictionio.controller.EmptyActualResult -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.Storage - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import com.github.nscala_time.time.Imports._ - -import grizzled.slf4j.Logger - -case class DataSourceParams( - srcAppId: Int, - dstAppId: Int, - startTime: Option[DateTime], - untilTime: Option[DateTime] -) extends Params - -class DataSource(val dsp: DataSourceParams) - extends PDataSource[TrainingData, - EmptyEvaluationInfo, Query, EmptyActualResult] { - - @transient lazy val logger = Logger[this.type] - - override - def readTraining(sc: SparkContext): TrainingData = { - val eventsDb = Storage.getPEvents() - logger.info(s"TrimApp: $dsp") - - - logger.info(s"Read events from appId ${dsp.srcAppId}") - val srcEvents: RDD[Event] = eventsDb.find( - appId = dsp.srcAppId, - startTime = dsp.startTime, - untilTime = dsp.untilTime - )(sc) - - val dstEvents: Array[Event] = eventsDb.find(appId = dsp.dstAppId)(sc).take(1) - - if (dstEvents.size > 0) { - throw new Exception(s"DstApp ${dsp.dstAppId} is not empty. Quitting.") - } - - logger.info(s"Write events to appId ${dsp.dstAppId}") - eventsDb.write(srcEvents, dsp.dstAppId)(sc) - - logger.info(s"Finish writing events to appId ${dsp.dstAppId}") - - new TrainingData() - } -} - -class TrainingData( -) extends Serializable { - override def toString = "" -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-trim-app/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-trim-app/src/main/scala/Engine.scala b/examples/experimental/scala-parallel-trim-app/src/main/scala/Engine.scala deleted file mode 100644 index d45baf3..0000000 --- a/examples/experimental/scala-parallel-trim-app/src/main/scala/Engine.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.predictionio.examples.experimental.trimapp - -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.Engine -import org.apache.predictionio.controller._ - -case class Query(q: String) extends Serializable - -case class PredictedResult(p: String) extends Serializable - -object VanillaEngine extends IEngineFactory { - def apply() = { - new Engine( - classOf[DataSource], - PIdentityPreparator(classOf[DataSource]), - Map("" -> classOf[Algorithm]), - classOf[Serving]) - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-trim-app/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-trim-app/src/main/scala/Preparator.scala b/examples/experimental/scala-parallel-trim-app/src/main/scala/Preparator.scala deleted file mode 100644 index fd8efde..0000000 --- a/examples/experimental/scala-parallel-trim-app/src/main/scala/Preparator.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.predictionio.examples.experimental.trimapp - -import org.apache.predictionio.controller.PPreparator -import org.apache.predictionio.data.storage.Event - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -/* -class Preparator - extends PPreparator[TrainingData, PreparedData] { - - def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { - new PreparedData(events = trainingData.events) - } -} - -class PreparedData( - val events: RDD[Event] -) extends Serializable -*/ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-parallel-trim-app/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-parallel-trim-app/src/main/scala/Serving.scala b/examples/experimental/scala-parallel-trim-app/src/main/scala/Serving.scala deleted file mode 100644 index 4fc376f..0000000 --- a/examples/experimental/scala-parallel-trim-app/src/main/scala/Serving.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.predictionio.examples.experimental.trimapp - -import org.apache.predictionio.controller.LServing - -class Serving - extends LServing[Query, PredictedResult] { - - override - def serve(query: Query, - predictedResults: Seq[PredictedResult]): PredictedResult = { - predictedResults.head - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-recommendations/README.md ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-recommendations/README.md b/examples/experimental/scala-recommendations/README.md deleted file mode 100644 index 1bb80da..0000000 --- a/examples/experimental/scala-recommendations/README.md +++ /dev/null @@ -1,146 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> - -Distributed Recommendation Engine with RDD-based Model using MLlib's ALS -======================================================================== - -This document describes a recommendation engine that is based on Apache Spark's -MLlib collaborative filtering algorithm. - - -Prerequisite ------------- - -Make sure you have built PredictionIO and setup storage described -[here](/README.md). - - -High Level Description ----------------------- - -This engine demonstrates how one can integrate MLlib's algorithms that produce -an RDD-based model, deploy it in production and serve real-time queries. - -For details about MLlib's collaborative filtering algorithms, please refer to -https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html. - -All code definition can be found [here](src/main/scala/Run.scala). - - -### Data Source - -Training data is located at `/examples/data/movielens.txt`. Values are delimited -by double colons (::). The first column are user IDs. The second column are item -IDs. The third column are ratings. In this example, they are represented as -`RDD[Rating]`, as described in the official MLlib guide. - - -### Preparator - -The preparator in this example is an identity function, i.e. no further -preparation is done on the training data. - - -### Algorithm - -This example engine contains one single algorithm that wraps around MLlib. The -`train()` method simply calls MLlib's `ALS.train()` method. - - -### Serving - -This example engine uses `FirstServing`, which serves only predictions from the -first algorithm. Since there is only one algorithm in this engine, predictions -from MLlib's ALS algorithm will be served. - - -Training a Model ----------------- - -This example provides a set of ready-to-use parameters for each component -mentioned in the previous section. They are located inside the `params` -subdirectory. - -Before training, you must let PredictionIO know about the engine. Run the -following command to build and register the engine. -``` -$ cd $PIO_HOME/examples/scala-recommendations -$ ../../bin/pio build -``` -where `$PIO_HOME` is the root directory of the PredictionIO code tree. - -To start training, use the following command. You need to install the -[`gfortran`](https://github.com/mikiobraun/jblas/wiki/Missing-Libraries) -runtime library if it is not already present on your nodes. For Debian and -Ubuntu systems this would be "`sudo apt-get install libgfortran3`". -``` -$ cd $PIO_HOME/examples/scala-recommendations -$ ../../bin/pio train -``` -This will train a model and save it in PredictionIO's metadata storage. Notice -that when the run is completed, it will display a run ID, like below. -``` -2014-08-27 23:13:54,596 INFO SparkContext - Job finished: saveAsObjectFile at Run.scala:68, took 0.299989372 s -2014-08-27 23:13:54,736 INFO APIDebugWorkflow$ - Saved engine instance with ID: txHBY2XRQTKFnxC-lYoVgA -``` - - -Deploying a Real-time Prediction Server ---------------------------------------- - -Following from instructions above, you should have trained a model. Use the -following command to start a server. -``` -$ cd $PIO_HOME/examples/scala-recommendations -$ ../../bin/pio deploy -``` -This will create a server that by default binds to http://localhost:8000. You -can visit that page in your web browser to check its status. - -To perform real-time predictions, try the following. This predicts on how user 1 will rate item (movie) 4. As in all collaborative filtering algorithms, it will not handle the case of a cold user (when the user has not rated any movies). -``` -$ curl -H "Content-Type: application/json" -d '[1,4]' http://localhost:8000/queries.json -``` -Congratulations! You have just trained an ALS model and is able to perform real -time prediction distributed across an Apache Spark cluster! - - -Production Prediction Server Deployment ---------------------------------------- - -Prediction servers support reloading models on the fly with the latest completed -run. - -1. Assuming you already have a running prediction server from the previous - section, go to http://localhost:8000 to check its status. Take note of the - **Run ID** at the top. - -2. Run training and deploy again. There is no need to manually terminate the previous deploy instance. - - ``` - $ cd $PIO_HOME/examples/scala-recommendations - $ ../../bin/pio train - $ ../../bin/pio deploy - ``` - -3. Refresh the page at http://localhost:8000, you should see the prediction - server status page with a new **Run ID** at the top. - -Congratulations! You have just experienced a production-ready setup that can -reload itself automatically after every training! Simply add the training or -evaluation command to your *crontab*, and your setup will be able to re-deploy -itself automatically at a regular interval. http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-recommendations/build.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-recommendations/build.sbt b/examples/experimental/scala-recommendations/build.sbt deleted file mode 100644 index bda1930..0000000 --- a/examples/experimental/scala-recommendations/build.sbt +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import AssemblyKeys._ - -assemblySettings - -name := "example-scala-recommendations" - -organization := "org.apache.predictionio" - -libraryDependencies ++= Seq( - "org.apache.predictionio" %% "core" % "0.9.1" % "provided", - "commons-io" % "commons-io" % "2.4", - "org.apache.spark" %% "spark-core" % "1.2.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided", - "org.json4s" %% "json4s-native" % "3.2.10") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-recommendations/engine.json ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-recommendations/engine.json b/examples/experimental/scala-recommendations/engine.json deleted file mode 100644 index a4c30e4..0000000 --- a/examples/experimental/scala-recommendations/engine.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "id": "default", - "description": "Item Recommendations examples", - "engineFactory": "org.apache.spark.mllib.recommendation.engine.RecommendationEngine", - "datasource": { - "params": { - "filepath": "../data/movielens.txt" - } - }, - "algorithms": [ - { - "name": "", - "params": { - "rank": 10, - "numIterations": 20, - "lambda": 0.01, - "persistModel": true - } - } - ] -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-recommendations/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-recommendations/project/assembly.sbt b/examples/experimental/scala-recommendations/project/assembly.sbt deleted file mode 100644 index 54c3252..0000000 --- a/examples/experimental/scala-recommendations/project/assembly.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-recommendations/src/main/scala/Run.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-recommendations/src/main/scala/Run.scala b/examples/experimental/scala-recommendations/src/main/scala/Run.scala deleted file mode 100644 index 7493d05..0000000 --- a/examples/experimental/scala-recommendations/src/main/scala/Run.scala +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.recommendation.engine - -import org.apache.predictionio.controller.Engine -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.IPersistentModel -import org.apache.predictionio.controller.IPersistentModelLoader -import org.apache.predictionio.controller.PDataSource -import org.apache.predictionio.controller.Params -import org.apache.predictionio.controller.PAlgorithm -import org.apache.predictionio.controller.PIdentityPreparator -import org.apache.predictionio.controller.LFirstServing -import org.apache.predictionio.controller.Utils -import org.apache.predictionio.controller.Workflow -import org.apache.predictionio.controller.WorkflowParams - -import org.apache.commons.io.FileUtils -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD -import org.apache.spark.mllib.recommendation.ALS -import org.apache.spark.mllib.recommendation.Rating -import org.apache.spark.mllib.recommendation.MatrixFactorizationModel -import org.json4s._ - -import scala.io.Source - -import java.io.File - -case class DataSourceParams(val filepath: String) extends Params - -case class DataSource(val dsp: DataSourceParams) - extends PDataSource[DataSourceParams, Null, RDD[Rating], (Int, Int), Double] { - - override - def read(sc: SparkContext) - : Seq[(Null, RDD[Rating], RDD[((Int, Int), Double)])] = { - val data = sc.textFile(dsp.filepath) - val ratings: RDD[Rating] = data.map(_.split("::") match { - case Array(user, item, rate) => - Rating(user.toInt, item.toInt, rate.toDouble) - }) - - val featureTargets: RDD[((Int, Int), Double)] = ratings.map { - case Rating(user, product, rate) => ((user, product), rate) - } - - Seq((null, ratings, featureTargets)) - } -} - -case class AlgorithmParams( - val rank: Int = 10, - val numIterations: Int = 20, - val lambda: Double = 0.01, - val persistModel: Boolean = false) extends Params - -class PMatrixFactorizationModel(rank: Int, - userFeatures: RDD[(Int, Array[Double])], - productFeatures: RDD[(Int, Array[Double])]) - extends MatrixFactorizationModel(rank, userFeatures, productFeatures) - with IPersistentModel[AlgorithmParams] { - def save(id: String, params: AlgorithmParams, sc: SparkContext): Boolean = { - if (params.persistModel) { - sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank") - userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures") - productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures") - } - params.persistModel - } -} - -object PMatrixFactorizationModel - extends IPersistentModelLoader[AlgorithmParams, PMatrixFactorizationModel] { - def apply(id: String, params: AlgorithmParams, sc: Option[SparkContext]) = { - new PMatrixFactorizationModel( - rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first, - userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"), - productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures")) - } -} - -class ALSAlgorithm(val ap: AlgorithmParams) - extends PAlgorithm[AlgorithmParams, RDD[Rating], - PMatrixFactorizationModel, (Int, Int), Double] { - - def train(data: RDD[Rating]): PMatrixFactorizationModel = { - val m = ALS.train(data, ap.rank, ap.numIterations, ap.lambda) - new PMatrixFactorizationModel( - rank = m.rank, - userFeatures = m.userFeatures, - productFeatures = m.productFeatures) - } - - override - def batchPredict( - model: PMatrixFactorizationModel, - feature: RDD[(Long, (Int, Int))]): RDD[(Long, Double)] = { - val indexlessFeature = feature.values - - val prediction: RDD[Rating] = model.predict(indexlessFeature) - - val p: RDD[((Int, Int), Double)] = prediction.map { - r => ((r.user, r.product), r.rating) - } - - feature.map{ _.swap } - .join(p) - .map { case (up, (fi, r)) => (fi,r) } - } - - def predict( - model: PMatrixFactorizationModel, feature: (Int, Int)): Double = { - model.predict(feature._1, feature._2) - } - - @transient override lazy val querySerializer = - Utils.json4sDefaultFormats + new Tuple2IntSerializer -} - -object Run { - def main(args: Array[String]) { - val dsp = DataSourceParams("data/movielens.txt") - val ap = AlgorithmParams() - - Workflow.run( - dataSourceClassOpt = Some(classOf[DataSource]), - dataSourceParams = dsp, - preparatorClassOpt = Some(PIdentityPreparator(classOf[DataSource])), - algorithmClassMapOpt = Some(Map("" -> classOf[ALSAlgorithm])), - algorithmParamsList = Seq(("", ap)), - servingClassOpt = Some(LFirstServing(classOf[ALSAlgorithm])), - params = WorkflowParams( - batch = "Imagine: P Recommendations", - verbose = 1 - ) - ) - } -} - -object RecommendationEngine extends IEngineFactory { - def apply() = { - new Engine( - classOf[DataSource], - PIdentityPreparator(classOf[DataSource]), - Map("" -> classOf[ALSAlgorithm]), - LFirstServing(classOf[ALSAlgorithm])) - } -} - - -class Tuple2IntSerializer extends CustomSerializer[(Int, Int)](format => ( - { - case JArray(List(JInt(x), JInt(y))) => (x.intValue, y.intValue) - }, - { - case x: (Int, Int) => JArray(List(JInt(x._1), JInt(x._2))) - } -)) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-refactor-test/build.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-refactor-test/build.sbt b/examples/experimental/scala-refactor-test/build.sbt deleted file mode 100644 index c3935fe..0000000 --- a/examples/experimental/scala-refactor-test/build.sbt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import AssemblyKeys._ - -assemblySettings - -name := "template-scala-parallel-vanilla" - -organization := "org.apache.predictionio" - -libraryDependencies ++= Seq( - //"org.apache.predictionio" %% "core" % "0.8.6" % "provided", - "org.apache.predictionio" %% "core" % "0.9.1" % "provided", - "org.apache.spark" %% "spark-core" % "1.2.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-refactor-test/engine.json ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-refactor-test/engine.json b/examples/experimental/scala-refactor-test/engine.json deleted file mode 100644 index 4cfb64d..0000000 --- a/examples/experimental/scala-refactor-test/engine.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "id": "default", - "description": "Default settings", - "engineFactory": "pio.refactor.VanillaEngine", - "datasource": { - "params" : { - "appId": 1 - } - }, - "algorithms": [ - { - "name": "algo", - "params": { - "mult" : 1 - } - } - ] -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-refactor-test/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-refactor-test/project/assembly.sbt b/examples/experimental/scala-refactor-test/project/assembly.sbt deleted file mode 100644 index 54c3252..0000000 --- a/examples/experimental/scala-refactor-test/project/assembly.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-refactor-test/src/main/scala/Algorithm.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-refactor-test/src/main/scala/Algorithm.scala b/examples/experimental/scala-refactor-test/src/main/scala/Algorithm.scala deleted file mode 100644 index 51d329e..0000000 --- a/examples/experimental/scala-refactor-test/src/main/scala/Algorithm.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package pio.refactor - -import org.apache.predictionio.controller.P2LAlgorithm -import org.apache.predictionio.controller.Params - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import grizzled.slf4j.Logger - -case class AlgorithmParams(mult: Int) extends Params - -class Algorithm(val ap: AlgorithmParams) - // extends PAlgorithm if Model contains RDD[] - extends P2LAlgorithm[TrainingData, Model, Query, PredictedResult] { - - @transient lazy val logger = Logger[this.type] - - def train(data: TrainingData): Model = { - // Simply count number of events - // and multiple it by the algorithm parameter - // and store the number as model - val count = data.events.reduce(_ + _) * ap.mult - logger.error("Algo.train") - new Model(mc = count) - } - - def predict(model: Model, query: Query): PredictedResult = { - // Prefix the query with the model data - //val result = s"${model.mc}-${query.q}" - logger.error("Algo.predict") - PredictedResult(p = model.mc + query.q) - } -} - -class Model(val mc: Int) extends Serializable { - override def toString = s"mc=${mc}" -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-refactor-test/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-refactor-test/src/main/scala/DataSource.scala b/examples/experimental/scala-refactor-test/src/main/scala/DataSource.scala deleted file mode 100644 index 1b25e42..0000000 --- a/examples/experimental/scala-refactor-test/src/main/scala/DataSource.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package pio.refactor - -import org.apache.predictionio.controller.PDataSource -import org.apache.predictionio.controller.EmptyEvaluationInfo -import org.apache.predictionio.controller.EmptyActualResult -import org.apache.predictionio.controller.Params -import org.apache.predictionio.controller._ -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.Storage - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import grizzled.slf4j.Logger - -//case class DataSourceParams(appId: Int) extends Params - -class DataSource - extends PDataSource[ - TrainingData, - EmptyEvaluationInfo, - Query, - ActualResult] { - - @transient lazy val logger = Logger[this.type] - - override - def readTraining(sc: SparkContext): TrainingData = { - new TrainingData( - events = sc.parallelize(0 until 100) - ) - } - - override - def readEval(sc: SparkContext) - : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = - { - logger.error("Datasource!!!") - (0 until 3).map { ex => - ( - readTraining(sc), - new EmptyEvaluationInfo(), - sc - .parallelize((0 until 20) - .map {i => (Query(i), new ActualResult())})) - } - } -} - -class TrainingData( - val events: RDD[Int] -) extends Serializable { - override def toString = { - s"events: [${events.count()}] (${events.take(2).toList}...)" - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-refactor-test/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-refactor-test/src/main/scala/Engine.scala b/examples/experimental/scala-refactor-test/src/main/scala/Engine.scala deleted file mode 100644 index b064004..0000000 --- a/examples/experimental/scala-refactor-test/src/main/scala/Engine.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package pio.refactor - -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.Engine -import org.apache.predictionio.controller._ -//import org.apache.predictionio.workflow.CoreWorkflow -import grizzled.slf4j.Logger - -case class Query(q: Int) - -case class PredictedResult(p: Int) - -case class ActualResult() - -object VanillaEngine extends IEngineFactory { - def apply() = { - new Engine( - classOf[DataSource], - //classOf[Preparator], - PIdentityPreparator(classOf[DataSource]), - Map("algo" -> classOf[Algorithm]), - classOf[Serving]) - } -} - -object Runner { - @transient lazy val logger = Logger[this.type] - - def main(args: Array[String]) { - val engine = VanillaEngine() - val engineParams = EngineParams( - algorithmParamsList = Seq(("algo", AlgorithmParams(2))) - ) - - logger.error("Runner. before evaluation!!!") - val evaluator = new VanillaEvaluator() - - logger.error("Runner before runEval!!!") - Workflow.runEval( - engine = engine, - engineParams = engineParams, - evaluator = evaluator, - evaluatorParams = EmptyParams()) - - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-refactor-test/src/main/scala/Evaluator.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-refactor-test/src/main/scala/Evaluator.scala b/examples/experimental/scala-refactor-test/src/main/scala/Evaluator.scala deleted file mode 100644 index 495a025..0000000 --- a/examples/experimental/scala-refactor-test/src/main/scala/Evaluator.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package pio.refactor - -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.Engine -import org.apache.predictionio.controller._ - -class VanillaEvaluator - extends Evaluator[EmptyEvaluationInfo, Query, PredictedResult, - ActualResult, Int, Int, String] { - - def evaluateUnit(q: Query, p: PredictedResult, a: ActualResult): Int = { - q.q - p.p - } - - def evaluateSet(evalInfo: EmptyEvaluationInfo, eus: Seq[Int]): Int = eus.sum - - def evaluateAll(input: Seq[(EmptyEvaluationInfo, Int)]): String = { - val sum = input.map(_._2).sum - s"VanillaEvaluator(${input.size}, $sum)" - } -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-refactor-test/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-refactor-test/src/main/scala/Preparator.scala b/examples/experimental/scala-refactor-test/src/main/scala/Preparator.scala deleted file mode 100644 index ca05459..0000000 --- a/examples/experimental/scala-refactor-test/src/main/scala/Preparator.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* -package pio.refactor - -import org.apache.predictionio.controller.PPreparator -import org.apache.predictionio.data.storage.Event - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class Preparator - extends PPreparator[TrainingData, PreparedData] { - - def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { - new PreparedData(events = trainingData.events) - } -} - -class PreparedData( - val events: RDD[Event] -) extends Serializable - -*/ http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/dec9f84c/examples/experimental/scala-refactor-test/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/experimental/scala-refactor-test/src/main/scala/Serving.scala b/examples/experimental/scala-refactor-test/src/main/scala/Serving.scala deleted file mode 100644 index 839ab1e..0000000 --- a/examples/experimental/scala-refactor-test/src/main/scala/Serving.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package pio.refactor - -import org.apache.predictionio.controller.LServing -import grizzled.slf4j.Logger - -class Serving - extends LServing[Query, PredictedResult] { - - @transient lazy val logger = Logger[this.type] - override def serve(query: Query, - predictedResults: Seq[PredictedResult]): PredictedResult = { - logger.error("Serving.serve") - predictedResults.head - } -}
