Repository: bigtop Updated Branches: refs/heads/master 2c3c598a6 -> 1472a0e11
BIGTOP-1536. Add Basic Sales Analytics Example to BPS Spark. Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/1472a0e1 Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/1472a0e1 Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/1472a0e1 Branch: refs/heads/master Commit: 1472a0e11a9d557d12798b1e0c5a77784fc81910 Parents: 2c3c598 Author: jayunit100 <[email protected]> Authored: Wed Feb 4 04:41:08 2015 -0500 Committer: jayunit100 <[email protected]> Committed: Fri Feb 6 21:26:33 2015 -0500 ---------------------------------------------------------------------- .../spark/analytics/PetStoreStatistics.scala | 95 ++++++++++++++++++++ .../bigpetstore/spark/datamodel/DataModel.scala | 8 +- .../org/apache/bigpetstore/spark/etl/ETL.scala | 41 ++++++--- .../bigpetstore/spark/TestFullPipeline.scala | 64 +++++++++++++ .../spark/datamodel/IOUtilsSuite.scala | 15 ++-- .../apache/bigpetstore/spark/etl/ETLSuite.scala | 17 ++-- .../spark/generator/SparkDriverSuite.scala | 30 +++++-- 7 files changed, 233 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala new file mode 100644 index 0000000..03d2012 --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/analytics/PetStoreStatistics.scala @@ -0,0 +1,95 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.bigtop.bigpetstore.spark.generator + +import _root_.org.apache.spark.SparkConf +import org.apache.bigtop.bigpetstore.spark.datamodel.{IOUtils, Statistics,Transaction} +import scala.Nothing; +import com.github.rnowling.bps.datagenerator.datamodels.inputs.ZipcodeRecord +import com.github.rnowling.bps.datagenerator.datamodels._ +import com.github.rnowling.bps.datagenerator.{DataLoader,StoreGenerator,CustomerGenerator => CustGen, PurchasingProfileGenerator,TransactionGenerator} +import com.github.rnowling.bps.datagenerator.framework.SeedFactory +import org.apache.hadoop.fs.Path +import scala.collection.JavaConversions._ +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd._ + +import java.util.ArrayList +import scala.util.Random +import java.io.File +import java.util.Date + +object PetStoreStatistics { + + private def printUsage() { + val usage: String = + "BigPetStore Analytics Module.\n" + + "Usage: inputDir\n" ; + + System.err.println(usage) + } + + /** + * Scala details. Some or None are an idiomatic way, in scala, to + * return an optional value. This allows us to signify, to the caller, that the + * method may fail. The caller can decide how to deal with failure (i.e. using getOrElse). + * @param args + * @return + */ + def parseArgs(args: Array[String]):Option[Path] = { + if(args.length != 1) { + printUsage(); + return None; + } + //success, return path. + Some(new Path(args(0))); + } + + /** + * Here we generate an RDD of all the petstore transactions, + * by generating the static data first (stores, customers, ...) + * followed by running the simulation as a distributed spark task. + */ + def totalTransactions(r:(_,_,_,_,RDD[Transaction]), sc: SparkContext): Statistics = { + return Statistics(r._5.count()); + } + + /** + * We keep a "run" method which can be called easily from tests and also is used by main. + */ + def run(transactionsInputDir:String, sc:SparkContext): Boolean = { + System.out.println("input : " + transactionsInputDir); + val t=totalTransactions(IOUtils.load(sc,transactionsInputDir), sc); + System.out.println("Transaction count = " + t); + sc.stop() + true; + } + + def main(args: Array[String]) { + // Get or else : On failure (else) we exit. + val inputPath = parseArgs(args).getOrElse { + System.exit(1); + }; + + System.out.println("Running w/ input = " + inputPath); + val conf = new SparkConf().setAppName("BPS Data Generator") + run(inputPath.toString,new SparkContext(conf)); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala index b385bb9..19a4260 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala @@ -28,5 +28,9 @@ case class Product(productId: Long, category: String, attributes: Map[String, St case class Store(storeId: Long, zipcode: String) -case class Transaction(customerId: Long, transactionId: Long, - storeId: Long, dateTime: Calendar, productId: Long) +case class Transaction(customerId: Long, transactionId: Long, storeId: Long, dateTime: Calendar, productId: Long) + +/** + * Statistics phase. To be expanded... + * */ +case class Statistics(transactions:Long) http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala index 5f9aa8a..66d2f52 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala @@ -31,7 +31,7 @@ import java.util._ case class TransactionProduct(customerId: Long, transactionId: Long, storeId: Long, dateTime: Calendar, product: String) -case class Parameters(inputDir: String, outputDir: String) +case class ETLParameters(inputDir: String, outputDir: String) object SparkETL { @@ -48,13 +48,13 @@ object SparkETL { println(usage) } - def parseArgs(args: Array[String]): Parameters = { + def parseArgs(args: Array[String]): ETLParameters = { if(args.length != NPARAMS) { printUsage() System.exit(1) } - Parameters(args(0), args(1)) + ETLParameters(args(0), args(1)) } def readRawData(sc: SparkContext, inputDir: String): RDD[String] = { @@ -179,16 +179,10 @@ object SparkETL { (locationRDD, storeRDD, customerRDD, productRDD, transactionRDD) } - - def main(args: Array[String]) { - val parameters = parseArgs(args) - - println("Creating SparkConf") - val conf = new SparkConf().setAppName("BPS Data Generator") - - println("Creating SparkContext") - val sc = new SparkContext(conf) - + /** + * Runs the ETL and returns the total number of locations,stores,customers,products,transactions. + */ + def run(sc:SparkContext, parameters:ETLParameters) : (Long,Long,Long,Long,Long) = { val rawStringRDD = readRawData(sc, parameters.inputDir) val rawRecordRDD = parseRawData(rawStringRDD) val normalizedRDDs = normalizeData(rawRecordRDD) @@ -202,6 +196,27 @@ object SparkETL { IOUtils.save(parameters.outputDir, locationRDD, storeRDD, customerRDD, productRDD, transactionRDD) + return (locationRDD.count(), + storeRDD.count(), + customerRDD.count(), + productRDD.count(), + transactionRDD.count() + ); + } + + def main(args: Array[String]) { + val parameters = parseArgs(args) + + println("Creating SparkConf") + + val conf = new SparkConf().setAppName("BPS Data Generator") + + println("Creating SparkContext") + + val sc = new SparkContext(conf) + + run(sc, parameters) + sc.stop() } } http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala new file mode 100644 index 0000000..747a477 --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/TestFullPipeline.scala @@ -0,0 +1,64 @@ +package org.apache.bigpetstore.spark + +import org.apache.bigtop.bigpetstore.spark.etl.ETLParameters +import org.apache.bigtop.bigpetstore.spark.etl.SparkETL +import org.apache.bigtop.bigpetstore.spark.etl.{ETLParameters, SparkETL} +import org.apache.bigtop.bigpetstore.spark.generator.PetStoreStatistics +import org.apache.bigtop.bigpetstore.spark.generator.SparkDriver +import org.apache.spark.SparkConf +import org.apache.spark.SparkContext +import org.junit.runner.RunWith +import org.scalatest.{BeforeAndAfterAll, FunSuite} +import org.scalatest.junit.JUnitRunner + +import Array._ + +import java.io.File +import java.nio.file.Files + +import org.apache.spark.{SparkContext, SparkConf} +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + + +// hack for running tests with Gradle +@RunWith(classOf[JUnitRunner]) +class TestFullPipeline extends FunSuite with BeforeAndAfterAll { + + val conf = new SparkConf().setAppName("BPS Data Generator Test Suite").setMaster("local[2]") + val sc = new SparkContext(conf) + + override def afterAll() { + sc.stop(); + } + + test("Full integration test.") { + + // First generate the data. + val tmpDir:File = Files.createTempDirectory("sparkDriverSuiteGeneratedData2").toFile() + + //stores, customers, days, randomSeed + val parameters:Array[String] = Array(tmpDir.toString(), "10", "1000", "365.0","123456789") + SparkDriver.parseArgs(parameters); + + val transactionRDD = SparkDriver.generateData(sc) + SparkDriver.writeData(transactionRDD) + + //Now ETL the data + val etlDir:File = Files.createTempDirectory("BPSTest_ETL2").toFile() + System.out.println(etlDir.getAbsolutePath + "== "+etlDir.list()) + + val (locations,stores,customers,products,transactions) = SparkETL.run(sc, new ETLParameters(tmpDir.getAbsolutePath,etlDir.getAbsolutePath)); + + // assert(locations==400L) TODO : This seems to vary (325,400,) + assert(stores==10L) + assert(customers==1000L) + assert(products==55L) + //assert(transactions==45349L) + + //Now do the analytics. + PetStoreStatistics.run(etlDir.getAbsolutePath, sc); + + sc.stop() + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala index 9864a8c..7b1e1f5 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala @@ -26,7 +26,7 @@ import java.util.Locale import org.apache.spark.{SparkContext, SparkConf} -import org.scalatest.FunSuite +import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith @@ -34,15 +34,19 @@ import org.apache.bigtop.bigpetstore.spark.datamodel._ // hack for running tests with Gradle @RunWith(classOf[JUnitRunner]) -class IOUtilsSuite extends FunSuite { +class IOUtilsSuite extends FunSuite with BeforeAndAfterAll { + + val conf = new SparkConf().setAppName("BPS Data Generator Test Suite").setMaster("local[2]") + val sc = new SparkContext(conf) + + override def afterAll() { + sc.stop(); + } test("Saving & Loading data") { val tmpDir = Files.createTempDirectory("ioUtilsSuite").toFile().toString() - val conf = new SparkConf().setAppName("BPS Data Generator Test Suite").setMaster("local[2]") - val sc = new SparkContext(conf) - val locations = Array(Location("11111", "Sunnyvale", "CA"), Location("22222", "Margate", "FL")) val customers = Array(Customer(1L, "James", "Madison", "11111"), @@ -89,6 +93,5 @@ class IOUtilsSuite extends FunSuite { assert(productRDD.collect().toSet === readProductRDD.collect().toSet) assert(transactionRDD.collect().toSet === readTransactionRDD.collect().toSet) - sc.stop() } } http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala index ca1cfdf..510e5f3 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala @@ -17,6 +17,8 @@ package org.apache.bigtop.bigpetstore.spark.etl +import org.apache.spark.rdd.RDD + import Array._ import java.util.Calendar @@ -42,13 +44,15 @@ import org.apache.bigtop.bigpetstore.spark.datamodel._ * RunWith annotation is just a hack for running tests with Gradle */ @RunWith(classOf[JUnitRunner]) -class IOUtilsSuite extends FunSuite with BeforeAndAfterAll { +class ETLSuite extends FunSuite with BeforeAndAfterAll { /** * TODO : We are using Option monads as a replacement for nulls. * Lets move towards immutable spark context instead, if possible ? */ - var sc: Option[SparkContext] = None + val conf = new SparkConf().setAppName("BPS Data Generator Test Suite").setMaster("local[2]") + val sc = new SparkContext(conf) + var rawRecords: Option[Array[(Store, Location, Customer, Location, TransactionProduct)]] = None var transactions: Option[Array[Transaction]] = None @@ -72,8 +76,6 @@ class IOUtilsSuite extends FunSuite with BeforeAndAfterAll { "6,66067,Ottawa,KS,999,Cesareo,Lamplough,20152,Chantilly,VA,30,Mon Oct 12 04:29:46 EDT 2015,category=dry cat food;brand=Feisty Feline;flavor=Chicken & Rice;size=14.0;per_unit_cost=2.14;") override def beforeAll() { - val conf = new SparkConf().setAppName("BPS Data Generator Test Suite").setMaster("local[2]") - sc = Some(new SparkContext(conf)) val cal1 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), Locale.US) val cal2 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), Locale.US) @@ -101,12 +103,13 @@ class IOUtilsSuite extends FunSuite with BeforeAndAfterAll { Transaction(999L, 32L, 5L, cal1, 1L))) } + override def afterAll() { - sc.get.stop() + sc.stop() } test("Parsing Generated Strings into Transaction Objects") { - val rawRDD = sc.get.parallelize(rawLines) + val rawRDD = sc.parallelize(rawLines) val expectedRecords = rawRecords.get //Goal: Confirm that these RDD's are identical to the expected ones. @@ -144,7 +147,7 @@ class IOUtilsSuite extends FunSuite with BeforeAndAfterAll { } test("Generation of unique sets of transaction attributes") { - val rawRDD = sc.get.parallelize(rawRecords.get) + val rawRDD = sc.parallelize(rawRecords.get) val rdds = SparkETL.normalizeData(rawRDD) val locationRDD = rdds._1 val storeRDD = rdds._2 http://git-wip-us.apache.org/repos/asf/bigtop/blob/1472a0e1/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala index 8fbfb71..a3c6e1c 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala @@ -17,6 +17,8 @@ package org.apache.bigtop.bigpetstore.spark.generator +import org.apache.bigtop.bigpetstore.spark.etl.{ETLParameters, SparkETL} + import Array._ import java.io.File @@ -24,37 +26,47 @@ import java.nio.file.Files import org.apache.spark.{SparkContext, SparkConf} -import org.scalatest.FunSuite +import org.scalatest.{BeforeAndAfterAll, FunSuite} import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith // hack for running tests with Gradle @RunWith(classOf[JUnitRunner]) -class SparkDriverSuite extends FunSuite { +class SparkDriverSuite extends FunSuite with BeforeAndAfterAll { - test("Generating data") { + val conf = new SparkConf().setAppName("BPS Data Generator Test Suite").setMaster("local[2]") + val sc = new SparkContext(conf); + override def afterAll() { + sc.stop(); + } + + /** + * Run the test, return outputdir of the raw data. + */ + def runGenerator(sc:SparkContext) : File = { val tmpDir:File = Files.createTempDirectory("sparkDriverSuiteGeneratedData").toFile() // 10 stores, 1000 customers, 365.0 days val parameters:Array[String] = Array(tmpDir.toString(), "10", "1000", "365.0") SparkDriver.parseArgs(parameters) - val conf = new SparkConf().setAppName("BPS Data Generator Test Suite").setMaster("local[2]") - val sc = new SparkContext(conf) - val transactionRDD = SparkDriver.generateData(sc) val transactionCount = transactionRDD.count() assert(transactionCount > 0) SparkDriver.writeData(transactionRDD) + tmpDir; - // check that generator wrote out the data + } + + test("Generating data") { + + val tmpDir:File =runGenerator(sc); val transactionDir:File = new File(tmpDir, "transactions") assert(transactionDir.exists()) assert(transactionDir.isDirectory()) - - sc.stop() + //TODO : Assert format is TextFile } }
