Repository: bigtop Updated Branches: refs/heads/master a2d38959f -> 194fb619f
BIGTOP-1535. Add Spark ETL script to BigPetStore. Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/194fb619 Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/194fb619 Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/194fb619 Branch: refs/heads/master Commit: 194fb619fae4788d4d193f9ad67c706d2f4d0515 Parents: a2d3895 Author: RJ Nowling <[email protected]> Authored: Thu Nov 20 12:52:00 2014 -0500 Committer: jayunit100 <[email protected]> Committed: Fri Nov 21 20:29:37 2014 -0500 ---------------------------------------------------------------------- bigtop-bigpetstore/bigpetstore-spark/README.md | 66 +++++- bigtop-bigpetstore/bigpetstore-spark/arch.dot | 36 ++++ .../bigpetstore/spark/datamodel/DataModel.scala | 32 +++ .../bigpetstore/spark/datamodel/IOUtils.scala | 86 ++++++++ .../org/apache/bigpetstore/spark/etl/ETL.scala | 207 +++++++++++++++++++ .../spark/datamodel/IOUtilsSuite.scala | 94 +++++++++ .../apache/bigpetstore/spark/etl/ETLSuite.scala | 128 ++++++++++++ 7 files changed, 644 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/README.md ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md b/bigtop-bigpetstore/bigpetstore-spark/README.md index 4533366..0ed1a05 100644 --- a/bigtop-bigpetstore/bigpetstore-spark/README.md +++ b/bigtop-bigpetstore/bigpetstore-spark/README.md @@ -14,6 +14,50 @@ Architecture The Spark application consists of the following modules so far: * generator: generates raw data on the dfs +* datamodel: data model used as input for analytics components +* etl: normalizes and transforms the raw data to the data model + +Data Model +---------- + +The data generator creates a dirty CSV file containing the following fields: + +* Store ID: Int +* Store Zipcode: String +* Store City: String +* Store State: String +* Customer ID: Int +* Customer First Name: String +* Customer Last Name: String +* Customer Zipcode: String +* Customer City: String +* Customer State: String +* Transaction ID: Int +* Transation Date Time: String (e.g., "Tue Nov 03 01:08:11 EST 2014") +* Transaction Product: String (e.g., "category=dry cat food;brand=Feisty Feline;flavor=Chicken & Rice;size=14.0;per_unit_cost=2.14;") + +Note that the transaction ID is unique only per customer -- the customer and transaction IDs form a unique composite key. + +Since the dirty CSV data contains repetitive information and requires massaging to use for analytics, an +internal structured data model is defined as input for the analytics components: + +* Location(zipcode: String, city: String, state: String) +* Customer(customerId: Long, firstName: String, lastName: String, zipcode: String) +* Store(storeId: Long, zipcode: String) +* Product(productId: Long, category: String, attributes: Map[String, String]) +* Transaction(customerId: Long, transactionId: Long, storeId: Long, dateTime: java.util.Calendar, productId: Long) + +The ETL stage parses and cleans up the dirty CSV and writes out RDDs for each data type in the data model, serialized using +the `saveAsObjectFile()` method. The analytics components can use the `IOUtils.load()` method to de-serialize the structured +data. + +Running Tests +------------- +BigPetStore Spark includes unit tests that you can run with the following command: + +``` +gradle clean test +``` Building and Running with Spark ------------------------------- @@ -28,16 +72,28 @@ This will produce a jar file under `build/libs` (referred to as `bigpetstore-spa use this jar to run a Spark job as follows: ``` -spark-submit --master local[2] --class org.apache.bigtop.bigpetstore.generator.SparkDriver bigpetstore-spark-X.jar generated_data/ 10 1000 365.0 345 +spark-submit --master local[2] --class org.apache.bigtop.bigpetstore.spark.generator.SparkDriver bigpetstore-spark-X.jar generated_data/ 10 1000 365.0 345 ``` You will need to change the master if you want to run on a cluster. The last five parameters control the output directory, the number of stores, the number of customers, simulation length (in days), and the random seed (which is optional). -Running Tests -------------- -BigPetStore Spark includes unit tests that you can run with the following command: + +Running the ETL component +------------------------- +The data produced by the generator is in a raw text format, similar to what users will see in production environments. +The raw data isn't normalized (e.g., repeated customer, store, location, and product information) and needs to be parsed +(e.g., dates) before it can be easily used. The ETL component does this for us. + +The ETL component: + +* Reads the raw data +* Parses the data times and products +* Normalizes the data +* Writes out RDDs for each type of class (Store, Customer, Location, Product, Transaction) in the data model + +After building the jar (see above), you can run the ETL component like so: ``` -gradle test +spark-submit --master local[2] --class org.apache.bigtop.bigpetstore.spark.etl.SparkETL bigpetstore-spark-X.jar generated\_data transformed\_data ``` http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/arch.dot ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/arch.dot b/bigtop-bigpetstore/bigpetstore-spark/arch.dot new file mode 100644 index 0000000..5761036 --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/arch.dot @@ -0,0 +1,36 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +digraph bigpetstore { + + node [shape=record]; + + + DIRTY_CSV [label="Raw, dirty data in CSV format"]; + STRUCTURED_DATA [label="Data model serialized as sequence files"]; + generator [label="Data Generator (generator.SparkDriver)"]; + ETL [label="Extract-Transform-Load (etl.SparkETL)"]; + SalesAnalytics [label="Sales Analytics (analytics.Sales)"]; + SalesTables [label="Sales Trends Tables (CSV)"]; + CustomerAnalytics [label="Customer Analytics (analytics.Customers)"]; + CustomerTables [label="Customer Trends Tables (CSV)"]; + ItemRecommender [label="Item Recommendor (analytics.ItemRecommendor)"]; + ItemRecommendations [label="Customer Product Recommendations (CSV)"]; + + generator -> DIRTY_CSV -> ETL -> STRUCTURED_DATA; + STRUCTURED_DATA -> SalesAnalytics -> SalesTables; + STRUCTURED_DATA -> CustomerAnalytics -> CustomerTables; + STRUCTURED_DATA -> ItemRecommender -> ItemRecommendations; +} http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala new file mode 100644 index 0000000..b385bb9 --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bigtop.bigpetstore.spark.datamodel + +import java.util.Calendar + +case class Customer(customerId: Long, firstName: String, + lastName: String, zipcode: String) + +case class Location(zipcode: String, city: String, state: String) + +case class Product(productId: Long, category: String, attributes: Map[String, String]) + +case class Store(storeId: Long, zipcode: String) + +case class Transaction(customerId: Long, transactionId: Long, + storeId: Long, dateTime: Calendar, productId: Long) http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala new file mode 100644 index 0000000..8899abd --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bigtop.bigpetstore.spark.datamodel + +import java.util.Date + +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd._ + +import org.apache.bigtop.bigpetstore.spark.datamodel._ + +/** + * Utility functions for loading and saving data model RDDs. + */ +object IOUtils { + private val LOCATION_DIR = "locations" + private val STORE_DIR = "stores" + private val CUSTOMER_DIR = "customers" + private val PRODUCT_DIR = "products" + private val TRANSACTION_DIR = "transactions" + + /** + * Save RDDs of the data model as Sequence files. + * + * @param outputDir Output directory + * @param locationRDD RDD of Location objects + * @param storeRDD RDD of Store objects + * @param customerRDD RDD of Customer objects + * @param productRDD RDD of Product objects + * @param transactionRDD RDD of Transaction objects + */ + def save(outputDir: String, locationRDD: RDD[Location], + storeRDD: RDD[Store], customerRDD: RDD[Customer], + productRDD: RDD[Product], transactionRDD: RDD[Transaction]) { + + locationRDD.saveAsObjectFile(outputDir + "/" + LOCATION_DIR) + storeRDD.saveAsObjectFile(outputDir + "/" + STORE_DIR) + customerRDD.saveAsObjectFile(outputDir + "/" + CUSTOMER_DIR) + productRDD.saveAsObjectFile(outputDir + "/" + PRODUCT_DIR) + transactionRDD.saveAsObjectFile(outputDir + "/" + TRANSACTION_DIR) + } + + /** + * Load RDDs of the data model from Sequence files. + * + * @param sc SparkContext + * @param inputDir Directory containing Sequence files + */ + def load(sc: SparkContext, inputDir: String): (RDD[Location], RDD[Store], + RDD[Customer], RDD[Product], RDD[Transaction]) = { + + val locationRDD: RDD[Location] = + sc.objectFile(inputDir + "/" + LOCATION_DIR) + + val storeRDD: RDD[Store] = + sc.objectFile(inputDir + "/" + STORE_DIR) + + val customerRDD: RDD[Customer] = + sc.objectFile(inputDir + "/" + CUSTOMER_DIR) + + val productRDD: RDD[Product] = + sc.objectFile(inputDir + "/" + PRODUCT_DIR) + + val transactionRDD: RDD[Transaction] = + sc.objectFile(inputDir + "/" + TRANSACTION_DIR) + + (locationRDD, storeRDD, customerRDD, productRDD, transactionRDD) + } + +} http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala new file mode 100644 index 0000000..5f9aa8a --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bigtop.bigpetstore.spark.etl + +import org.apache.bigtop.bigpetstore.spark.datamodel._ + +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd._ + +import java.io.File +import java.text.DateFormat +import java.text.SimpleDateFormat +import java.util._ + +case class TransactionProduct(customerId: Long, transactionId: Long, + storeId: Long, dateTime: Calendar, product: String) + +case class Parameters(inputDir: String, outputDir: String) + +object SparkETL { + + private val NPARAMS = 2 + + private def printUsage() { + val usage: String = "BigPetStore Spark ETL\n" + + "\n" + + "Usage: spark-submit ... inputDir outputDir\n" + + "\n" + + "inputDir - (string) directory of raw transaction records from data generator\n" + + "outputDir - (string) directory to write normalized records\n" + + println(usage) + } + + def parseArgs(args: Array[String]): Parameters = { + if(args.length != NPARAMS) { + printUsage() + System.exit(1) + } + + Parameters(args(0), args(1)) + } + + def readRawData(sc: SparkContext, inputDir: String): RDD[String] = { + val rawRecords = sc.textFile(inputDir + "/transactions") + .flatMap(_.split("\n")) + + rawRecords + } + + def parseRawData(rawRecords: RDD[String]): + RDD[(Store, Location, Customer, Location, TransactionProduct)] = { + val splitRecords = rawRecords.map { r => + val cols = r.split(",") + + val storeId = cols(0).toInt + val storeZipcode = cols(1) + val storeCity = cols(2) + val storeState = cols(3) + + val storeLocation = Location(storeZipcode, storeCity, storeState) + val store = Store(storeId, storeZipcode) + + val customerId = cols(4).toInt + val firstName = cols(5) + val lastName = cols(6) + val customerZipcode = cols(7) + val customerCity = cols(8) + val customerState = cols(9) + + val customerLocation = Location(customerZipcode, customerCity, + customerState) + val customer = Customer(customerId, firstName, lastName, + customerZipcode) + + val txId = cols(10).toInt + val df = new SimpleDateFormat("EEE MMM dd kk:mm:ss z yyyy", Locale.US) + val txDate = df.parse(cols(11)) + val txCal = Calendar.getInstance(Locale.US) + txCal.setTime(txDate) + txCal.set(Calendar.MILLISECOND, 0) + val txProduct = cols(12) + + val transaction = TransactionProduct(customerId, txId, + storeId, txCal, txProduct) + + (store, storeLocation, customer, customerLocation, transaction) + } + + splitRecords + } + + def normalizeData(rawRecords: RDD[(Store, Location, Customer, + Location, TransactionProduct)]): (RDD[Location], RDD[Store], + RDD[Customer], RDD[Product], RDD[Transaction]) = { + // extract stores + val storeRDD = rawRecords.map { + case (store, _, _, _, _) => + store + }.distinct() + + // extract store locations + val storeLocationRDD = rawRecords.map { + case (_, location, _, _, _) => + location + }.distinct() + + // extract customers + val customerRDD = rawRecords.map { + case (_, _, customer, _, _) => + customer + }.distinct() + + // extract customer locations + val customerLocationRDD = rawRecords.map { + case (_, _, _, location, _) => + location + }.distinct() + + // extract and normalize products + val productStringRDD = rawRecords.map { + case (_, _, _, _, tx) => + tx.product + } + .distinct() + .zipWithUniqueId() + + val productRDD = productStringRDD.map { + case (productString, id) => + // products are key-value pairs of the form: + // key=value;key=value; + val prodKV = productString + .split(";") + .filter(_.trim().length > 0) + .map { pair => + val pairString = pair.split("=") + (pairString(0), pairString(1)) + } + .toMap + + Product(id, prodKV("category"), prodKV) + } + + // extract transactions, map products to productIds + val productTransactionRDD = rawRecords.map { + case (_, _, _, _, tx) => + (tx.product, tx) + } + + val joinedRDD: RDD[(String, (TransactionProduct, Long))] + = productTransactionRDD.join(productStringRDD) + + val transactionRDD = joinedRDD.map { + case (productString, (tx, productId)) => + Transaction(tx.customerId, tx.transactionId, + tx.storeId, tx.dateTime, productId) + } + + val locationRDD = storeLocationRDD. + union(customerLocationRDD). + distinct() + + (locationRDD, storeRDD, customerRDD, productRDD, transactionRDD) + } + + + def main(args: Array[String]) { + val parameters = parseArgs(args) + + println("Creating SparkConf") + val conf = new SparkConf().setAppName("BPS Data Generator") + + println("Creating SparkContext") + val sc = new SparkContext(conf) + + val rawStringRDD = readRawData(sc, parameters.inputDir) + val rawRecordRDD = parseRawData(rawStringRDD) + val normalizedRDDs = normalizeData(rawRecordRDD) + + val locationRDD = normalizedRDDs._1 + val storeRDD = normalizedRDDs._2 + val customerRDD = normalizedRDDs._3 + val productRDD = normalizedRDDs._4 + val transactionRDD = normalizedRDDs._5 + + IOUtils.save(parameters.outputDir, locationRDD, storeRDD, + customerRDD, productRDD, transactionRDD) + + sc.stop() + } +} http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala new file mode 100644 index 0000000..9864a8c --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bigtop.bigpetstore.spark.datamodel + +import Array._ + +import java.io.File +import java.nio.file.Files +import java.util.Calendar +import java.util.Locale + +import org.apache.spark.{SparkContext, SparkConf} + +import org.scalatest.FunSuite +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import org.apache.bigtop.bigpetstore.spark.datamodel._ + +// hack for running tests with Gradle +@RunWith(classOf[JUnitRunner]) +class IOUtilsSuite extends FunSuite { + + test("Saving & Loading data") { + + val tmpDir = Files.createTempDirectory("ioUtilsSuite").toFile().toString() + + val conf = new SparkConf().setAppName("BPS Data Generator Test Suite").setMaster("local[2]") + val sc = new SparkContext(conf) + + val locations = Array(Location("11111", "Sunnyvale", "CA"), + Location("22222", "Margate", "FL")) + val customers = Array(Customer(1L, "James", "Madison", "11111"), + Customer(2L, "George", "Washington", "11111"), + Customer(3L, "Matt", "Steele", "22222"), + Customer(4L, "George", "Foreman", "22222")) + val products = Array( + Product(1L, "dog food", Map("brand" -> "Dog Days", "flavor" -> "Chicken & Rice")), + Product(2L, "dog food", Map("brand" -> "Wellfed", "flavor" -> "Pork & Beans")), + Product(3L, "cat food", Map("brand" -> "Fatty Catty", "flavor" -> "Squirrel"))) + + val stores = Array(Store(1L, "11111"), Store(2L, "22222"), Store(3L, "11111")) + + val txDate1 = Calendar.getInstance(Locale.US) + val txDate2 = Calendar.getInstance(Locale.US) + val txDate3 = Calendar.getInstance(Locale.US) + + val transactions = Array( + Transaction(1L, 1L, 1L, txDate1, 1L), + Transaction(1L, 1L, 1L, txDate1, 2L), + Transaction(2L, 1L, 2L, txDate2, 3L), + Transaction(2L, 2L, 1L, txDate3, 1L)) + + val locationRDD = sc.parallelize(locations) + val storeRDD = sc.parallelize(stores) + val customerRDD = sc.parallelize(customers) + val productRDD = sc.parallelize(products) + val transactionRDD = sc.parallelize(transactions) + + IOUtils.save(tmpDir, locationRDD, storeRDD, customerRDD, productRDD, + transactionRDD) + + val rdds = IOUtils.load(sc, tmpDir) + + val readLocationRDD = rdds._1 + val readStoreRDD = rdds._2 + val readCustomerRDD = rdds._3 + val readProductRDD = rdds._4 + val readTransactionRDD = rdds._5 + + assert(locationRDD.collect().toSet === readLocationRDD.collect().toSet) + assert(storeRDD.collect().toSet === readStoreRDD.collect().toSet) + assert(customerRDD.collect().toSet === readCustomerRDD.collect().toSet) + assert(productRDD.collect().toSet === readProductRDD.collect().toSet) + assert(transactionRDD.collect().toSet === readTransactionRDD.collect().toSet) + + sc.stop() + } +} http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala new file mode 100644 index 0000000..7ccece3 --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bigtop.bigpetstore.spark.etl + +import Array._ + +import java.util.Calendar +import java.util.Locale +import java.util.TimeZone + +import org.apache.spark.{SparkContext, SparkConf} + +import org.scalatest._ +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import org.apache.bigtop.bigpetstore.spark.datamodel._ + +// hack for running tests with Gradle +@RunWith(classOf[JUnitRunner]) +class IOUtilsSuite extends FunSuite with BeforeAndAfterAll { + + var sc: Option[SparkContext] = None + var rawRecords: Option[Array[(Store, Location, Customer, Location, TransactionProduct)]] = None + var transactions: Option[Array[Transaction]] = None + + val stores = Array(Store(5L, "11553"), Store(1L, "98110"), Store(6L, "66067")) + val locations = Array(Location("11553", "Uniondale", "NY"), + Location("98110", "Bainbridge Islan", "WA"), + Location("66067", "Ottawa", "KS"), + Location("20152", "Chantilly", "VA")) + val customers = Array(Customer(999L, "Cesareo", "Lamplough", "20152")) + val products = Array( + Product(1L, "dry dog food", Map("category" -> "dry dog food", "brand" -> "Happy Pup", "flavor" -> "Fish & Potato", "size" -> "30.0", "per_unit_cost" -> "2.67")), + Product(0L, "poop bags", Map("category" -> "poop bags", "brand" -> "Dog Days", "color" -> "Blue", "size" -> "60.0", "per_unit_cost" -> "0.21")), + Product(2L, "dry cat food", Map("category" -> "dry cat food", "brand" -> "Feisty Feline", "flavor" -> "Chicken & Rice", "size" -> "14.0", "per_unit_cost" -> "2.14"))) + + val rawLines = Array( + "5,11553,Uniondale,NY,999,Cesareo,Lamplough,20152,Chantilly,VA,32,Tue Nov 03 01:08:11 EST 2015,category=dry dog food;brand=Happy Pup;flavor=Fish & Potato;size=30.0;per_unit_cost=2.67;", + + "1,98110,Bainbridge Islan,WA,999,Cesareo,Lamplough,20152,Chantilly,VA,31,Mon Nov 02 17:51:37 EST 2015,category=poop bags;brand=Dog Days;color=Blue;size=60.0;per_unit_cost=0.21;", + + "6,66067,Ottawa,KS,999,Cesareo,Lamplough,20152,Chantilly,VA,30,Mon Oct 12 04:29:46 EDT 2015,category=dry cat food;brand=Feisty Feline;flavor=Chicken & Rice;size=14.0;per_unit_cost=2.14;") + + + override def beforeAll() { + val conf = new SparkConf().setAppName("BPS Data Generator Test Suite").setMaster("local[2]") + sc = Some(new SparkContext(conf)) + + val cal1 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), + Locale.US) + val cal2 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), + Locale.US) + val cal3 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"), + Locale.US) + + // Calendar seems to interpet months as 0-11 + // ms are not in output we parse. + // have to set ms to 0, otherwise calendar will + // use current system's ms. + cal1.set(2015, 10, 3, 1, 8, 11) + cal1.set(Calendar.MILLISECOND, 0) + + cal2.set(2015, 10, 2, 17, 51, 37) + cal2.set(Calendar.MILLISECOND, 0) + + cal3.set(2015, 9, 12, 4, 29, 46) + cal3.set(Calendar.MILLISECOND, 0) + + rawRecords = Some(Array( + (stores(0), locations(0), customers(0), locations(3), + TransactionProduct(999L, 32L, 5L, cal1, "category=dry dog food;brand=Happy Pup;flavor=Fish & Potato;size=30.0;per_unit_cost=2.67;")), + + (stores(1), locations(1), customers(0), locations(3), + TransactionProduct(999L, 31L, 1L, cal2, "category=poop bags;brand=Dog Days;color=Blue;size=60.0;per_unit_cost=0.21;")), + + (stores(2), locations(2), customers(0), locations(3), + TransactionProduct(999L, 30L, 6L, cal3, "category=dry cat food;brand=Feisty Feline;flavor=Chicken & Rice;size=14.0;per_unit_cost=2.14;")))) + + transactions = Some(Array( + Transaction(999L, 31L, 1L, cal2, 0L), + Transaction(999L, 30L, 6L, cal3, 2L), + Transaction(999L, 32L, 5L, cal1, 1L))) + } + + override def afterAll() { + sc.get.stop() + } + + test("Parse Raw Data") { + val rawRDD = sc.get.parallelize(rawLines) + val rdds = SparkETL.parseRawData(rawRDD) + + assert(rdds.collect().toSet === rawRecords.get.toSet) + } + + test("Normalize Data") { + val rawRDD = sc.get.parallelize(rawRecords.get) + val rdds = SparkETL.normalizeData(rawRDD) + + val locationRDD = rdds._1 + val storeRDD = rdds._2 + val customerRDD = rdds._3 + val productRDD = rdds._4 + val transactionRDD = rdds._5 + + assert(storeRDD.collect().toSet === stores.toSet) + assert(locationRDD.collect().toSet === locations.toSet) + assert(customerRDD.collect().toSet === customers.toSet) + assert(productRDD.collect().toSet === products.toSet) + assert(transactionRDD.collect().toSet === transactions.get.toSet) + } +}
