Repository: bigtop
Updated Branches:
  refs/heads/master 771ca5670 -> b7e369edd


BIGTOP-1653. Add queries for customer, state, and product statistics w/ d3 
friendly JSON output to analytics phase.


Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/b7e369ed
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/b7e369ed
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/b7e369ed

Branch: refs/heads/master
Commit: b7e369edd6fa7b97ef4399c56ce66b766035f008
Parents: 771ca56
Author: jayunit100 <[email protected]>
Authored: Mon Feb 16 20:51:39 2015 -0500
Committer: jayunit100 <[email protected]>
Committed: Tue Feb 17 10:45:55 2015 -0500

----------------------------------------------------------------------
 bigtop-bigpetstore/bigpetstore-spark/README.md  |  42 +++++++
 .../bigpetstore-spark/build.gradle              |   3 +
 .../spark/analytics/PetStoreStatistics.scala    | 121 +++++++++++++------
 .../bigpetstore/spark/datamodel/DataModel.scala |  38 +++++-
 .../bigpetstore/spark/datamodel/IOUtils.scala   |  30 +++++
 .../bigpetstore/spark/TestFullPipeline.scala    |  23 +++-
 .../spark/analytics/AnalyticsSuite.scala        |  43 +++++++
 7 files changed, 259 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bigtop/blob/b7e369ed/bigtop-bigpetstore/bigpetstore-spark/README.md
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md 
b/bigtop-bigpetstore/bigpetstore-spark/README.md
index 0ed1a05..caf4276 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/README.md
+++ b/bigtop-bigpetstore/bigpetstore-spark/README.md
@@ -97,3 +97,45 @@ After building the jar (see above), you can run the ETL 
component like so:
 ```
 spark-submit --master local[2] --class 
org.apache.bigtop.bigpetstore.spark.etl.SparkETL bigpetstore-spark-X.jar 
generated\_data transformed\_data
 ```
+
+Running the SparkSQL component
+-------------------------------
+
+Once ETL'd we can now process the data and do analytics on it.  The 
DataModel.scala class itself is used to read/write classes
+from files.  To run the analytics job, which outputs a JSON file at the end, 
you now will run the following:
+
+```
+spark-submit --master local[2] --class 
org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics 
bigpetstore-spark-X.jar transformed\_data PetStoreStats.json
+```
+
+This will output a JSON file to the /tmp directory, which has formatting 
(approximately) like this.
+
+```
+{
+   "totalTransaction":12,
+   "transactionsByZip":[
+  
{"count":64,"productId":54,"zipcode":"94583"},{"count":38,"productId":18,"zipcode":"34761"},
+   
{"count":158,"productId":14,"zipcode":"11368"},{"count":66,"productId":46,"zipcode":"33027"},
+   
{"count":52,"productId":27,"zipcode":"94583"},{"count":84,"productId":19,"zipcode":"33027"},
+   
{"count":143,"productId":0,"zipcode":"94583"},{"count":58,"productId":41,"zipcode":"72715"},
+   
{"count":76,"productId":54,"zipcode":"15014"},{"count":118,"productId":52,"zipcode":"45439"}},
+     ..... (several more) ....
+   "productDetails":[
+      {
+         "productId":0,
+         "category":"kitty litter",
+         "attributes":{
+            "category":"kitty litter",
+            "brand":"Pretty Cat",
+            "size":"7.0",
+            "per_unit_cost":"1.43"
+         }
+      },
+      {
+         "productId":2,
+         "category":"dry cat food",
+         "attributes":{
+```
+
+Of course, the above data is for a front end web app which will display 
charts/summary stats of the transactions.
+Keep tracking Apache BigTop for updates on this front !
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/b7e369ed/bigtop-bigpetstore/bigpetstore-spark/build.gradle
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/bigpetstore-spark/build.gradle 
b/bigtop-bigpetstore/bigpetstore-spark/build.gradle
index 35ff7bc..6f3b2d0 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/build.gradle
+++ b/bigtop-bigpetstore/bigpetstore-spark/build.gradle
@@ -117,10 +117,13 @@ def updateDependencyVersion(dependencyDetails, 
dependencyString) {
 dependencies {
     compile "org.apache.spark:spark-assembly_2.10:${sparkVersion}"
     compile "com.github.rnowling.bigpetstore:bigpetstore-data-generator:0.2.1"
+    compile "joda-time:joda-time:2.7"
+    compile "org.json4s:json4s-jackson_2.10:3.1.0"
 
     testCompile "junit:junit:4.11"
     testCompile "org.hamcrest:hamcrest-all:1.3"
     testCompile "org.scalatest:scalatest_2.10:2.2.1"
+    testCompile "joda-time:joda-time:2.7"
 }
 
 task listJars << {

http://git-wip-us.apache.org/repos/asf/bigtop/blob/b7e369ed/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
index 03d2012..c97bf92 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
@@ -15,32 +15,34 @@
 *  limitations under the License.
 */
 
-package org.apache.bigtop.bigpetstore.spark.generator
+package org.apache.bigtop.bigpetstore.spark.analytics
+
+
+import java.sql.Timestamp
+
+import org.joda.time.DateTime
 
 import _root_.org.apache.spark.SparkConf
-import org.apache.bigtop.bigpetstore.spark.datamodel.{IOUtils, 
Statistics,Transaction}
+import org.apache.bigtop.bigpetstore.spark.datamodel._
+import org.apache.spark.sql._;
 import scala.Nothing;
-import com.github.rnowling.bps.datagenerator.datamodels.inputs.ZipcodeRecord
-import com.github.rnowling.bps.datagenerator.datamodels._
-import 
com.github.rnowling.bps.datagenerator.{DataLoader,StoreGenerator,CustomerGenerator
 => CustGen, PurchasingProfileGenerator,TransactionGenerator}
-import com.github.rnowling.bps.datagenerator.framework.SeedFactory
 import org.apache.hadoop.fs.Path
 import scala.collection.JavaConversions._
 import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.SparkContext._
 import org.apache.spark.rdd._
 
-import java.util.ArrayList
+import java.util.{Calendar, ArrayList, Date}
 import scala.util.Random
 import java.io.File
-import java.util.Date
+import org.json4s.JsonDSL.WithBigDecimal._
 
 object PetStoreStatistics {
 
     private def printUsage() {
       val usage: String =
-        "BigPetStore Analytics Module.\n" +
-          "Usage: inputDir\n" ;
+        "BigPetStore Analytics Module. Usage: inputDir, outputDir.\n " +
+        "Ouptut is a JSON file in outputDir.  For schema, see the code." ;
 
       System.err.println(usage)
     }
@@ -52,44 +54,95 @@ object PetStoreStatistics {
    * @param args
    * @return
    */
-    def parseArgs(args: Array[String]):Option[Path] = {
-      if(args.length != 1) {
-        printUsage();
-        return None;
-      }
-      //success, return path.
-      Some(new Path(args(0)));
+    def parseArgs(args: Array[String]):(Option[Path],Option[File]) = {
+      (if(args.length < 1) { System.err.println("ERROR AT ARG 1: Missing INPUT 
path"); None } else Some(new Path(args(0))),
+       if(args.length < 2) { System.err.println("ERROR AT ARG 2: Missing 
OUTPUT path");; None } else Some(new File(args(1))))
     }
 
-    /**
-     * Here we generate an RDD of all the petstore transactions,
-     * by generating the static data first (stores, customers, ...)
-     * followed by running the simulation as a distributed spark task.
-     */
-    def totalTransactions(r:(_,_,_,_,RDD[Transaction]), sc: SparkContext): 
Statistics = {
-      return Statistics(r._5.count());
+  def productMap(r:Array[Product]) : Map[Long,Product] = {
+    r map (prod => prod.productId -> prod) toMap
+  }
+
+    def totalTransactions(r:(RDD[Location], RDD[Store], RDD[Customer], 
RDD[Product], RDD[Transaction]),
+                          sc: SparkContext): Statistics = {
+      val sqlContext = new org.apache.spark.sql.SQLContext(sc);
+
+      import sqlContext._;
+
+      /**
+       * Transform the non-sparksql mappable calendar
+       * into a spark sql freindly field.
+       */
+      val mappableTransactions:RDD[TransactionSQL] =
+        /**
+        * Map the RDD[Transaction] -> RDD[TransactionSQL] so that we can run 
SparkSQL against it.
+        */
+        r._5.map(trans => trans.toSQL())
+
+        mappableTransactions.registerTempTable("transactions");
+
+        r._2.registerTempTable("Stores")
+
+      val results: SchemaRDD = sql("SELECT month,count(*) FROM transactions 
group by month")
+      val transactionsByMonth = results.collect();
+      for(x<-transactionsByMonth){
+        println(x);
+      }
+
+      val results2: SchemaRDD = sql(
+        """SELECT count(*) c, productId , zipcode
+FROM transactions t
+JOIN Stores s ON t.storeId = s.storeId
+GROUP BY productId, zipcode""")
+      val groupedProductZips = results2.collect();
+
+      //get list of all transactionsData
+      for(x<-groupedProductZips){
+        println("grouped product:zip " + x);
+      }
+
+      return Statistics(
+        results.count(), // Total number of transaction
+        results2.collect().map(r => {
+          //Map JDBC Row into a Serializable case class.
+          StatisticsTrByZip(r.getLong(0),r.getLong(1),r.getString(2))
+        }),
+        r._4.collect()); // Product details.
     }
 
     /**
     * We keep a "run" method which can be called easily from tests and also is 
used by main.
     */
-    def run(transactionsInputDir:String, sc:SparkContext): Boolean = {
+    def run(transactionsInputDir:String, sc:SparkContext): Statistics = {
       System.out.println("input : " + transactionsInputDir);
-      val t=totalTransactions(IOUtils.load(sc,transactionsInputDir), sc);
-      System.out.println("Transaction count = " + t);
+      val stats = totalTransactions(IOUtils.load(sc,transactionsInputDir), sc);
       sc.stop()
-      true;
+      stats
     }
 
-    def main(args: Array[String]) {
+  def main(args: Array[String]) {
+      main(
+        args,
+        new SparkContext(new SparkConf().setAppName("PetStoreStatistics")));
+  }
+
+  def main(args: Array[String], context:SparkContext) = {
       // Get or else : On failure (else) we exit.
-      val inputPath = parseArgs(args).getOrElse {
-        System.exit(1);
-      };
+      val (inputPath,outputPath)= parseArgs(args);
+
+      if(! (inputPath.isDefined && outputPath.isDefined)) {
+        printUsage()
+        System.exit(1)
+      }
 
       System.out.println("Running w/ input = " + inputPath);
-      val conf = new SparkConf().setAppName("BPS Data Generator")
-      run(inputPath.toString,new SparkContext(conf));
+
+      val stats:Statistics = run(inputPath.get.toUri.getPath, context);
+
+      IOUtils.saveLocalAsJSON(outputPath.get, stats)
+
+      System.out.println("Output JSON Stats stored : " + outputPath.get);
+
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/b7e369ed/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
index 19a4260..8eaf707 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
@@ -17,8 +17,23 @@
 
 package org.apache.bigtop.bigpetstore.spark.datamodel
 
+import java.sql.Timestamp
 import java.util.Calendar
 
+import org.apache.spark.sql
+import org.joda.time.DateTime
+import org.json4s.CustomSerializer
+import org.json4s.JsonAST.{JString, JField, JInt, JObject}
+
+/**
+ * Statistics phase.  Represents A JSON for a front end.
+ * Currently, transactionByZip schema = count, productId, zip
+ * */
+
+case class StatisticsTrByZip(count:Long, productId:Long, zipcode:String)
+
+case class Statistics(totalTransaction: Long, transactionsByZip: 
Array[StatisticsTrByZip], productDetails:Array[Product])
+
 case class Customer(customerId: Long, firstName: String,
   lastName: String, zipcode: String)
 
@@ -28,9 +43,24 @@ case class Product(productId: Long, category: String, 
attributes: Map[String, St
 
 case class Store(storeId: Long, zipcode: String)
 
-case class Transaction(customerId: Long, transactionId: Long, storeId: Long, 
dateTime: Calendar, productId: Long)
+case class Transaction(customerId: Long, transactionId: Long, storeId: Long, 
dateTime: Calendar, productId: Long){
+  /**
+   * Convert to TransactionSQL.
+   * There possibly could be a conversion.
+   */
+  def toSQL(): TransactionSQL = {
+    val dt = new DateTime(dateTime);
+    val ts = new Timestamp(dt.getMillis);
+    return TransactionSQL(customerId,transactionId,storeId,
+      new Timestamp(
+        new DateTime(dateTime).getMillis),
+        productId,
+        
dt.getYearOfEra,dt.getMonthOfYear,dt.getDayOfMonth,dt.getHourOfDay,dt.getMinuteOfHour)
+  }
+}
 
 /**
- * Statistics phase.  To be expanded...
- * */
-case class Statistics(transactions:Long)
+ * A Transaction which we can create from the natively stored transactions.
+ */
+case class TransactionSQL(customerId: Long, transactionId: Long, storeId: 
Long, timestamp:Timestamp, productId: Long,
+                          year:Int, month:Int, day:Int, hour:Int, minute:Int )

http://git-wip-us.apache.org/repos/asf/bigtop/blob/b7e369ed/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
index 8899abd..a4d1486 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
@@ -17,13 +17,24 @@
 
 package org.apache.bigtop.bigpetstore.spark.datamodel
 
+import java.io.File
 import java.util.Date
+import java.nio.file.{Path, Paths, Files}
+import java.nio.charset.StandardCharsets
 
 import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.SparkContext._
 import org.apache.spark.rdd._
 
 import org.apache.bigtop.bigpetstore.spark.datamodel._
+import org.json4s.JsonDSL._
+import org.json4s.JsonDSL.WithDouble._
+import org.json4s.JsonDSL.WithBigDecimal._
+import org.json4s.jackson.Serialization
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+import org.json4s.jackson.Serialization.{read, write}
 
 /**
   * Utility functions for loading and saving data model RDDs.
@@ -35,6 +46,8 @@ object IOUtils {
   private val PRODUCT_DIR = "products"
   private val TRANSACTION_DIR = "transactions"
 
+  private val ANALYTICS_STATS_DIR = "analytics_stats"
+
   /**
     * Save RDDs of the data model as Sequence files.
     *
@@ -56,11 +69,28 @@ object IOUtils {
     transactionRDD.saveAsObjectFile(outputDir + "/" + TRANSACTION_DIR)
   }
 
+  def saveLocalAsJSON(outputDir: File, statistics: Statistics) {
+    //load the write/read methods.
+    implicit val formats = Serialization.formats(NoTypeHints)
+    val json:String = write(statistics)
+    Files.write(outputDir.toPath, json.getBytes(StandardCharsets.UTF_8))
+  }
+
+  def readLocalAsStatistics(jsonFile: File):Statistics = {
+    //load the write/read methods.
+    implicit val formats = Serialization.formats(NoTypeHints)
+    //Read file as String, and serialize it into Stats object.
+    //See http://json4s.org/ examples.
+    
read[Statistics](scala.io.Source.fromFile(jsonFile).getLines.reduceLeft(_+_));
+  }
+
   /**
     * Load RDDs of the data model from Sequence files.
     *
     * @param sc SparkContext
     * @param inputDir Directory containing Sequence files
+    *
+    * TODO Should take path, not string, this makes input validation complex.
     */
   def load(sc: SparkContext, inputDir: String): (RDD[Location], RDD[Store],
     RDD[Customer], RDD[Product], RDD[Transaction]) = {

http://git-wip-us.apache.org/repos/asf/bigtop/blob/b7e369ed/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
index 747a477..e460e2e 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
@@ -1,9 +1,10 @@
 package org.apache.bigpetstore.spark
 
+import org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics
+import org.apache.bigtop.bigpetstore.spark.datamodel.{Statistics, IOUtils}
 import org.apache.bigtop.bigpetstore.spark.etl.ETLParameters
 import org.apache.bigtop.bigpetstore.spark.etl.SparkETL
 import org.apache.bigtop.bigpetstore.spark.etl.{ETLParameters, SparkETL}
-import org.apache.bigtop.bigpetstore.spark.generator.PetStoreStatistics
 import org.apache.bigtop.bigpetstore.spark.generator.SparkDriver
 import org.apache.spark.SparkConf
 import org.apache.spark.SparkContext
@@ -55,9 +56,25 @@ class TestFullPipeline extends FunSuite with 
BeforeAndAfterAll {
     assert(customers==1000L)
     assert(products==55L)
     //assert(transactions==45349L)
-
+    val analyticsJson = new File(tmpDir,"analytics.json")
     //Now do the analytics.
-    PetStoreStatistics.run(etlDir.getAbsolutePath, sc);
+
+    PetStoreStatistics.main(
+      Array(
+        etlDir.getAbsolutePath,
+        analyticsJson.getAbsolutePath),
+      sc);
+
+    val stats:Statistics = IOUtils.readLocalAsStatistics(analyticsJson);
+
+    /**
+     * Assert some very generic features.  We will refine this later once
+     * consistency is implemented.
+     * See https://github.com/rnowling/bigpetstore-data-generator/issues/38
+     */
+    assert(stats.totalTransaction > 5);
+    //TODO : Will add more assertions here, see comment above
+    assert(stats.productDetails.length > 10);
 
     sc.stop()
   }

http://git-wip-us.apache.org/repos/asf/bigtop/blob/b7e369ed/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
new file mode 100644
index 0000000..b7724cc
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/analytics/AnalyticsSuite.scala
@@ -0,0 +1,43 @@
+package org.apache.bigpetstore.spark.analytics
+
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+import com.google.common.collect.ImmutableMap
+import org.apache.bigtop.bigpetstore.spark.analytics.PetStoreStatistics
+import org.apache.bigtop.bigpetstore.spark.datamodel.Product
+import org.junit.runner.RunWith
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+import Array._
+
+import java.io.File
+import java.nio.file.Files
+import java.util.Calendar
+import java.util.Locale
+
+
+// hack for running tests with Gradle
+@RunWith(classOf[JUnitRunner])
+class AnalyticsSuite extends FunSuite with BeforeAndAfterAll {
+
+  test("product mapper") {
+    val p = Product(1L, "cat1", Map(("a","a1"), ("b","b1")))
+    assert(PetStoreStatistics.productMap(Array(p)).get(1L).get === p);
+  }
+}

Reply via email to