Repository: bigtop Updated Branches: refs/heads/master fe15ba18b -> 388beca8e
BIGTOP-1273. BigPetStore Cleanup formatting. Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/388beca8 Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/388beca8 Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/388beca8 Branch: refs/heads/master Commit: 388beca8e1b30133024ce3c38ce8785b09f3ba10 Parents: fe15ba1 Author: jayunit100 <[email protected]> Authored: Mon Dec 15 22:24:02 2014 -0500 Committer: jayunit100 <[email protected]> Committed: Wed Dec 17 21:11:14 2014 -0500 ---------------------------------------------------------------------- .../spark/generator/SparkDriver.scala | 208 ++++++++++--------- .../apache/bigpetstore/spark/etl/ETLSuite.scala | 31 ++- 2 files changed, 125 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bigtop/blob/388beca8/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala index 2d7ed17..19d1565 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala @@ -17,10 +17,11 @@ package org.apache.bigtop.bigpetstore.spark.generator -import com.github.rnowling.bps.datagenerator.datamodels.{Store,Customer,PurchasingProfile,Transaction} +import com.github.rnowling.bps.datagenerator.datamodels.inputs.ZipcodeRecord +import com.github.rnowling.bps.datagenerator.datamodels._ import com.github.rnowling.bps.datagenerator.{DataLoader,StoreGenerator,CustomerGenerator => CustGen, PurchasingProfileGenerator,TransactionGenerator} import com.github.rnowling.bps.datagenerator.framework.SeedFactory - +import scala.collection.JavaConversions._ import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ @@ -30,27 +31,34 @@ import scala.util.Random import java.io.File import java.util.Date +/** + * This driver uses the data generator API to generate + * an arbitrarily large data set of petstore transactions. + * + * Each "transaction" consists of many "products", each of which + * is stringified into what is often called a "line item". + * + * Then, spark writes those line items out as a distributed hadoop file glob. + * + */ object SparkDriver { private var nStores: Int = -1 private var nCustomers: Int = -1 private var simulationLength: Double = -1.0 private var seed: Long = -1 private var outputDir: String = "" - private val NPARAMS = 5 private def printUsage() { - val usage: String = "BigPetStore Data Generator\n" + - "\n" + + val usage: String = + "BigPetStore Data Generator.\n" + "Usage: spark-submit ... outputDir nStores nCustomers simulationLength [seed]\n" + - "\n" + "outputDir - (string) directory to write files\n" + "nStores - (int) number of stores to generate\n" + "nCustomers - (int) number of customers to generate\n" + "simulationLength - (float) number of days to simulate\n" + "seed - (long) seed for RNG. If not given, one is reandomly generated.\n" - - println(usage) + System.err.println(usage) } def parseArgs(args: Array[String]) { @@ -58,52 +66,43 @@ object SparkDriver { printUsage() System.exit(1) } - - var i = 0 - - outputDir = args(i) - - i += 1 + outputDir = args(0) try { - nStores = args(i).toInt + nStores = args(1).toInt } catch { case _ : NumberFormatException => - System.err.println("Unable to parse '" + args(i) + "' as an integer for nStores.\n") + System.err.println("Unable to parse '" + args(1) + "' as an integer for nStores.\n") printUsage() System.exit(1) } - - i += 1 try { - nCustomers = args(i).toInt + nCustomers = args(2).toInt } catch { case _ : NumberFormatException => - System.err.println("Unable to parse '" + args(i) + "' as an integer for nCustomers.\n") + System.err.println("Unable to parse '" + args(2) + "' as an integer for nCustomers.\n") printUsage() System.exit(1) } - - i += 1 try { - simulationLength = args(i).toDouble + simulationLength = args(3).toDouble } catch { case _ : NumberFormatException => - System.err.println("Unable to parse '" + args(i) + "' as a float for simulationLength.\n") + System.err.println("Unable to parse '" + args(3) + "' as a float for simulationLength.\n") printUsage() System.exit(1) } + //If seed isnt present, then no is used seed. if(args.length == NPARAMS) { - i += 1 try { - seed = args(i).toLong + seed = args(4).toLong } catch { case _ : NumberFormatException => - System.err.println("Unable to parse '" + args(i) + "' as a long for seed.\n") + System.err.println("Unable to parse '" + args(4) + "' as a long for seed.\n") printUsage() System.exit(1) } @@ -113,13 +112,18 @@ object SparkDriver { } } + /** + * Here we generate an RDD of all the petstore transactions, + * by generating the static data first (stores, customers, ...) + * followed by running the simulation as a distributed spark task. + */ def generateData(sc: SparkContext): RDD[Transaction] = { val inputData = new DataLoader().loadData() - val seedFactory = new SeedFactory(seed); + val seedFactory = new SeedFactory(seed) println("Generating stores...") val stores : ArrayList[Store] = new ArrayList() - val storeGenerator = new StoreGenerator(inputData, seedFactory); + val storeGenerator = new StoreGenerator(inputData, seedFactory) for(i <- 1 to nStores) { val store = storeGenerator.generate() stores.add(store) @@ -133,101 +137,111 @@ object SparkDriver { val customer = custGen.generate() customers = customer :: customers } - println("Done.") + println("...Done generating customers.") - println("Broadcasting stores and products") + println("Broadcasting stores and products...") val storesBC = sc.broadcast(stores) val productBC = sc.broadcast(inputData.getProductCategories()) val customerRDD = sc.parallelize(customers) val nextSeed = seedFactory.getNextSeed() - - println("Defining transaction DAG") - val transactionRDD = customerRDD.mapPartitionsWithIndex { (index, custIter) => - val seedFactory = new SeedFactory(nextSeed ^ index) - val transactionIter = custIter.map{ customer => - val products = productBC.value - - val profileGen = new PurchasingProfileGenerator(products, seedFactory) - val profile = profileGen.generate() - - val transGen = new TransactionGenerator(customer, profile, storesBC.value, products, - seedFactory) - - var transactions : List[Transaction] = List() - var transaction = transGen.generate() - while(transaction.getDateTime() < simulationLength) { - transactions = transaction :: transactions - - transaction = transGen.generate() + println("...Done broadcasting stores and products.") + + println("Defining transaction DAG...") + + /** + * See inline comments below regarding how we + * generate TRANSACTION objects from CUSTOMERs. + */ + val transactionRDD = customerRDD.mapPartitionsWithIndex{ + (index, custIter) => + // Create a new RNG + val seedFactory = new SeedFactory(nextSeed ^ index) + val transactionIter = custIter.map{ + customer => + val products = productBC.value + //Create a new purchasing profile. + val profileGen = new PurchasingProfileGenerator(products, seedFactory) + val profile = profileGen.generate() + val transGen = new TransactionGenerator(customer, profile, storesBC.value, products, seedFactory) + var transactions : List[Transaction] = List() + var transaction = transGen.generate() + + //Create a list of this customer's transactions for the time period + while(transaction.getDateTime() < simulationLength) { + transactions = transaction :: transactions + transaction = transGen.generate() + } + //The final result, we return the list of transactions produced above. + transactions } - - transactions - } transactionIter - }.flatMap( s => s) + }.flatMap(s => s) + + println("...Done defining transaction DAG.") println("Generating transactions...") - val nTrans = transactionRDD.count() - println(s"Generated $nTrans transactions.") + // forces RDD materialization. + val nTrans = transactionRDD.count() + println(s"... Done Generating $nTrans transactions.") + + /** + * Return the RDD representing all the petstore transactions. + * This RDD contains a distributed collection of instances where + * a customer went to a pet store, and bought a variable number of items. + * We can then serialize all the contents to disk. + */ transactionRDD } + def lineItem(t: Transaction, date:Date, p:Product): String = { + t.getStore.getId + "," + + t.getStore.getLocation+ "," + + t.getStore.getLocation.getCity + "," + + t.getStore.getLocation.getState + "," + + t.getCustomer.getId + "," + + t.getCustomer.getName.getFirst + " " +t.getCustomer.getName.getSecond + "," + + t.getCustomer.getLocation.getZipcode + "," + + t.getCustomer.getLocation.getCity + "," + + t.getCustomer.getLocation.getState + "," + + t.getId + "," + + date + "," + p + } def writeData(transactionRDD : RDD[Transaction]) { val initialDate : Long = new Date().getTime() - val transactionStringsRDD = transactionRDD.map { t => - var records : List[String] = List() - val products = t.getProducts() - for(i <- 0 until products.size()) { - val p = products.get(i) - val name = t.getCustomer().getName() - val custLocation = t.getCustomer().getLocation() - val storeLocation = t.getStore().getLocation() - - // days -> milliseconds = days * 24 h / day * 60 min / hr * 60 sec / min * 1000 ms / sec - val dateMS = (t.getDateTime * 24.0 * 60.0 * 60.0 * 1000.0).toLong - val date = new Date(initialDate + dateMS) - - - var record = "" - record += t.getStore().getId() + "," - record += storeLocation.getZipcode() + "," - record += storeLocation.getCity() + "," - record += storeLocation.getState() + "," - - record += t.getCustomer().getId() + "," - record += name.getFirst() + "," + name.getSecond() + "," - record += custLocation.getZipcode() + "," - record += custLocation.getCity() + "," - record += custLocation.getState() + "," - - record += t.getId() + "," - record += date + "," - record += p - - records = record :: records - } + val transactionStringsRDD = transactionRDD.map { + transaction => + val products = transaction.getProducts() + + /********************************************************* + * we define a "records" RDD : Which is a + * mapping of products from each single transaction to strings. + * + * So we ultimately define an RDD of strings, where each string represents + * an instance where of a item purchase. + * ********************************************************/ + val records = products.map{ + product => + val storeLocation = transaction.getStore().getLocation() + // days -> milliseconds = days * 24 h / day * 60 min / hr * 60 sec / min * 1000 ms / sec + val dateMS = (transaction.getDateTime * 24.0 * 60.0 * 60.0 * 1000.0).toLong + // Return a stringified "line item", which represents a single item bought. + lineItem(transaction, new Date(initialDate + dateMS), product) + } records - }.flatMap { s => s } - + } + // Distributed serialization of the records to part-r-* files... transactionStringsRDD.saveAsTextFile(outputDir + "/transactions") } def main(args: Array[String]) { parseArgs(args) - - println("Creating SparkConf") val conf = new SparkConf().setAppName("BPS Data Generator") - - println("Creating SparkContext") val sc = new SparkContext(conf) - val transactionRDD = generateData(sc) - writeData(transactionRDD) - sc.stop() } } http://git-wip-us.apache.org/repos/asf/bigtop/blob/388beca8/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala index 7ccece3..a7699ac 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala @@ -40,34 +40,31 @@ class IOUtilsSuite extends FunSuite with BeforeAndAfterAll { var transactions: Option[Array[Transaction]] = None val stores = Array(Store(5L, "11553"), Store(1L, "98110"), Store(6L, "66067")) - val locations = Array(Location("11553", "Uniondale", "NY"), - Location("98110", "Bainbridge Islan", "WA"), - Location("66067", "Ottawa", "KS"), - Location("20152", "Chantilly", "VA")) + val locations = + Array( + Location("11553", "Uniondale", "NY"), + Location("98110", "Bainbridge Islan", "WA"), + Location("66067", "Ottawa", "KS"), + Location("20152", "Chantilly", "VA")) val customers = Array(Customer(999L, "Cesareo", "Lamplough", "20152")) - val products = Array( - Product(1L, "dry dog food", Map("category" -> "dry dog food", "brand" -> "Happy Pup", "flavor" -> "Fish & Potato", "size" -> "30.0", "per_unit_cost" -> "2.67")), - Product(0L, "poop bags", Map("category" -> "poop bags", "brand" -> "Dog Days", "color" -> "Blue", "size" -> "60.0", "per_unit_cost" -> "0.21")), - Product(2L, "dry cat food", Map("category" -> "dry cat food", "brand" -> "Feisty Feline", "flavor" -> "Chicken & Rice", "size" -> "14.0", "per_unit_cost" -> "2.14"))) + val products = + Array( + Product(1L, "dry dog food", Map("category" -> "dry dog food", "brand" -> "Happy Pup", "flavor" -> "Fish & Potato", "size" -> "30.0", "per_unit_cost" -> "2.67")), + Product(0L, "poop bags", Map("category" -> "poop bags", "brand" -> "Dog Days", "color" -> "Blue", "size" -> "60.0", "per_unit_cost" -> "0.21")), + Product(2L, "dry cat food", Map("category" -> "dry cat food", "brand" -> "Feisty Feline", "flavor" -> "Chicken & Rice", "size" -> "14.0", "per_unit_cost" -> "2.14"))) val rawLines = Array( "5,11553,Uniondale,NY,999,Cesareo,Lamplough,20152,Chantilly,VA,32,Tue Nov 03 01:08:11 EST 2015,category=dry dog food;brand=Happy Pup;flavor=Fish & Potato;size=30.0;per_unit_cost=2.67;", - "1,98110,Bainbridge Islan,WA,999,Cesareo,Lamplough,20152,Chantilly,VA,31,Mon Nov 02 17:51:37 EST 2015,category=poop bags;brand=Dog Days;color=Blue;size=60.0;per_unit_cost=0.21;", - "6,66067,Ottawa,KS,999,Cesareo,Lamplough,20152,Chantilly,VA,30,Mon Oct 12 04:29:46 EDT 2015,category=dry cat food;brand=Feisty Feline;flavor=Chicken & Rice;size=14.0;per_unit_cost=2.14;") - override def beforeAll() { val conf = new SparkConf().setAppName("BPS Data Generator Test Suite").setMaster("local[2]") sc = Some(new SparkContext(conf)) - val cal1 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), - Locale.US) - val cal2 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), - Locale.US) - val cal3 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), - Locale.US) + val cal1 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), Locale.US) + val cal2 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), Locale.US) + val cal3 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), Locale.US) // Calendar seems to interpet months as 0-11 // ms are not in output we parse.
