Repository: bigtop
Updated Branches:
  refs/heads/master a2d38959f -> 194fb619f


BIGTOP-1535.  Add Spark ETL script to BigPetStore.


Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/194fb619
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/194fb619
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/194fb619

Branch: refs/heads/master
Commit: 194fb619fae4788d4d193f9ad67c706d2f4d0515
Parents: a2d3895
Author: RJ Nowling <[email protected]>
Authored: Thu Nov 20 12:52:00 2014 -0500
Committer: jayunit100 <[email protected]>
Committed: Fri Nov 21 20:29:37 2014 -0500

----------------------------------------------------------------------
 bigtop-bigpetstore/bigpetstore-spark/README.md  |  66 +++++-
 bigtop-bigpetstore/bigpetstore-spark/arch.dot   |  36 ++++
 .../bigpetstore/spark/datamodel/DataModel.scala |  32 +++
 .../bigpetstore/spark/datamodel/IOUtils.scala   |  86 ++++++++
 .../org/apache/bigpetstore/spark/etl/ETL.scala  | 207 +++++++++++++++++++
 .../spark/datamodel/IOUtilsSuite.scala          |  94 +++++++++
 .../apache/bigpetstore/spark/etl/ETLSuite.scala | 128 ++++++++++++
 7 files changed, 644 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/README.md
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md 
b/bigtop-bigpetstore/bigpetstore-spark/README.md
index 4533366..0ed1a05 100644
--- a/bigtop-bigpetstore/bigpetstore-spark/README.md
+++ b/bigtop-bigpetstore/bigpetstore-spark/README.md
@@ -14,6 +14,50 @@ Architecture
 The Spark application consists of the following modules so far:
 
 * generator: generates raw data on the dfs
+* datamodel: data model used as input for analytics components
+* etl: normalizes and transforms the raw data to the data model
+
+Data Model
+----------
+
+The data generator creates a dirty CSV file containing the following fields:
+
+* Store ID: Int
+* Store Zipcode: String
+* Store City: String
+* Store State: String
+* Customer ID: Int
+* Customer First Name: String
+* Customer Last Name: String
+* Customer Zipcode: String
+* Customer City: String
+* Customer State: String
+* Transaction ID: Int
+* Transation Date Time: String (e.g., "Tue Nov 03 01:08:11 EST 2014")
+* Transaction Product: String (e.g., "category=dry cat food;brand=Feisty 
Feline;flavor=Chicken & Rice;size=14.0;per_unit_cost=2.14;")
+
+Note that the transaction ID is unique only per customer -- the customer and 
transaction IDs form a unique composite key.
+
+Since the dirty CSV data contains repetitive information and requires 
massaging to use for analytics, an
+internal structured data model is defined as input for the analytics 
components:
+
+* Location(zipcode: String, city: String, state: String)
+* Customer(customerId: Long, firstName: String, lastName: String, zipcode: 
String)
+* Store(storeId: Long, zipcode: String)
+* Product(productId: Long, category: String, attributes: Map[String, String])
+* Transaction(customerId: Long, transactionId: Long, storeId: Long, dateTime: 
java.util.Calendar, productId: Long)
+
+The ETL stage parses and cleans up the dirty CSV and writes out RDDs for each 
data type in the data model, serialized using
+the `saveAsObjectFile()` method.  The analytics components can use the 
`IOUtils.load()` method to de-serialize the structured
+data.
+
+Running Tests
+-------------
+BigPetStore Spark includes unit tests that you can run with the following 
command:
+
+```
+gradle clean test
+```
 
 Building and Running with Spark
 -------------------------------
@@ -28,16 +72,28 @@ This will produce a jar file under `build/libs` (referred 
to as `bigpetstore-spa
 use this jar to run a Spark job as follows:
 
 ```
-spark-submit --master local[2] --class 
org.apache.bigtop.bigpetstore.generator.SparkDriver bigpetstore-spark-X.jar 
generated_data/ 10 1000 365.0 345
+spark-submit --master local[2] --class 
org.apache.bigtop.bigpetstore.spark.generator.SparkDriver 
bigpetstore-spark-X.jar generated_data/ 10 1000 365.0 345
 ```
 
 You will need to change the master if you want to run on a cluster.  The last 
five parameters control the output directory,
 the number of stores, the number of customers, simulation length (in days), 
and the random seed (which is optional).
 
-Running Tests
--------------
-BigPetStore Spark includes unit tests that you can run with the following 
command:
+
+Running the ETL component
+-------------------------
+The data produced by the generator is in a raw text format, similar to what 
users will see in production environments.
+The raw data isn't normalized (e.g., repeated customer, store, location, and 
product information) and needs to be parsed
+(e.g., dates) before it can be easily used.  The ETL component does this for 
us.
+
+The ETL component:
+
+* Reads the raw data
+* Parses the data times and products
+* Normalizes the data
+* Writes out RDDs for each type of class (Store, Customer, Location, Product, 
Transaction) in the data model
+
+After building the jar (see above), you can run the ETL component like so:
 
 ```
-gradle test
+spark-submit --master local[2] --class 
org.apache.bigtop.bigpetstore.spark.etl.SparkETL bigpetstore-spark-X.jar 
generated\_data transformed\_data
 ```

http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/arch.dot
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/bigpetstore-spark/arch.dot 
b/bigtop-bigpetstore/bigpetstore-spark/arch.dot
new file mode 100644
index 0000000..5761036
--- /dev/null
+++ b/bigtop-bigpetstore/bigpetstore-spark/arch.dot
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+digraph bigpetstore {
+
+   node [shape=record];
+
+
+   DIRTY_CSV [label="Raw, dirty data in CSV format"];
+   STRUCTURED_DATA [label="Data model serialized as sequence files"];
+   generator [label="Data Generator (generator.SparkDriver)"];
+   ETL [label="Extract-Transform-Load (etl.SparkETL)"];
+   SalesAnalytics [label="Sales Analytics (analytics.Sales)"];
+   SalesTables [label="Sales Trends Tables (CSV)"];
+   CustomerAnalytics [label="Customer Analytics (analytics.Customers)"];
+   CustomerTables [label="Customer Trends Tables (CSV)"];
+   ItemRecommender [label="Item Recommendor (analytics.ItemRecommendor)"];
+   ItemRecommendations [label="Customer Product Recommendations (CSV)"];
+
+   generator -> DIRTY_CSV -> ETL -> STRUCTURED_DATA;
+   STRUCTURED_DATA -> SalesAnalytics -> SalesTables;
+   STRUCTURED_DATA -> CustomerAnalytics -> CustomerTables;
+   STRUCTURED_DATA -> ItemRecommender -> ItemRecommendations;
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
new file mode 100644
index 0000000..b385bb9
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/DataModel.scala
@@ -0,0 +1,32 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.bigtop.bigpetstore.spark.datamodel
+
+import java.util.Calendar
+
+case class Customer(customerId: Long, firstName: String,
+  lastName: String, zipcode: String)
+
+case class Location(zipcode: String, city: String, state: String)
+
+case class Product(productId: Long, category: String, attributes: Map[String, 
String])
+
+case class Store(storeId: Long, zipcode: String)
+
+case class Transaction(customerId: Long, transactionId: Long,
+  storeId: Long, dateTime: Calendar, productId: Long)

http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
new file mode 100644
index 0000000..8899abd
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/datamodel/IOUtils.scala
@@ -0,0 +1,86 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.bigtop.bigpetstore.spark.datamodel
+
+import java.util.Date
+
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+
+import org.apache.bigtop.bigpetstore.spark.datamodel._
+
+/**
+  * Utility functions for loading and saving data model RDDs.
+  */
+object IOUtils {
+  private val LOCATION_DIR = "locations"
+  private val STORE_DIR = "stores"
+  private val CUSTOMER_DIR = "customers"
+  private val PRODUCT_DIR = "products"
+  private val TRANSACTION_DIR = "transactions"
+
+  /**
+    * Save RDDs of the data model as Sequence files.
+    *
+    * @param outputDir Output directory
+    * @param locationRDD RDD of Location objects
+    * @param storeRDD RDD of Store objects
+    * @param customerRDD RDD of Customer objects
+    * @param productRDD RDD of Product objects
+    * @param transactionRDD RDD of Transaction objects
+    */
+  def save(outputDir: String, locationRDD: RDD[Location],
+    storeRDD: RDD[Store], customerRDD: RDD[Customer],
+    productRDD: RDD[Product], transactionRDD: RDD[Transaction]) {
+
+    locationRDD.saveAsObjectFile(outputDir + "/" + LOCATION_DIR)
+    storeRDD.saveAsObjectFile(outputDir + "/" + STORE_DIR)
+    customerRDD.saveAsObjectFile(outputDir + "/" + CUSTOMER_DIR)
+    productRDD.saveAsObjectFile(outputDir + "/" + PRODUCT_DIR)
+    transactionRDD.saveAsObjectFile(outputDir + "/" + TRANSACTION_DIR)
+  }
+
+  /**
+    * Load RDDs of the data model from Sequence files.
+    *
+    * @param sc SparkContext
+    * @param inputDir Directory containing Sequence files
+    */
+  def load(sc: SparkContext, inputDir: String): (RDD[Location], RDD[Store],
+    RDD[Customer], RDD[Product], RDD[Transaction]) = {
+
+    val locationRDD: RDD[Location] =
+      sc.objectFile(inputDir + "/" + LOCATION_DIR)
+
+    val storeRDD: RDD[Store] =
+      sc.objectFile(inputDir + "/" + STORE_DIR)
+
+    val customerRDD: RDD[Customer] =
+      sc.objectFile(inputDir + "/" + CUSTOMER_DIR)
+
+    val productRDD: RDD[Product] =
+      sc.objectFile(inputDir + "/" + PRODUCT_DIR)
+
+    val transactionRDD: RDD[Transaction] =
+      sc.objectFile(inputDir + "/" + TRANSACTION_DIR)
+
+    (locationRDD, storeRDD, customerRDD, productRDD, transactionRDD)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala
new file mode 100644
index 0000000..5f9aa8a
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/etl/ETL.scala
@@ -0,0 +1,207 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.bigtop.bigpetstore.spark.etl
+
+import org.apache.bigtop.bigpetstore.spark.datamodel._
+
+import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+
+import java.io.File
+import java.text.DateFormat
+import java.text.SimpleDateFormat
+import java.util._
+
+case class TransactionProduct(customerId: Long, transactionId: Long,
+  storeId: Long, dateTime: Calendar, product: String)
+
+case class Parameters(inputDir: String, outputDir: String)
+
+object SparkETL {
+
+  private val NPARAMS = 2
+
+  private def printUsage() {
+    val usage: String = "BigPetStore Spark ETL\n" +
+      "\n" +
+      "Usage: spark-submit ... inputDir outputDir\n" +
+      "\n" +
+      "inputDir - (string) directory of raw transaction records from data 
generator\n" +
+      "outputDir - (string) directory to write normalized records\n"
+
+    println(usage)
+  }
+
+  def parseArgs(args: Array[String]): Parameters = {
+    if(args.length != NPARAMS) {
+      printUsage()
+      System.exit(1)
+    }
+
+    Parameters(args(0), args(1))
+  }
+
+  def readRawData(sc: SparkContext, inputDir: String): RDD[String] = {
+    val rawRecords = sc.textFile(inputDir + "/transactions")
+      .flatMap(_.split("\n"))
+
+    rawRecords
+  }
+
+  def parseRawData(rawRecords: RDD[String]):
+      RDD[(Store, Location, Customer, Location, TransactionProduct)] = {
+    val splitRecords = rawRecords.map { r =>
+      val cols = r.split(",")
+
+      val storeId = cols(0).toInt
+      val storeZipcode = cols(1)
+      val storeCity = cols(2)
+      val storeState = cols(3)
+
+      val storeLocation = Location(storeZipcode, storeCity, storeState)
+      val store = Store(storeId, storeZipcode)
+
+      val customerId = cols(4).toInt
+      val firstName = cols(5)
+      val lastName = cols(6)
+      val customerZipcode = cols(7)
+      val customerCity = cols(8)
+      val customerState = cols(9)
+
+      val customerLocation = Location(customerZipcode, customerCity,
+        customerState)
+      val customer = Customer(customerId, firstName, lastName,
+        customerZipcode)
+
+      val txId = cols(10).toInt
+      val df = new SimpleDateFormat("EEE MMM dd kk:mm:ss z yyyy", Locale.US)
+      val txDate = df.parse(cols(11))
+      val txCal = Calendar.getInstance(Locale.US)
+      txCal.setTime(txDate)
+      txCal.set(Calendar.MILLISECOND, 0)
+      val txProduct = cols(12)
+
+      val transaction = TransactionProduct(customerId, txId,
+        storeId, txCal, txProduct)
+
+      (store, storeLocation, customer, customerLocation, transaction)
+    }
+
+    splitRecords
+  }
+
+  def normalizeData(rawRecords: RDD[(Store, Location, Customer,
+    Location, TransactionProduct)]): (RDD[Location], RDD[Store],
+      RDD[Customer], RDD[Product], RDD[Transaction]) = {
+    // extract stores
+    val storeRDD = rawRecords.map {
+      case (store, _, _, _, _) =>
+        store
+    }.distinct()
+
+    // extract store locations
+    val storeLocationRDD = rawRecords.map {
+      case (_, location, _, _, _) =>
+        location
+    }.distinct()
+
+    // extract customers
+    val customerRDD = rawRecords.map {
+      case (_, _, customer, _, _) =>
+        customer
+    }.distinct()
+
+    // extract customer locations
+    val customerLocationRDD = rawRecords.map {
+      case (_, _, _, location, _) =>
+        location
+    }.distinct()
+
+    // extract and normalize products
+    val productStringRDD = rawRecords.map {
+      case (_, _, _, _, tx) =>
+        tx.product
+    }
+    .distinct()
+    .zipWithUniqueId()
+
+    val productRDD = productStringRDD.map {
+      case (productString, id) =>
+        // products are key-value pairs of the form:
+        // key=value;key=value;
+        val prodKV = productString
+          .split(";")
+          .filter(_.trim().length > 0)
+          .map { pair =>
+            val pairString = pair.split("=")
+            (pairString(0), pairString(1))
+           }
+          .toMap
+
+        Product(id, prodKV("category"), prodKV)
+    }
+
+    // extract transactions, map products to productIds
+    val productTransactionRDD = rawRecords.map {
+      case (_, _, _, _, tx) =>
+       (tx.product, tx)
+    }
+
+    val joinedRDD: RDD[(String, (TransactionProduct, Long))]
+      = productTransactionRDD.join(productStringRDD)
+
+    val transactionRDD = joinedRDD.map {
+      case (productString, (tx, productId)) =>
+        Transaction(tx.customerId, tx.transactionId,
+          tx.storeId, tx.dateTime, productId)
+    }
+
+    val locationRDD = storeLocationRDD.
+      union(customerLocationRDD).
+      distinct()
+
+    (locationRDD, storeRDD, customerRDD, productRDD, transactionRDD)
+  }
+
+
+  def main(args: Array[String]) {
+    val parameters = parseArgs(args)
+
+    println("Creating SparkConf")
+    val conf = new SparkConf().setAppName("BPS Data Generator")
+
+    println("Creating SparkContext")
+    val sc = new SparkContext(conf)
+
+    val rawStringRDD = readRawData(sc, parameters.inputDir)
+    val rawRecordRDD = parseRawData(rawStringRDD)
+    val normalizedRDDs = normalizeData(rawRecordRDD)
+
+    val locationRDD = normalizedRDDs._1
+    val storeRDD = normalizedRDDs._2
+    val customerRDD = normalizedRDDs._3
+    val productRDD = normalizedRDDs._4
+    val transactionRDD = normalizedRDDs._5
+
+    IOUtils.save(parameters.outputDir, locationRDD, storeRDD,
+      customerRDD, productRDD, transactionRDD)
+
+    sc.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
new file mode 100644
index 0000000..9864a8c
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/datamodel/IOUtilsSuite.scala
@@ -0,0 +1,94 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.bigtop.bigpetstore.spark.datamodel
+
+import Array._
+
+import java.io.File
+import java.nio.file.Files
+import java.util.Calendar
+import java.util.Locale
+
+import org.apache.spark.{SparkContext, SparkConf}
+
+import org.scalatest.FunSuite
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import org.apache.bigtop.bigpetstore.spark.datamodel._
+
+// hack for running tests with Gradle
+@RunWith(classOf[JUnitRunner])
+class IOUtilsSuite extends FunSuite {
+
+  test("Saving & Loading data") {
+
+    val tmpDir = Files.createTempDirectory("ioUtilsSuite").toFile().toString()
+
+    val conf = new SparkConf().setAppName("BPS Data Generator Test 
Suite").setMaster("local[2]")
+    val sc = new SparkContext(conf)
+
+    val locations = Array(Location("11111", "Sunnyvale", "CA"),
+      Location("22222", "Margate", "FL"))
+    val customers = Array(Customer(1L, "James", "Madison", "11111"),
+      Customer(2L, "George", "Washington", "11111"),
+      Customer(3L, "Matt", "Steele", "22222"),
+      Customer(4L, "George", "Foreman", "22222"))
+    val products = Array(
+      Product(1L, "dog food", Map("brand" -> "Dog Days", "flavor" -> "Chicken 
& Rice")),
+      Product(2L, "dog food", Map("brand" -> "Wellfed", "flavor" -> "Pork & 
Beans")),
+      Product(3L, "cat food", Map("brand" -> "Fatty Catty", "flavor" -> 
"Squirrel")))
+
+    val stores = Array(Store(1L, "11111"), Store(2L, "22222"), Store(3L, 
"11111"))
+
+    val txDate1 = Calendar.getInstance(Locale.US)
+    val txDate2 = Calendar.getInstance(Locale.US)
+    val txDate3 = Calendar.getInstance(Locale.US)
+
+    val transactions = Array(
+      Transaction(1L, 1L, 1L, txDate1, 1L),
+      Transaction(1L, 1L, 1L, txDate1, 2L),
+      Transaction(2L, 1L, 2L, txDate2, 3L),
+      Transaction(2L, 2L, 1L, txDate3, 1L))
+
+    val locationRDD = sc.parallelize(locations)
+    val storeRDD = sc.parallelize(stores)
+    val customerRDD = sc.parallelize(customers)
+    val productRDD = sc.parallelize(products)
+    val transactionRDD = sc.parallelize(transactions)
+
+    IOUtils.save(tmpDir, locationRDD, storeRDD, customerRDD, productRDD,
+      transactionRDD)
+
+    val rdds = IOUtils.load(sc, tmpDir)
+
+    val readLocationRDD = rdds._1
+    val readStoreRDD = rdds._2
+    val readCustomerRDD = rdds._3
+    val readProductRDD = rdds._4
+    val readTransactionRDD = rdds._5
+
+    assert(locationRDD.collect().toSet === readLocationRDD.collect().toSet)
+    assert(storeRDD.collect().toSet === readStoreRDD.collect().toSet)
+    assert(customerRDD.collect().toSet === readCustomerRDD.collect().toSet)
+    assert(productRDD.collect().toSet === readProductRDD.collect().toSet)
+    assert(transactionRDD.collect().toSet === 
readTransactionRDD.collect().toSet)
+
+    sc.stop()
+  }
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/194fb619/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
new file mode 100644
index 0000000..7ccece3
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/etl/ETLSuite.scala
@@ -0,0 +1,128 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.bigtop.bigpetstore.spark.etl
+
+import Array._
+
+import java.util.Calendar
+import java.util.Locale
+import java.util.TimeZone
+
+import org.apache.spark.{SparkContext, SparkConf}
+
+import org.scalatest._
+import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
+
+import org.apache.bigtop.bigpetstore.spark.datamodel._
+
+// hack for running tests with Gradle
+@RunWith(classOf[JUnitRunner])
+class IOUtilsSuite extends FunSuite with BeforeAndAfterAll {
+
+  var sc: Option[SparkContext] = None
+  var rawRecords: Option[Array[(Store, Location, Customer, Location, 
TransactionProduct)]] = None
+  var transactions: Option[Array[Transaction]] = None
+
+  val stores = Array(Store(5L, "11553"), Store(1L, "98110"), Store(6L, 
"66067"))
+  val locations = Array(Location("11553", "Uniondale", "NY"),
+    Location("98110", "Bainbridge Islan", "WA"),
+    Location("66067", "Ottawa", "KS"),
+    Location("20152", "Chantilly", "VA"))
+  val customers = Array(Customer(999L, "Cesareo", "Lamplough", "20152"))
+  val products = Array(
+    Product(1L, "dry dog food", Map("category" -> "dry dog food", "brand" -> 
"Happy Pup", "flavor" -> "Fish & Potato", "size" -> "30.0", "per_unit_cost" -> 
"2.67")),
+    Product(0L, "poop bags", Map("category" -> "poop bags", "brand" -> "Dog 
Days", "color" -> "Blue", "size" -> "60.0", "per_unit_cost" -> "0.21")),
+    Product(2L, "dry cat food", Map("category" -> "dry cat food", "brand" -> 
"Feisty Feline", "flavor" -> "Chicken & Rice", "size" -> "14.0", 
"per_unit_cost" -> "2.14")))
+
+  val rawLines = Array(
+    "5,11553,Uniondale,NY,999,Cesareo,Lamplough,20152,Chantilly,VA,32,Tue Nov 
03 01:08:11 EST 2015,category=dry dog food;brand=Happy Pup;flavor=Fish & 
Potato;size=30.0;per_unit_cost=2.67;",
+
+    "1,98110,Bainbridge 
Islan,WA,999,Cesareo,Lamplough,20152,Chantilly,VA,31,Mon Nov 02 17:51:37 EST 
2015,category=poop bags;brand=Dog 
Days;color=Blue;size=60.0;per_unit_cost=0.21;",
+
+    "6,66067,Ottawa,KS,999,Cesareo,Lamplough,20152,Chantilly,VA,30,Mon Oct 12 
04:29:46 EDT 2015,category=dry cat food;brand=Feisty Feline;flavor=Chicken & 
Rice;size=14.0;per_unit_cost=2.14;")
+
+
+  override def beforeAll() {
+    val conf = new SparkConf().setAppName("BPS Data Generator Test 
Suite").setMaster("local[2]")
+    sc = Some(new SparkContext(conf))
+
+    val cal1 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"),
+      Locale.US)
+    val cal2 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"),
+      Locale.US)
+    val cal3 = Calendar.getInstance(TimeZone.getTimeZone("America/New_York"),
+      Locale.US)
+
+    // Calendar seems to interpet months as 0-11
+    // ms are not in output we parse.
+    // have to set ms to 0, otherwise calendar will
+    // use current system's ms.
+    cal1.set(2015, 10, 3, 1, 8, 11)
+    cal1.set(Calendar.MILLISECOND, 0)
+
+    cal2.set(2015, 10, 2, 17, 51, 37)
+    cal2.set(Calendar.MILLISECOND, 0)
+
+    cal3.set(2015, 9, 12, 4, 29, 46)
+    cal3.set(Calendar.MILLISECOND, 0)
+
+    rawRecords = Some(Array(
+      (stores(0), locations(0), customers(0), locations(3),
+        TransactionProduct(999L, 32L, 5L, cal1, "category=dry dog 
food;brand=Happy Pup;flavor=Fish & Potato;size=30.0;per_unit_cost=2.67;")),
+
+      (stores(1), locations(1), customers(0), locations(3),
+      TransactionProduct(999L, 31L, 1L, cal2, "category=poop bags;brand=Dog 
Days;color=Blue;size=60.0;per_unit_cost=0.21;")),
+
+      (stores(2), locations(2), customers(0), locations(3),
+      TransactionProduct(999L, 30L, 6L, cal3, "category=dry cat 
food;brand=Feisty Feline;flavor=Chicken & 
Rice;size=14.0;per_unit_cost=2.14;"))))
+
+    transactions = Some(Array(
+      Transaction(999L, 31L, 1L, cal2, 0L),
+      Transaction(999L, 30L, 6L, cal3, 2L),
+      Transaction(999L, 32L, 5L, cal1, 1L)))
+  }
+
+  override def afterAll() {
+    sc.get.stop()
+  }
+
+  test("Parse Raw Data") {
+    val rawRDD = sc.get.parallelize(rawLines)
+    val rdds = SparkETL.parseRawData(rawRDD)
+
+    assert(rdds.collect().toSet === rawRecords.get.toSet)
+  }
+
+  test("Normalize Data") {
+    val rawRDD = sc.get.parallelize(rawRecords.get)
+    val rdds = SparkETL.normalizeData(rawRDD)
+
+    val locationRDD = rdds._1
+    val storeRDD = rdds._2
+    val customerRDD = rdds._3
+    val productRDD = rdds._4
+    val transactionRDD = rdds._5
+
+    assert(storeRDD.collect().toSet === stores.toSet)
+    assert(locationRDD.collect().toSet === locations.toSet)
+    assert(customerRDD.collect().toSet === customers.toSet)
+    assert(productRDD.collect().toSet === products.toSet)
+    assert(transactionRDD.collect().toSet === transactions.get.toSet)
+  }
+}

Reply via email to