Repository: bigtop
Updated Branches:
  refs/heads/master b8ea3e0de -> 770d50b65


[BigPetStore] Clean up Spark SQL analytics module. Adds transactions/month and 
transactions/product output.


Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/770d50b6
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/770d50b6
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/770d50b6

Branch: refs/heads/master
Commit: 770d50b65a0585bc90251696910668377c6d24b6
Parents: b8ea3e0
Author: RJ Nowling <[email protected]>
Authored: Wed Feb 18 08:27:26 2015 -0600
Committer: jayunit100 <[email protected]>
Committed: Wed Feb 18 15:54:41 2015 -0500

----------------------------------------------------------------------
 bigtop-bigpetstore/bigpetstore-spark/README.md  |  11 +-
 .../spark/analytics/PetStoreStatistics.scala    | 189 +++++++++++--------
 .../bigpetstore/spark/datamodel/DataModel.scala |  22 ++-
 .../bigpetstore/spark/datamodel/IOUtils.scala   |   2 +-
 .../bigpetstore/spark/TestFullPipeline.scala    |  26 ++-
 .../spark/analytics/AnalyticsSuite.scala        |   2 +-
 .../spark/datamodel/IOUtilsSuite.scala          |   2 +-
 7 files changed, 150 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bigtop/blob/770d50b6/bigtop-bigpetstore/bigpetstore-spark/README.md
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md 
b/bigtop-bigpetstore/bigpetstore-spark/README.md
index caf4276..6ec7489 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/README.md
+++ b/bigtop-bigpetstore/bigpetstore-spark/README.md
@@ -108,11 +108,18 @@ from files.  To run the analytics job, which outputs a 
JSON file at the end, you
 spark-submit --master local[2] --class 
org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics 
bigpetstore-spark-X.jar transformed\_data PetStoreStats.json
 ```
 
+Current queries include:
+
+1. Total Transactions
+2. Transaction Counts by Month
+3. Transaction Counts by Product
+4. Transaction Counts by Product and Store Zipcode
+
 This will output a JSON file to the /tmp directory, which has formatting 
(approximately) like this.
 
 ```
 {
-   "totalTransaction":12,
+   "totalTransaction":34586,
    "transactionsByZip":[
   
{"count":64,"productId":54,"zipcode":"94583"},{"count":38,"productId":18,"zipcode":"34761"},
    
{"count":158,"productId":14,"zipcode":"11368"},{"count":66,"productId":46,"zipcode":"33027"},
@@ -138,4 +145,4 @@ This will output a JSON file to the /tmp directory, which 
has formatting (approx
 ```
 
 Of course, the above data is for a front end web app which will display 
charts/summary stats of the transactions.
-Keep tracking Apache BigTop for updates on this front !
\ No newline at end of file
+Keep tracking Apache BigTop for updates on this front !

http://git-wip-us.apache.org/repos/asf/bigtop/blob/770d50b6/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
index c97bf92..2d376a4 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
@@ -17,32 +17,29 @@
 
 package org.apache.bigtop.bigpetstore.spark.analytics
 
-
+import java.io.File
 import java.sql.Timestamp
 
-import org.joda.time.DateTime
+import scala.Nothing
 
-import _root_.org.apache.spark.SparkConf
-import org.apache.bigtop.bigpetstore.spark.datamodel._
-import org.apache.spark.sql._;
-import scala.Nothing;
-import org.apache.hadoop.fs.Path
-import scala.collection.JavaConversions._
+import org.apache.spark.sql._
 import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.SparkContext._
 import org.apache.spark.rdd._
 
-import java.util.{Calendar, ArrayList, Date}
-import scala.util.Random
-import java.io.File
+import org.joda.time.DateTime
 import org.json4s.JsonDSL.WithBigDecimal._
 
+import org.apache.bigtop.bigpetstore.spark.datamodel._
+
 object PetStoreStatistics {
 
     private def printUsage() {
-      val usage: String =
-        "BigPetStore Analytics Module. Usage: inputDir, outputDir.\n " +
-        "Ouptut is a JSON file in outputDir.  For schema, see the code." ;
+      val usage: String = "BigPetStore Analytics Module." +
+      "\n" +
+      "Usage: spark-submit ... inputDir outputFile\n " +
+      "inputDir - (string) Path to ETL'd data\n" +
+      "outputFile - (string) is a JSON file.  For schema, see the code.\n"
 
       System.err.println(usage)
     }
@@ -54,95 +51,131 @@ object PetStoreStatistics {
    * @param args
    * @return
    */
-    def parseArgs(args: Array[String]):(Option[Path],Option[File]) = {
-      (if(args.length < 1) { System.err.println("ERROR AT ARG 1: Missing INPUT 
path"); None } else Some(new Path(args(0))),
-       if(args.length < 2) { System.err.println("ERROR AT ARG 2: Missing 
OUTPUT path");; None } else Some(new File(args(1))))
+    def parseArgs(args: Array[String]):(Option[String],Option[String]) = {
+      if(args.length < 1) {
+        (None, None)
+      } else if (args.length == 1) {
+        (Some(args(0)), None)
+      } else {
+        (Some(args(0)), Some(args(1)))
+      }
     }
 
   def productMap(r:Array[Product]) : Map[Long,Product] = {
     r map (prod => prod.productId -> prod) toMap
   }
 
-    def totalTransactions(r:(RDD[Location], RDD[Store], RDD[Customer], 
RDD[Product], RDD[Transaction]),
-                          sc: SparkContext): Statistics = {
-      val sqlContext = new org.apache.spark.sql.SQLContext(sc);
-
-      import sqlContext._;
-
-      /**
-       * Transform the non-sparksql mappable calendar
-       * into a spark sql freindly field.
-       */
-      val mappableTransactions:RDD[TransactionSQL] =
-        /**
-        * Map the RDD[Transaction] -> RDD[TransactionSQL] so that we can run 
SparkSQL against it.
-        */
-        r._5.map(trans => trans.toSQL())
+  def queryTxByMonth(sqlContext: SQLContext): Array[StatisticsTxByMonth] = {
+    import sqlContext._
 
-        mappableTransactions.registerTempTable("transactions");
+    val results: SchemaRDD = sql("SELECT count(*), month FROM Transactions 
GROUP BY month")
+    val transactionsByMonth = results.collect()
+    for(x<-transactionsByMonth){
+      println(x)
+    }
 
-        r._2.registerTempTable("Stores")
+    transactionsByMonth.map { r =>
+      StatisticsTxByMonth(r.getInt(1), r.getLong(0))
+    }
+  }
 
-      val results: SchemaRDD = sql("SELECT month,count(*) FROM transactions 
group by month")
-      val transactionsByMonth = results.collect();
-      for(x<-transactionsByMonth){
-        println(x);
-      }
+  def queryTxByProductZip(sqlContext: SQLContext): 
Array[StatisticsTxByProductZip] = {
+    import sqlContext._
 
-      val results2: SchemaRDD = sql(
-        """SELECT count(*) c, productId , zipcode
-FROM transactions t
+    val results: SchemaRDD = sql(
+      """SELECT count(*) c, productId, zipcode
+FROM Transactions t
 JOIN Stores s ON t.storeId = s.storeId
 GROUP BY productId, zipcode""")
-      val groupedProductZips = results2.collect();
 
-      //get list of all transactionsData
-      for(x<-groupedProductZips){
-        println("grouped product:zip " + x);
-      }
+    val groupedProductZips = results.collect()
 
-      return Statistics(
-        results.count(), // Total number of transaction
-        results2.collect().map(r => {
-          //Map JDBC Row into a Serializable case class.
-          StatisticsTrByZip(r.getLong(0),r.getLong(1),r.getString(2))
-        }),
-        r._4.collect()); // Product details.
+    //get list of all transactionsData
+    for(x<-groupedProductZips){
+      println("grouped product:zip " + x)
     }
 
-    /**
-    * We keep a "run" method which can be called easily from tests and also is 
used by main.
-    */
-    def run(transactionsInputDir:String, sc:SparkContext): Statistics = {
-      System.out.println("input : " + transactionsInputDir);
-      val stats = totalTransactions(IOUtils.load(sc,transactionsInputDir), sc);
-      sc.stop()
-      stats
+    //Map JDBC Row into a Serializable case class.
+    groupedProductZips.map { r =>
+      StatisticsTxByProductZip(r.getLong(1),r.getString(2),r.getLong(0))
     }
+  }
 
-  def main(args: Array[String]) {
-      main(
-        args,
-        new SparkContext(new SparkConf().setAppName("PetStoreStatistics")));
+  def queryTxByProduct(sqlContext: SQLContext): Array[StatisticsTxByProduct] = 
{
+    import sqlContext._
+
+    val results: SchemaRDD = sql(
+      """SELECT count(*) c, productId FROM Transactions GROUP BY productId""")
+
+    val groupedProducts = results.collect()
+
+    //Map JDBC Row into a Serializable case class.
+    groupedProducts.map { r =>
+      StatisticsTxByProduct(r.getLong(1),r.getLong(0))
+    }
   }
 
-  def main(args: Array[String], context:SparkContext) = {
-      // Get or else : On failure (else) we exit.
-      val (inputPath,outputPath)= parseArgs(args);
 
-      if(! (inputPath.isDefined && outputPath.isDefined)) {
-        printUsage()
-        System.exit(1)
-      }
+  def runQueries(r:(RDD[Location], RDD[Store], RDD[Customer], RDD[Product],
+    RDD[Transaction]), sc: SparkContext): Statistics = {
+
+    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+    import sqlContext._
+
+    // Transform the Non-SparkSQL Calendar into a SparkSQL-friendly field.
+    val mappableTransactions:RDD[TransactionSQL] =
+      r._5.map { trans => trans.toSQL() }
+
+    r._1.registerTempTable("Locations")
+    r._2.registerTempTable("Stores")
+    r._3.registerTempTable("Customers")
+    r._4.registerTempTable("Product")
+    mappableTransactions.registerTempTable("Transactions")
+
+
+    val txByMonth = queryTxByMonth(sqlContext)
+    val txByProduct = queryTxByProduct(sqlContext)
+    val txByProductZip = queryTxByProductZip(sqlContext)
+
+    return Statistics(
+      txByMonth.map { s => s.count }.reduce(_+_),  // Total number of 
transactions
+      txByMonth,
+      txByProduct,
+      txByProductZip,
+      r._4.collect()) // Product details
+  }
 
-      System.out.println("Running w/ input = " + inputPath);
+    /**
+    * We keep a "run" method which can be called easily from tests and also is 
used by main.
+    */
+    def run(txInputDir:String, statsOutputFile:String,
+      sc:SparkContext) {
 
-      val stats:Statistics = run(inputPath.get.toUri.getPath, context);
+      System.out.println("Running w/ input = " + txInputDir)
 
-      IOUtils.saveLocalAsJSON(outputPath.get, stats)
+      System.out.println("input : " + txInputDir)
+      val etlData = IOUtils.load(sc, txInputDir)
 
-      System.out.println("Output JSON Stats stored : " + outputPath.get);
+      val stats = runQueries(etlData, sc)
 
+      IOUtils.saveLocalAsJSON(new File(statsOutputFile), stats)
+
+      System.out.println("Output JSON Stats stored : " + statsOutputFile)
     }
 
-}
\ No newline at end of file
+  def main(args: Array[String]) {
+    // Get or else : On failure (else) we exit.
+    val (inputPath,outputPath) = parseArgs(args)
+
+    if(! (inputPath.isDefined && outputPath.isDefined)) {
+      printUsage()
+      System.exit(1)
+    }
+
+    val sc = new SparkContext(new SparkConf().setAppName("PetStoreStatistics"))
+
+    run(inputPath.get, outputPath.get, sc)
+
+    sc.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/770d50b6/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
index 8eaf707..59cc304 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
@@ -26,13 +26,20 @@ import org.json4s.CustomSerializer
 import org.json4s.JsonAST.{JString, JField, JInt, JObject}
 
 /**
- * Statistics phase.  Represents A JSON for a front end.
- * Currently, transactionByZip schema = count, productId, zip
- * */
+ * Statistics phase.  Represents JSON for a front end.
+ */
+
+case class StatisticsTxByMonth(month: Int, count: Long)
 
-case class StatisticsTrByZip(count:Long, productId:Long, zipcode:String)
+case class StatisticsTxByProductZip(productId:Long, zipcode:String, count:Long)
 
-case class Statistics(totalTransaction: Long, transactionsByZip: 
Array[StatisticsTrByZip], productDetails:Array[Product])
+case class StatisticsTxByProduct(count: Long, productId: Long)
+
+case class Statistics(totalTransactions: Long,
+  transactionsByMonth: Array[StatisticsTxByMonth],
+  transactionsByProduct: Array[StatisticsTxByProduct],
+  transactionsByProductZip: Array[StatisticsTxByProductZip],
+  productDetails:Array[Product])
 
 case class Customer(customerId: Long, firstName: String,
   lastName: String, zipcode: String)
@@ -44,13 +51,14 @@ case class Product(productId: Long, category: String, 
attributes: Map[String, St
 case class Store(storeId: Long, zipcode: String)
 
 case class Transaction(customerId: Long, transactionId: Long, storeId: Long, 
dateTime: Calendar, productId: Long){
+
   /**
    * Convert to TransactionSQL.
    * There possibly could be a conversion.
    */
   def toSQL(): TransactionSQL = {
-    val dt = new DateTime(dateTime);
-    val ts = new Timestamp(dt.getMillis);
+    val dt = new DateTime(dateTime)
+    val ts = new Timestamp(dt.getMillis)
     return TransactionSQL(customerId,transactionId,storeId,
       new Timestamp(
         new DateTime(dateTime).getMillis),

http://git-wip-us.apache.org/repos/asf/bigtop/blob/770d50b6/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
index a4d1486..5432e3c 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
@@ -81,7 +81,7 @@ object IOUtils {
     implicit val formats = Serialization.formats(NoTypeHints)
     //Read file as String, and serialize it into Stats object.
     //See http://json4s.org/ examples.
-    
read[Statistics](scala.io.Source.fromFile(jsonFile).getLines.reduceLeft(_+_));
+    
read[Statistics](scala.io.Source.fromFile(jsonFile).getLines.reduceLeft(_+_))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/bigtop/blob/770d50b6/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
index e460e2e..9d5cb84 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
@@ -30,7 +30,7 @@ class TestFullPipeline extends FunSuite with 
BeforeAndAfterAll {
   val sc = new SparkContext(conf)
 
   override def afterAll() {
-    sc.stop();
+    sc.stop()
   }
 
   test("Full integration test.") {
@@ -40,7 +40,7 @@ class TestFullPipeline extends FunSuite with 
BeforeAndAfterAll {
 
     //stores, customers, days, randomSeed
     val parameters:Array[String] = Array(tmpDir.toString(), "10", "1000", 
"365.0","123456789")
-    SparkDriver.parseArgs(parameters);
+    SparkDriver.parseArgs(parameters)
 
     val transactionRDD = SparkDriver.generateData(sc)
     SparkDriver.writeData(transactionRDD)
@@ -49,33 +49,31 @@ class TestFullPipeline extends FunSuite with 
BeforeAndAfterAll {
     val etlDir:File = Files.createTempDirectory("BPSTest_ETL2").toFile()
     System.out.println(etlDir.getAbsolutePath + "== "+etlDir.list())
 
-    val (locations,stores,customers,products,transactions) = SparkETL.run(sc, 
new ETLParameters(tmpDir.getAbsolutePath,etlDir.getAbsolutePath));
+    val (locations,stores,customers,products,transactions) = SparkETL.run(sc, 
new ETLParameters(tmpDir.getAbsolutePath,etlDir.getAbsolutePath))
 
     // assert(locations==400L) TODO : This seems to vary (325,400,)
     assert(stores==10L)
     assert(customers==1000L)
     assert(products==55L)
     //assert(transactions==45349L)
-    val analyticsJson = new File(tmpDir,"analytics.json")
+
     //Now do the analytics.
+    val analyticsJson = new File(tmpDir,"analytics.json")
 
-    PetStoreStatistics.main(
-      Array(
-        etlDir.getAbsolutePath,
-        analyticsJson.getAbsolutePath),
-      sc);
+    PetStoreStatistics.run(etlDir.getAbsolutePath,
+      analyticsJson.getAbsolutePath, sc)
 
-    val stats:Statistics = IOUtils.readLocalAsStatistics(analyticsJson);
+    val stats:Statistics = IOUtils.readLocalAsStatistics(analyticsJson)
 
     /**
      * Assert some very generic features.  We will refine this later once
      * consistency is implemented.
      * See https://github.com/rnowling/bigpetstore-data-generator/issues/38
      */
-    assert(stats.totalTransaction > 5);
-    //TODO : Will add more assertions here, see comment above
-    assert(stats.productDetails.length > 10);
+    assert(stats.totalTransactions === transactions)
+    assert(stats.productDetails.length === products)
+    assert(stats.transactionsByMonth.length === 12)
 
     sc.stop()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/770d50b6/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
index b7724cc..d9ed390 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
@@ -38,6 +38,6 @@ class AnalyticsSuite extends FunSuite with BeforeAndAfterAll {
 
   test("product mapper") {
     val p = Product(1L, "cat1", Map(("a","a1"), ("b","b1")))
-    assert(PetStoreStatistics.productMap(Array(p)).get(1L).get === p);
+    assert(PetStoreStatistics.productMap(Array(p)).get(1L).get === p)
   }
 }

http://git-wip-us.apache.org/repos/asf/bigtop/blob/770d50b6/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
index 7b1e1f5..49ff1a4 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
@@ -40,7 +40,7 @@ class IOUtilsSuite extends FunSuite with BeforeAndAfterAll {
   val sc = new SparkContext(conf)
 
   override def afterAll() {
-    sc.stop();
+    sc.stop()
   }
 
   test("Saving & Loading data") {

Reply via email to