http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Evaluation.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Evaluation.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Evaluation.scala new file mode 100644 index 0000000..a665496 --- /dev/null +++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Evaluation.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.Evaluation +import org.apache.predictionio.controller.OptionAverageMetric +import org.apache.predictionio.controller.AverageMetric +import org.apache.predictionio.controller.EmptyEvaluationInfo +import org.apache.predictionio.controller.EngineParamsGenerator +import org.apache.predictionio.controller.EngineParams +import org.apache.predictionio.controller.MetricEvaluator + +// Usage: +// $ pio eval org.example.recommendation.RecommendationEvaluation \ +// org.example.recommendation.EngineParamsList + +case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0) + extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + require(k > 0, "k must be greater than 0") + + override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)" + + def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = { + val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet + + // If there is no positive results, Precision is undefined. We don't consider this case in the + // metrics, hence we return None. + if (positives.size == 0) { + None + } else { + val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size + Some(tpCount.toDouble / math.min(k, positives.size)) + } + } +} + +case class PositiveCount(ratingThreshold: Double = 2.0) + extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + override def header = s"PositiveCount (threshold=$ratingThreshold)" + + def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = { + a.ratings.filter(_.rating >= ratingThreshold).size + } +} + +object RecommendationEvaluation extends Evaluation { + engineEvaluator = ( + RecommendationEngine(), + MetricEvaluator( + metric = PrecisionAtK(k = 10, ratingThreshold = 4.0), + otherMetrics = Seq( + PositiveCount(ratingThreshold = 4.0), + PrecisionAtK(k = 10, ratingThreshold = 2.0), + PositiveCount(ratingThreshold = 2.0), + PrecisionAtK(k = 10, ratingThreshold = 1.0), + PositiveCount(ratingThreshold = 1.0) + ))) +} + + +object ComprehensiveRecommendationEvaluation extends Evaluation { + val ratingThresholds = Seq(0.0, 2.0, 4.0) + val ks = Seq(1, 3, 10) + + engineEvaluator = ( + RecommendationEngine(), + MetricEvaluator( + metric = PrecisionAtK(k = 3, ratingThreshold = 2.0), + otherMetrics = ( + (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++ + (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r)) + ))) +} + + +trait BaseEngineParamsList extends EngineParamsGenerator { + protected val baseEP = EngineParams( + dataSourceParams = DataSourceParams( + appName = "MyApp1", + evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10)))) +} + +object EngineParamsList extends BaseEngineParamsList { + engineParamsList = for( + rank <- Seq(5, 10, 20); + numIterations <- Seq(1, 5, 10)) + yield baseEP.copy( + algorithmParamsList = Seq( + ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3))))) +}
http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Preparator.scala new file mode 100644 index 0000000..6a41c47 --- /dev/null +++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Preparator.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.PPreparator + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +class Preparator + extends PPreparator[TrainingData, PreparedData] { + + def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { + new PreparedData(ratings = trainingData.ratings) + } +} + +class PreparedData( + val ratings: RDD[Rating] +) extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Serving.scala new file mode 100644 index 0000000..c478455 --- /dev/null +++ b/examples/scala-parallel-recommendation/reading-custom-events/src/main/scala/Serving.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.LServing + +class Serving + extends LServing[Query, PredictedResult] { + + override + def serve(query: Query, + predictedResults: Seq[PredictedResult]): PredictedResult = { + predictedResults.head + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/reading-custom-events/template.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/reading-custom-events/template.json b/examples/scala-parallel-recommendation/reading-custom-events/template.json new file mode 100644 index 0000000..d076ec5 --- /dev/null +++ b/examples/scala-parallel-recommendation/reading-custom-events/template.json @@ -0,0 +1 @@ +{"pio": {"version": { "min": "0.10.0-incubating" }}} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/.gitignore ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/.gitignore b/examples/scala-parallel-recommendation/train-with-view-event/.gitignore new file mode 100644 index 0000000..3f3403a --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/.gitignore @@ -0,0 +1,5 @@ +data/sample_movielens_data.txt +manifest.json +target/ +pio.log +/pio.sbt \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/build.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/build.sbt b/examples/scala-parallel-recommendation/train-with-view-event/build.sbt new file mode 100644 index 0000000..3d25df1 --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/build.sbt @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +name := "template-scala-parallel-recommendation" + +organization := "org.apache.predictionio" +scalaVersion := "2.11.8" +libraryDependencies ++= Seq( + "org.apache.predictionio" %% "apache-predictionio-core" % "0.11.0-incubating" % "provided", + "org.apache.spark" %% "spark-mllib" % "2.1.1" % "provided") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/data/import_eventserver.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/data/import_eventserver.py b/examples/scala-parallel-recommendation/train-with-view-event/data/import_eventserver.py new file mode 100644 index 0000000..f6add25 --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/data/import_eventserver.py @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Import sample data for recommendation engine +""" + +import predictionio +import argparse +import random + +RATE_ACTIONS_DELIMITER = "::" +SEED = 3 + +def import_events(client, file): + f = open(file, 'r') + random.seed(SEED) + count = 0 + print("Importing data...") + for line in f: + data = line.rstrip('\r\n').split(RATE_ACTIONS_DELIMITER) + client.create_event( + event="view", + entity_type="user", + entity_id=data[0], + target_entity_type="item", + target_entity_id=data[1] + ) + count += 1 + # For demonstration purpose, randomly mix in some buy events + if (random.randint(0, 1) == 1): + client.create_event( + event="rate", + entity_type="user", + entity_id=data[0], + target_entity_type="item", + target_entity_id=data[1], + properties= { "rating" : float(data[2]) } + ) + else: + client.create_event( + event="buy", + entity_type="user", + entity_id=data[0], + target_entity_type="item", + target_entity_id=data[1] + ) + count += 1 + f.close() + print("%s events are imported." % count) + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description="Import sample data for recommendation engine") + parser.add_argument('--access_key', default='invald_access_key') + parser.add_argument('--url', default="http://localhost:7070") + parser.add_argument('--file', default="./data/sample_movielens_data.txt") + + args = parser.parse_args() + print(args) + + client = predictionio.EventClient( + access_key=args.access_key, + url=args.url, + threads=5, + qsize=500) + import_events(client, args.file) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/data/send_query.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/data/send_query.py b/examples/scala-parallel-recommendation/train-with-view-event/data/send_query.py new file mode 100644 index 0000000..f6ec9ab --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/data/send_query.py @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Send sample query to prediction engine +""" + +import predictionio +engine_client = predictionio.EngineClient(url="http://localhost:8000") +print(engine_client.send_query({"user": "1", "num": 4})) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/engine.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/engine.json b/examples/scala-parallel-recommendation/train-with-view-event/engine.json new file mode 100644 index 0000000..718b0e1 --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/engine.json @@ -0,0 +1,21 @@ +{ + "id": "default", + "description": "Default settings", + "engineFactory": "org.apache.predictionio.examples.recommendation.RecommendationEngine", + "datasource": { + "params" : { + "appName": "MyApp1" + } + }, + "algorithms": [ + { + "name": "als", + "params": { + "rank": 10, + "numIterations": 20, + "lambda": 0.01, + "seed": 3 + } + } + ] +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/project/assembly.sbt b/examples/scala-parallel-recommendation/train-with-view-event/project/assembly.sbt new file mode 100644 index 0000000..92636bc --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/project/build.properties ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/project/build.properties b/examples/scala-parallel-recommendation/train-with-view-event/project/build.properties new file mode 100644 index 0000000..64317fd --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.15 http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala new file mode 100644 index 0000000..3aa1953 --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSAlgorithm.scala @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.PAlgorithm +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data.storage.BiMap + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.recommendation.ALS +import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} +import org.apache.spark.mllib.recommendation.ALSModel + +import grizzled.slf4j.Logger + +case class ALSAlgorithmParams( + rank: Int, + numIterations: Int, + lambda: Double, + seed: Option[Long]) extends Params + +class ALSAlgorithm(val ap: ALSAlgorithmParams) + extends PAlgorithm[PreparedData, ALSModel, Query, PredictedResult] { + + @transient lazy val logger = Logger[this.type] + + if (ap.numIterations > 30) { + logger.warn( + s"ALSAlgorithmParams.numIterations > 30, current: ${ap.numIterations}. " + + s"There is a chance of running to StackOverflowException." + + s"To remedy it, set lower numIterations or checkpoint parameters.") + } + + def train(sc: SparkContext, data: PreparedData): ALSModel = { + // MLLib ALS cannot handle empty training data. + require(!data.ratings.take(1).isEmpty, + s"RDD[Rating] in PreparedData cannot be empty." + + " Please check if DataSource generates TrainingData" + + " and Preparator generates PreparedData correctly.") + // Convert user and item String IDs to Int index for MLlib + + val userStringIntMap = BiMap.stringInt(data.ratings.map(_.user)) + val itemStringIntMap = BiMap.stringInt(data.ratings.map(_.item)) + val mllibRatings = data.ratings.map( r => + // MLlibRating requires integer index for user and item + MLlibRating(userStringIntMap(r.user), itemStringIntMap(r.item), r.rating) + ) + + // seed for MLlib ALS + val seed = ap.seed.getOrElse(System.nanoTime) + + // Set checkpoint directory + // sc.setCheckpointDir("checkpoint") + + // If you only have one type of implicit event (Eg. "view" event only), + // set implicitPrefs to true + // MODIFIED + val implicitPrefs = true + val als = new ALS() + als.setUserBlocks(-1) + als.setProductBlocks(-1) + als.setRank(ap.rank) + als.setIterations(ap.numIterations) + als.setLambda(ap.lambda) + als.setImplicitPrefs(implicitPrefs) + als.setAlpha(1.0) + als.setSeed(seed) + als.setCheckpointInterval(10) + val m = als.run(mllibRatings) + + new ALSModel( + rank = m.rank, + userFeatures = m.userFeatures, + productFeatures = m.productFeatures, + userStringIntMap = userStringIntMap, + itemStringIntMap = itemStringIntMap) + } + + def predict(model: ALSModel, query: Query): PredictedResult = { + // Convert String ID to Int index for Mllib + model.userStringIntMap.get(query.user).map { userInt => + // create inverse view of itemStringIntMap + val itemIntStringMap = model.itemStringIntMap.inverse + // recommendProducts() returns Array[MLlibRating], which uses item Int + // index. Convert it to String ID for returning PredictedResult + val itemScores = model.recommendProducts(userInt, query.num) + .map (r => ItemScore(itemIntStringMap(r.product), r.rating)) + new PredictedResult(itemScores) + }.getOrElse{ + logger.info(s"No prediction for unknown user ${query.user}.") + new PredictedResult(Array.empty) + } + } + + // This function is used by the evaluation module, where a batch of queries is sent to this engine + // for evaluation purpose. + override def batchPredict(model: ALSModel, queries: RDD[(Long, Query)]): RDD[(Long, PredictedResult)] = { + val userIxQueries: RDD[(Int, (Long, Query))] = queries + .map { case (ix, query) => { + // If user not found, then the index is -1 + val userIx = model.userStringIntMap.get(query.user).getOrElse(-1) + (userIx, (ix, query)) + }} + + // Cross product of all valid users from the queries and products in the model. + val usersProducts: RDD[(Int, Int)] = userIxQueries + .keys + .filter(_ != -1) + .cartesian(model.productFeatures.map(_._1)) + + // Call mllib ALS's predict function. + val ratings: RDD[MLlibRating] = model.predict(usersProducts) + + // The following code construct predicted results from mllib's ratings. + // Not optimal implementation. Instead of groupBy, should use combineByKey with a PriorityQueue + val userRatings: RDD[(Int, Iterable[MLlibRating])] = ratings.groupBy(_.user) + + userIxQueries.leftOuterJoin(userRatings) + .map { + // When there are ratings + case (userIx, ((ix, query), Some(ratings))) => { + val topItemScores: Array[ItemScore] = ratings + .toArray + .sortBy(_.rating)(Ordering.Double.reverse) // note: from large to small ordering + .take(query.num) + .map { rating => ItemScore( + model.itemStringIntMap.inverse(rating.product), + rating.rating) } + + (ix, PredictedResult(itemScores = topItemScores)) + } + // When user doesn't exist in training data + case (userIx, ((ix, query), None)) => { + require(userIx == -1) + (ix, PredictedResult(itemScores = Array.empty)) + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSModel.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSModel.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSModel.scala new file mode 100644 index 0000000..898858d --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/ALSModel.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.recommendation +// This must be the same package as Spark's MatrixFactorizationModel because +// MatrixFactorizationModel's constructor is private and we are using +// its constructor in order to save and load the model + +import org.apache.predictionio.examples.recommendation.ALSAlgorithmParams + +import org.apache.predictionio.controller.PersistentModel +import org.apache.predictionio.controller.PersistentModelLoader +import org.apache.predictionio.data.storage.BiMap + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +class ALSModel( + override val rank: Int, + override val userFeatures: RDD[(Int, Array[Double])], + override val productFeatures: RDD[(Int, Array[Double])], + val userStringIntMap: BiMap[String, Int], + val itemStringIntMap: BiMap[String, Int]) + extends MatrixFactorizationModel(rank, userFeatures, productFeatures) + with PersistentModel[ALSAlgorithmParams] { + + def save(id: String, params: ALSAlgorithmParams, + sc: SparkContext): Boolean = { + + sc.parallelize(Seq(rank)).saveAsObjectFile(s"/tmp/${id}/rank") + userFeatures.saveAsObjectFile(s"/tmp/${id}/userFeatures") + productFeatures.saveAsObjectFile(s"/tmp/${id}/productFeatures") + sc.parallelize(Seq(userStringIntMap)) + .saveAsObjectFile(s"/tmp/${id}/userStringIntMap") + sc.parallelize(Seq(itemStringIntMap)) + .saveAsObjectFile(s"/tmp/${id}/itemStringIntMap") + true + } + + override def toString = { + s"userFeatures: [${userFeatures.count()}]" + + s"(${userFeatures.take(2).toList}...)" + + s" productFeatures: [${productFeatures.count()}]" + + s"(${productFeatures.take(2).toList}...)" + + s" userStringIntMap: [${userStringIntMap.size}]" + + s"(${userStringIntMap.take(2)}...)" + + s" itemStringIntMap: [${itemStringIntMap.size}]" + + s"(${itemStringIntMap.take(2)}...)" + } +} + +object ALSModel + extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] { + def apply(id: String, params: ALSAlgorithmParams, + sc: Option[SparkContext]) = { + new ALSModel( + rank = sc.get.objectFile[Int](s"/tmp/${id}/rank").first, + userFeatures = sc.get.objectFile(s"/tmp/${id}/userFeatures"), + productFeatures = sc.get.objectFile(s"/tmp/${id}/productFeatures"), + userStringIntMap = sc.get + .objectFile[BiMap[String, Int]](s"/tmp/${id}/userStringIntMap").first, + itemStringIntMap = sc.get + .objectFile[BiMap[String, Int]](s"/tmp/${id}/itemStringIntMap").first) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/DataSource.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/DataSource.scala new file mode 100644 index 0000000..21796db --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/DataSource.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.PDataSource +import org.apache.predictionio.controller.EmptyEvaluationInfo +import org.apache.predictionio.controller.EmptyActualResult +import org.apache.predictionio.controller.Params +import org.apache.predictionio.data.storage.Event +import org.apache.predictionio.data.store.PEventStore + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import grizzled.slf4j.Logger + +case class DataSourceEvalParams(kFold: Int, queryNum: Int) + +case class DataSourceParams( + appName: String, + evalParams: Option[DataSourceEvalParams]) extends Params + +class DataSource(val dsp: DataSourceParams) + extends PDataSource[TrainingData, + EmptyEvaluationInfo, Query, ActualResult] { + + @transient lazy val logger = Logger[this.type] + + def getRatings(sc: SparkContext): RDD[Rating] = { + + val eventsRDD: RDD[Event] = PEventStore.find( + appName = dsp.appName, + entityType = Some("user"), + eventNames = Some(List("view")), // MODIFIED + // targetEntityType is optional field of an event. + targetEntityType = Some(Some("item")))(sc) + + val ratingsRDD: RDD[Rating] = eventsRDD.map { event => + try { + val ratingValue: Double = event.event match { + case "view" => 1.0 // MODIFIED + case _ => throw new Exception(s"Unexpected event ${event} is read.") + } + // MODIFIED + // key is (user id, item id) + // value is the rating value, which is 1. + ((event.entityId, event.targetEntityId.get), ratingValue) + } catch { + case e: Exception => { + logger.error(s"Cannot convert ${event} to Rating. Exception: ${e}.") + throw e + } + } + } + // MODIFIED + // sum all values for the same user id and item id key + .reduceByKey { case (a, b) => a + b } + .map { case ((uid, iid), r) => + Rating(uid, iid, r) + }.cache() + + ratingsRDD + } + + override + def readTraining(sc: SparkContext): TrainingData = { + new TrainingData(getRatings(sc)) + } + + override + def readEval(sc: SparkContext) + : Seq[(TrainingData, EmptyEvaluationInfo, RDD[(Query, ActualResult)])] = { + require(!dsp.evalParams.isEmpty, "Must specify evalParams") + val evalParams = dsp.evalParams.get + + val kFold = evalParams.kFold + val ratings: RDD[(Rating, Long)] = getRatings(sc).zipWithUniqueId + ratings.cache + + (0 until kFold).map { idx => { + val trainingRatings = ratings.filter(_._2 % kFold != idx).map(_._1) + val testingRatings = ratings.filter(_._2 % kFold == idx).map(_._1) + + val testingUsers: RDD[(String, Iterable[Rating])] = testingRatings.groupBy(_.user) + + (new TrainingData(trainingRatings), + new EmptyEvaluationInfo(), + testingUsers.map { + case (user, ratings) => (Query(user, evalParams.queryNum), ActualResult(ratings.toArray)) + } + ) + }} + } +} + +case class Rating( + user: String, + item: String, + rating: Double +) + +class TrainingData( + val ratings: RDD[Rating] +) extends Serializable { + override def toString = { + s"ratings: [${ratings.count()}] (${ratings.take(2).toList}...)" + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Engine.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Engine.scala new file mode 100644 index 0000000..b2a668b --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Engine.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.EngineFactory +import org.apache.predictionio.controller.Engine + +case class Query( + user: String, + num: Int +) extends Serializable + +case class PredictedResult( + itemScores: Array[ItemScore] +) extends Serializable + +case class ActualResult( + ratings: Array[Rating] +) extends Serializable + +case class ItemScore( + item: String, + score: Double +) extends Serializable + +object RecommendationEngine extends EngineFactory { + def apply() = { + new Engine( + classOf[DataSource], + classOf[Preparator], + Map("als" -> classOf[ALSAlgorithm]), + classOf[Serving]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Evaluation.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Evaluation.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Evaluation.scala new file mode 100644 index 0000000..a665496 --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Evaluation.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.Evaluation +import org.apache.predictionio.controller.OptionAverageMetric +import org.apache.predictionio.controller.AverageMetric +import org.apache.predictionio.controller.EmptyEvaluationInfo +import org.apache.predictionio.controller.EngineParamsGenerator +import org.apache.predictionio.controller.EngineParams +import org.apache.predictionio.controller.MetricEvaluator + +// Usage: +// $ pio eval org.example.recommendation.RecommendationEvaluation \ +// org.example.recommendation.EngineParamsList + +case class PrecisionAtK(k: Int, ratingThreshold: Double = 2.0) + extends OptionAverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + require(k > 0, "k must be greater than 0") + + override def header = s"Precision@K (k=$k, threshold=$ratingThreshold)" + + def calculate(q: Query, p: PredictedResult, a: ActualResult): Option[Double] = { + val positives: Set[String] = a.ratings.filter(_.rating >= ratingThreshold).map(_.item).toSet + + // If there is no positive results, Precision is undefined. We don't consider this case in the + // metrics, hence we return None. + if (positives.size == 0) { + None + } else { + val tpCount: Int = p.itemScores.take(k).filter(is => positives(is.item)).size + Some(tpCount.toDouble / math.min(k, positives.size)) + } + } +} + +case class PositiveCount(ratingThreshold: Double = 2.0) + extends AverageMetric[EmptyEvaluationInfo, Query, PredictedResult, ActualResult] { + override def header = s"PositiveCount (threshold=$ratingThreshold)" + + def calculate(q: Query, p: PredictedResult, a: ActualResult): Double = { + a.ratings.filter(_.rating >= ratingThreshold).size + } +} + +object RecommendationEvaluation extends Evaluation { + engineEvaluator = ( + RecommendationEngine(), + MetricEvaluator( + metric = PrecisionAtK(k = 10, ratingThreshold = 4.0), + otherMetrics = Seq( + PositiveCount(ratingThreshold = 4.0), + PrecisionAtK(k = 10, ratingThreshold = 2.0), + PositiveCount(ratingThreshold = 2.0), + PrecisionAtK(k = 10, ratingThreshold = 1.0), + PositiveCount(ratingThreshold = 1.0) + ))) +} + + +object ComprehensiveRecommendationEvaluation extends Evaluation { + val ratingThresholds = Seq(0.0, 2.0, 4.0) + val ks = Seq(1, 3, 10) + + engineEvaluator = ( + RecommendationEngine(), + MetricEvaluator( + metric = PrecisionAtK(k = 3, ratingThreshold = 2.0), + otherMetrics = ( + (for (r <- ratingThresholds) yield PositiveCount(ratingThreshold = r)) ++ + (for (r <- ratingThresholds; k <- ks) yield PrecisionAtK(k = k, ratingThreshold = r)) + ))) +} + + +trait BaseEngineParamsList extends EngineParamsGenerator { + protected val baseEP = EngineParams( + dataSourceParams = DataSourceParams( + appName = "MyApp1", + evalParams = Some(DataSourceEvalParams(kFold = 5, queryNum = 10)))) +} + +object EngineParamsList extends BaseEngineParamsList { + engineParamsList = for( + rank <- Seq(5, 10, 20); + numIterations <- Seq(1, 5, 10)) + yield baseEP.copy( + algorithmParamsList = Seq( + ("als", ALSAlgorithmParams(rank, numIterations, 0.01, Some(3))))) +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Preparator.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Preparator.scala new file mode 100644 index 0000000..6a41c47 --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Preparator.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.PPreparator + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +class Preparator + extends PPreparator[TrainingData, PreparedData] { + + def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { + new PreparedData(ratings = trainingData.ratings) + } +} + +class PreparedData( + val ratings: RDD[Rating] +) extends Serializable http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Serving.scala b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Serving.scala new file mode 100644 index 0000000..c478455 --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/src/main/scala/Serving.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.predictionio.examples.recommendation + +import org.apache.predictionio.controller.LServing + +class Serving + extends LServing[Query, PredictedResult] { + + override + def serve(query: Query, + predictedResults: Seq[PredictedResult]): PredictedResult = { + predictedResults.head + } +} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-recommendation/train-with-view-event/template.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-recommendation/train-with-view-event/template.json b/examples/scala-parallel-recommendation/train-with-view-event/template.json new file mode 100644 index 0000000..d076ec5 --- /dev/null +++ b/examples/scala-parallel-recommendation/train-with-view-event/template.json @@ -0,0 +1 @@ +{"pio": {"version": { "min": "0.10.0-incubating" }}} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/README.md ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/README.md b/examples/scala-parallel-similarproduct/README.md new file mode 100644 index 0000000..0a56220 --- /dev/null +++ b/examples/scala-parallel-similarproduct/README.md @@ -0,0 +1,20 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +This is based on Similar Product Template v0.11.0-incubating. + +Please refer to http://predictionio.incubator.apache.org/templates/similarproduct/how-to/ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/.gitignore ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/.gitignore b/examples/scala-parallel-similarproduct/add-and-return-item-properties/.gitignore deleted file mode 100644 index 57841c6..0000000 --- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -manifest.json -target/ -pio.log -/pio.sbt http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/README.md ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/README.md b/examples/scala-parallel-similarproduct/add-and-return-item-properties/README.md deleted file mode 100644 index e877e1c..0000000 --- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/README.md +++ /dev/null @@ -1,205 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> - ---- -#PredictionIO: Add Your Own Properties to Returned Items ---- - -This small how-to explains how to add user defined properties to items returned by PredictionIO engine. -This how-to is based on the [Similar Product Engine Template](http://predictionio.incubator.apache.org/templates/similarproduct/quickstart/) version v0.1.3 -To use this how-to you need to be familiar with scala programming language. -In this how-to we also suppose you was able to set up and run `Similar Product Engine` (see their [quick start guide](http://predictionio.incubator.apache.org/templates/similarproduct/quickstart/)). - -A full end-to-end example can be found on -[GitHub](https://github.com/apache/incubator-predictionio/tree/develop/examples/scala-parallel-similarproduct/add-and-return-item-properties). - -## THE TASK - -Suppose you would like to use [Similar Product Engine](http://predictionio.incubator.apache.org/templates/similarproduct/quickstart/) -for suggesting your users the videos they can also like. The `Similar Product Engine` will answer to you -with list of IDs for such videos. So, for example `REST` response from the engine right now -looks like the one below -```json -{"itemScores":[ - { - "item":"i12", - "score":1.1700499715209998 - },{ - "item":"i44", - "score":1.1153550716504106 - } -]} -``` - -But you want the engine to return more information about every video. Let's think you want add fields -`title`, `date`, and `imdbUrl` to every item, so, the resulting `REST` respose -for your case should look similar to the posted below -```json -{"itemScores":[ - { - "item":"i12", - "title":"title for movie i12", - "date":"1935", - "imdbUrl":"http://imdb.com/fake-url/i12", - "score":1.1700499715209998 - },{ - "item":"i44", - "title":"title for movie i44", - "date":"1974", - "imdbUrl":"http://imdb.com/fake-url/i44", - "score":1.1153550716504106 - } -]} -``` - -## SO, HOW TO? - -### The Main Idea - -Recall [the DASE Architecture](http://predictionio.incubator.apache.org/templates/similarproduct/dase/), a PredictionIO engine has -4 main components: `Data Source`, `Data Preparator`, `Algorithm`, and `Serving` -components. To achieve your goal, you will need provide the information about video to engine -(using sdk), and then let this information to pass from `Data Source` through all the engine -to the `Serving` component where the engine will send required information back to your application. - -### Implementation - -#### Modify The Item -In file [DataSource.scala#L104](https://github.com/apache/incubator-predictionio/blob/develop/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/DataSource.scala#L104) -you will find class `Item` defined in the next way -```scala -case class Item(categories: Option[List[String]]) -``` - -At the first, we need simply add required fields to this class -```scala -case class Item( - title: String, - date: String, - imdbUrl: String, - categories: Option[List[String]]) -``` - -#### Create The Item Properly -Now, your IDE (or compiler) will say you about all the places where you need make changes to create item -properly. For example, [DataSource.scala#L52](https://github.com/apache/incubator-predictionio/blob/develop/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/DataSource.scala#L52) -```scala -Item(categories = properties.getOpt[List[String]]("categories")) -``` -You need now to add needed properties to item -```scala -Item( - title = properties.get[String]("title"), - date = properties.get[String]("date"), - imdbUrl = properties.get[String]("imdbUrl"), - categories = properties.getOpt[List[String]]("categories")) -``` - -#### Modify The ItemScore -Now, when you've fixed item creation, take a look on class `ItemScore` from the file [Engine.scala](https://github.com/apache/incubator-predictionio/blob/develop/examples/scala-parallel-similarproduct/multi/src/main/scala/Engine.scala) -```scala -case class ItemScore( - item: String, - score: Double -) extends Serializable -``` -Engine will return class `PredictedResult` which contains property `itemScores: Array[ItemScore]`. -So, since your result items are of class`ItemScore`, you need modify this class too. -In our example after modification you will have something similar to below -```scala -case class ItemScore( - item: String, - title: String, - date: String, - imdbUrl: String, - score: Double -) extends Serializable -``` - -#### Create The ItemScore Properly - -Again, now you need to go through all the places where `ItemScore` is created and fix compiler errors. - -Result is initially created by the `Algorithm` component and then is passed to the `Serving` component. -Take a look on a place where object of class ItemScore is initially created in file [ALSAlgorithm.scala#L171](https://github.com/apache/incubator-predictionio/blob/develop/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/ALSAlgorithm.scala#L171). -```scala -new ItemScore( - item = model.itemIntStringMap(i), - score = s -) -``` -You code after changes will be similar to posted below -```scala -val it = model.items(i) -new ItemScore( - item = model.itemIntStringMap(i), - title = it.title, - date = it.date, - imdbUrl = it.imdbUrl, - score = s -) -``` -Using `model.items(i)` you can receive corresponding object of the `Item` class, -and now you can access its properties which you created during previous step. -Using `model.itemIntStringMap(i)` you can receive ID of corresponding item. - -#### Modify Script That Supplies Data For The Engine -And this is the final step. You should supply your data to the engine using new format now. -To get the idea take a look on this piece of code in our [sample python script](https://github.com/apache/incubator-predictionio/blob/develop/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/import_eventserver.py#L34) -that creates test. - -Creating item before modification. -```python -client.create_event( - event="$set", - entity_type="item", - entity_id=item_id, - properties={ - "categories" : random.sample(categories, random.randint(1, 4)) - } -) -``` -Creating item after modification. -```python -client.create_event( - event="$set", - entity_type="item", - entity_id=item_id, - properties={ - "categories" : random.sample(categories, random.randint(1, 4)), - "title": "title for movie " + item_id, - "date": 1935 + random.randint(1, 25), - "imdbUrl": "http://imdb.com/fake-url/" + item_id - } -) -``` - -#### Try It! -When you are ready, don't forget to fill application with new data and then -```bash -$ pio build -$ pio train -$ pio deploy -``` - -Now, you should be able to see desired results by querying engine -```bash -curl -H "Content-Type: application/json" -d '{ "items": ["i1", "i3"], "num": 10}' http://localhost:8000/queries.json -``` - -A full end-to-end example can be found on -[GitHub](https://github.com/apache/incubator-predictionio/tree/develop/examples/scala-parallel-similarproduct/add-and-return-item-properties) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/build.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/build.sbt b/examples/scala-parallel-similarproduct/add-and-return-item-properties/build.sbt deleted file mode 100644 index ef66b2f..0000000 --- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/build.sbt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import AssemblyKeys._ - -assemblySettings - -name := "template-scala-parallel-similarproduct" - -organization := "org.apache.predictionio" - -libraryDependencies ++= Seq( - "org.apache.predictionio" %% "core" % pioVersion.value % "provided", - "org.apache.spark" %% "spark-core" % "1.2.0" % "provided", - "org.apache.spark" %% "spark-mllib" % "1.2.0" % "provided") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/import_eventserver.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/import_eventserver.py b/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/import_eventserver.py deleted file mode 100644 index 15aa38c..0000000 --- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/import_eventserver.py +++ /dev/null @@ -1,93 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Import sample data for similar product engine -""" - -import predictionio -import argparse -import random - -SEED = 3 - -def import_events(client): - random.seed(SEED) - count = 0 - print client.get_status() - print "Importing data..." - - # generate 10 users, with user ids u1,u2,....,u10 - user_ids = ["u%s" % i for i in range(1, 11)] - for user_id in user_ids: - print "Set user", user_id - client.create_event( - event="$set", - entity_type="user", - entity_id=user_id - ) - count += 1 - - # generate 50 items, with item ids i1,i2,....,i50 - # random assign 1 to 4 categories among c1-c6 to items - categories = ["c%s" % i for i in range(1, 7)] - item_ids = ["i%s" % i for i in range(1, 51)] - for item_id in item_ids: - print "Set item", item_id - client.create_event( - event="$set", - entity_type="item", - entity_id=item_id, - properties={ - "categories" : random.sample(categories, random.randint(1, 4)), - "title": "title for movie " + item_id, - "date": 1935 + random.randint(1, 25), - "imdbUrl": "http://imdb.com/fake-url/" + item_id - } - ) - count += 1 - - # each user randomly viewed 10 items - for user_id in user_ids: - for viewed_item in random.sample(item_ids, 10): - print "User", user_id ,"views item", viewed_item - client.create_event( - event="view", - entity_type="user", - entity_id=user_id, - target_entity_type="item", - target_entity_id=viewed_item - ) - count += 1 - - print "%s events are imported." % count - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description="Import sample data for similar product engine") - parser.add_argument('--access_key', default='invald_access_key') - parser.add_argument('--url', default="http://localhost:7070") - - args = parser.parse_args() - print args - - client = predictionio.EventClient( - access_key=args.access_key, - url=args.url, - threads=5, - qsize=500) - import_events(client) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/send_query.py ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/send_query.py b/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/send_query.py deleted file mode 100644 index 8678b15..0000000 --- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/data/send_query.py +++ /dev/null @@ -1,24 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Send sample query to prediction engine -""" - -import predictionio -engine_client = predictionio.EngineClient(url="http://localhost:8000") -print engine_client.send_query({"items": ["i1", "i3"], "num": 4}) http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/engine.json ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/engine.json b/examples/scala-parallel-similarproduct/add-and-return-item-properties/engine.json deleted file mode 100644 index 9aa6dfa..0000000 --- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/engine.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "id": "default", - "description": "Default settings", - "engineFactory": "org.template.similarproduct.SimilarProductEngine", - "datasource": { - "params" : { - "appId": 2 - } - }, - "algorithms": [ - { - "name": "als", - "params": { - "rank": 10, - "numIterations" : 20, - "lambda": 0.01, - "seed": 3 - } - } - ] -} http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/assembly.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/assembly.sbt b/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/assembly.sbt deleted file mode 100644 index 54c3252..0000000 --- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/assembly.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/pio-build.sbt ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/pio-build.sbt b/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/pio-build.sbt deleted file mode 100644 index 9aed0ee..0000000 --- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/project/pio-build.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("org.apache.predictionio" % "pio-build" % "0.9.0") http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/ALSAlgorithm.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/ALSAlgorithm.scala b/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/ALSAlgorithm.scala deleted file mode 100644 index 3f0b628..0000000 --- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/ALSAlgorithm.scala +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.P2LAlgorithm -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.BiMap - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.recommendation.ALS -import org.apache.spark.mllib.recommendation.{Rating => MLlibRating} - -import grizzled.slf4j.Logger - -import scala.collection.mutable.PriorityQueue - -case class ALSAlgorithmParams( - rank: Int, - numIterations: Int, - lambda: Double, - seed: Option[Long]) extends Params - -class ALSModel( - val productFeatures: Map[Int, Array[Double]], - val itemStringIntMap: BiMap[String, Int], - val items: Map[Int, Item] -) extends Serializable { - - @transient lazy val itemIntStringMap = itemStringIntMap.inverse - - override def toString = { - s" productFeatures: [${productFeatures.size}]" + - s"(${productFeatures.take(2).toList}...)" + - s" itemStringIntMap: [${itemStringIntMap.size}]" + - s"(${itemStringIntMap.take(2).toString}...)]" + - s" items: [${items.size}]" + - s"(${items.take(2).toString}...)]" - } -} - -/** - * Use ALS to build item x feature matrix - */ -class ALSAlgorithm(val ap: ALSAlgorithmParams) - extends P2LAlgorithm[PreparedData, ALSModel, Query, PredictedResult] { - - @transient lazy val logger = Logger[this.type] - - def train(sc: SparkContext, data: PreparedData): ALSModel = { - require(!data.viewEvents.take(1).isEmpty, - s"viewEvents in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - require(!data.users.take(1).isEmpty, - s"users in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - require(!data.items.take(1).isEmpty, - s"items in PreparedData cannot be empty." + - " Please check if DataSource generates TrainingData" + - " and Preprator generates PreparedData correctly.") - // create User and item's String ID to integer index BiMap - val userStringIntMap = BiMap.stringInt(data.users.keys) - val itemStringIntMap = BiMap.stringInt(data.items.keys) - - // collect Item as Map and convert ID to Int index - val items: Map[Int, Item] = data.items.map { case (id, item) => - (itemStringIntMap(id), item) - }.collectAsMap.toMap - - val mllibRatings = data.viewEvents - .map { r => - // Convert user and item String IDs to Int index for MLlib - val uindex = userStringIntMap.getOrElse(r.user, -1) - val iindex = itemStringIntMap.getOrElse(r.item, -1) - - if (uindex == -1) - logger.info(s"Couldn't convert nonexistent user ID ${r.user}" - + " to Int index.") - - if (iindex == -1) - logger.info(s"Couldn't convert nonexistent item ID ${r.item}" - + " to Int index.") - - ((uindex, iindex), 1) - }.filter { case ((u, i), v) => - // keep events with valid user and item index - (u != -1) && (i != -1) - }.reduceByKey(_ + _) // aggregate all view events of same user-item pair - .map { case ((u, i), v) => - // MLlibRating requires integer index for user and item - MLlibRating(u, i, v) - } - .cache() - - // MLLib ALS cannot handle empty training data. - require(!mllibRatings.take(1).isEmpty, - s"mllibRatings cannot be empty." + - " Please check if your events contain valid user and item ID.") - - // seed for MLlib ALS - val seed = ap.seed.getOrElse(System.nanoTime) - - val m = ALS.trainImplicit( - ratings = mllibRatings, - rank = ap.rank, - iterations = ap.numIterations, - lambda = ap.lambda, - blocks = -1, - alpha = 1.0, - seed = seed) - - new ALSModel( - productFeatures = m.productFeatures.collectAsMap.toMap, - itemStringIntMap = itemStringIntMap, - items = items - ) - } - - def predict(model: ALSModel, query: Query): PredictedResult = { - - val productFeatures = model.productFeatures - - // convert items to Int index - val queryList: Set[Int] = query.items.map(model.itemStringIntMap.get(_)) - .flatten.toSet - - val queryFeatures: Vector[Array[Double]] = queryList.toVector - // productFeatures may not contain the requested item - .map { item => productFeatures.get(item) } - .flatten - - val whiteList: Option[Set[Int]] = query.whiteList.map( set => - set.map(model.itemStringIntMap.get(_)).flatten - ) - val blackList: Option[Set[Int]] = query.blackList.map ( set => - set.map(model.itemStringIntMap.get(_)).flatten - ) - - val ord = Ordering.by[(Int, Double), Double](_._2).reverse - - val indexScores: Array[(Int, Double)] = if (queryFeatures.isEmpty) { - logger.info(s"No productFeatures vector for query items ${query.items}.") - Array[(Int, Double)]() - } else { - productFeatures.par // convert to parallel collection - .mapValues { f => - queryFeatures.map{ qf => - cosine(qf, f) - }.reduce(_ + _) - } - .filter(_._2 > 0) // keep items with score > 0 - .seq // convert back to sequential collection - .toArray - } - - val filteredScore = indexScores.view.filter { case (i, v) => - isCandidateItem( - i = i, - items = model.items, - categories = query.categories, - queryList = queryList, - whiteList = whiteList, - blackList = blackList - ) - } - - val topScores = getTopN(filteredScore, query.num)(ord).toArray - - val itemScores = topScores.map { case (i, s) => - val it = model.items(i) - new ItemScore( - item = model.itemIntStringMap(i), - title = it.title, - date = it.date, - imdbUrl = it.imdbUrl, - score = s - ) - } - - new PredictedResult(itemScores) - } - - private - def getTopN[T](s: Seq[T], n: Int)(implicit ord: Ordering[T]): Seq[T] = { - - val q = PriorityQueue() - - for (x <- s) { - if (q.size < n) - q.enqueue(x) - else { - // q is full - if (ord.compare(x, q.head) < 0) { - q.dequeue() - q.enqueue(x) - } - } - } - - q.dequeueAll.toSeq.reverse - } - - private - def cosine(v1: Array[Double], v2: Array[Double]): Double = { - val size = v1.size - var i = 0 - var n1: Double = 0 - var n2: Double = 0 - var d: Double = 0 - while (i < size) { - n1 += v1(i) * v1(i) - n2 += v2(i) * v2(i) - d += v1(i) * v2(i) - i += 1 - } - val n1n2 = (math.sqrt(n1) * math.sqrt(n2)) - if (n1n2 == 0) 0 else (d / n1n2) - } - - private - def isCandidateItem( - i: Int, - items: Map[Int, Item], - categories: Option[Set[String]], - queryList: Set[Int], - whiteList: Option[Set[Int]], - blackList: Option[Set[Int]] - ): Boolean = { - whiteList.map(_.contains(i)).getOrElse(true) && - blackList.map(!_.contains(i)).getOrElse(true) && - // discard items in query as well - (!queryList.contains(i)) && - // filter categories - categories.map { cat => - items(i).categories.map { itemCat => - // keep this item if has ovelap categories with the query - !(itemCat.toSet.intersect(cat).isEmpty) - }.getOrElse(false) // discard this item if it has no categories - }.getOrElse(true) - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/DataSource.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/DataSource.scala b/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/DataSource.scala deleted file mode 100644 index 1c60e8a..0000000 --- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/DataSource.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.PDataSource -import org.apache.predictionio.controller.EmptyEvaluationInfo -import org.apache.predictionio.controller.EmptyActualResult -import org.apache.predictionio.controller.Params -import org.apache.predictionio.data.storage.Event -import org.apache.predictionio.data.storage.Storage - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -import grizzled.slf4j.Logger - -case class DataSourceParams(appId: Int) extends Params - -class DataSource(val dsp: DataSourceParams) - extends PDataSource[TrainingData, - EmptyEvaluationInfo, Query, EmptyActualResult] { - - @transient lazy val logger = Logger[this.type] - - override - def readTraining(sc: SparkContext): TrainingData = { - val eventsDb = Storage.getPEvents() - - // create a RDD of (entityID, User) - val usersRDD: RDD[(String, User)] = eventsDb.aggregateProperties( - appId = dsp.appId, - entityType = "user" - )(sc).map { case (entityId, properties) => - val user = try { - User() - } catch { - case e: Exception => { - logger.error(s"Failed to get properties ${properties} of" + - s" user ${entityId}. Exception: ${e}.") - throw e - } - } - (entityId, user) - }.cache() - - // create a RDD of (entityID, Item) - val itemsRDD: RDD[(String, Item)] = eventsDb.aggregateProperties( - appId = dsp.appId, - entityType = "item" - )(sc).map { case (entityId, properties) => - val item = try { - // Assume categories is optional property of item. - Item( - title = properties.get[String]("title"), - date = properties.get[String]("date"), - imdbUrl = properties.get[String]("imdbUrl"), - categories = properties.getOpt[List[String]]("categories")) - } catch { - case e: Exception => { - logger.error(s"Failed to get properties ${properties} of" + - s" item ${entityId}. Exception: ${e}.") - throw e - } - } - (entityId, item) - }.cache() - - // get all "user" "view" "item" events - val viewEventsRDD: RDD[ViewEvent] = eventsDb.find( - appId = dsp.appId, - entityType = Some("user"), - eventNames = Some(List("view")), - // targetEntityType is optional field of an event. - targetEntityType = Some(Some("item")))(sc) - // eventsDb.find() returns RDD[Event] - .map { event => - val viewEvent = try { - event.event match { - case "view" => ViewEvent( - user = event.entityId, - item = event.targetEntityId.get, - t = event.eventTime.getMillis) - case _ => throw new Exception(s"Unexpected event ${event} is read.") - } - } catch { - case e: Exception => { - logger.error(s"Cannot convert ${event} to ViewEvent." + - s" Exception: ${e}.") - throw e - } - } - viewEvent - }.cache() - - new TrainingData( - users = usersRDD, - items = itemsRDD, - viewEvents = viewEventsRDD - ) - } -} - -case class User() - -case class Item( - title: String, - date: String, - imdbUrl: String, - categories: Option[List[String]]) - -case class ViewEvent(user: String, item: String, t: Long) - -class TrainingData( - val users: RDD[(String, User)], - val items: RDD[(String, Item)], - val viewEvents: RDD[ViewEvent] -) { - override def toString = { - s"users: [${users.count()} (${users.take(2).toList}...)]" + - s"items: [${items.count()} (${items.take(2).toList}...)]" + - s"viewEvents: [${viewEvents.count()}] (${viewEvents.take(2).toList}...)" - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Engine.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Engine.scala b/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Engine.scala deleted file mode 100644 index 1133a61..0000000 --- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Engine.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.IEngineFactory -import org.apache.predictionio.controller.Engine - -case class Query( - items: List[String], - num: Int, - categories: Option[Set[String]], - whiteList: Option[Set[String]], - blackList: Option[Set[String]] -) - -case class PredictedResult( - itemScores: Array[ItemScore] -) - -case class ItemScore( - item: String, - title: String, - date: String, - imdbUrl: String, - score: Double -) - -object SimilarProductEngine extends IEngineFactory { - def apply() = { - new Engine( - classOf[DataSource], - classOf[Preparator], - Map("als" -> classOf[ALSAlgorithm]), - classOf[Serving]) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Preparator.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Preparator.scala b/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Preparator.scala deleted file mode 100644 index d747a84..0000000 --- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Preparator.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.PPreparator - -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.RDD - -class Preparator - extends PPreparator[TrainingData, PreparedData] { - - def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = { - new PreparedData( - users = trainingData.users, - items = trainingData.items, - viewEvents = trainingData.viewEvents) - } -} - -class PreparedData( - val users: RDD[(String, User)], - val items: RDD[(String, Item)], - val viewEvents: RDD[ViewEvent] -) extends Serializable \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Serving.scala ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Serving.scala b/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Serving.scala deleted file mode 100644 index 58c3b4e..0000000 --- a/examples/scala-parallel-similarproduct/add-and-return-item-properties/src/main/scala/Serving.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.template.similarproduct - -import org.apache.predictionio.controller.LServing - -class Serving - extends LServing[Query, PredictedResult] { - - override - def serve(query: Query, - predictedResults: Seq[PredictedResult]): PredictedResult = { - predictedResults.head - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-predictionio/blob/76f34090/examples/scala-parallel-similarproduct/add-rateevent/README.md ---------------------------------------------------------------------- diff --git a/examples/scala-parallel-similarproduct/add-rateevent/README.md b/examples/scala-parallel-similarproduct/add-rateevent/README.md deleted file mode 100644 index 6c5c471..0000000 --- a/examples/scala-parallel-similarproduct/add-rateevent/README.md +++ /dev/null @@ -1,156 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. ---> - -# Similar Product Template Modified to Add Explicit Rate Event - -This example engine is based on Similar Product Tempplate version v0.1.2 and is modified to add Explicit Rate Event to training data. - -For example, An User would rate an item with a score or rating.The rating is used to train the model. - -## Documentation - -Please refer to http://predictionio.incubator.apache.org/templates/similarproduct/quickstart/ - - - -## Development Notes - -### Changes to DataSource.scala - -1) class "Rating" is created. -``` -case class Rating( - user: String, - item: String, - rating: Double, - t:Long -) -``` - -2) "rateEventsRDD" is initialized by filtering events of type "rate" from Events database as shown below. - -``` - val rateEventsRDD: RDD[RateEvent] = eventsDb.find( - appId = dsp.appId, - entityType = Some("user"), - eventNames = Some(List("rate")... - - val rateEvent = try { - event.event match { - case "rate" => RateEvent( - user = event.entityId, - item = event.targetEntityId.get, - rating = event.properties.get[Double]("rating")... -``` - -### Changes to Preparator.scala - -1) val "rateEvents" is added to class "PreparedData" - -``` - class PreparedData( - val users: RDD[(String, User)], - val items: RDD[(String, Item)], - val rateEvents: RDD[RateEvent] -``` - -2) val "rateEvents" is initialized in the Object of class "PreparedData". - -``` - new PreparedData( - users = trainingData.users, - items = trainingData.items, - rateEvents = trainingData.rateEvents) -``` - -### Changes to ALSAlgorithm.scala - -1) Changed the Signature of "train" method to include SparkContext and edited method definition to specify the debug message. - -``` - def train(sc:SparkContext ,data: PreparedData): ALSModel = { - require(!data.rateEvents.take(1).isEmpty, - s"rateEvents in PreparedData cannot be empty." + -``` - -2) MlibRatings are initialized from rateEvents. - -``` - val mllibRatings = data.rateEvents -``` - -3) Invoke "ALS.train" method to train explicit rate events. -``` - val m = ALS.train( - ratings = mllibRatings, - rank = ap.rank, - iterations = ap.numIterations, - lambda = ap.lambda, - blocks = -1, - seed = seed) -``` - -4) Define "rateEvent" RDD to filter rate events from events. - -``` - val rateEventsRDD: RDD[RateEvent] = eventsDb.find(...) -``` - -5) if a user may rate same item with different value at different times,use the latest value for this case. - -``` - .reduceByKey { case (v1, v2) => // MODIFIED - // if a user may rate same item with different value at different times, - // use the latest value for this case. - // Can remove this reduceByKey() if no need to support this case. - val (rating1, t1) = v1 - val (rating2, t2) = v2 - // keep the latest value - if (t1 > t2) v1 else v2 - } -``` - -6) persist mlibRating. -``` - .map { case ((u, i), (rating, t)) => // MODIFIED - // MLlibRating requires integer index for user and item - MLlibRating(u, i, rating) // MODIFIED - }.cache() -``` - - -7) Add "rateEvent" to class "TrainingData". - -``` - class TrainingData( - val users: RDD[(String, User)], - val items: RDD[(String, Item)], - val rateEvents: RDD[RateEvent] - ) -``` - -8) Add "rateEvent" to object "TrainingData". - -``` - new TrainingData( - users = usersRDD, - items = itemsRDD, - rateEvents = rateEventsRDD) -``` - - -
