Repository: bigtop
Updated Branches:
  refs/heads/master 770d50b65 -> 0a66c49e5


[BigPetStore] Add Spark Product Recommender example


Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/0a66c49e
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/0a66c49e
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/0a66c49e

Branch: refs/heads/master
Commit: 0a66c49e518f744b53e3d952b0f8083d1e4228c4
Parents: 770d50b
Author: RJ Nowling <[email protected]>
Authored: Wed Feb 18 16:42:51 2015 -0600
Committer: jayunit100 <[email protected]>
Committed: Thu Feb 19 20:32:02 2015 -0500

----------------------------------------------------------------------
 bigtop-bigpetstore/bigpetstore-spark/README.md  |  11 ++
 .../spark/analytics/RecommendProducts.scala     | 118 +++++++++++++++++++
 .../bigpetstore/spark/datamodel/DataModel.scala |   4 +
 .../bigpetstore/spark/datamodel/IOUtils.scala   |  16 +++
 .../bigpetstore/spark/TestFullPipeline.scala    |   7 ++
 5 files changed, 156 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bigtop/blob/0a66c49e/bigtop-bigpetstore/bigpetstore-spark/README.md
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md 
b/bigtop-bigpetstore/bigpetstore-spark/README.md
index 6ec7489..ac2f584 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/README.md
+++ b/bigtop-bigpetstore/bigpetstore-spark/README.md
@@ -146,3 +146,14 @@ This will output a JSON file to the /tmp directory, which 
has formatting (approx
 
 Of course, the above data is for a front end web app which will display 
charts/summary stats of the transactions.
 Keep tracking Apache BigTop for updates on this front !
+
+Running the Product Recommendation Component
+--------------------------------------------
+
+BigPetStore can recommend products to customers using the alternating least 
squares (ALS) algorithm. The recommender can be run as follows:
+
+```
+spark-submit --master local[2] --class 
org.apache.bigtop.bigpetstore.spark.analytics.RecommendProducts 
bigpetstore-spark-X.jar transformed\_data recommendations.json
+```
+
+The resulting json file will contain lists of customers, products, and 
products recommended to each customer.

http://git-wip-us.apache.org/repos/asf/bigtop/blob/0a66c49e/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/RecommendProducts.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/RecommendProducts.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/RecommendProducts.scala
new file mode 100644
index 0000000..5ac0fba
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/RecommendProducts.scala
@@ -0,0 +1,118 @@
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one or more
+*  contributor license agreements.  See the NOTICE file distributed with
+*  this work for additional information regarding copyright ownership.
+*  The ASF licenses this file to You under the Apache License, Version 2.0
+*  (the "License"); you may not use this file except in compliance with
+*  the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License.
+*/
+
+package org.apache.bigtop.bigpetstore.spark.analytics
+
+import org.apache.bigtop.bigpetstore.spark.datamodel._
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+import org.apache.spark.mllib.recommendation._
+
+import java.io.File
+
+case class PRParameters(inputDir: String, outputFile: String)
+
+object RecommendProducts {
+
+  private def printUsage() {
+    val usage = "BigPetStore Product Recommendation Module\n" +
+    "\n" +
+    "Usage: transformed_data recommendations\n" +
+    "\n" +
+    "transformed_data - (string) directory of ETL'd data\n" +
+    "recommendations - (string) output file of recommendations\n"
+
+    println(usage)
+  }
+
+  def parseArgsOrDie(args: Array[String]): PRParameters = {
+    if(args.length != 2) {
+      printUsage();
+      System.exit(1)
+    }
+
+    PRParameters(args(0), args(1))
+  }
+
+  def prepareRatings(tx: RDD[Transaction]): RDD[Rating] = {
+    val productPairs = tx.map { t => ((t.customerId, t.productId), 1) }
+    val pairCounts = productPairs.reduceByKey { case (v1, v2) => v1 + v2 }
+    val ratings = pairCounts.map { p => Rating(p._1._1.toInt, p._1._2.toInt, 
p._2) }
+
+    ratings
+  }
+
+  def trainModel(ratings: RDD[Rating], nIterations: Int, rank: Int, alpha: 
Double,
+    lambda: Double): MatrixFactorizationModel = {
+    ratings.cache()
+    val model = ALS.trainImplicit(ratings, nIterations, rank, lambda, alpha)
+
+    model
+  }
+
+  def recommendProducts(customers: RDD[Customer],
+    model: MatrixFactorizationModel, sc: SparkContext, nRecommendations: Int):
+      Array[UserProductRecommendations] = {
+
+    customers.collect().map { c =>
+      val ratings = model.recommendProducts(c.customerId.toInt, 
nRecommendations)
+
+      val productIds = ratings.map { r => r.product.toLong}
+
+      UserProductRecommendations(c.customerId, productIds.toArray)
+    }
+  }
+
+  /**
+    * We keep a "run" method which can be called easily from tests and also is 
used by main.
+    */
+  def run(txInputDir: String, recOutputFile: String, sc: SparkContext,
+  nIterations: Int = 20, alpha: Double = 40.0, rank:Int = 10, lambda: Double = 
1.0,
+  nRecommendations: Int = 5) {
+
+    println("input : " + txInputDir)
+    println(sc)
+    val rdds = IOUtils.load(sc, txInputDir)
+    val tx = rdds._5
+    val products = rdds._4
+    val customers = rdds._3
+    System.out.println("Transaction count = " + tx.count())
+
+    val ratings = prepareRatings(tx)
+    val model = trainModel(ratings, nIterations, rank, alpha, lambda)
+    val userProdRec = recommendProducts(customers, model, sc, nRecommendations)
+
+    val prodRec = ProductRecommendations(customers.collect(),
+      products.collect(),
+      userProdRec)
+
+    IOUtils.saveLocalAsJSON(new File(recOutputFile), prodRec)
+  }
+
+  def main(args: Array[String]) {
+    val params: PRParameters = parseArgsOrDie(args)
+
+    val conf = new SparkConf().setAppName("BPS Product Recommendations")
+    val sc = new SparkContext(conf)
+
+    run(params.inputDir, params.outputFile, sc)
+
+    sc.stop()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/0a66c49e/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
index 59cc304..4ec30e2 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
@@ -72,3 +72,7 @@ case class Transaction(customerId: Long, transactionId: Long, 
storeId: Long, dat
  */
 case class TransactionSQL(customerId: Long, transactionId: Long, storeId: 
Long, timestamp:Timestamp, productId: Long,
                           year:Int, month:Int, day:Int, hour:Int, minute:Int )
+
+case class UserProductRecommendations(customerId: Long, productIds: 
Array[Long])
+
+case class ProductRecommendations(customers: Array[Customer], products: 
Array[Product], recommendations: Array[UserProductRecommendations])

http://git-wip-us.apache.org/repos/asf/bigtop/blob/0a66c49e/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
index 5432e3c..1485ac3 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
@@ -84,6 +84,22 @@ object IOUtils {
     
read[Statistics](scala.io.Source.fromFile(jsonFile).getLines.reduceLeft(_+_))
   }
 
+  def saveLocalAsJSON(outputDir: File, recommendations:ProductRecommendations) 
{
+    //load the write/read methods.
+    implicit val formats = Serialization.formats(NoTypeHints)
+    val json:String = write(recommendations)
+    Files.write(outputDir.toPath, json.getBytes(StandardCharsets.UTF_8))
+  }
+
+  def readLocalAsProductRecommendations(jsonFile: File):ProductRecommendations 
= {
+    //load the write/read methods.
+    implicit val formats = Serialization.formats(NoTypeHints)
+    //Read file as String, and serialize it into Stats object.
+    //See http://json4s.org/ examples.
+    
read[ProductRecommendations](scala.io.Source.fromFile(jsonFile).getLines.reduceLeft(_+_))
+  }
+
+
   /**
     * Load RDDs of the data model from Sequence files.
     *

http://git-wip-us.apache.org/repos/asf/bigtop/blob/0a66c49e/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
index 9d5cb84..1d8c875 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
@@ -1,6 +1,7 @@
 package org.apache.bigpetstore.spark
 
 import org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics
+import org.apache.bigtop.bigpetstore.spark.analytics.RecommendProducts
 import org.apache.bigtop.bigpetstore.spark.datamodel.{Statistics, IOUtils}
 import org.apache.bigtop.bigpetstore.spark.etl.ETLParameters
 import org.apache.bigtop.bigpetstore.spark.etl.SparkETL
@@ -74,6 +75,12 @@ class TestFullPipeline extends FunSuite with 
BeforeAndAfterAll {
     assert(stats.productDetails.length === products)
     assert(stats.transactionsByMonth.length === 12)
 
+    val recommJson = new File(tmpDir,"recommendations.json")
+    RecommendProducts.run(etlDir.getAbsolutePath,
+      recommJson.getAbsolutePath,
+      sc, nIterations=5)
+
+
     sc.stop()
   }
 }

Reply via email to