BIGTOP-1272: Productionize the mahout recommender Signed-off-by: [email protected] <jayunit100>
Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/4fca4573 Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/4fca4573 Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/4fca4573 Branch: refs/heads/master Commit: 4fca4573b388714e305cd365f4a9bed4f4c17e8c Parents: e9771e6 Author: bhashit parikh <[email protected]> Authored: Thu May 29 11:07:15 2014 +0530 Committer: [email protected] <jayunit100> Committed: Tue Aug 19 23:09:35 2014 -0400 ---------------------------------------------------------------------- bigtop-bigpetstore/BPS_analytics.pig | 10 +- bigtop-bigpetstore/README.md | 68 ++- bigtop-bigpetstore/arch.dot | 27 +- bigtop-bigpetstore/build.gradle | 327 +++++++------ .../bigtop/bigpetstore/BigPetStoreMahoutIT.java | 73 +++ .../bigtop/bigpetstore/BigPetStorePigIT.java | 68 +-- .../org/apache/bigtop/bigpetstore/ITUtils.java | 92 ++-- .../bigtop/bigpetstore/etl/PigCSVCleaner.java | 71 ++- .../bigpetstore/generator/BPSGenerator.java | 110 ++--- .../generator/CustomerGenerator.scala | 80 ++++ ...GeneratePetStoreTransactionsInputFormat.java | 134 ------ .../PetStoreTransactionInputSplit.java | 28 +- .../PetStoreTransactionsInputFormat.java | 139 ++++++ .../generator/TransactionIteratorFactory.java | 468 ------------------- .../bigpetstore/generator/util/Product.java | 63 +++ .../bigpetstore/generator/util/ProductType.java | 29 ++ .../bigpetstore/generator/util/State.java | 26 ++ .../bigpetstore/recommend/ItemRecommender.scala | 103 ++++ .../bigpetstore/util/BigPetStoreConstants.java | 17 +- .../bigpetstore/util/NumericalIdUtils.java | 10 +- .../apache/bigtop/bigpetstore/util/Pair.java | 125 ----- .../bigpetstore/generator/DataForger.scala | 263 +++++++++++ .../generator/TransactionIteratorFactory.scala | 104 +++++ .../generator/TestNumericalIdUtils.java | 8 +- .../TestPetStoreTransactionGeneratorJob.java | 10 +- 25 files changed, 1347 insertions(+), 1106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/BPS_analytics.pig ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/BPS_analytics.pig b/bigtop-bigpetstore/BPS_analytics.pig index 44ed541..23e3749 100755 --- a/bigtop-bigpetstore/BPS_analytics.pig +++ b/bigtop-bigpetstore/BPS_analytics.pig @@ -38,14 +38,16 @@ csvdata = dump:chararray, state:chararray, transaction:int, + custId:long, fname:chararray, lname:chararray, - date:chararray, + productId:int, + product:chararray, price:float, - product:chararray); + date:chararray); -- RESULT: --- (BigPetStore,storeCode_AK,1,jay,guy,Thu Dec 18 12:17:10 EST 1969,10.5,dog-food) +-- (BigPetStore,storeCode_AK,1,11,jay,guy,3,dog-food,10.5,Thu Dec 18 12:17:10 EST 1969) -- ... -- Okay! Now lets group our data so we can do some stats. @@ -55,7 +57,7 @@ csvdata = state_product = group csvdata by ( state, product ) ; -- RESULT --- ((storeCode_AK,dog-food) , {(BigPetStore,storeCode_AK,1,jay,guy,Thu Dec 18 12:17:10 EST 1969,10.5,dog-food)}) -- +-- ((storeCode_AK,dog-food) , {(BigPetStore,storeCode_AK,1,11,jay,guy,3,dog-food,10.5,Thu Dec 18 12:17:10 EST 1969)}) -- -- ... http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/README.md ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/README.md b/bigtop-bigpetstore/README.md index 40a9088..e58d8f3 100644 --- a/bigtop-bigpetstore/README.md +++ b/bigtop-bigpetstore/README.md @@ -13,7 +13,7 @@ Architecture The application consists of the following modules * generator: generates raw data on the dfs -* clustering: Apache Mahout demo code for processing the data using Item based Collaborative Filtering. This feature is not supported yet. You can track its progress using this [`JIRA` issue](https://issues.apache.org/jira/browse/BIGTOP-1272) +* recommendations: Apache Mahout demo code for generating recommendations by anaylyzing the transaction records. This feature can be tracked at this [`JIRA` issue](https://issues.apache.org/jira/browse/BIGTOP-1272) * Pig: demo code for processing the data using Apache Pig * Hive: demo code for processing the data using Apache Hive. This part is not complete yet. We are working on it. You can track it using this [`JIRA` issue](https://issues.apache.org/jira/browse/BIGTOP-1270) * Crunch: demo code for processing the data using Apache Crunch @@ -21,22 +21,22 @@ The application consists of the following modules Build Instructions ------------------ -You'll need to have [`gradle`](http://www.gradle.org/downloads) installed and set-up correctly in order to follow along these instructions. +You'll need to have version 2.0 of [`gradle`](http://www.gradle.org/downloads) installed and set-up correctly in order to follow along these instructions. We could have used the [`gradle-wrapper`](http://www.gradle.org/docs/current/userguide/gradle_wrapper.html) to avoid having to install `gradle`, but the `bigtop` project includes all `gradle*` directories in `.gitignore`. So, that's not going to work. ### Build the JAR - `gradle clean build` will build the bigpetstore `jar`. The `jar` will be located in the `build\libs` directory. +`gradle clean build` will build the bigpetstore `jar`. The `jar` will be located in the `build\libs` directory. ### Run Intergration Tests With * Pig profile: `gradle clean integrationTest -P ITProfile=pig` - * Crunch profile: `gradle clean integrationTest -P ITProfile=crunch` + * Mahout Profile: `gradle clean integrationTest -P ITProfile=mahout` + * Crunch profile: Not Implemented Yet * Hive profile: Not implemented yet. - * Mahout profile: Not implemented yet. If you don't specify any profile-name, or if you specify an invalid-name for the `integrationTest` task, no integration tests will be run. -*Note:* At this stage, only the `Pig` profile is working. Will continue to update this area as further work is completed. +*Note:* At this stage, only the `Pig` and `Mahout` profiles are working. Will continue to update this area as further work is completed. For Eclipse Users ----------------- @@ -87,14 +87,61 @@ The next phase of the application processes the data to create basic aggregation - try it [on the gh-pages branch](http://jayunit100.github.io/bigpetstore/) + Running on a hadoop cluster --------------------------- -wget s3://bigpetstore/bigpetstore.jar +*Note:* For running the code using the `hadoop jar` command instead of the `gradle` tasks, you will need to set the classpath appropriately. The discussion after [this comment][jira-mahout] in JIRA could also be useful apart from these instructions. + +### Build the fat-jar + +We are going to use a fat-jar in order to avoid specifying the entire classpath ourselves. + +The fat-jar is required when we are running the application on a hadoop cluster. The other way would be to specify all the dependencies (including the transitive ones) manually while running the hadoop job. Fat-jars make it easier to bundle almost all the dependencies inside the distribution jar itself. + +``` +gradle clean shadowJar -Pfor-cluster +``` + +This command will build the fat-jar with all the dependencies bundled in except the hadoop, mahout and pig dependencies, which we'll specify using `-libjars` option while running the hadoop job. These dependencies are excluded to avoid conflicts with the jars provided by hadoop itself. + +The generated jar will be inside the `build/libs` dir, with name like `BigPetStore-x.x.x-SNAPSHOT-all.jar`. For the remainig discussion I'll refer to this jar by `bps.jar`. + +### Get the mahout and pig jars + +You'll need both mahout and pig jars with the hadoop classes excluded. Commonly, you can find both of these in their respective distros. The required pig jar is generally named like `pig-x.x.x-withouthadoop.jar` and the mahout jar would be named like `mahout-core-job.jar`. If you want, you can build those yourself by following the instructions in [this JIRA comment][jira-mahout]]. For the remaining discussion, I am going to refer to these two jars by `pig-withouthadoop.jar` and `mahout-core-job.jar`. + +### Setup the classpath for hadoop nodes in the cluster + +``` +export JARS="/usr/lib/pig/pig-withouthadoop.jar,/usr/lib/mahout/mahout-core-job.jar" +``` + +We also need these jars to be present on the client side to kick-off the jobs. Reusing the `JARS` variable to put the same jars on the client classpath. + +``` +export HADOOP_CLASSPATH=`echo $JARS | sed s/,/:/g` +``` + +### Generate the data + +``` +hadoop jar bps.jar org.apache.bigtop.bigpetstore.generator.BPSGenerator 1000000 bigpetstore/gen +``` + +### Clean with pig + +``` +hadoop jar bps.jar org.apache.bigtop.bigpetstore.etl.PigCSVCleaner -libjars $JARS bigpetstore/gen/ bigpetstore/ custom_pigscript.pig +``` + +### Analyze and generate recommendations with mahout + +``` +hadoop jar bps.jar org.apache.bigtop.bigpetstore.recommend.ItemRecommender -libjars $JARS bigpetstore/pig/Mahout bigpetstore/Mahout/AlsFactorization bigpetstore/Mahout/AlsRecommendations +``` -hadoop jar bigpetstore.jar org.apache.bigtop.bigpetstore.generator.BPSGenerator 1000000 bigpetstore/gen -hadoop jar bigpetstore.jar org.apache.bigtop.bigpetstore.etl.PigCSVCleaner bigpetstore/gen/ bigpetstore/pig/ custom_pigscript.pig ... (will add more steps as we add more phases to the workflow) ... @@ -134,3 +181,6 @@ of EMR setup w/ a custom script). ... And so on. + + +[jira-mahout]: https://issues.apache.org/jira/browse/BIGTOP-1272?focusedCommentId=14076023&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-1407602 http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/arch.dot ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/arch.dot b/bigtop-bigpetstore/arch.dot index 0f3f404..7d17c5a 100644 --- a/bigtop-bigpetstore/arch.dot +++ b/bigtop-bigpetstore/arch.dot @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,25 +18,24 @@ digraph bigpetstore { node [shape=record]; - PROD_And_USER_HASH_FUNC [label="python or datafu udf" ,style="rounded,filled", shape=diamond]; + BPSAnalytics [label="BPSAnalytics.pig" ,style="rounded, filled", shape=diamond]; CUSTOMER_PAGE [label="CUSTOMER_PAGE|json|CUSTOMER_PAGE/part*"]; DIRTY_CSV [label="DIRTY_CSV|fname lname -prod , price ,prod,..|generated/part*"]; CSV [label="CSV|fname,lname,prod,price,date,xcoord,ycoord,...|cleaned/part*"]; - MAHOUT_VIEW_INPUT [label="MAHOUT_VIEW | (hashed name) 10001, (hashed purchases) 203 | <hive_warehouse>/mahout_cf_in/part*" ]; - MAHOUT_CF [label="MAHOUT collaborative filter output | (hashed name) 10001, (hashed product) 201, .6 | mahout_cf_out/part*" ]; + MAHOUT_VIEW_INPUT [label="MAHOUT_VIEW | (user-id) 10001 (product-id) 203 (implicit-rating) 1 | cleaned/Mahout/part*" ]; + MAHOUT_ALS [label="Parallel ALS Recommender output | (user-id) 10001 [(product-id) 201: (recommendation-strength 0-1)0.546] | Mahout/AlsRecommendations/part*" ]; Generate -> DIRTY_CSV [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.generator.BPSGenerator 100 bps/generated/"] ; - DIRTY_CSV -> pig [label=""]; + DIRTY_CSV -> pig [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.etl.PigCSVCleaner bps/generated/ bps/cleaned/ "]; - pig -> CSV [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.etl.PigCSVCleaner bps/generated/ bps/cleaned/ "]; - CSV -> MAHOUT_VIEW_INPUT [label="BPS_Mahout_Viewbuilder.pig"]; - PROD_And_USER_HASH_FUNC -> MAHOUT_VIEW_INPUT [label="used in BPS_MAHOUT_Viewbuilder.pig script"] ; + pig -> CSV [label="pig query to clean up generated transaction records"]; + pig -> MAHOUT_VIEW_INPUT [label="pig query to produce mahout input format"]; - MAHOUT_VIEW_INPUT -> mahout; - mahout -> MAHOUT_CF [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.analytics.BPSRecommender bps/mahout_cf_in/part* bps/mahout_cf_out/"]; + MAHOUT_VIEW_INPUT -> ParallelALSFactorizationJob [label="hadoop jar bigpetstore.jar org.apache.bigtop.bigpetstore.recommend.ItemRecommender cleaned/Mahout Mahout/AlsFactorization Mahout/AlsRecommendations"]; + ParallelALSFactorizationJob -> "Mahout RecommenderJob" + "Mahout RecommenderJob" -> MAHOUT_ALS - CSV -> pig_job2; - MAHOUT_CF -> pig_job2 ; - PROD_And_USER_HASH_FUNC -> pig_job2; - pig_job2 -> CUSTOMER_PAGE [label="hadoop jar bigpetstore.jar org.bigtop.bigpetstore.analytics.BPSRecommender bpg/cleaned/ bps/mahout_cf_out/"]; + CSV -> BPSAnalytics; + BPSAnalytics -> pig_job2; + pig_job2 -> CUSTOMER_PAGE [label=""]; } http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/build.gradle ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/build.gradle b/bigtop-bigpetstore/build.gradle index efb69b3..a6a8c1a 100644 --- a/bigtop-bigpetstore/build.gradle +++ b/bigtop-bigpetstore/build.gradle @@ -3,6 +3,14 @@ apply plugin: "eclipse" // TODO add idea module config. apply plugin: "idea" apply plugin: "scala" +apply plugin: 'com.github.johnrengelman.shadow' + +buildscript { + repositories { jcenter() } + dependencies { + classpath 'com.github.jengelman.gradle.plugins:shadow:1.0.2' + } +} // Read the groupId and version properties from the "parent" bigtop project. // It would be better if there was some better way of doing this. Howvever, @@ -10,9 +18,9 @@ apply plugin: "scala" // projects can't have maven projects as parents (AFAIK. If there is a way to do it, // it doesn't seem to be well-documented). def setProjectProperties() { - Node xml = new XmlParser().parse("../pom.xml") - group = xml.groupId.first().value().first() - version = xml.version.first().value().first() + Node xml = new XmlParser().parse("../pom.xml") + group = xml.groupId.first().value().first() + version = xml.version.first().value().first() } setProjectProperties() @@ -27,40 +35,49 @@ targetCompatibility = 1.7 // Specify any additional project properties. ext { - slf4jVersion = "1.7.5" - guavaVersion = "15.0" - hadoopVersion = "2.2.0" - datanucleusVersion = "3.2.2" - datanucleusJpaVersion = "3.2.1" - bonecpVersion = "0.8.0.RELEASE" - derbyVersion = "10.10.1.1" + slf4jVersion = "1.7.5" + guavaVersion = "15.0" + datanucleusVersion = "3.2.2" + datanucleusJpaVersion = "3.2.1" + bonecpVersion = "0.8.0.RELEASE" + derbyVersion = "10.10.1.1" + + // from horton-works repo. They compile mahout-core against hadoop2.x. These + // mahout is compiled against 2.4.0 + hadoopVersion = "2.4.0.2.1.2.0-402" + mahoutVersion = "0.9.0.2.1.2.0-402" } repositories { - mavenCentral() + mavenCentral() + maven { + url "http://repo.hortonworks.com/content/repositories/releases/" + } } -tasks.withType(Compile) { - options.encoding = 'UTF-8' - options.compilerArgs << "-Xlint:all" +tasks.withType(AbstractCompile) { + options.encoding = 'UTF-8' + options.compilerArgs << "-Xlint:all" } tasks.withType(ScalaCompile) { - // Enables incremental compilation. - // http://www.gradle.org/docs/current/userguide/userguide_single.html#N12F78 - scalaCompileOptions.useAnt = false + // Enables incremental compilation. + // http://www.gradle.org/docs/current/userguide/userguide_single.html#N12F78 + scalaCompileOptions.useAnt = false } tasks.withType(Test) { - testLogging { - // Uncomment this if you want to see the console output from the tests. - // showStandardStreams = true - events "passed", "skipped", "failed" - } + testLogging { + // Uncomment this if you want to see the console output from the tests. + // showStandardStreams = true + events "passed", "skipped", "failed" + // show standard out and standard error of the test JVM(s) on the console + //showStandardStreams = true + } } test { - exclude "**/*TestPig.java", "**/*TestHiveEmbedded.java", "**/*TestCrunch.java", "**/*TestPetStoreTransactionGeneratorJob.java" + exclude "**/*TestPig.java", "**/*TestHiveEmbedded.java", "**/*TestCrunch.java", "**/*TestPetStoreTransactionGeneratorJob.java" } // Create a separate source-set for the src/integrationTest set of classes. The convention here @@ -68,19 +85,23 @@ test { // under the 'src' directory. So, in this case, it will look for a directory named 'src/integrationTest' // since the name of the source-set is 'integrationTest' sourceSets { - // The main and test source-sets are configured by both java and scala plugins. They contain - // all the src/main and src/test classes. The following statements make all of those classes - // available on the classpath for the integration-tests, for both java and scala. - integrationTest { - java { - compileClasspath += main.output + test.output - runtimeClasspath += main.output + test.output - } - scala { - compileClasspath += main.output + test.output - runtimeClasspath += main.output + test.output - } - } + main { + java.srcDirs = []; + scala.srcDirs = ["src/main/scala", "src/main/java"] + } + // The main and test source-sets are configured by both java and scala plugins. They contain + // all the src/main and src/test classes. The following statements make all of those classes + // available on the classpath for the integration-tests, for both java and scala. + integrationTest { + java { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + } + scala { + compileClasspath += main.output + test.output + runtimeClasspath += main.output + test.output + } + } } // Creating a source-set automatically add a couple of corresponding configurations (when java/scala @@ -91,120 +112,164 @@ sourceSets { // available for integrationTestRuntime. For ex. the testCompile configuration has a dependency on // jUnit and scalatest. This makes them available for the integration tests as well. configurations { - integrationTestCompile { - extendsFrom testCompile - } + integrationTestCompile { + extendsFrom testCompile + } - integrationTestRuntime { - extendsFrom integrationTestCompile, testRuntime - } + integrationTestRuntime { + extendsFrom integrationTestCompile, testRuntime + } } // To see the API that is being used here, consult the following docs // http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ResolutionStrategy.html def updateDependencyVersion(dependencyDetails, dependencyString) { - def parts = dependencyString.split(':') - def group = parts[0] - def name = parts[1] - def version = parts[2] - if (dependencyDetails.requested.group == group - && dependencyDetails.requested.name == name) { - dependencyDetails.useVersion version - } + def parts = dependencyString.split(':') + def group = parts[0] + def name = parts[1] + def version = parts[2] + if (dependencyDetails.requested.group == group + && dependencyDetails.requested.name == name) { + dependencyDetails.useVersion version + } } def setupPigIntegrationTestDependencyVersions(dependencyResolveDetails) { - // This is the way we override the dependencies. - updateDependencyVersion dependencyResolveDetails, "joda-time:joda-time:2.2" + // This is the way we override the dependencies. + updateDependencyVersion dependencyResolveDetails, "joda-time:joda-time:2.2" } def setupCrunchIntegrationTestDependencyVersions(dependencyResolveDetails) { - // Specify any dependencies that you want to override for crunch integration tests. + // Specify any dependencies that you want to override for crunch integration tests. +} + +def setupMahoutIntegrationTestDependencyVersions(dependencyResolveDetails) { + // Specify any dependencies that you want to override for mahout integration tests. } + task integrationTest(type: Test, dependsOn: test) { - testClassesDir = sourceSets.integrationTest.output.classesDir - classpath = sourceSets.integrationTest.runtimeClasspath - - if(!project.hasProperty('ITProfile')) { - // skip integration-tests if no profile has been specified. - integrationTest.onlyIf { false } - return; - } - - def patternsToInclude - def dependencyConfigClosure - def skipDependencyUpdates = false - // Select the pattern for test classes that should be executed, and the dependency - // configuration function to be called based on the profile name specified at the command line. - switch (project.ITProfile) { - case "pig": - patternsToInclude = "*PigIT*" - dependencyConfigClosure = { setupPigIntegrationTestDependencyVersions(it) } - break - case "crunch": - patternsToInclude = "*CrunchIT*" - dependencyConfigClosure = { setupCrunchIntegrationTestDependencyVersions(it) } - break - // skip integration-tests if the passed in profile-name is not valid - default: integrationTest.onlyIf { false }; return - } - - - filter { includeTestsMatching patternsToInclude } - - // This is the standard way gradle allows overriding each specific dependency. - // see: http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ResolutionStrategy.html - project.configurations.all { - resolutionStrategy { - eachDependency { - dependencyConfigClosure(it) - } - } - } + testClassesDir = sourceSets.integrationTest.output.classesDir + classpath = sourceSets.integrationTest.runtimeClasspath + + if(!project.hasProperty('ITProfile')) { + // skip integration-tests if no profile has been specified. + integrationTest.onlyIf { false } + return; + } + + def patternsToInclude + def dependencyConfigClosure + def skipDependencyUpdates = false + // Select the pattern for test classes that should be executed, and the dependency + // configuration function to be called based on the profile name specified at the command line. + switch (project.ITProfile) { + case "pig": + patternsToInclude = "*PigIT*" + dependencyConfigClosure = { setupPigIntegrationTestDependencyVersions(it) } + break + case "crunch": + patternsToInclude = "*CrunchIT*" + dependencyConfigClosure = { setupCrunchIntegrationTestDependencyVersions(it) } + break + case "mahout": + patternsToInclude = "*MahoutIT*" + dependencyConfigClosure = { setupMahoutIntegrationTestDependencyVersions(it) } + break + // skip integration-tests if the passed in profile-name is not valid + default: integrationTest.onlyIf { false }; return + } + + + filter { includeTestsMatching patternsToInclude } + + // This is the standard way gradle allows overriding each specific dependency. + // see: http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ResolutionStrategy.html + project.configurations.all { + resolutionStrategy { + eachDependency { + dependencyConfigClosure(it) + } + } + } } dependencies { - compile "org.kohsuke:graphviz-api:1.0" - compile "org.apache.crunch:crunch-core:0.9.0-hadoop2" - compile "com.jolbox:bonecp:${project.bonecpVersion}" - compile "org.apache.derby:derby:${project.derbyVersion}" - compile "com.google.guava:guava:${project.guavaVersion}" - compile "commons-lang:commons-lang:2.6" - compile "joda-time:joda-time:2.3" - compile "org.apache.commons:commons-lang3:3.1" - compile "com.google.protobuf:protobuf-java:2.5.0" - compile "commons-logging:commons-logging:1.1.3" - compile "com.thoughtworks.xstream:xstream:+" - compile "org.apache.lucene:lucene-core:+" - compile "org.apache.lucene:lucene-analyzers-common:+" - compile "org.apache.solr:solr-commons-csv:3.5.0" - compile "org.apache.hadoop:hadoop-client:${project.hadoopVersion}" - compile group: "org.apache.pig", name: "pig", version: "0.12.0", classifier:"h2" - compile "org.slf4j:slf4j-api:${project.slf4jVersion}" - compile "log4j:log4j:1.2.12" - compile "org.slf4j:slf4j-log4j12:${project.slf4jVersion}" - compile "org.datanucleus:datanucleus-core:${project.datanucleusVersion}" - compile "org.datanucleus:datanucleus-rdbms:${project.datanucleusJpaVersion}" - compile "org.datanucleus:datanucleus-api-jdo:${project.datanucleusJpaVersion}" - compile "org.datanucleus:datanucleus-accessplatform-jdo-rdbms:${project.datanucleusJpaVersion}" - compile group: "org.apache.mrunit", name: "mrunit", version: "1.0.0", classifier:"hadoop2" - - compile 'org.scala-lang:scala-library:2.10.0' - - testCompile "junit:junit:4.11" - testCompile "org.hamcrest:hamcrest-all:1.3" - testCompile "org.scalatest:scalatest_2.10:2.1.7" + compile "org.kohsuke:graphviz-api:1.0" + compile "org.apache.crunch:crunch-core:0.9.0-hadoop2" + compile "com.jolbox:bonecp:${project.bonecpVersion}" + compile "org.apache.derby:derby:${project.derbyVersion}" + compile "com.google.guava:guava:${project.guavaVersion}" + compile "commons-lang:commons-lang:2.6" + compile "joda-time:joda-time:2.3" + compile "org.apache.commons:commons-lang3:3.1" + compile "com.google.protobuf:protobuf-java:2.5.0" + compile "commons-logging:commons-logging:1.1.3" + compile "com.thoughtworks.xstream:xstream:+" + compile "org.apache.lucene:lucene-core:+" + compile "org.apache.lucene:lucene-analyzers-common:+" + compile "org.apache.solr:solr-commons-csv:3.5.0" + + compile group: "org.apache.pig", name: "pig", version: "0.12.0", classifier:"h2" + compile "org.slf4j:slf4j-api:${project.slf4jVersion}" + compile "log4j:log4j:1.2.12" + compile "org.slf4j:slf4j-log4j12:${project.slf4jVersion}" + compile "org.datanucleus:datanucleus-core:${project.datanucleusVersion}" + compile "org.datanucleus:datanucleus-rdbms:${project.datanucleusJpaVersion}" + compile "org.datanucleus:datanucleus-api-jdo:${project.datanucleusJpaVersion}" + compile "org.datanucleus:datanucleus-accessplatform-jdo-rdbms:${project.datanucleusJpaVersion}" + compile group: "org.apache.mrunit", name: "mrunit", version: "1.0.0", classifier:"hadoop2" + + compile "org.jfairy:jfairy:0.2.4" + + // from horton-works repo. They compile mahout-core against hadoop2.x + compile "org.apache.hadoop:hadoop-client:${hadoopVersion}" + compile "org.apache.mahout:mahout-core:${mahoutVersion}" + + compile 'org.scala-lang:scala-library:2.11.0' + + testCompile "junit:junit:4.11" + testCompile "org.hamcrest:hamcrest-all:1.3" + testCompile "org.scalatest:scalatest_2.11:2.1.7" } -eclipse { - classpath { - // Add the sependencies and the src dirs for the integrationTest source-set to the - // .classpath file that will be generated by the eclipse plugin. - plusConfigurations += configurations.integrationTestCompile - // Uncomment the following two lines if you want to generate an eclipse project quickly. - downloadSources = false - downloadJavadoc = false - } +configurations { + /* hadoopClusterRuntime */ runtime { + // extendsFrom integrationTestRuntime + if(project.hasProperty('for-cluster')) { + excludeRules += [getGroup: { 'org.apache.crunch' }, getModule: { 'crunch-core' } ] as ExcludeRule + excludeRules += [getGroup: { 'org.apache.pig' }, getModule: { 'pig' } ] as ExcludeRule + excludeRules += [getGroup: { 'org.apache.mahout' }, getModule: { 'mahout-core' } ] as ExcludeRule + excludeRules += [getGroup: { 'org.apache.hadoop' }, getModule: { 'hadoop-client' } ] as ExcludeRule + } + } } + +task listJars << { + configurations.shadow.each { println it.name } +} + +def copyDependencyJarsForHadoopCluster() { + copy { + from configurations.hadoopClusterRuntime + into 'build/libs' + } +} + +build { + doLast { + copyDependencyJarsForHadoopCluster() + } +} + +eclipse { + classpath { + // Add the dependencies and the src dirs for the integrationTest source-set to the + // .classpath file that will be generated by the eclipse plugin. + plusConfigurations += [configurations.integrationTestCompile] + // Comment out the following two lines if you want to generate an eclipse project quickly. + downloadSources = true + downloadJavadoc = false + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java new file mode 100644 index 0000000..b07c5a0 --- /dev/null +++ b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStoreMahoutIT.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bigtop.bigpetstore; + +import static org.apache.bigtop.bigpetstore.ITUtils.createTestOutputPath; +import static org.apache.bigtop.bigpetstore.ITUtils.setup; + +import java.util.regex.Pattern; + +import org.apache.bigtop.bigpetstore.recommend.ItemRecommender; +import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants.OUTPUTS.MahoutPaths; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Predicate; + +public class BigPetStoreMahoutIT { + + public static final Path INPUT_DIR_PATH = + new Path(ITUtils.BPS_TEST_PIG_CLEANED, MahoutPaths.Mahout.name()); + public static final String INPUT_DIR_PATH_STR = INPUT_DIR_PATH.toString(); + private static final Path MAHOUT_OUTPUT_DIR = createTestOutputPath(MahoutPaths.Mahout.name()); + private static final Path ALS_FACTORIZATION_OUTPUT_DIR = + createTestOutputPath(MahoutPaths.Mahout.name(), MahoutPaths.AlsFactorization.name()); + private static final Path ALS_RECOMMENDATIONS_DIR = + createTestOutputPath(MahoutPaths.Mahout.name(), MahoutPaths.AlsRecommendations.name()); + + private ItemRecommender itemRecommender; + + @Before + public void setupTest() throws Throwable { + setup(); + try { + FileSystem fs = FileSystem.get(new Configuration()); + fs.delete(MAHOUT_OUTPUT_DIR, true); + itemRecommender = new ItemRecommender(INPUT_DIR_PATH_STR, ALS_FACTORIZATION_OUTPUT_DIR.toString(), + ALS_RECOMMENDATIONS_DIR.toString()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static final Predicate<String> TEST_OUTPUT_FORMAT = new Predicate<String>() { + private final Pattern p = Pattern.compile("^\\d+\\s\\[\\d+:\\d+\\.\\d+\\]$"); + @Override + public boolean apply(String input) { + return p.matcher(input).matches(); + } + }; + + @Test + public void testPetStorePipeline() throws Exception { + itemRecommender.recommend(); + ITUtils.assertOutput(ALS_RECOMMENDATIONS_DIR, TEST_OUTPUT_FORMAT); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java index 045a9cf..78d5c6b 100644 --- a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java +++ b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/BigPetStorePigIT.java @@ -19,26 +19,22 @@ import static org.apache.bigtop.bigpetstore.ITUtils.BPS_TEST_GENERATED; import static org.apache.bigtop.bigpetstore.ITUtils.BPS_TEST_PIG_CLEANED; import static org.apache.bigtop.bigpetstore.ITUtils.fs; -import java.io.BufferedReader; import java.io.File; -import java.io.InputStreamReader; import java.util.Map; import java.util.Map.Entry; import org.apache.bigtop.bigpetstore.etl.PigCSVCleaner; import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.pig.ExecType; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; /** @@ -76,68 +72,24 @@ public class BigPetStorePigIT { FileSystem.get(new Configuration()).delete(BPS_TEST_PIG_CLEANED, true); FileSystem.get(new Configuration()).delete(BPS_TEST_PIG_COUNT_PRODUCTS, true); } catch (Exception e) { - System.out.println("didnt need to delete pig output."); - // not necessarily an error + throw new RuntimeException(e); } } - static Map<Path, Function<String, Boolean>> TESTS = ImmutableMap.of( + static Map<Path, Predicate<String>> TESTS = ImmutableMap.of( /** Test of the main output */ - BPS_TEST_PIG_CLEANED, new Function<String, Boolean>() { - public Boolean apply(String x) { - // System.out.println("Verified..."); - return true; - } - }, - // Example of how to count products - // after doing basic pig data cleanup - BPS_TEST_PIG_COUNT_PRODUCTS, new Function<String, Boolean>() { - // Jeff' - public Boolean apply(String x) { - return true; - } - } + BPS_TEST_PIG_CLEANED, ITUtils.VERIFICATION_PERDICATE, + // Example of how to count products after doing basic pig data cleanup + BPS_TEST_PIG_COUNT_PRODUCTS, ITUtils.VERIFICATION_PERDICATE, + // Test the output that is to be used as an input for Mahout. + BigPetStoreMahoutIT.INPUT_DIR_PATH, ITUtils.VERIFICATION_PERDICATE ); - /** - * The "core" task reformats data to TSV. lets test that first. - */ @Test public void testPetStoreCorePipeline() throws Exception { runPig(BPS_TEST_GENERATED, BPS_TEST_PIG_CLEANED, PIG_SCRIPT); - for (Entry<Path, Function<String, Boolean>> e : TESTS.entrySet()) { - assertOutput(e.getKey(), e.getValue()); - } - } - - public static void assertOutput(Path base, - Function<String, Boolean> validator) throws Exception { - FileSystem fs = FileSystem.getLocal(new Configuration()); - - FileStatus[] files = fs.listStatus(base); - // print out all the files. - for (FileStatus stat : files) { - System.out.println(stat.getPath() + " " + stat.getLen()); - } - - /** - * Support map OR reduce outputs - */ - Path partm = new Path(base, "part-m-00000"); - Path partr = new Path(base, "part-r-00000"); - Path p = fs.exists(partm) ? partm : partr; - - /** - * Now we read through the file and validate its contents. - */ - BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p))); - - // line:{"product":"big chew toy","count":3} - while (r.ready()) { - String line = r.readLine(); - log.info("line:" + line); - // System.out.println("line:"+line); - Assert.assertTrue("validationg line : " + line, validator.apply(line)); + for (Entry<Path, Predicate<String>> e : TESTS.entrySet()) { + ITUtils.assertOutput(e.getKey(), e.getValue()); } } http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java index df3b948..fd53dc1 100644 --- a/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java +++ b/bigtop-bigpetstore/src/integrationTest/java/org/apache/bigtop/bigpetstore/ITUtils.java @@ -15,6 +15,8 @@ */ package org.apache.bigtop.bigpetstore; +import java.io.BufferedReader; +import java.io.InputStreamReader; import java.net.InetAddress; import java.nio.charset.Charset; import java.util.List; @@ -22,15 +24,26 @@ import java.util.List; import org.apache.bigtop.bigpetstore.generator.BPSGenerator; import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; +import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Predicate; import com.google.common.io.Files; public class ITUtils { + public static final Path TEST_OUTPUT_DIR = new Path("bps_integration_"); + + public static Predicate<String> VERIFICATION_PERDICATE = new Predicate<String>() { + @Override + public boolean apply(String input) { + return true; + } + }; static final Logger log = LoggerFactory.getLogger(ITUtils.class); @@ -46,26 +59,25 @@ public class ITUtils { msg += cp.replaceAll("hadoop", "**HADOOP**") + "\n"; } } - throw new RuntimeException("Major error: Probably issue. " + "Check hadoop version? " + e.getMessage() - + " .... check these classpath elements:" + msg); + throw new RuntimeException("Major error: Probably issue. " + + "Check hadoop version? " + e.getMessage() + + " .... check these classpath elements:" + msg); } } - public static final Path BPS_TEST_GENERATED = fs.makeQualified(new Path("bps_integration_", - BigPetStoreConstants.OUTPUTS.generated.name())); - public static final Path BPS_TEST_PIG_CLEANED = fs.makeQualified(new Path("bps_integration_", - BigPetStoreConstants.OUTPUTS.cleaned.name())); - public static final Path BPS_TEST_MAHOUT_IN = fs.makeQualified(new Path("bps_integration_", - BigPetStoreConstants.OUTPUTS.MAHOUT_CF_IN.name())); - public static final Path BPS_TEST_MAHOUT_OUT = fs.makeQualified(new Path("bps_integration_", - BigPetStoreConstants.OUTPUTS.MAHOUT_CF_OUT.name())); - - public static void main(String[] args) { + public static final Path BPS_TEST_GENERATED = + createTestOutputPath(BigPetStoreConstants.OUTPUTS.generated.name()); + public static final Path BPS_TEST_PIG_CLEANED = + createTestOutputPath (BigPetStoreConstants.OUTPUTS.cleaned.name()); + + public static Path createTestOutputPath(String... pathParts) { + Path path = TEST_OUTPUT_DIR; + for(String pathPart: pathParts) { + path = new Path(path, pathPart); + } + return path; } - // public static final Path CRUNCH_OUT = new - // Path("bps_integration_",BigPetStoreConstants.OUTPUT_3).makeQualified(fs); - /** * Some simple checks to make sure that unit tests in local FS. these arent * designed to be run against a distribtued system. @@ -99,29 +111,18 @@ public class ITUtils { * test_data_directory/generated/part-r-00000 */ public static void setup() throws Throwable { - int records = 10; - /** - * Setup configuration with prop. - */ Configuration conf = new Configuration(); - // debugging for jeff and others in local fs - // that wont build + // debugging for Jeff and others in local fs that won't build checkConf(conf); - conf.setInt(BPSGenerator.props.bigpetstore_records.name(), records); + conf.setInt(BPSGenerator.props.bigpetstore_records.name(), BPSGenerator.DEFAULT_NUM_RECORDS); - /** - * Only create if doesnt exist already..... - */ if (FileSystem.getLocal(conf).exists(BPS_TEST_GENERATED)) { return; } - /** - * Create the data set. - */ - Job createInput = BPSGenerator.createJob(BPS_TEST_GENERATED, conf); + Job createInput = BPSGenerator.getCreateTransactionRecordsJob(BPS_TEST_GENERATED, conf); createInput.waitForCompletion(true); Path outputfile = new Path(BPS_TEST_GENERATED, "part-r-00000"); @@ -131,4 +132,37 @@ public class ITUtils { System.out.println(l); } } + + + // A functions that logs the output file as a verification test + public static void assertOutput(Path base, Predicate<String> validator) throws Exception { + FileSystem fs = FileSystem.getLocal(new Configuration()); + + FileStatus[] files = fs.listStatus(base); + // print out all the files. + for (FileStatus stat : files) { + System.out.println(stat.getPath() + " " + stat.getLen()); + } + + /** + * Support map OR reduce outputs + */ + Path partm = new Path(base, "part-m-00000"); + Path partr = new Path(base, "part-r-00000"); + Path p = fs.exists(partm) ? partm : partr; + + /** + * Now we read through the file and validate its contents. + */ + BufferedReader r = new BufferedReader(new InputStreamReader(fs.open(p))); + + // line:{"product":"big chew toy","count":3} + while (r.ready()) { + String line = r.readLine(); + log.info("line:" + line); + // System.out.println("line:"+line); + Assert.assertTrue("validationg line : " + line, validator.apply(line)); + } + } + } http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java index 01ddd6e..0ca7444 100644 --- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java +++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/etl/PigCSVCleaner.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,9 +21,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants; +import org.apache.bigtop.bigpetstore.util.BigPetStoreConstants.OUTPUTS; import org.apache.bigtop.bigpetstore.util.DeveloperTools; -import org.apache.bigtop.bigpetstore.util.NumericalIdUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,7 +32,7 @@ import org.apache.pig.ExecType; import org.apache.pig.PigServer; /** - * This class operates by ETL'ing the dataset into pig. + * This class operates by ETL'ing the data-set into pig. * The pigServer is persisted through the life of the class, so that the * intermediate data sets created in the constructor can be reused. */ @@ -41,11 +40,12 @@ public class PigCSVCleaner { PigServer pigServer; + private static Path getCleanedTsvPath(Path outputPath) { + return new Path(outputPath, OUTPUTS.tsv.name()); + } + public PigCSVCleaner(Path inputPath, Path outputPath, ExecType ex, File... scripts) throws Exception { - - - FileSystem fs = FileSystem.get(inputPath.toUri(), new Configuration()); if(! fs.exists(inputPath)){ @@ -61,36 +61,29 @@ public class PigCSVCleaner { /** * First, split the tabs up. * - * BigPetStore,storeCode_OK,2 yang,jay,Mon Dec 15 23:33:49 EST - * 1969,69.56,flea collar - * - * ("BigPetStore,storeCode_OK,2", - * "yang,jay,Mon Dec 15 23:33:49 EST 1969,69.56,flea collar") + * BigPetStore,storeCode_OK,2 1,yang,jay,3,flea collar,69.56,Mon Dec 15 23:33:49 EST 1969 * - * BigPetStore,storeCode_AK,1 amanda,fitzgerald,Sat Dec 20 09:44:25 EET - * 1969,7.5,cat-food + * ("BigPetStore,storeCode_OK,2", "1,yang,jay,3,flea collar,69.56,Mon Dec 15 23:33:49 EST 1969") */ - pigServer.registerQuery("csvdata = LOAD '<i>' AS (ID,DETAILS);" - .replaceAll("<i>", inputPath.toString())); + pigServer.registerQuery("csvdata = LOAD '<i>' AS (ID,DETAILS);".replaceAll("<i>", inputPath.toString())); + // currentCustomerId, firstName, lastName, product.id, product.name.toLowerCase, product.price, date /** - * Now, we want to split the two tab delimited feidls into uniform + * Now, we want to split the two tab delimited fields into uniform * fields of comma separated values. To do this, we 1) Internally split * the FIRST and SECOND fields by commas "a,b,c" --> (a,b,c) 2) FLATTEN * the FIRST and SECOND fields. (d,e) (a,b,c) -> d e a b c */ - pigServer - .registerQuery( - "id_details = FOREACH csvdata GENERATE " - + "FLATTEN" + "(STRSPLIT(ID,',',3)) AS " + - "(drop, code, transaction) ," - - + "FLATTEN" + "(STRSPLIT(DETAILS,',',5)) AS " + - "(lname, fname, date, price," + - "product:chararray);"); - - pigServer.store("id_details", outputPath.toString()); - + pigServer.registerQuery( + "id_details = FOREACH csvdata GENERATE " + + "FLATTEN(STRSPLIT(ID, ',', 3)) AS " + + "(drop, code, transaction) ," + + + "FLATTEN(STRSPLIT(DETAILS, ',', 7)) AS " + + "(custId, fname, lname, productId, product:chararray, price, date);"); + pigServer.registerQuery("mahout_records = FOREACH id_details GENERATE custId, productId, 1;"); + pigServer.store("id_details", getCleanedTsvPath(outputPath).toString()); + pigServer.store("mahout_records", new Path(outputPath, OUTPUTS.MahoutPaths.Mahout.name()).toString()); /** * Now we run scripts... this is where you can add some * arbitrary analytics. @@ -102,18 +95,13 @@ public class PigCSVCleaner { */ int i = 0; for(File script : scripts) { - Map<String,String> parameters = new HashMap<String,String>(); - parameters.put("input", - outputPath.toString()); + Map<String,String> parameters = new HashMap<>(); + parameters.put("input", getCleanedTsvPath(outputPath).toString()); Path dir = outputPath.getParent(); - Path adHocOut= - new Path( - dir, - BigPetStoreConstants.OUTPUTS.pig_ad_hoc_script.name()+(i++)); + Path adHocOut = new Path(dir, OUTPUTS.pig_ad_hoc_script.name() + (i++)); System.out.println("Setting default output to " + adHocOut); parameters.put("output", adHocOut.toString()); - pigServer.registerScript(script.getAbsolutePath(), parameters); } } @@ -123,7 +111,7 @@ public class PigCSVCleaner { for(int i = startIndex ; i < args.length ; i++) { File f = new File(args[i]); if(! f.exists()) { - throw new RuntimeException("Pig script arg " + i+ " " + f.getAbsolutePath() + " not found. "); + throw new RuntimeException("Pig script arg " + i + " " + f.getAbsolutePath() + " not found. "); } files.add(f); } @@ -133,14 +121,11 @@ public class PigCSVCleaner { "Each one will be given $input and $output arguments."); return files.toArray(new File[]{}); } + public static void main(final String[] args) throws Exception { System.out.println("Starting pig etl " + args.length); - Configuration c = new Configuration(); - int res = ToolRunner.run( - c, - - new Tool() { + int res = ToolRunner.run(c, new Tool() { Configuration conf; @Override public void setConf(Configuration conf) { http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java index 3319064..6c8beef 100755 --- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java +++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/BPSGenerator.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.bigtop.bigpetstore.generator.PetStoreTransactionsInputFormat.props; /** * This is a mapreduce implementation of a generator of a large sentiment @@ -46,71 +47,62 @@ import org.slf4j.LoggerFactory; */ public class BPSGenerator { - final static Logger log = LoggerFactory.getLogger(BPSGenerator.class); - - public enum props { - // bigpetstore_splits, - bigpetstore_records - } + public static final int DEFAULT_NUM_RECORDS = 100; - public static Job createJob(Path output, int records) throws IOException { - Configuration c = new Configuration(); - c.setInt(props.bigpetstore_records.name(), 10); - return createJob(output, c); - } + final static Logger log = LoggerFactory.getLogger(BPSGenerator.class); - public static Job createJob(Path output, Configuration conf) - throws IOException { - Job job = new Job(conf, "PetStoreTransaction_ETL_" - + System.currentTimeMillis()); - // recursively delete the data set if it exists. - FileSystem.get(output.toUri(),conf).delete(output, true); - job.setJarByClass(BPSGenerator.class); - job.setMapperClass(MyMapper.class); - // use the default reducer - // job.setReducerClass(PetStoreTransactionGeneratorJob.Red.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Text.class); - job.setInputFormatClass(GeneratePetStoreTransactionsInputFormat.class); - job.setOutputFormatClass(TextOutputFormat.class); - FileOutputFormat.setOutputPath(job, output); - return job; - } + public enum props { + bigpetstore_records + } - public static class MyMapper extends Mapper<Text, Text, Text, Text> { + public static Job createJob(Path output, int records) throws IOException { + Configuration c = new Configuration(); + c.setInt(props.bigpetstore_records.name(), DEFAULT_NUM_RECORDS); + return getCreateTransactionRecordsJob(output, c); + } - @Override - protected void setup(Context context) throws IOException, - InterruptedException { - super.setup(context); - } + public static Job getCreateTransactionRecordsJob(Path outputDir, Configuration conf) + throws IOException { + Job job = new Job(conf, "PetStoreTransaction_ETL_" + System.currentTimeMillis()); + // recursively delete the data set if it exists. + FileSystem.get(outputDir.toUri(), conf).delete(outputDir, true); + job.setJarByClass(BPSGenerator.class); + job.setMapperClass(MyMapper.class); + // use the default reducer + // job.setReducerClass(PetStoreTransactionGeneratorJob.Red.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + job.setInputFormatClass(PetStoreTransactionsInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + FileOutputFormat.setOutputPath(job, outputDir); + return job; + } - protected void map(Text key, Text value, Context context) - throws java.io.IOException, InterruptedException { - context.write(key, value); - // TODO: Add multiple outputs here which writes mock addresses for - // generated users - // to a corresponding data file. - }; + public static class MyMapper extends Mapper<Text, Text, Text, Text> { + @Override + protected void setup(Context context) throws IOException, + InterruptedException { + super.setup(context); } - public static void main(String args[]) throws Exception { - if (args.length != 2) { - System.err.println("USAGE : [number of records] [output path]"); - System.exit(0); - } else { - Configuration conf = new Configuration(); - DeveloperTools.validate( - args, - "# of records", - "output path"); + protected void map(Text key, Text value, Context context) + throws java.io.IOException, InterruptedException { + context.write(key, value); + } + } - conf.setInt( - GeneratePetStoreTransactionsInputFormat.props.bigpetstore_records.name(), - Integer.parseInt(args[0])); - createJob(new Path(args[1]), conf).waitForCompletion(true); - } + public static void main(String args[]) throws Exception { + if (args.length != 2) { + System.err.println("USAGE : [number of records] [output path]"); + System.exit(0); + } else { + Configuration conf = new Configuration(); + DeveloperTools.validate(args, "# of records", "output path"); + conf.setInt(PetStoreTransactionsInputFormat.props.bigpetstore_records.name(), + Integer.parseInt(args[0])); + getCreateTransactionRecordsJob(new Path(args[1]), conf).waitForCompletion(true); } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala new file mode 100644 index 0000000..ef4ffb7 --- /dev/null +++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/CustomerGenerator.scala @@ -0,0 +1,80 @@ +package org.apache.bigtop.bigpetstore.generator + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.bigtop.bigpetstore.generator.util.State +import org.apache.hadoop.fs.Path +import parquet.org.codehaus.jackson.format.DataFormatDetector +import org.slf4j.LoggerFactory +import java.util.{Collection => JavaCollection} +import scala.collection.JavaConversions.asJavaCollection +import java.util.Random +import scala.collection.mutable.{HashMap, Set, MultiMap} +import scala.collection.immutable.NumericRange + +/** + * This class generates random customer data. The generated customer + * ids will be consecutive. The client code that generates the transactions + * records needs to know the available customer ids. If we keep the customer + * ids consecutive here. we don't have to store those ids in memory, or perform + * costly lookups. Once we introduce something that allows efficient lookup + * of data, we can do something else as well. + * + * The generated customer ids will start from 1. So, if we have 100 customers, + * the ids will be [1, 100]. + */ +class CustomerGenerator(val desiredCustomerCount: Int, val outputPath: Path) { + private val logger = LoggerFactory.getLogger(getClass) + private val random = new Random; + private val assertion = "The generateCustomerRecords() hasn't been called yet"; + private var customerFileGenerated = false + private val _stateToCustomerIds = new HashMap[State, NumericRange[Long]] + + def isCustomerFileGenrated = customerFileGenerated + + def customerIds(state: State) = { + assert(customerFileGenerated, assertion) + _stateToCustomerIds(state) + } + + def generateCustomerRecords() = { + val config = new Configuration + val fs = FileSystem.getLocal(config) + + assert(!fs.exists(outputPath)) + + val outputStream = fs.create(outputPath) + + var currentId: Long = 1 + logger.info("Generating customer records at: {}", fs.pathToFile(outputPath)) + for (state <- State.values(); + stateCustomerCount = (state.probability * desiredCustomerCount) toLong; + random = new Random(state.hashCode); + i <- 1L to stateCustomerCount) { + val customerRecord = CustomerGenerator.createRecord(currentId, state, random); + logger.info("generated customer: {}", customerRecord) + outputStream.writeBytes(customerRecord) + + if(i == 1) { + val stateCustomerIdRange = currentId until (currentId + stateCustomerCount); + _stateToCustomerIds += (state -> stateCustomerIdRange) + } + currentId += 1 + } + + println(_stateToCustomerIds) + outputStream.flush + outputStream.close + customerFileGenerated = true + } +} + +object CustomerGenerator { + val OUTPUT_FILE_NAME = "customers" + + private def createRecord(id: Long, state: State, r: Random) = { + val firstName = DataForger.firstName + val lastName = DataForger.lastName + s"$id\t${DataForger.firstName(r)}\t${DataForger.lastName(r)}\t${state.name}\n" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/GeneratePetStoreTransactionsInputFormat.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/GeneratePetStoreTransactionsInputFormat.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/GeneratePetStoreTransactionsInputFormat.java deleted file mode 100755 index a779428..0000000 --- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/GeneratePetStoreTransactionsInputFormat.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.bigtop.bigpetstore.generator; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.KeyVal; -import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.STATE; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; - -/** - * A simple input split that fakes input. - */ -public class GeneratePetStoreTransactionsInputFormat extends - FileInputFormat<Text, Text> { - - @Override - public RecordReader<Text, Text> createRecordReader( - final InputSplit inputSplit, TaskAttemptContext arg1) - throws IOException, InterruptedException { - return new RecordReader<Text, Text>() { - - @Override - public void close() throws IOException { - - } - - /** - * We need the "state" information to generate records. - Each state - * has a probability associated with it, so that our data set can be - * realistic (i.e. Colorado should have more transactions than rhode - * island). - * - * - Each state also will its name as part of the key. - * - * - This task would be distributed, for example, into 50 nodes on a - * real cluster, each creating the data for a given state. - */ - - // String storeCode = ((Split) inputSplit).storeCode; - int records = ((PetStoreTransactionInputSplit) inputSplit).records; - Iterator<KeyVal<String, String>> data = (new TransactionIteratorFactory( - records, ((PetStoreTransactionInputSplit) inputSplit).state)) - .getData(); - KeyVal<String, String> currentRecord; - - @Override - public Text getCurrentKey() throws IOException, - InterruptedException { - return new Text(currentRecord.key); - } - - @Override - public Text getCurrentValue() throws IOException, - InterruptedException { - return new Text(currentRecord.val); - } - - @Override - public void initialize(InputSplit arg0, TaskAttemptContext arg1) - throws IOException, InterruptedException { - } - - @Override - public boolean nextKeyValue() throws IOException, - InterruptedException { - if (data.hasNext()) { - currentRecord = data.next(); - return true; - } - return false; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return 0f; - } - - }; - } - - public enum props { - // bigpetstore_splits, - bigpetstore_records - } - - @Override - public List<InputSplit> getSplits(JobContext arg) throws IOException { - int num_records_desired = arg - .getConfiguration() - .getInt(GeneratePetStoreTransactionsInputFormat.props.bigpetstore_records - .name(), -1); - if (num_records_desired == -1) { - throw new RuntimeException( - "# of total records not set in configuration object: " - + arg.getConfiguration()); - } - - ArrayList<InputSplit> list = new ArrayList<InputSplit>(); - - /** - * Generator class will take a state as input and generate all the data - * for that state. - */ - for (TransactionIteratorFactory.STATE s : STATE.values()) { - PetStoreTransactionInputSplit split = new PetStoreTransactionInputSplit( - (int) (Math.ceil(num_records_desired * s.probability)), s); - System.out.println(s + " _ " + split.records); - list.add(split); - } - return list; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java index 9b32344..d350cc8 100755 --- a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java +++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionInputSplit.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,7 +19,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.STATE; +import org.apache.bigtop.bigpetstore.generator.util.State; +import org.apache.commons.lang3.Range; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; @@ -38,21 +39,26 @@ public class PetStoreTransactionInputSplit extends InputSplit implements } public int records; - public STATE state; + public State state; + public Range<Long> customerIdRange; - public PetStoreTransactionInputSplit(int records, STATE state) { + public PetStoreTransactionInputSplit(int records, Range<Long> customerIdRange, State state) { this.records = records; this.state = state; + this.customerIdRange = customerIdRange; } - public void readFields(DataInput arg0) throws IOException { - records = arg0.readInt(); - state = STATE.valueOf(arg0.readUTF()); + public void readFields(DataInput dataInputStream) throws IOException { + records = dataInputStream.readInt(); + state = State.valueOf(dataInputStream.readUTF()); + customerIdRange = Range.between(dataInputStream.readLong(), dataInputStream.readLong()); } - public void write(DataOutput arg0) throws IOException { - arg0.writeInt(records); - arg0.writeUTF(state.name()); + public void write(DataOutput dataOutputStream) throws IOException { + dataOutputStream.writeInt(records); + dataOutputStream.writeUTF(state.name()); + dataOutputStream.writeLong(customerIdRange.getMinimum()); + dataOutputStream.writeLong(customerIdRange.getMaximum()); } @Override @@ -62,6 +68,6 @@ public class PetStoreTransactionInputSplit extends InputSplit implements @Override public long getLength() throws IOException, InterruptedException { - return 100; + return records; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/bigtop/blob/4fca4573/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java ---------------------------------------------------------------------- diff --git a/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java new file mode 100755 index 0000000..4c22e36 --- /dev/null +++ b/bigtop-bigpetstore/src/main/java/org/apache/bigtop/bigpetstore/generator/PetStoreTransactionsInputFormat.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bigtop.bigpetstore.generator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.bigtop.bigpetstore.generator.TransactionIteratorFactory.KeyVal; +import org.apache.bigtop.bigpetstore.generator.util.State; +import org.apache.commons.lang3.Range; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +/** + * A simple input split that fakes input. + */ +public class PetStoreTransactionsInputFormat extends + FileInputFormat<Text, Text> { + + @Override + public RecordReader<Text, Text> createRecordReader( + final InputSplit inputSplit, TaskAttemptContext arg1) + throws IOException, InterruptedException { + return new RecordReader<Text, Text>() { + + @Override + public void close() throws IOException { + + } + + /** + * We need the "state" information to generate records. - Each state + * has a probability associated with it, so that our data set can be + * realistic (i.e. Colorado should have more transactions than rhode + * island). + * + * - Each state also will its name as part of the key. + * + * - This task would be distributed, for example, into 50 nodes on a + * real cluster, each creating the data for a given state. + */ + + PetStoreTransactionInputSplit bpsInputplit = (PetStoreTransactionInputSplit) inputSplit; + int records = bpsInputplit.records; + // TODO why not send the whole InputSplit there? + Iterator<KeyVal<String, String>> data = + (new TransactionIteratorFactory(records, bpsInputplit.customerIdRange, bpsInputplit.state)).data(); + KeyVal<String, String> currentRecord; + + @Override + public Text getCurrentKey() throws IOException, + InterruptedException { + return new Text(currentRecord.key()); + } + + @Override + public Text getCurrentValue() throws IOException, + InterruptedException { + return new Text(currentRecord.value()); + } + + @Override + public void initialize(InputSplit arg0, TaskAttemptContext arg1) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, + InterruptedException { + if (data.hasNext()) { + currentRecord = data.next(); + return true; + } + return false; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0f; + } + + }; + } + + public enum props { + bigpetstore_records + } + + @Override + public List<InputSplit> getSplits(JobContext arg) throws IOException { + int numRecordsDesired = arg + .getConfiguration() + .getInt(PetStoreTransactionsInputFormat.props.bigpetstore_records + .name(), -1); + if (numRecordsDesired == -1) { + throw new RuntimeException( + "# of total records not set in configuration object: " + + arg.getConfiguration()); + } + + List<InputSplit> list = new ArrayList<InputSplit>(); + long customerIdStart = 1; + for (State s : State.values()) { + int numRecords = numRecords(numRecordsDesired, s.probability); + // each state is assigned a range of customer-ids from which it can choose. + // The number of customers can be as many as the number of transactions. + Range<Long> customerIdRange = Range.between(customerIdStart, customerIdStart + numRecords - 1); + PetStoreTransactionInputSplit split = + new PetStoreTransactionInputSplit(numRecords, customerIdRange, s); + System.out.println(s + " _ " + split.records); + list.add(split); + customerIdStart += numRecords; + } + return list; + } + + private int numRecords(int numRecordsDesired, float probability) { + return (int) (Math.ceil(numRecordsDesired * probability)); + } +} \ No newline at end of file
