http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/java/org/apache/bigtop/bigpetstore/docs/TestDocs.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/java/org/apache/bigtop/bigpetstore/docs/TestDocs.java b/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/java/org/apache/bigtop/bigpetstore/docs/TestDocs.java new file mode 100644 index 0000000..8d7bf99 --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/java/org/apache/bigtop/bigpetstore/docs/TestDocs.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bigtop.bigpetstore.docs; + +import static org.junit.Assert.assertTrue; + +import java.io.File; + +import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants.OUTPUTS; +import org.apache.commons.io.FileUtils; +import org.junit.Test; + +public class TestDocs { + + @Test + public void testGraphViz() throws Exception { + // test the graphviz file by grepping out the constants. + String graphviz = FileUtils.readFileToString(new File("arch.dot")); + System.out.println(graphviz); + + assertTrue(graphviz.contains(OUTPUTS.generated.name())); + assertTrue(graphviz.contains(OUTPUTS.cleaned.name())); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/java/org/apache/bigtop/bigpetstore/generator/TestNumericalIdUtils.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/java/org/apache/bigtop/bigpetstore/generator/TestNumericalIdUtils.java b/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/java/org/apache/bigtop/bigpetstore/generator/TestNumericalIdUtils.java new file mode 100644 index 0000000..e2f1f25 --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/java/org/apache/bigtop/bigpetstore/generator/TestNumericalIdUtils.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bigtop.bigpetstore.generator; + +import static org.junit.Assert.assertFalse; + +import org.apache.bigtop.bigpetstore.generator.util.State; +import org.apache.bigtop.bigpetstore.util.NumericalIdUtils; +import org.junit.Test; + +public class TestNumericalIdUtils { + + @Test + public void testName() { + String strId= State.OK.name()+"_"+ "jay vyas"; + long id = NumericalIdUtils.toId(strId); + String strId2= State.CO.name()+"_"+ "jay vyas"; + long id2 = NumericalIdUtils.toId(strId2); + System.out.println(id + " " + id2); + assertFalse(id==id2); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/java/org/apache/bigtop/bigpetstore/generator/TestPetStoreTransactionGeneratorJob.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/java/org/apache/bigtop/bigpetstore/generator/TestPetStoreTransactionGeneratorJob.java b/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/java/org/apache/bigtop/bigpetstore/generator/TestPetStoreTransactionGeneratorJob.java new file mode 100755 index 0000000..76de3d0 --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/java/org/apache/bigtop/bigpetstore/generator/TestPetStoreTransactionGeneratorJob.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bigtop.bigpetstore.generator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.InputStreamReader; +import java.util.Date; + +import org.apache.bigtop.bigpetstore.generator.BPSGenerator.props; +import org.apache.bigtop.bigpetstore.generator.util.State; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * run this test with vm options -XX:MaxPermSize=256m -Xms512m -Xmx1024m + * + */ +public class TestPetStoreTransactionGeneratorJob { + + final static Logger log = LoggerFactory + .getLogger(TestPetStoreTransactionGeneratorJob.class); + + @Test + public void test() throws Exception { + System.out.println("memory : " + Runtime.getRuntime().freeMemory() + / 1000000); + if (Runtime.getRuntime().freeMemory() / 1000000 < 75) { + // throw new + // RuntimeException("need more memory to run this test !"); + } + int records = 20; + /** + * Setup configuration with prop. + */ + Configuration c = new Configuration(); + c.setInt(props.bigpetstore_records.name(), records); + + /** + * Run the job + */ + Path output = new Path("petstoredata/" + (new Date()).toString()); + Job createInput = BPSGenerator.getCreateTransactionRecordsJob(output, c); + createInput.submit(); + System.out.println(createInput); + createInput.waitForCompletion(true); + + FileSystem fs = FileSystem.getLocal(new Configuration()); + + /** + * Read file output into string. + */ + DataInputStream f = fs.open(new Path(output, "part-r-00000")); + BufferedReader br = new BufferedReader(new InputStreamReader(f)); + String s; + int recordsSeen = 0; + boolean CTseen = false; + boolean AZseen = false; + + // confirm that both CT and AZ are seen in the outputs. + while (br.ready()) { + s = br.readLine(); + System.out.println("===>" + s); + recordsSeen++; + if (s.contains(State.CT.name())) { + CTseen = true; + } + if (s.contains(State.AZ.name())) { + AZseen = true; + } + } + + // records seen should = 20 + assertEquals(records, recordsSeen); + // Assert that a couple of the states are seen (todo make it + // comprehensive for all states). + assertTrue(CTseen); + assertTrue(AZseen); + log.info("Created " + records + " , file was " + + fs.getFileStatus(new Path(output, "part-r-00000")).getLen() + + " bytes."); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/resources/log4j.properties b/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/resources/log4j.properties new file mode 100644 index 0000000..1e33093 --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/resources/log4j.properties @@ -0,0 +1,47 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +hadoop.root.logger=INFO,console +hadoop.log.dir=. +hadoop.log.file=hadoop.log + +# +# Job Summary Appender +# +# Use following logger to send summary to separate file defined by +# hadoop.mapreduce.jobsummary.log.file rolled daily: +# hadoop.mapreduce.jobsummary.logger=INFO,JSA +# +hadoop.mapreduce.jobsummary.logger=${hadoop.root.logger} +hadoop.mapreduce.jobsummary.log.file=hadoop-mapreduce.jobsummary.log +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.EventCounter=org.apache.log4j.ConsoleAppender +log4j.appender.EventCounter.layout=org.apache.log4j.PatternLayout +# Define the root logger to the system property "hadoop.root.logger". +log4j.rootLogger=${hadoop.root.logger}, EventCounter +log4j.appender.console.layout=org.apache.log4j.PatternLayout +# Logging Threshold +log4j.threshold=ALL + +# +# Daily Rolling File Appender +# + +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/scala/org/apache/bigtop/bigpetstore/ScalaTestSample.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/scala/org/apache/bigtop/bigpetstore/ScalaTestSample.scala b/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/scala/org/apache/bigtop/bigpetstore/ScalaTestSample.scala new file mode 100644 index 0000000..a393b4b --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-mapreduce/src/test/scala/org/apache/bigtop/bigpetstore/ScalaTestSample.scala @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bigtop.bigpetstore + +import org.junit.Test +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest._ +import scala.collection.mutable.Stack + +@RunWith(classOf[JUnitRunner]) +class ScalaTestSample extends FlatSpec with Matchers { + "This test" should "show an example of what we can do with the scala-test library" in { + val stack = new Stack[Int] + stack.push(1) + stack.push(2) + stack.pop() should be(2) + stack.pop() should be(1) + } +} http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-spark/README.md ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/README.md b/bigtop-bigpetstore/bigpetstore-spark/README.md new file mode 100644 index 0000000..4533366 --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/README.md @@ -0,0 +1,43 @@ +BigPetStore -- Spark +==================== + +BigPetStore is a family of example applications for the Hadoop and Spark +ecosystems. BigPetStore is build around a fictional chain pet stores, +providing generators for synthetic transaction data and pipelines for +processing that data. Each ecosystems has its own version of the +application. + +The Spark application currently builds against Spark 1.1.0. + +Architecture +------------ +The Spark application consists of the following modules so far: + +* generator: generates raw data on the dfs + +Building and Running with Spark +------------------------------- +BigPetStore has a Spark driver for generating data with the new data generator. +Build a fat jar as follows: + +``` +gradle clean shadowJar +``` + +This will produce a jar file under `build/libs` (referred to as `bigpetstore-spark-X.jar`). You can then +use this jar to run a Spark job as follows: + +``` +spark-submit --master local[2] --class org.apache.bigtop.bigpetstore.generator.SparkDriver bigpetstore-spark-X.jar generated_data/ 10 1000 365.0 345 +``` + +You will need to change the master if you want to run on a cluster. The last five parameters control the output directory, +the number of stores, the number of customers, simulation length (in days), and the random seed (which is optional). + +Running Tests +------------- +BigPetStore Spark includes unit tests that you can run with the following command: + +``` +gradle test +``` http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-spark/build.gradle ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/build.gradle b/bigtop-bigpetstore/bigpetstore-spark/build.gradle new file mode 100644 index 0000000..726a0e5 --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/build.gradle @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: "java" +apply plugin: "eclipse" +// TODO add idea module config. +apply plugin: "idea" +apply plugin: "scala" +apply plugin: 'com.github.johnrengelman.shadow' + +buildscript { + repositories { jcenter() } + dependencies { + classpath 'com.github.jengelman.gradle.plugins:shadow:1.0.2' + } +} + + +// Read the groupId and version properties from the "parent" bigtop project. +// It would be better if there was some better way of doing this. Howvever, +// at this point, we have to do this (or some variation thereof) since gradle +// projects can't have maven projects as parents (AFAIK. If there is a way to do it, +// it doesn't seem to be well-documented). +def setProjectProperties() { + Node xml = new XmlParser().parse("../../pom.xml") + group = xml.groupId.first().value().first() + version = xml.version.first().value().first() +} + +setProjectProperties() +description = """""" + +// We are using 1.7 as gradle can't play well when java 8 and scala are combined. +// There is an open issue here: http://issues.gradle.org/browse/GRADLE-3023 +// There is talk of this being resolved in the next version of gradle. Till then, +// we are stuck with java 7. But we do have scala if we want more syntactic sugar. +sourceCompatibility = 1.7 +targetCompatibility = 1.7 + +// Specify any additional project properties. +ext { + sparkVersion = "1.1.0" +} + +shadowJar { + zip64 true +} + +repositories { + mavenCentral() + maven { + url "http://dl.bintray.com/rnowling/bigpetstore" + } +} + +tasks.withType(AbstractCompile) { + options.encoding = 'UTF-8' + options.compilerArgs << "-Xlint:all" +} + +tasks.withType(ScalaCompile) { + // Enables incremental compilation. + // http://www.gradle.org/docs/current/userguide/userguide_single.html#N12F78 + scalaCompileOptions.useAnt = false +} + +tasks.withType(Test) { + testLogging { + // Uncomment this if you want to see the console output from the tests. + // showStandardStreams = true + events "passed", "skipped", "failed" + // show standard out and standard error of the test JVM(s) on the console + //showStandardStreams = true + } +} + +// Create a separate source-set for the src/integrationTest set of classes. The convention here +// is that gradle will look for a directory with the same name as that of the specified source-set +// under the 'src' directory. +sourceSets { + main { + java.srcDirs = []; + scala.srcDirs = ["src/main/scala", "src/main/java"] + } +} + + +// To see the API that is being used here, consult the following docs +// http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ResolutionStrategy.html +def updateDependencyVersion(dependencyDetails, dependencyString) { + def parts = dependencyString.split(':') + def group = parts[0] + def name = parts[1] + def version = parts[2] + if (dependencyDetails.requested.group == group + && dependencyDetails.requested.name == name) { + dependencyDetails.useVersion version + } +} + + +dependencies { + compile "org.apache.spark:spark-assembly_2.10:${sparkVersion}" + compile "com.github.rnowling.bigpetstore:bigpetstore-data-generator:0.2" + + testCompile "junit:junit:4.11" + testCompile "org.hamcrest:hamcrest-all:1.3" + testCompile "org.scalatest:scalatest_2.10:2.2.1" +} + +task listJars << { + configurations.shadow.each { println it.name } +} + + +eclipse { + classpath { + // Comment out the following two lines if you want to generate an eclipse project quickly. + downloadSources = true + downloadJavadoc = false + } +} http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala new file mode 100644 index 0000000..1ab1057 --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/src/main/scala/org/apache/bigpetstore/spark/generator/SparkDriver.scala @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bigtop.bigpetstore.spark.generator + +import com.github.rnowling.bps.datagenerator.datamodels.{Store,Customer,PurchasingProfile,Transaction} +import com.github.rnowling.bps.datagenerator.{DataLoader,StoreGenerator,CustomerGenerator => CustGen, PurchasingProfileGenerator,TransactionGenerator} +import com.github.rnowling.bps.datagenerator.framework.SeedFactory + +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd._ + +import java.util.ArrayList +import scala.util.Random +import java.io.File +import java.util.Date + +object SparkDriver { + private var nStores: Int = -1 + private var nCustomers: Int = -1 + private var simulationLength: Double = -1.0 + private var seed: Long = -1 + private var outputDir: File = new File(".") + + private val NPARAMS = 5 + + private def printUsage() { + val usage: String = "BigPetStore Data Generator\n" + + "\n" + + "Usage: spark-submit ... outputDir nStores nCustomers simulationLength [seed]\n" + + "\n" + + "outputDir - (string) directory to write files\n" + + "nStores - (int) number of stores to generate\n" + + "nCustomers - (int) number of customers to generate\n" + + "simulationLength - (float) number of days to simulate\n" + + "seed - (long) seed for RNG. If not given, one is reandomly generated.\n" + + println(usage) + } + + def parseArgs(args: Array[String]) { + if(args.length != NPARAMS && args.length != (NPARAMS - 1)) { + printUsage() + System.exit(1) + } + + var i = 0 + + outputDir = new File(args(i)) + if(! outputDir.exists()) { + System.err.println("Given path (" + args(i) + ") does not exist.\n") + printUsage() + System.exit(1) + } + + if(! outputDir.isDirectory()) { + System.err.println("Given path (" + args(i) + ") is not a directory.\n") + printUsage() + System.exit(1) + } + + i += 1 + try { + nStores = args(i).toInt + } + catch { + case _ : NumberFormatException => + System.err.println("Unable to parse '" + args(i) + "' as an integer for nStores.\n") + printUsage() + System.exit(1) + } + + i += 1 + try { + nCustomers = args(i).toInt + } + catch { + case _ : NumberFormatException => + System.err.println("Unable to parse '" + args(i) + "' as an integer for nCustomers.\n") + printUsage() + System.exit(1) + } + + i += 1 + try { + simulationLength = args(i).toDouble + } + catch { + case _ : NumberFormatException => + System.err.println("Unable to parse '" + args(i) + "' as a float for simulationLength.\n") + printUsage() + System.exit(1) + } + + if(args.length == NPARAMS) { + i += 1 + try { + seed = args(i).toLong + } + catch { + case _ : NumberFormatException => + System.err.println("Unable to parse '" + args(i) + "' as a long for seed.\n") + printUsage() + System.exit(1) + } + } + else { + seed = (new Random()).nextLong + } + } + + def generateData(sc: SparkContext): RDD[Transaction] = { + val inputData = new DataLoader().loadData() + val seedFactory = new SeedFactory(seed); + + println("Generating stores...") + val stores : ArrayList[Store] = new ArrayList() + val storeGenerator = new StoreGenerator(inputData, seedFactory); + for(i <- 1 to nStores) { + val store = storeGenerator.generate() + stores.add(store) + } + println("Done.") + + println("Generating customers...") + var customers: List[Customer] = List() + val custGen = new CustGen(inputData, stores, seedFactory) + for(i <- 1 to nCustomers) { + val customer = custGen.generate() + customers = customer :: customers + } + println("Done.") + + println("Broadcasting stores and products") + val storesBC = sc.broadcast(stores) + val productBC = sc.broadcast(inputData.getProductCategories()) + val customerRDD = sc.parallelize(customers) + val nextSeed = seedFactory.getNextSeed() + + println("Defining transaction DAG") + val transactionRDD = customerRDD.mapPartitionsWithIndex { (index, custIter) => + val seedFactory = new SeedFactory(nextSeed ^ index) + val transactionIter = custIter.map{ customer => + val products = productBC.value + + val profileGen = new PurchasingProfileGenerator(products, seedFactory) + val profile = profileGen.generate() + + val transGen = new TransactionGenerator(customer, profile, storesBC.value, products, + seedFactory) + + var transactions : List[Transaction] = List() + var transaction = transGen.generate() + while(transaction.getDateTime() < simulationLength) { + transactions = transaction :: transactions + + transaction = transGen.generate() + } + + transactions + } + transactionIter + }.flatMap( s => s) + + println("Generating transactions...") + val nTrans = transactionRDD.count() + println(s"Generated $nTrans transactions.") + + transactionRDD + } + + def writeData(transactionRDD : RDD[Transaction]) { + val initialDate : Long = new Date().getTime() + + val transactionStringsRDD = transactionRDD.map { t => + var records : List[String] = List() + val products = t.getProducts() + for(i <- 0 until products.size()) { + val p = products.get(i) + val name = t.getCustomer().getName() + val custLocation = t.getCustomer().getLocation() + val storeLocation = t.getStore().getLocation() + + // days -> milliseconds = days * 24 h / day * 60 min / hr * 60 sec / min * 1000 ms / sec + val dateMS = (t.getDateTime * 24.0 * 60.0 * 60.0 * 1000.0).toLong + val date = new Date(initialDate + dateMS) + + + var record = "" + record += t.getStore().getId() + "," + record += storeLocation.getZipcode() + "," + record += storeLocation.getCity() + "," + record += storeLocation.getState() + "," + + record += t.getCustomer().getId() + "," + record += name.getFirst() + "," + name.getSecond() + "," + record += custLocation.getZipcode() + "," + record += custLocation.getCity() + "," + record += custLocation.getState() + "," + + record += t.getId() + "," + record += date + "," + record += p + + records = record :: records + } + + records + }.flatMap { s => s } + + transactionStringsRDD.saveAsTextFile(outputDir + "/transactions") + } + + def main(args: Array[String]) { + parseArgs(args) + + println("Creating SparkConf") + val conf = new SparkConf().setAppName("BPS Data Generator") + + println("Creating SparkContext") + val sc = new SparkContext(conf) + + val transactionRDD = generateData(sc) + + writeData(transactionRDD) + + sc.stop() + } +} http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala new file mode 100644 index 0000000..8fbfb71 --- /dev/null +++ b/bigtop-bigpetstore/bigpetstore-spark/src/test/scala/org/apache/bigpetstore/spark/generator/SparkDriverSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bigtop.bigpetstore.spark.generator + +import Array._ + +import java.io.File +import java.nio.file.Files + +import org.apache.spark.{SparkContext, SparkConf} + +import org.scalatest.FunSuite +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + + +// hack for running tests with Gradle +@RunWith(classOf[JUnitRunner]) +class SparkDriverSuite extends FunSuite { + + test("Generating data") { + + val tmpDir:File = Files.createTempDirectory("sparkDriverSuiteGeneratedData").toFile() + // 10 stores, 1000 customers, 365.0 days + val parameters:Array[String] = Array(tmpDir.toString(), "10", "1000", "365.0") + + SparkDriver.parseArgs(parameters) + + val conf = new SparkConf().setAppName("BPS Data Generator Test Suite").setMaster("local[2]") + val sc = new SparkContext(conf) + + val transactionRDD = SparkDriver.generateData(sc) + val transactionCount = transactionRDD.count() + assert(transactionCount > 0) + + SparkDriver.writeData(transactionRDD) + + // check that generator wrote out the data + val transactionDir:File = new File(tmpDir, "transactions") + assert(transactionDir.exists()) + assert(transactionDir.isDirectory()) + + sc.stop() + } +} http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/build.gradle ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/build.gradle b/bigtop-bigpetstore/build.gradle deleted file mode 100644 index 6d42e8d..0000000 --- a/bigtop-bigpetstore/build.gradle +++ /dev/null @@ -1,292 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -apply plugin: "java" -apply plugin: "eclipse" -// TODO add idea module config. -apply plugin: "idea" -apply plugin: "scala" -apply plugin: 'com.github.johnrengelman.shadow' - -buildscript { - repositories { jcenter() } - dependencies { - classpath 'com.github.jengelman.gradle.plugins:shadow:1.0.2' - } -} - -// Read the groupId and version properties from the "parent" bigtop project. -// It would be better if there was some better way of doing this. Howvever, -// at this point, we have to do this (or some variation thereof) since gradle -// projects can't have maven projects as parents (AFAIK. If there is a way to do it, -// it doesn't seem to be well-documented). -def setProjectProperties() { - Node xml = new XmlParser().parse("../pom.xml") - group = xml.groupId.first().value().first() - version = xml.version.first().value().first() -} - -setProjectProperties() -description = """""" - -// We are using 1.7 as gradle can't play well when java 8 and scala are combined. -// There is an open issue here: http://issues.gradle.org/browse/GRADLE-3023 -// There is talk of this being resolved in the next version of gradle. Till then, -// we are stuck with java 7. But we do have scala if we want more syntactic sugar. -sourceCompatibility = 1.7 -targetCompatibility = 1.7 - -// Specify any additional project properties. -ext { - slf4jVersion = "1.7.5" - guavaVersion = "15.0" - datanucleusVersion = "3.2.2" - datanucleusJpaVersion = "3.2.1" - bonecpVersion = "0.8.0.RELEASE" - derbyVersion = "10.10.1.1" - - // from horton-works repo. They compile mahout-core against hadoop2.x. These - // mahout is compiled against 2.4.0 - hadoopVersion = "2.4.0.2.1.2.0-402" - mahoutVersion = "0.9.0.2.1.2.0-402" -} - -repositories { - mavenCentral() - maven { - url "http://repo.hortonworks.com/content/repositories/releases/" - } -} - -tasks.withType(AbstractCompile) { - options.encoding = 'UTF-8' - options.compilerArgs << "-Xlint:all" -} - -tasks.withType(ScalaCompile) { - // Enables incremental compilation. - // http://www.gradle.org/docs/current/userguide/userguide_single.html#N12F78 - scalaCompileOptions.useAnt = false -} - -tasks.withType(Test) { - testLogging { - // Uncomment this if you want to see the console output from the tests. - // showStandardStreams = true - events "passed", "skipped", "failed" - // show standard out and standard error of the test JVM(s) on the console - //showStandardStreams = true - } -} - -test { - exclude "**/*TestPig.java", "**/*TestHiveEmbedded.java", "**/*TestCrunch.java", "**/*TestPetStoreTransactionGeneratorJob.java" -} - -// Create a separate source-set for the src/integrationTest set of classes. The convention here -// is that gradle will look for a directory with the same name as that of the specified source-set -// under the 'src' directory. So, in this case, it will look for a directory named 'src/integrationTest' -// since the name of the source-set is 'integrationTest' -sourceSets { - main { - java.srcDirs = []; - scala.srcDirs = ["src/main/scala", "src/main/java"] - } - // The main and test source-sets are configured by both java and scala plugins. They contain - // all the src/main and src/test classes. The following statements make all of those classes - // available on the classpath for the integration-tests, for both java and scala. - integrationTest { - java { - compileClasspath += main.output + test.output - runtimeClasspath += main.output + test.output - } - scala { - compileClasspath += main.output + test.output - runtimeClasspath += main.output + test.output - } - } -} - -// Creating a source-set automatically add a couple of corresponding configurations (when java/scala -// plugins are applied). The convention for these configurations is <sourceSetName>Compile and -// <sourceSetName>Runtime. The following statements declare that all the dependencies from the -// testCompile configuration will now be available for integrationTestCompile, and all the -// dependencies (and other configuration that we might have provided) for testRuntime will be -// available for integrationTestRuntime. For ex. the testCompile configuration has a dependency on -// jUnit and scalatest. This makes them available for the integration tests as well. -configurations { - integrationTestCompile { - extendsFrom testCompile - } - - integrationTestRuntime { - extendsFrom integrationTestCompile, testRuntime - } -} - -// To see the API that is being used here, consult the following docs -// http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ResolutionStrategy.html -def updateDependencyVersion(dependencyDetails, dependencyString) { - def parts = dependencyString.split(':') - def group = parts[0] - def name = parts[1] - def version = parts[2] - if (dependencyDetails.requested.group == group - && dependencyDetails.requested.name == name) { - dependencyDetails.useVersion version - } -} - -def setupPigIntegrationTestDependencyVersions(dependencyResolveDetails) { - // This is the way we override the dependencies. - updateDependencyVersion dependencyResolveDetails, "joda-time:joda-time:2.2" -} - -def setupCrunchIntegrationTestDependencyVersions(dependencyResolveDetails) { - // Specify any dependencies that you want to override for crunch integration tests. -} - -def setupMahoutIntegrationTestDependencyVersions(dependencyResolveDetails) { - // Specify any dependencies that you want to override for mahout integration tests. -} - - -task integrationTest(type: Test, dependsOn: test) { - - testClassesDir = sourceSets.integrationTest.output.classesDir - classpath = sourceSets.integrationTest.runtimeClasspath - - if(!project.hasProperty('ITProfile')) { - // skip integration-tests if no profile has been specified. - integrationTest.onlyIf { false } - return; - } - - def patternsToInclude - def dependencyConfigClosure - def skipDependencyUpdates = false - // Select the pattern for test classes that should be executed, and the dependency - // configuration function to be called based on the profile name specified at the command line. - switch (project.ITProfile) { - case "pig": - patternsToInclude = "*PigIT*" - dependencyConfigClosure = { setupPigIntegrationTestDependencyVersions(it) } - break - case "crunch": - patternsToInclude = "*CrunchIT*" - dependencyConfigClosure = { setupCrunchIntegrationTestDependencyVersions(it) } - break - case "mahout": - patternsToInclude = "*MahoutIT*" - dependencyConfigClosure = { setupMahoutIntegrationTestDependencyVersions(it) } - break - // skip integration-tests if the passed in profile-name is not valid - default: integrationTest.onlyIf { false }; return - } - - - filter { includeTestsMatching patternsToInclude } - - // This is the standard way gradle allows overriding each specific dependency. - // see: http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ResolutionStrategy.html - project.configurations.all { - resolutionStrategy { - eachDependency { - dependencyConfigClosure(it) - } - } - } -} - -dependencies { - compile "org.kohsuke:graphviz-api:1.0" - compile "org.apache.crunch:crunch-core:0.9.0-hadoop2" - compile "com.jolbox:bonecp:${project.bonecpVersion}" - compile "org.apache.derby:derby:${project.derbyVersion}" - compile "com.google.guava:guava:${project.guavaVersion}" - compile "commons-lang:commons-lang:2.6" - compile "joda-time:joda-time:2.3" - compile "org.apache.commons:commons-lang3:3.1" - compile "com.google.protobuf:protobuf-java:2.5.0" - compile "commons-logging:commons-logging:1.1.3" - compile "com.thoughtworks.xstream:xstream:+" - compile "org.apache.lucene:lucene-core:+" - compile "org.apache.lucene:lucene-analyzers-common:+" - compile "org.apache.solr:solr-commons-csv:3.5.0" - - compile group: "org.apache.pig", name: "pig", version: "0.12.0", classifier:"h2" - compile "org.slf4j:slf4j-api:${project.slf4jVersion}" - compile "log4j:log4j:1.2.12" - compile "org.slf4j:slf4j-log4j12:${project.slf4jVersion}" - compile "org.datanucleus:datanucleus-core:${project.datanucleusVersion}" - compile "org.datanucleus:datanucleus-rdbms:${project.datanucleusJpaVersion}" - compile "org.datanucleus:datanucleus-api-jdo:${project.datanucleusJpaVersion}" - compile "org.datanucleus:datanucleus-accessplatform-jdo-rdbms:${project.datanucleusJpaVersion}" - compile group: "org.apache.mrunit", name: "mrunit", version: "1.0.0", classifier:"hadoop2" - - compile "org.jfairy:jfairy:0.2.4" - - // from horton-works repo. They compile mahout-core against hadoop2.x - compile "org.apache.hadoop:hadoop-client:${hadoopVersion}" - compile "org.apache.mahout:mahout-core:${mahoutVersion}" - - compile 'org.scala-lang:scala-library:2.11.0' - - testCompile "junit:junit:4.11" - testCompile "org.hamcrest:hamcrest-all:1.3" - testCompile "org.scalatest:scalatest_2.11:2.1.7" -} - -configurations { - hadoopClusterRuntime { - // extendsFrom integrationTestRuntime - if(project.hasProperty('for-cluster')) { - excludeRules += [getGroup: { 'org.apache.crunch' }, getModule: { 'crunch-core' } ] as ExcludeRule - excludeRules += [getGroup: { 'org.apache.pig' }, getModule: { 'pig' } ] as ExcludeRule - excludeRules += [getGroup: { 'org.apache.mahout' }, getModule: { 'mahout-core' } ] as ExcludeRule - excludeRules += [getGroup: { 'org.apache.hadoop' }, getModule: { 'hadoop-client' } ] as ExcludeRule - } - } -} - -task listJars << { - configurations.shadow.each { println it.name } -} - -def copyDependencyJarsForHadoopCluster() { - copy { - from configurations.hadoopClusterRuntime - into 'build/libs' - } -} - -build { - doLast { - copyDependencyJarsForHadoopCluster() - } -} - -eclipse { - classpath { - // Add the dependencies and the src dirs for the integrationTest source-set to the - // .classpath file that will be generated by the eclipse plugin. - plusConfigurations += [configurations.integrationTestCompile] - // Comment out the following two lines if you want to generate an eclipse project quickly. - downloadSources = true - downloadJavadoc = false - } -} http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/pom.xml ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/pom.xml b/bigtop-bigpetstore/pom.xml deleted file mode 100644 index ef328ae..0000000 --- a/bigtop-bigpetstore/pom.xml +++ /dev/null @@ -1,584 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <groupId>org.apache.bigtop</groupId> - <artifactId>BigPetStore</artifactId> - <version>0.9.0-SNAPSHOT</version> - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - <slf4j.version>1.7.5</slf4j.version> - <guava.version>15.0</guava.version> - <hadoop.version>2.2.0</hadoop.version> - <hive.version>0.12.0</hive.version> - <datanucleus.version>3.2.2</datanucleus.version> - <datanucleus.jpa.version>3.2.1</datanucleus.jpa.version> - <bonecp.version>0.9.0-SNAPSHOT.RELEASE</bonecp.version> - <derby.version>10.10.1.1</derby.version> - <plugin.surefire.version>2.17</plugin.surefire.version> - </properties> - - <dependencies> - <dependency> - <groupId>org.kohsuke</groupId> - <artifactId>graphviz-api</artifactId> - <version>1.0</version> - </dependency> - - <dependency> - <groupId>org.apache.crunch</groupId> - <artifactId>crunch-core</artifactId> - <version>0.9.0-hadoop2</version> - </dependency> - - <!-- misc deps --> - <dependency> - <groupId>com.jolbox</groupId> - <artifactId>bonecp</artifactId> - <version>${bonecp.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.derby</groupId> - <artifactId>derby</artifactId> - <version>${derby.version}</version> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - - <!-- From pig profile --> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <version>2.6</version> - </dependency> - - <dependency> - <groupId>joda-time</groupId> - <artifactId>joda-time</artifactId> - <version>2.3</version> - </dependency> - <!-- end pig profile --> - <!-- From hive profile --> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>3.1</version> - </dependency> - <!-- end hive profile --> - <!-- From Crunch profile --> - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <version>2.5.0</version> - </dependency> - <!-- end crunch profile --> - <!-- From Mahout profile --> - <dependency> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - <version>1.1.3</version> - </dependency> - <dependency> - <groupId>org.apache.mahout</groupId> - <artifactId>mahout-math</artifactId> - <version>0.9</version> - </dependency> - <dependency> - <groupId>com.thoughtworks.xstream</groupId> - <artifactId>xstream</artifactId> - <version>LATEST</version> - </dependency> - <dependency> - <groupId>org.apache.lucene</groupId> - <artifactId>lucene-core</artifactId> - <version>LATEST</version> - </dependency> - <dependency> - <groupId>org.apache.lucene</groupId> - <artifactId>lucene-analyzers-common</artifactId> - <version>LATEST</version> - </dependency> - <dependency> - <groupId>org.apache.mahout.commons</groupId> - <artifactId>commons-cli</artifactId> - <version>LATEST</version> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-math3</artifactId> - <version>LATEST</version> - </dependency> - <dependency> - <groupId>org.apache.solr</groupId> - <artifactId>solr-commons-csv</artifactId> - <version>3.5.0</version> - </dependency> - <!-- end Mahout profile --> - - <!-- TODO ask question about this comment --> - <!-- We keep this at top level so that mvn eclipse:eclipse creates a nice - tidy project, but its a little messy. later we'll create a profile for eclipse - and move this (and other deps) into profiles as needed. Important: Remove - this dependency when running hive integration tests... --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> - </dependency> - <!-- TODO ask question about this comment --> - <!-- mahout deps : may need to turn these on/off when testing mahout locally --> - <!-- For testing on my machine, I created a bigpetstore mahout jar which - is compiled for 2.2.0 . Or substitute this with the standard apache mahout-core - but not sure if it will work. --> - <dependency> - <groupId>org.apache.mahout</groupId> - <artifactId>mahout-core</artifactId> - <version>0.8</version> - </dependency> - <!-- pig deps --> - <dependency> - <groupId>org.apache.pig</groupId> - <artifactId>pig</artifactId> - <classifier>h2</classifier> - <version>0.12.0</version> - </dependency> - - <!--logging --> - - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>${slf4j.version}</version> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <version>1.2.12</version> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>${slf4j.version}</version> - </dependency> - <!-- hive --> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-common</artifactId> - <version>${hive.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-serde</artifactId> - <version>${hive.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-jdbc</artifactId> - <version>${hive.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-contrib</artifactId> - <version>${hive.version}</version> - </dependency> - - <!-- datanucleus --> - <dependency> - <groupId>org.datanucleus</groupId> - <artifactId>datanucleus-core</artifactId> - <version>${datanucleus.version}</version> - </dependency> - - <dependency> - <groupId>org.datanucleus</groupId> - <artifactId>datanucleus-rdbms</artifactId> - <version>${datanucleus.jpa.version}</version> - </dependency> - - <dependency> - <groupId>org.datanucleus</groupId> - <artifactId>datanucleus-api-jdo</artifactId> - <version>${datanucleus.jpa.version}</version> - </dependency> - - <!-- TODO eliminate this pom dependency --> - <dependency> - <groupId>org.datanucleus</groupId> - <artifactId>datanucleus-accessplatform-jdo-rdbms</artifactId> - <version>${datanucleus.jpa.version}</version> - <type>pom</type> - </dependency> - - <!-- Unit test artifacts --> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.11</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.hamcrest</groupId> - <artifactId>hamcrest-all</artifactId> - <version>1.3</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.mrunit</groupId> - <artifactId>mrunit</artifactId> - <version>1.0.0</version> - <classifier>hadoop2</classifier> - </dependency> - </dependencies> - - <build> - <extensions> - <extension> - <groupId>org.springframework.build.aws</groupId> - <artifactId>org.springframework.build.aws.maven</artifactId> - <version>3.0.0.RELEASE</version> - </extension> - </extensions> - <finalName>bigpetstore-${project.version}</finalName> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-release-plugin</artifactId> - <version>2.5</version> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - <downloadJavadocs>true</downloadJavadocs> - </configuration> - </plugin> - - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <version>${maven-compiler-plugin.version}</version> - <configuration> - <source>1.8</source> - <target>1.8</target> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <version>2.4</version> - <configuration> - <outputDirectory>${basedir}/target</outputDirectory> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>${plugin.surefire.version}</version> - <configuration> - <excludes> - <exclude>**/*TestPig.java</exclude> - <exclude>**/*TestHiveEmbedded.java</exclude> - <exclude>**/*TestCrunch.java</exclude> - </excludes> - </configuration> - </plugin> - </plugins> - </build> - - <profiles> - <profile> - <id>pig</id> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>${plugin.surefire.version}</version> - <configuration> - <excludes> - <exclude>**/*TestPig.java</exclude> - <exclude>**/*TestHiveEmbedded.java</exclude> - <exclude>**/*TestCrunch.java</exclude> - <exclude>**/*TestPetStoreTransactionGeneratorJob.java</exclude> - </excludes> - - </configuration> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.5</version> - <executions> - <execution> - <id>add-test-source</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/integration/java</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-failsafe-plugin</artifactId> - <version>2.12</version> - - <configuration> - <argLine>-Xmx1g</argLine> - <excludes> - <exclude>**/*BigPetStoreMahoutIT.java</exclude> - <exclude>**/*BigPetStoreHiveIT.java</exclude> - <exclude>**/*BigPetStoreCrunchIT.java</exclude> - </excludes> - </configuration> - <executions> - <!-- States that both integration-test and verify goals of the Failsafe - Maven plugin are executed. --> - <execution> - <id>integration-tests</id> - <goals> - <goal>integration-test</goal> - <goal>verify</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - - <profile> - <id>hive</id> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>${plugin.surefire.version}</version> - <configuration> - <excludes> - <exclude>**/*TestPig.java</exclude> - <exclude>**/*TestHiveEmbedded.java</exclude> - <exclude>**/*TestCrunch.java</exclude> - <exclude>**/*TestPetStoreTransactionGeneratorJob.java</exclude> - </excludes> - </configuration> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.5</version> - <executions> - <execution> - <id>add-test-source</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/integration/java</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-failsafe-plugin</artifactId> - <version>2.12</version> - <configuration> - <excludes> - <exclude>**/*BigPetStoreMahoutIT.java</exclude> - <exclude>**/*BigPetStorePigIT.java</exclude> - <exclude>**/*BigPetStoreCrunchIT.java</exclude> - </excludes> - </configuration> - <executions> - <!-- States that both integration-test and verify goals of the Failsafe - Maven plugin are executed. --> - <execution> - <id>integration-tests</id> - <goals> - <goal>integration-test</goal> - <goal>verify</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - <dependencies> - <!-- hadoop --> - <!-- TODO is this version change required? Version 2.2.0 is provided - by hadoop-client dependency. Shouldn't we have the same versions for the - related dependencies? --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-app</artifactId> - <version>2.3.0</version> - </dependency> - </dependencies> - </profile> - <profile> - <id>crunch</id> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>${plugin.surefire.version}</version> - <configuration> - <excludes> - <exclude>**/*TestPig.java</exclude> - <exclude>**/*TestHiveEmbedded.java</exclude> - <exclude>**/*TestCrunch.java</exclude> - <exclude>**/*TestPetStoreTransactionGeneratorJob.java</exclude> - </excludes> - </configuration> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.5</version> - <executions> - <execution> - <id>add-test-source</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/integration/java</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-failsafe-plugin</artifactId> - <version>2.12</version> - <configuration> - <excludes> - <exclude>**/*BigPetStorePigIT.java</exclude> - <exclude>**/*BigPetStoreHiveIT.java</exclude> - <exclude>**/*BigPetStoreMahoutIT.java</exclude> - </excludes> - </configuration> - <executions> - <!-- States that both integration-test and verify goals of the Failsafe - Maven plugin are executed. --> - <execution> - <id>integration-tests</id> - <goals> - <goal>integration-test</goal> - <goal>verify</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - <profile> - <id>mahout</id> - <!-- TODO this property is not being used anywhere. It's not even automatically - detectable. Remove? Or do something that the name suggests? --> - <properties> - <skip.unit.tests>true</skip.unit.tests> - </properties> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>${plugin.surefire.version}</version> - <configuration> - <excludes> - <exclude>**/*TestPig.java</exclude> - <exclude>**/*TestHiveEmbedded.java</exclude> - <exclude>**/*TestCrunch.java</exclude> - <exclude>**/*TestPetStoreTransactionGeneratorJob.java</exclude> - </excludes> - </configuration> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.5</version> - <executions> - <execution> - <id>add-test-source</id> - <phase>generate-test-sources</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>src/integration/java</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-failsafe-plugin</artifactId> - <version>2.12</version> - <configuration> - <excludes> - <exclude>**/*BigPetStorePigIT.java</exclude> - <exclude>**/*BigPetStoreCrunchIT.java</exclude> - <exclude>**/*BigPetStoreHiveIT.java</exclude> - </excludes> - </configuration> - <executions> - <!-- States that both integration-test and verify goals of the Failsafe - Maven plugin are executed. --> - <execution> - <id>integration-tests</id> - <goals> - <goal>integration-test</goal> - <goal>verify</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - </profiles> -</project> http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/settings.gradle ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/settings.gradle b/bigtop-bigpetstore/settings.gradle deleted file mode 100644 index 53d74f2..0000000 --- a/bigtop-bigpetstore/settings.gradle +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -rootProject.name = 'BigPetStore' http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java deleted file mode 100644 index b07c5a0..0000000 --- a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.bigtop.bigpetstore; - -import static org.apache.bigtop.bigpetstore.ITUtils.createTestOutputPath; -import static org.apache.bigtop.bigpetstore.ITUtils.setup; - -import java.util.regex.Pattern; - -import org.apache.bigtop.bigpetstore.recommend.ItemRecommender; -import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants.OUTPUTS.MahoutPaths; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.junit.Before; -import org.junit.Test; - -import com.google.common.base.Predicate; - -public class BigPetStoreMahoutIT { - - public static final Path INPUT_DIR_PATH = - new Path(ITUtils.BPS_TEST_PIG_CLEANED, MahoutPaths.Mahout.name()); - public static final String INPUT_DIR_PATH_STR = INPUT_DIR_PATH.toString(); - private static final Path MAHOUT_OUTPUT_DIR = createTestOutputPath(MahoutPaths.Mahout.name()); - private static final Path ALS_FACTORIZATION_OUTPUT_DIR = - createTestOutputPath(MahoutPaths.Mahout.name(), MahoutPaths.AlsFactorization.name()); - private static final Path ALS_RECOMMENDATIONS_DIR = - createTestOutputPath(MahoutPaths.Mahout.name(), MahoutPaths.AlsRecommendations.name()); - - private ItemRecommender itemRecommender; - - @Before - public void setupTest() throws Throwable { - setup(); - try { - FileSystem fs = FileSystem.get(new Configuration()); - fs.delete(MAHOUT_OUTPUT_DIR, true); - itemRecommender = new ItemRecommender(INPUT_DIR_PATH_STR, ALS_FACTORIZATION_OUTPUT_DIR.toString(), - ALS_RECOMMENDATIONS_DIR.toString()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private static final Predicate<String> TEST_OUTPUT_FORMAT = new Predicate<String>() { - private final Pattern p = Pattern.compile("^\\d+\\s\\[\\d+:\\d+\\.\\d+\\]$"); - @Override - public boolean apply(String input) { - return p.matcher(input).matches(); - } - }; - - @Test - public void testPetStorePipeline() throws Exception { - itemRecommender.recommend(); - ITUtils.assertOutput(ALS_RECOMMENDATIONS_DIR, TEST_OUTPUT_FORMAT); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java deleted file mode 100644 index 78d5c6b..0000000 --- a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.bigtop.bigpetstore; - -import static org.apache.bigtop.bigpetstore.ITUtils.BPS_TEST_GENERATED; -import static org.apache.bigtop.bigpetstore.ITUtils.BPS_TEST_PIG_CLEANED; -import static org.apache.bigtop.bigpetstore.ITUtils.fs; - -import java.io.File; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.bigtop.bigpetstore.etl.PigCSVCleaner; -import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.pig.ExecType; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableMap; - -/** - * This is the main integration test for pig. Like all BPS integration tests, it - * is designed to simulate exactly what will happen on the actual cluster, - * except with a small amount of records. - * - * In addition to cleaning the dataset, it also runs the BPS_analytics.pig - * script which BigPetStore ships with. - */ -public class BigPetStorePigIT { - - final static Logger log = LoggerFactory.getLogger(BigPetStorePigIT.class); - - /** - * An extra unsupported code path that we have so people can do ad hoc - * analytics on pig data after it is cleaned. - */ - public static final Path BPS_TEST_PIG_COUNT_PRODUCTS = fs - .makeQualified(new Path("bps_integration_", - BigPetStoreConstants.OUTPUTS.pig_ad_hoc_script.name() + "0")); - - static final File PIG_SCRIPT = new File("BPS_analytics.pig"); - - static { - if (!PIG_SCRIPT.exists()) { - throw new RuntimeException("Couldnt find pig script at " + PIG_SCRIPT.getAbsolutePath()); - } - } - - @Before - public void setupTest() throws Throwable { - ITUtils.setup(); - try { - FileSystem.get(new Configuration()).delete(BPS_TEST_PIG_CLEANED, true); - FileSystem.get(new Configuration()).delete(BPS_TEST_PIG_COUNT_PRODUCTS, true); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - static Map<Path, Predicate<String>> TESTS = ImmutableMap.of( - /** Test of the main output */ - BPS_TEST_PIG_CLEANED, ITUtils.VERIFICATION_PERDICATE, - // Example of how to count products after doing basic pig data cleanup - BPS_TEST_PIG_COUNT_PRODUCTS, ITUtils.VERIFICATION_PERDICATE, - // Test the output that is to be used as an input for Mahout. - BigPetStoreMahoutIT.INPUT_DIR_PATH, ITUtils.VERIFICATION_PERDICATE - ); - - @Test - public void testPetStoreCorePipeline() throws Exception { - runPig(BPS_TEST_GENERATED, BPS_TEST_PIG_CLEANED, PIG_SCRIPT); - for (Entry<Path, Predicate<String>> e : TESTS.entrySet()) { - ITUtils.assertOutput(e.getKey(), e.getValue()); - } - } - - private void runPig(Path input, Path output, File pigscript) - throws Exception { - new PigCSVCleaner(input, output, ExecType.LOCAL, pigscript); - } -} http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java deleted file mode 100644 index fd53dc1..0000000 --- a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.bigtop.bigpetstore; - -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.InetAddress; -import java.nio.charset.Charset; -import java.util.List; - -import org.apache.bigtop.bigpetstore.generator.BPSGenerator; -import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.junit.Assert; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Predicate; -import com.google.common.io.Files; - -public class ITUtils { - public static final Path TEST_OUTPUT_DIR = new Path("bps_integration_"); - - public static Predicate<String> VERIFICATION_PERDICATE = new Predicate<String>() { - @Override - public boolean apply(String input) { - return true; - } - }; - - static final Logger log = LoggerFactory.getLogger(ITUtils.class); - - static FileSystem fs; - static { - try { - fs = FileSystem.getLocal(new Configuration()); - } catch (Throwable e) { - String cpath = (String) System.getProperties().get("java.class.path"); - String msg = ""; - for (String cp : cpath.split(":")) { - if (cp.contains("hadoop")) { - msg += cp.replaceAll("hadoop", "**HADOOP**") + "\n"; - } - } - throw new RuntimeException("Major error: Probably issue. " - + "Check hadoop version? " + e.getMessage() - + " .... check these classpath elements:" + msg); - } - } - - public static final Path BPS_TEST_GENERATED = - createTestOutputPath(BigPetStoreConstants.OUTPUTS.generated.name()); - public static final Path BPS_TEST_PIG_CLEANED = - createTestOutputPath (BigPetStoreConstants.OUTPUTS.cleaned.name()); - - public static Path createTestOutputPath(String... pathParts) { - Path path = TEST_OUTPUT_DIR; - for(String pathPart: pathParts) { - path = new Path(path, pathPart); - } - return path; - } - - /** - * Some simple checks to make sure that unit tests in local FS. these arent - * designed to be run against a distribtued system. - */ - public static void checkConf(Configuration conf) throws Exception { - if (conf.get("mapreduce.jobtracker.address") == null) { - log.warn("Missing mapreduce.jobtracker.address???????!!!! " + "This can be the case in hive tests which use special " - + "configurations, but we should fix it sometime."); - return; - } - if (!conf.get("mapreduce.jobtracker.address").equals("local")) { - throw new RuntimeException("ERROR: bad conf : " + "mapreduce.jobtracker.address"); - } - if (!conf.get("fs.AbstractFileSystem.file.impl").contains("Local")) { - throw new RuntimeException("ERROR: bad conf : " + "mapreduce.jobtracker.address"); - } - try { - InetAddress addr = java.net.InetAddress.getLocalHost(); - System.out.println("Localhost = hn=" + addr.getHostName() + " / ha=" + addr.getHostAddress()); - } catch (Throwable e) { - throw new RuntimeException(" ERROR : Hadoop wont work at all on this machine yet" - + "...I can't get / resolve localhost ! Check java version/ " + "/etc/hosts / DNS or other networking related issues on your box" - + e.getMessage()); - } - } - - /** - * Creates a generated input data set in - * - * test_data_directory/generated. i.e. - * test_data_directory/generated/part-r-00000 - */ - public static void setup() throws Throwable { - Configuration conf = new Configuration(); - - // debugging for Jeff and others in local fs that won't build - checkConf(conf); - - conf.setInt(BPSGenerator.props.bigpetstore_records.name(), BPSGenerator.DEFAULT_NUM_RECORDS); - - if (FileSystem.getLocal(conf).exists(BPS_TEST_GENERATED)) { - return; - } - - Job createInput = BPSGenerator.getCreateTransactionRecordsJob(BPS_TEST_GENERATED, conf); - createInput.waitForCompletion(true); - - Path outputfile = new Path(BPS_TEST_GENERATED, "part-r-00000"); - List<String> lines = Files.readLines(FileSystem.getLocal(conf).pathToFile(outputfile), Charset.defaultCharset()); - log.info("output : " + FileSystem.getLocal(conf).pathToFile(outputfile)); - for (String l : lines) { - System.out.println(l); - } - } - - - // A functions that logs the output file as a verification test - public static void assertOutput(Path base, Predicate<String> validator) throws Exception { - FileSystem fs = FileSystem.getLocal(new Configuration()); - - FileStatus[] files = fs.listStatus(base); - // print out all the files. - for (FileStatus stat : files) { - System.out.println(stat.getPath() + " " + stat.getLen()); - } - - /** - * Support map OR reduce outputs - */ - Path partm = new Path(base, "part-m-00000"); - Path partr = new Path(base, "part-r-00000"); - Path p = fs.exists(partm) ? partm : partr; - - /** - * Now we read through the file and validate its contents. - */ - BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p))); - - // line:{"product":"big chew toy","count":3} - while (r.ready()) { - String line = r.readLine(); - log.info("line:" + line); - // System.out.println("line:"+line); - Assert.assertTrue("validationg line : " + line, validator.apply(line)); - } - } - -} http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/contract/PetStoreStatistics.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/contract/PetStoreStatistics.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/contract/PetStoreStatistics.java deleted file mode 100755 index ed618a8..0000000 --- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/contract/PetStoreStatistics.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.bigtop.bigpetstore.contract; - -import java.util.Map; - -/** - * This is the contract for the web site. This object is created by each ETL - * tool : Summary stats. - */ -public abstract class PetStoreStatistics { - - public abstract Map<String, ? extends Number> numberOfTransactionsByState() - throws Exception; - - public abstract Map<String, ? extends Number> numberOfProductsByProduct() - throws Exception; - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/CrunchETL.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/CrunchETL.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/CrunchETL.java deleted file mode 100755 index f6f459c..0000000 --- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/CrunchETL.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.bigtop.bigpetstore.etl; - -import java.util.Map; - -import org.apache.bigtop.bigpetstore.contract.PetStoreStatistics; -import org.apache.crunch.FilterFn; -import org.apache.crunch.MapFn; -import org.apache.crunch.PCollection; -import org.apache.crunch.PTable; -import org.apache.crunch.Pair; -import org.apache.crunch.Pipeline; -import org.apache.crunch.impl.mem.MemPipeline; -import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.io.From; -import org.apache.crunch.types.avro.Avros; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -public class CrunchETL extends PetStoreStatistics { - - public static MapFn<LineItem, String> COUNT_BY_PRODUCT = new MapFn<LineItem, String>() { - public String map(LineItem lineItem) { - try { - return lineItem.getDescription(); - } catch (Throwable t) { - throw new RuntimeException(t); - } - } - }; - public static MapFn<LineItem, String> COUNT_BY_STATE = new MapFn<LineItem, String>() { - public String map(LineItem lineItem) { - try { - return lineItem.getDescription(); - } catch (Throwable t) { - throw new RuntimeException(t); - } - } - }; - - PCollection<LineItem> lineItems; - - public CrunchETL(Path input, Path output) throws Exception { - Pipeline pipeline = MemPipeline.getInstance(); - PCollection<String> lines = pipeline.read(From.textFile(new Path(input, - "part-r-00000"))); - System.out.println("crunch : " + lines.getName() + " " - + lines.getSize()); - lineItems = lines.parallelDo(ETL, Avros.reflects(LineItem.class)); - - } - - public static MapFn ETL = new MapFn<String, LineItem>() { - @Override - public LineItem map(String input) { - String[] fields = input.split(","); - LineItem li = new LineItem(); - li.setAppName(fields[1]); - li.setFirstName(fields[3]); - // ... - li.setDescription(fields[fields.length - 1]); - return li; - } - }; - - @Override - public Map<String, ? extends Number> numberOfTransactionsByState() - throws Exception { - PTable<String, Long> counts = lineItems.parallelDo(COUNT_BY_STATE, - Avros.strings()).count(); - Map m = counts.materializeToMap(); - - System.out.println("Crunch::: " + m); - return m; - } - - @Override - public Map<String, ? extends Number> numberOfProductsByProduct() - throws Exception { - PTable<String, Long> counts = lineItems.parallelDo(COUNT_BY_PRODUCT, - Avros.strings()).count(); - Map m = counts.materializeToMap(); - //CrunchETL. System.out.println("Crunch::: " + m); - return m; - } - - public static void main(String... args) throws Exception { - /** - * PCollection<String> lines = MemPipeline .collectionOf( - * "BigPetStore,storeCode_AK,1 lindsay,franco,Sat Jan 10 00:11:10 EST 1970,10.5,dog-food" - * "BigPetStore,storeCode_AZ,1 tom,giles,Sun Dec 28 23:08:45 EST 1969,10.5,dog-food" - * "BigPetStore,storeCode_CA,1 brandon,ewing,Mon Dec 08 20:23:57 EST 1969,16.5,organic-dog-food" - * "BigPetStore,storeCode_CA,2 angie,coleman,Thu Dec 11 07:00:31 EST 1969,10.5,dog-food" - * "BigPetStore,storeCode_CA,3 angie,coleman,Tue Jan 20 06:24:23 EST 1970,7.5,cat-food" - * "BigPetStore,storeCode_CO,1 sharon,trevino,Mon Jan 12 07:52:10 EST 1970,30.1,antelope snacks" - * "BigPetStore,storeCode_CT,1 kevin,fitzpatrick,Wed Dec 10 05:24:13 EST 1969,10.5,dog-food" - * "BigPetStore,storeCode_NY,1 dale,holden,Mon Jan 12 23:02:13 EST 1970,19.75,fish-food" - * "BigPetStore,storeCode_NY,2 dale,holden,Tue Dec 30 12:29:52 EST 1969,10.5,dog-food" - * "BigPetStore,storeCode_OK,1 donnie,tucker,Sun Jan 18 04:50:26 EST 1970,7.5,cat-food" - * ); - **/ - // FAILS - Pipeline pipeline = new MRPipeline(CrunchETL.class); - - PCollection<String> lines = pipeline.read(From.textFile(new Path( - "/tmp/BigPetStore1388719888255/generated/part-r-00000"))); - - - PCollection<LineItem> lineItems = lines.parallelDo( - new MapFn<String, LineItem>() { - @Override - public LineItem map(String input) { - - System.out.println("proc1 " + input); - String[] fields = input.split(","); - LineItem li = new LineItem(); - li.setAppName("" + fields[1]); - li.setFirstName("" + fields[3]); - li.setDescription("" + fields[fields.length - 1]); - return li; - } - }, Avros.reflects(LineItem.class)); - - for (LineItem i : lineItems.materialize()) - System.out.println(i); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bigtop/blob/6ec6cebf/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/LineItem.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/LineItem.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/LineItem.java deleted file mode 100755 index 87e5d0d..0000000 --- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/LineItem.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.bigtop.bigpetstore.etl; - -import java.io.Serializable; - -public class LineItem implements Serializable{ - - public LineItem(String appName, String storeCode, Integer lineId, String firstName, String lastName, String timestamp, Double price, String description){ - super(); - this.appName=appName; - this.storeCode=storeCode; - this.lineId=lineId; - this.firstName=firstName; - this.lastName=lastName; - this.timestamp=timestamp; - this.price=price; - this.description=description; - } - - String appName; - String storeCode; - Integer lineId; - String firstName; - String lastName; - String timestamp; - Double price; - String description; - - public LineItem(){ - super(); - } - - public String getAppName(){ - return appName; - } - - public void setAppName(String appName){ - this.appName=appName; - } - - public String getStoreCode(){ - return storeCode; - } - - public void setStoreCode(String storeCode){ - this.storeCode=storeCode; - } - - public int getLineId(){ - return lineId; - } - - public void setLineId(int lineId){ - this.lineId=lineId; - } - - public String getFirstName(){ - return firstName; - } - - public void setFirstName(String firstName){ - this.firstName=firstName; - } - - public String getLastName(){ - return lastName; - } - - public void setLastName(String lastName){ - this.lastName=lastName; - } - - public String getTimestamp(){ - return timestamp; - } - - public void setTimestamp(String timestamp){ - this.timestamp=timestamp; - } - - public double getPrice(){ - return price; - } - - public void setPrice(double price){ - this.price=price; - } - - public String getDescription(){ - return description; - } - - public void setDescription(String description){ - this.description=description; - } - - // other constructors, parsers, etc. -} \ No newline at end of file
