Repository: bigtop Updated Branches: refs/heads/master 770d50b65 -> 0a66c49e5
[BigPetStore] Add Spark Product Recommender example Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/0a66c49e Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/0a66c49e Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/0a66c49e Branch: refs/heads/master Commit: 0a66c49e518f744b53e3d952b0f8083d1e4228c4 Parents: 770d50b Author: RJ Nowling <[email protected]> Authored: Wed Feb 18 16:42:51 2015 -0600 Committer: jayunit100 <[email protected]> Committed: Thu Feb 19 20:32:02 2015 -0500 ---------------------------------------------------------------------- bigtop-bigpetstore/bigpetstore-spark/README.md | 11 ++ .../spark/analytics/RecommendProducts.scala | 118 +++++++++++++++++++ .../bigpetstore/spark/datamodel/DataModel.scala | 4 + .../bigpetstore/spark/datamodel/IOUtils.scala | 16 +++ .../bigpetstore/spark/TestFullPipeline.scala | 7 ++ 5 files changed, 156 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bigtop/blob/0a66c49e/bigtop-bigpetstore/bigpetstore-spark/README.md ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md b/bigtop-bigpetstore/bigpetstore-spark/README.md index 6ec7489..ac2f584 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/README.md +++ b/bigtop-bigpetstore/bigpetstore-spark/README.md @@ -146,3 +146,14 @@ This will output a JSON file to the /tmp directory, which has formatting (approx Of course, the above data is for a front end web app which will display charts/summary stats of the transactions. Keep tracking Apache BigTop for updates on this front ! + +Running the Product Recommendation Component +-------------------------------------------- + +BigPetStore can recommend products to customers using the alternating least squares (ALS) algorithm. The recommender can be run as follows: + +``` +spark-submit --master local[2] --class org.apache.bigtop.bigpetstore.spark.analytics.RecommendProducts bigpetstore-spark-X.jar transformed\_data recommendations.json +``` + +The resulting json file will contain lists of customers, products, and products recommended to each customer. http://git-wip-us.apache.org/repos/asf/bigtop/blob/0a66c49e/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/RecommendProducts.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/RecommendProducts.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/RecommendProducts.scala new file mode 100644 index 0000000..5ac0fba --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/RecommendProducts.scala @@ -0,0 +1,118 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.bigtop.bigpetstore.spark.analytics + +import org.apache.bigtop.bigpetstore.spark.datamodel._ +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd._ +import org.apache.spark.mllib.recommendation._ + +import java.io.File + +case class PRParameters(inputDir: String, outputFile: String) + +object RecommendProducts { + + private def printUsage() { + val usage = "BigPetStore Product Recommendation Module\n" + + "\n" + + "Usage: transformed_data recommendations\n" + + "\n" + + "transformed_data - (string) directory of ETL'd data\n" + + "recommendations - (string) output file of recommendations\n" + + println(usage) + } + + def parseArgsOrDie(args: Array[String]): PRParameters = { + if(args.length != 2) { + printUsage(); + System.exit(1) + } + + PRParameters(args(0), args(1)) + } + + def prepareRatings(tx: RDD[Transaction]): RDD[Rating] = { + val productPairs = tx.map { t => ((t.customerId, t.productId), 1) } + val pairCounts = productPairs.reduceByKey { case (v1, v2) => v1 + v2 } + val ratings = pairCounts.map { p => Rating(p._1._1.toInt, p._1._2.toInt, p._2) } + + ratings + } + + def trainModel(ratings: RDD[Rating], nIterations: Int, rank: Int, alpha: Double, + lambda: Double): MatrixFactorizationModel = { + ratings.cache() + val model = ALS.trainImplicit(ratings, nIterations, rank, lambda, alpha) + + model + } + + def recommendProducts(customers: RDD[Customer], + model: MatrixFactorizationModel, sc: SparkContext, nRecommendations: Int): + Array[UserProductRecommendations] = { + + customers.collect().map { c => + val ratings = model.recommendProducts(c.customerId.toInt, nRecommendations) + + val productIds = ratings.map { r => r.product.toLong} + + UserProductRecommendations(c.customerId, productIds.toArray) + } + } + + /** + * We keep a "run" method which can be called easily from tests and also is used by main. + */ + def run(txInputDir: String, recOutputFile: String, sc: SparkContext, + nIterations: Int = 20, alpha: Double = 40.0, rank:Int = 10, lambda: Double = 1.0, + nRecommendations: Int = 5) { + + println("input : " + txInputDir) + println(sc) + val rdds = IOUtils.load(sc, txInputDir) + val tx = rdds._5 + val products = rdds._4 + val customers = rdds._3 + System.out.println("Transaction count = " + tx.count()) + + val ratings = prepareRatings(tx) + val model = trainModel(ratings, nIterations, rank, alpha, lambda) + val userProdRec = recommendProducts(customers, model, sc, nRecommendations) + + val prodRec = ProductRecommendations(customers.collect(), + products.collect(), + userProdRec) + + IOUtils.saveLocalAsJSON(new File(recOutputFile), prodRec) + } + + def main(args: Array[String]) { + val params: PRParameters = parseArgsOrDie(args) + + val conf = new SparkConf().setAppName("BPS Product Recommendations") + val sc = new SparkContext(conf) + + run(params.inputDir, params.outputFile, sc) + + sc.stop() + } + +} http://git-wip-us.apache.org/repos/asf/bigtop/blob/0a66c49e/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala index 59cc304..4ec30e2 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala @@ -72,3 +72,7 @@ case class Transaction(customerId: Long, transactionId: Long, storeId: Long, dat */ case class TransactionSQL(customerId: Long, transactionId: Long, storeId: Long, timestamp:Timestamp, productId: Long, year:Int, month:Int, day:Int, hour:Int, minute:Int ) + +case class UserProductRecommendations(customerId: Long, productIds: Array[Long]) + +case class ProductRecommendations(customers: Array[Customer], products: Array[Product], recommendations: Array[UserProductRecommendations]) http://git-wip-us.apache.org/repos/asf/bigtop/blob/0a66c49e/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala index 5432e3c..1485ac3 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala @@ -84,6 +84,22 @@ object IOUtils { read[Statistics](scala.io.Source.fromFile(jsonFile).getLines.reduceLeft(_+_)) } + def saveLocalAsJSON(outputDir: File, recommendations:ProductRecommendations) { + //load the write/read methods. + implicit val formats = Serialization.formats(NoTypeHints) + val json:String = write(recommendations) + Files.write(outputDir.toPath, json.getBytes(StandardCharsets.UTF_8)) + } + + def readLocalAsProductRecommendations(jsonFile: File):ProductRecommendations = { + //load the write/read methods. + implicit val formats = Serialization.formats(NoTypeHints) + //Read file as String, and serialize it into Stats object. + //See http://json4s.org/ examples. + read[ProductRecommendations](scala.io.Source.fromFile(jsonFile).getLines.reduceLeft(_+_)) + } + + /** * Load RDDs of the data model from Sequence files. * http://git-wip-us.apache.org/repos/asf/bigtop/blob/0a66c49e/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala index 9d5cb84..1d8c875 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala @@ -1,6 +1,7 @@ package org.apache.bigpetstore.spark import org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics +import org.apache.bigtop.bigpetstore.spark.analytics.RecommendProducts import org.apache.bigtop.bigpetstore.spark.datamodel.{Statistics, IOUtils} import org.apache.bigtop.bigpetstore.spark.etl.ETLParameters import org.apache.bigtop.bigpetstore.spark.etl.SparkETL @@ -74,6 +75,12 @@ class TestFullPipeline extends FunSuite with BeforeAndAfterAll { assert(stats.productDetails.length === products) assert(stats.transactionsByMonth.length === 12) + val recommJson = new File(tmpDir,"recommendations.json") + RecommendProducts.run(etlDir.getAbsolutePath, + recommJson.getAbsolutePath, + sc, nIterations=5) + + sc.stop() } }
