Repository: bigtop
Updated Branches:
  refs/heads/master 2c3c598a6 -> 1472a0e11


BIGTOP-1536. Add Basic Sales Analytics Example to BPS Spark.


Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/1472a0e1
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/1472a0e1
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/1472a0e1

Branch: refs/heads/master
Commit: 1472a0e11a9d557d12798b1e0c5a77784fc81910
Parents: 2c3c598
Author: jayunit100 <[email protected]>
Authored: Wed Feb 4 04:41:08 2015 -0500
Committer: jayunit100 <[email protected]>
Committed: Fri Feb 6 21:26:33 2015 -0500

----------------------------------------------------------------------
 .../spark/analytics/PetStoreStatistics.scala    | 95 ++++++++++++++++++++
 .../bigpetstore/spark/datamodel/DataModel.scala |  8 +-
 .../org/apache/bigpetstore/spark/etl/ETL.scala  | 41 ++++++---
 .../bigpetstore/spark/TestFullPipeline.scala    | 64 +++++++++++++
 .../spark/datamodel/IOUtilsSuite.scala          | 15 ++--
 .../apache/bigpetstore/spark/etl/ETLSuite.scala | 17 ++--
 .../spark/generator/SparkDriverSuite.scala      | 30 +++++--
 7 files changed, 233 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
new file mode 100644
index 0000000..03d2012
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala
@@ -0,0 +1,95 @@
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one or more
+*  contributor license agreements.  See the NOTICE file distributed with
+*  this work for additional information regarding copyright ownership.
+*  The ASF licenses this file to You under the Apache License, Version 2.0
+*  (the "License"); you may not use this file except in compliance with
+*  the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License.
+*/
+
+package org.apache.bigtop.bigpetstore.spark.generator
+
+import _root_.org.apache.spark.SparkConf
+import org.apache.bigtop.bigpetstore.spark.datamodel.{IOUtils, 
Statistics,Transaction}
+import scala.Nothing;
+import com.github.rnowling.bps.datagenerator.datamodels.inputs.ZipcodeRecord
+import com.github.rnowling.bps.datagenerator.datamodels._
+import 
com.github.rnowling.bps.datagenerator.{DataLoader,StoreGenerator,CustomerGenerator
 => CustGen, PurchasingProfileGenerator,TransactionGenerator}
+import com.github.rnowling.bps.datagenerator.framework.SeedFactory
+import org.apache.hadoop.fs.Path
+import scala.collection.JavaConversions._
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+
+import java.util.ArrayList
+import scala.util.Random
+import java.io.File
+import java.util.Date
+
+object PetStoreStatistics {
+
+    private def printUsage() {
+      val usage: String =
+        "BigPetStore Analytics Module.\n" +
+          "Usage: inputDir\n" ;
+
+      System.err.println(usage)
+    }
+
+  /**
+   * Scala details. Some or None are an idiomatic way, in scala, to
+   * return an optional value.  This allows us to signify, to the caller, that 
the
+   * method may fail.  The caller can decide how to deal with failure (i.e. 
using getOrElse).
+   * @param args
+   * @return
+   */
+    def parseArgs(args: Array[String]):Option[Path] = {
+      if(args.length != 1) {
+        printUsage();
+        return None;
+      }
+      //success, return path.
+      Some(new Path(args(0)));
+    }
+
+    /**
+     * Here we generate an RDD of all the petstore transactions,
+     * by generating the static data first (stores, customers, ...)
+     * followed by running the simulation as a distributed spark task.
+     */
+    def totalTransactions(r:(_,_,_,_,RDD[Transaction]), sc: SparkContext): 
Statistics = {
+      return Statistics(r._5.count());
+    }
+
+    /**
+    * We keep a "run" method which can be called easily from tests and also is 
used by main.
+    */
+    def run(transactionsInputDir:String, sc:SparkContext): Boolean = {
+      System.out.println("input : " + transactionsInputDir);
+      val t=totalTransactions(IOUtils.load(sc,transactionsInputDir), sc);
+      System.out.println("Transaction count = " + t);
+      sc.stop()
+      true;
+    }
+
+    def main(args: Array[String]) {
+      // Get or else : On failure (else) we exit.
+      val inputPath = parseArgs(args).getOrElse {
+        System.exit(1);
+      };
+
+      System.out.println("Running w/ input = " + inputPath);
+      val conf = new SparkConf().setAppName("BPS Data Generator")
+      run(inputPath.toString,new SparkContext(conf));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
index b385bb9..19a4260 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
@@ -28,5 +28,9 @@ case class Product(productId: Long, category: String, 
attributes: Map[String, St
 
 case class Store(storeId: Long, zipcode: String)
 
-case class Transaction(customerId: Long, transactionId: Long,
-  storeId: Long, dateTime: Calendar, productId: Long)
+case class Transaction(customerId: Long, transactionId: Long, storeId: Long, 
dateTime: Calendar, productId: Long)
+
+/**
+ * Statistics phase.  To be expanded...
+ * */
+case class Statistics(transactions:Long)

http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala
index 5f9aa8a..66d2f52 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala
@@ -31,7 +31,7 @@ import java.util._
 case class TransactionProduct(customerId: Long, transactionId: Long,
   storeId: Long, dateTime: Calendar, product: String)
 
-case class Parameters(inputDir: String, outputDir: String)
+case class ETLParameters(inputDir: String, outputDir: String)
 
 object SparkETL {
 
@@ -48,13 +48,13 @@ object SparkETL {
     println(usage)
   }
 
-  def parseArgs(args: Array[String]): Parameters = {
+  def parseArgs(args: Array[String]): ETLParameters = {
     if(args.length != NPARAMS) {
       printUsage()
       System.exit(1)
     }
 
-    Parameters(args(0), args(1))
+    ETLParameters(args(0), args(1))
   }
 
   def readRawData(sc: SparkContext, inputDir: String): RDD[String] = {
@@ -179,16 +179,10 @@ object SparkETL {
     (locationRDD, storeRDD, customerRDD, productRDD, transactionRDD)
   }
 
-
-  def main(args: Array[String]) {
-    val parameters = parseArgs(args)
-
-    println("Creating SparkConf")
-    val conf = new SparkConf().setAppName("BPS Data Generator")
-
-    println("Creating SparkContext")
-    val sc = new SparkContext(conf)
-
+  /**
+   * Runs the ETL and returns the total number of 
locations,stores,customers,products,transactions.
+   */
+  def run(sc:SparkContext, parameters:ETLParameters) : 
(Long,Long,Long,Long,Long) = {
     val rawStringRDD = readRawData(sc, parameters.inputDir)
     val rawRecordRDD = parseRawData(rawStringRDD)
     val normalizedRDDs = normalizeData(rawRecordRDD)
@@ -202,6 +196,27 @@ object SparkETL {
     IOUtils.save(parameters.outputDir, locationRDD, storeRDD,
       customerRDD, productRDD, transactionRDD)
 
+    return (locationRDD.count(),
+        storeRDD.count(),
+        customerRDD.count(),
+        productRDD.count(),
+        transactionRDD.count()
+        );
+  }
+
+  def main(args: Array[String]) {
+    val parameters = parseArgs(args)
+
+    println("Creating SparkConf")
+
+    val conf = new SparkConf().setAppName("BPS Data Generator")
+
+    println("Creating SparkContext")
+
+    val sc = new SparkContext(conf)
+
+    run(sc, parameters)
+
     sc.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
new file mode 100644
index 0000000..747a477
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala
@@ -0,0 +1,64 @@
+package org.apache.bigpetstore.spark
+
+import org.apache.bigtop.bigpetstore.spark.etl.ETLParameters
+import org.apache.bigtop.bigpetstore.spark.etl.SparkETL
+import org.apache.bigtop.bigpetstore.spark.etl.{ETLParameters, SparkETL}
+import org.apache.bigtop.bigpetstore.spark.generator.PetStoreStatistics
+import org.apache.bigtop.bigpetstore.spark.generator.SparkDriver
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkContext
+import org.junit.runner.RunWith
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.scalatest.junit.JUnitRunner
+
+import Array._
+
+import java.io.File
+import java.nio.file.Files
+
+import org.apache.spark.{SparkContext, SparkConf}
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+
+// hack for running tests with Gradle
+@RunWith(classOf[JUnitRunner])
+class TestFullPipeline extends FunSuite with BeforeAndAfterAll {
+
+  val conf = new SparkConf().setAppName("BPS Data Generator Test 
Suite").setMaster("local[2]")
+  val sc = new SparkContext(conf)
+
+  override def afterAll() {
+    sc.stop();
+  }
+
+  test("Full integration test.") {
+
+    // First generate the data.
+    val tmpDir:File = 
Files.createTempDirectory("sparkDriverSuiteGeneratedData2").toFile()
+
+    //stores, customers, days, randomSeed
+    val parameters:Array[String] = Array(tmpDir.toString(), "10", "1000", 
"365.0","123456789")
+    SparkDriver.parseArgs(parameters);
+
+    val transactionRDD = SparkDriver.generateData(sc)
+    SparkDriver.writeData(transactionRDD)
+
+    //Now ETL the data
+    val etlDir:File = Files.createTempDirectory("BPSTest_ETL2").toFile()
+    System.out.println(etlDir.getAbsolutePath + "== "+etlDir.list())
+
+    val (locations,stores,customers,products,transactions) = SparkETL.run(sc, 
new ETLParameters(tmpDir.getAbsolutePath,etlDir.getAbsolutePath));
+
+    // assert(locations==400L) TODO : This seems to vary (325,400,)
+    assert(stores==10L)
+    assert(customers==1000L)
+    assert(products==55L)
+    //assert(transactions==45349L)
+
+    //Now do the analytics.
+    PetStoreStatistics.run(etlDir.getAbsolutePath, sc);
+
+    sc.stop()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
index 9864a8c..7b1e1f5 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
@@ -26,7 +26,7 @@ import java.util.Locale
 
 import org.apache.spark.{SparkContext, SparkConf}
 
-import org.scalatest.FunSuite
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
 import org.scalatest.junit.JUnitRunner
 import org.junit.runner.RunWith
 
@@ -34,15 +34,19 @@ import org.apache.bigtop.bigpetstore.spark.datamodel._
 
 // hack for running tests with Gradle
 @RunWith(classOf[JUnitRunner])
-class IOUtilsSuite extends FunSuite {
+class IOUtilsSuite extends FunSuite with BeforeAndAfterAll {
+
+  val conf = new SparkConf().setAppName("BPS Data Generator Test 
Suite").setMaster("local[2]")
+  val sc = new SparkContext(conf)
+
+  override def afterAll() {
+    sc.stop();
+  }
 
   test("Saving & Loading data") {
 
     val tmpDir = Files.createTempDirectory("ioUtilsSuite").toFile().toString()
 
-    val conf = new SparkConf().setAppName("BPS Data Generator Test 
Suite").setMaster("local[2]")
-    val sc = new SparkContext(conf)
-
     val locations = Array(Location("11111", "Sunnyvale", "CA"),
       Location("22222", "Margate", "FL"))
     val customers = Array(Customer(1L, "James", "Madison", "11111"),
@@ -89,6 +93,5 @@ class IOUtilsSuite extends FunSuite {
     assert(productRDD.collect().toSet === readProductRDD.collect().toSet)
     assert(transactionRDD.collect().toSet === 
readTransactionRDD.collect().toSet)
 
-    sc.stop()
   }
 }

http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
index ca1cfdf..510e5f3 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.bigtop.bigpetstore.spark.etl
 
+import org.apache.spark.rdd.RDD
+
 import Array._
 
 import java.util.Calendar
@@ -42,13 +44,15 @@ import org.apache.bigtop.bigpetstore.spark.datamodel._
  * RunWith annotation is just a hack for running tests with Gradle
  */
 @RunWith(classOf[JUnitRunner])
-class IOUtilsSuite extends FunSuite with BeforeAndAfterAll {
+class ETLSuite extends FunSuite with BeforeAndAfterAll {
 
   /**
    * TODO : We are using Option monads as a replacement for nulls.
    * Lets move towards immutable spark context instead, if possible ?
    */
-  var sc: Option[SparkContext] = None
+  val conf = new SparkConf().setAppName("BPS Data Generator Test 
Suite").setMaster("local[2]")
+  val sc = new SparkContext(conf)
+
   var rawRecords: Option[Array[(Store, Location, Customer, Location, 
TransactionProduct)]] = None
   var transactions: Option[Array[Transaction]] = None
 
@@ -72,8 +76,6 @@ class IOUtilsSuite extends FunSuite with BeforeAndAfterAll {
     "6,66067,Ottawa,KS,999,Cesareo,Lamplough,20152,Chantilly,VA,30,Mon Oct 12 
04:29:46 EDT 2015,category=dry cat food;brand=Feisty Feline;flavor=Chicken & 
Rice;size=14.0;per_unit_cost=2.14;")
 
   override def beforeAll() {
-    val conf = new SparkConf().setAppName("BPS Data Generator Test 
Suite").setMaster("local[2]")
-    sc = Some(new SparkContext(conf))
 
     val cal1 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), 
Locale.US)
     val cal2 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), 
Locale.US)
@@ -101,12 +103,13 @@ class IOUtilsSuite extends FunSuite with 
BeforeAndAfterAll {
       Transaction(999L, 32L, 5L, cal1, 1L)))
   }
 
+
   override def afterAll() {
-    sc.get.stop()
+    sc.stop()
   }
 
   test("Parsing Generated Strings into Transaction Objects") {
-    val rawRDD = sc.get.parallelize(rawLines)
+    val rawRDD = sc.parallelize(rawLines)
     val expectedRecords = rawRecords.get
 
     //Goal: Confirm that these RDD's are identical to the expected ones.
@@ -144,7 +147,7 @@ class IOUtilsSuite extends FunSuite with BeforeAndAfterAll {
   }
 
   test("Generation of unique sets of transaction attributes") {
-    val rawRDD = sc.get.parallelize(rawRecords.get)
+    val rawRDD = sc.parallelize(rawRecords.get)
     val rdds = SparkETL.normalizeData(rawRDD)
     val locationRDD = rdds._1
     val storeRDD = rdds._2

http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala
index 8fbfb71..a3c6e1c 100644
--- 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.bigtop.bigpetstore.spark.generator
 
+import org.apache.bigtop.bigpetstore.spark.etl.{ETLParameters, SparkETL}
+
 import Array._
 
 import java.io.File
@@ -24,37 +26,47 @@ import java.nio.file.Files
 
 import org.apache.spark.{SparkContext, SparkConf}
 
-import org.scalatest.FunSuite
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
 import org.scalatest.junit.JUnitRunner
 import org.junit.runner.RunWith
 
 
 // hack for running tests with Gradle
 @RunWith(classOf[JUnitRunner])
-class SparkDriverSuite extends FunSuite {
+class SparkDriverSuite extends FunSuite  with BeforeAndAfterAll {
 
-  test("Generating data") {
+  val conf = new SparkConf().setAppName("BPS Data Generator Test 
Suite").setMaster("local[2]")
+  val sc = new SparkContext(conf);
 
+  override def afterAll() {
+      sc.stop();
+  }
+
+  /**
+   * Run the test, return outputdir of the raw data.
+   */
+  def runGenerator(sc:SparkContext) : File = {
     val tmpDir:File = 
Files.createTempDirectory("sparkDriverSuiteGeneratedData").toFile()
     // 10 stores, 1000 customers, 365.0 days
     val parameters:Array[String] = Array(tmpDir.toString(), "10", "1000", 
"365.0")
 
     SparkDriver.parseArgs(parameters)
 
-    val conf = new SparkConf().setAppName("BPS Data Generator Test 
Suite").setMaster("local[2]")
-    val sc = new SparkContext(conf)
-
     val transactionRDD = SparkDriver.generateData(sc)
     val transactionCount = transactionRDD.count()
     assert(transactionCount > 0)
 
     SparkDriver.writeData(transactionRDD)
+    tmpDir;
 
-    // check that generator wrote out the  data
+  }
+
+  test("Generating data") {
+
+    val tmpDir:File =runGenerator(sc);
     val transactionDir:File = new File(tmpDir, "transactions")
     assert(transactionDir.exists())
     assert(transactionDir.isDirectory())
-
-    sc.stop()
+    //TODO : Assert format is TextFile
   }
 }

Reply via email to