Repository: bigtop
Updated Branches:
  refs/heads/master 28d4dd2b2 -> 2b2392bf1


BIGTOP-1744. Add BigPigPetstore File and REST Load Generation.

Signed-off-by: RJ Nowling <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/2b2392bf
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/2b2392bf
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/2b2392bf

Branch: refs/heads/master
Commit: 2b2392bf135e9f1256bd0b930f05ae5aef8bbdcb
Parents: 28d4dd2
Author: jayunit100 <[email protected]>
Authored: Tue Mar 10 13:01:03 2015 -0400
Committer: RJ Nowling <[email protected]>
Committed: Wed Mar 11 13:27:00 2015 -0500

----------------------------------------------------------------------
 .../bigpetstore-transaction-queue/Dockerfile    |   8 +
 .../bigpetstore-transaction-queue/README.md     | 165 +++++++++++++++++
 .../bigpetstore-transaction-queue/build.gradle  |  45 +++++
 .../bigtop/bigpetstore/qstream/FileLoadGen.java |  75 ++++++++
 .../bigtop/bigpetstore/qstream/HttpLoadGen.java |  86 +++++++++
 .../bigtop/bigpetstore/qstream/LoadGen.java     | 175 +++++++++++++++++++
 .../bigpetstore/qstream/LoadGenFactory.java     | 106 +++++++++++
 .../bigpetstore/qstream/SimpleHttpServer.java   | 128 ++++++++++++++
 .../bigtop/bigpetstore/qstream/Utils.java       |  96 ++++++++++
 .../bigtop/bigpetstore/qstream/TestLoadGen.java |  88 ++++++++++
 10 files changed, 972 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bigtop/blob/2b2392bf/bigtop-bigpetstore/bigpetstore-transaction-queue/Dockerfile
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/bigpetstore-transaction-queue/Dockerfile 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/Dockerfile
new file mode 100644
index 0000000..c1e06cf
--- /dev/null
+++ b/bigtop-bigpetstore/bigpetstore-transaction-queue/Dockerfile
@@ -0,0 +1,8 @@
+FROM centos
+RUN yum update -y
+RUN yum install -y java-1.7.0-openjdk
+RUN yum install -y unzip
+ADD build/distributions/bigpetstore-transaction-queue-1.0.zip /opt/
+WORKDIR /opt/
+RUN unzip -o bigpetstore-transaction-queue-1.0.zip
+CMD /opt/bigpetstore-transaction-queue-1.0/bin/bigpetstore-transaction-queue

http://git-wip-us.apache.org/repos/asf/bigtop/blob/2b2392bf/bigtop-bigpetstore/bigpetstore-transaction-queue/README.md
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/bigpetstore-transaction-queue/README.md 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/README.md
new file mode 100644
index 0000000..385a1d1
--- /dev/null
+++ b/bigtop-bigpetstore/bigpetstore-transaction-queue/README.md
@@ -0,0 +1,165 @@
+# Introduction to the BigPetStore transaction queue#
+
+This is a rotating queue for Apache BigTop which uses the BigPetStore 
libraries to generate realistic streaming transactional data.
+
+There are many ways that data streams are consumed,
+
+- one is Files (files written every n seconds)
+- another is via the web (peridoic HTTP PUTs)
+
+Both of these are supported by this library.  It runs infinitely as a single 
java process, repeatedly re running itself every time a simulation is done.
+
+It is a part of the BigPetStore ecosystem of projects which Apache BigTop 
leverages for real world validation of the stack.
+
+As a proof of concept, it has been used to test google's kubernetes framework 
by generating large load against Redis (a in memory caching data layer).
+
+# I'm in a huge rush, how do I use it? #
+
+Just run the dockerfile (eventually these dockerfiles will be stored in 
bigtop/ on dockerhub).
+```
+docker run -t -i jayunit100/bigpetstore-load-generator
+```
+
+After you run it, you can read below to customize it.
+
+# BigPetStore Data Generator #
+
+See related project on github, jayunit100/PetStoreBook, for example usage.
+
+An infiniterator for bigpetstore !
+
+If you're not familiar with bigpetstore, check out the apache bigtop project, 
which uses it to process data with spark, and the original whitepaper on the 
data generator 
(http://ieeexplore.ieee.org/xpl/articleDetails.jsp?arnumber=7034765). This 
repository writes bigpetstore transactions to
+
+- a rotating *file queue* OR
+- a *REST API* calling queue.
+- You better have a consumer somewhere, otherwise it will fill up your disk 
very quickly !
+- Its a little raw: I can clean it up later if people start getting interested.
+
+# Get Started by building the jar #
+Clone this repository.  Then just run
+
+```
+gradle fatJar
+```
+
+## Arguments ##
+
+The command line arguments are:
+
+- path (URL, for REST loads or directory on local disk, for file loads)
+- stores (total number)
+- customers (total number)
+- simulation length (in days)
+- seed (random seed for generator)
+
+## Generating FileSystem load ##
+
+From docker
+
+```
+docker run -t -i jayunit100/bigpetstore-load-generator  
/opt/bigpetstore-transaction-queue-1.0/bin/bigpetstore-transaction-queue /tmp/ 
1 1 3.0
+```
+or Java
+```
+ gradle clean fatJar
+ java -jar ./build/libs/bigpetstore-transaction-queue-all-1.0.jar /tmp 1 5 
10000 123
+```
+
+## Generating REST load ##
+OR Replace the file path with a REST API root (it will jsonify the 
transactions, and send them as the final url portion).
+
+From docker
+
+```
+docker run -t -i jayunit100/bigpetstore-load-generator 
/opt/bigpetstore-transaction-queue-1.0/bin/bigpetstore-transaction-queue
+ http://localhost:3000/restapi/rpush/ 1 5 10000 123
+```
+Or from java
+```
+gradle clean fatJar
+java -jar ./build/libs/bigpetstore-transaction-queue-all-1.0.jar 
http://localhost:3000/rpush/guestbook/ 4 4 1000 123
+
+```
+
+For details, just see the unit tests.
+
+This will write transactions to /tmp/transaction[0..n] every 10 seconds or so, 
depending on performance of your machine.
+
+For different parameter settings you can try the following.
+
+*Increase timescale, lower customers, for higher throughput, but unrealistic *
+
+```
+/tmp 1 5 1500000 13241234
+```
+
+*Increase customers, for more diversity*
+
+```
+/tmp 1 500000 15000 13241234
+```
+
+And so on.
+
+Have fun ! For me, I was able to generate 25k transactions per minute.
+
+# Whats this for, how do I get help ? #
+
+To understand the BigPetStore application, you can go to the apache bigtop 
user list.  This is based on a data generator
+which was published recently 
http://ieeexplore.ieee.org/xpls/abs_all.jsp?arnumber=7034765.
+
+We also have a parallel Spark based generator,
+which is here 
https://github.com/apache/bigtop/tree/master/bigtop-bigpetstore/bigpetstore-spark.
+
+# OUTPUT #
+
+```
+jayunit100smacbookpro:PetStoreLoadGenerator jayunit100java -cp 
./build/libs/PetStoreLoadGenerator-1.0.jar:libs/* 
org.apache.bigtop.qstream.LoadGen
+Running default simulation, which should result in 10 to 25K transactions per 
second on i7 chip w/ SSD
+Reading zipcode data
+Read 30891 zipcode entries
+Reading name data
+Read 86987 first names and 47819 last names
+Reading product data
+Read 4 product categories
+Generating stores...
+Generating customers...
+...Generated 100000
+Clearing 31179 elements
+WRITE FILE to 6964688bytes -> /tmp/transactions0.txt
+TRANSACTIONS SO FAR 31182.0 RATE 577.4629629629629
+Clearing 293666 elements
+WRITE FILE to 66377212bytes -> /tmp/transactions1.txt
+TRANSACTIONS SO FAR 324852.0 RATE 4922.015151515152
+Clearing 343637 elements
+WRITE FILE to 77805125bytes -> /tmp/transactions2.txt
+TRANSACTIONS SO FAR 668492.0 RATE 8681.727272727272
+```
+
+And the output files:
+
+```
+jayunit100smacbookpro:PetStoreLoadGenerator jayunit100$ cat 
/tmp/transactions5.txt  | head
+851,07936,East Hanover,NJ,17105,Lofton,Mattheeuw,07722,Colts 
Neck,NJ,31,534.6314963516645,category=kitty litter;brand=Fiesty 
Feline;size=7.0;per_unit_cost=1.5;,category=kitty litter;brand=Fiesty 
Feline;size=7.0;per_unit_cost=1.5;
+454,60447,Minooka,IL,17105,Lofton,Mattheeuw,07722,Colts 
Neck,NJ,30,495.0237724899403,category=dry cat food;brand=Pretty 
Cat;flavor=Tuna;size=15.0;per_unit_cost=2.86;,category=kitty 
litter;brand=Fiesty Feline;size=7.0;per_unit_cost=1.5;,category=dry dog 
food;brand=Happy Pup;flavor=Pork;size=30.0;per_unit_cost=2.67;,category=kitty 
litter;brand=Fiesty Feline;size=7.0;per_unit_cost=1.5;
+703,37849,Powell,TN,17105,Lofton,Mattheeuw,07722,Colts 
Neck,NJ,29,477.41342172147426,category=dry cat food;brand=Pretty 
Cat;flavor=Tuna;size=15.0;per_unit_cost=2.86;,category=poop bags;brand=Happy 
Pup;color=Blue;size=120.0;per_unit_cost=0.17;
+576,79938,El Paso,TX,17105,Lofton,Mattheeuw,07722,Colts 
Neck,NJ,28,473.62278493049723,category=kitty litter;brand=Fiesty 
Feline;size=7.0;per_unit_cost=1.5;,category=dry cat 
food;brand=Wellfed;flavor=Chicken & Rice;size=14.0;per_unit_cost=2.14;
+570,71730,El Dorado,AR,17105,Lofton,Mattheeuw,07722,Colts 
Neck,NJ,27,452.4051824447436,category=poop bags;brand=Happy 
Pup;color=multicolor;size=60.0;per_unit_cost=0.17;,category=kitty 
litter;brand=Fiesty Feline;size=7.0;per_unit_cost=1.5;,category=dry dog 
food;brand=Dog Days;flavor=Pork;size=30.0;per_unit_cost=3.0;,category=kitty 
litter;brand=Fiesty Feline;size=7.0;per_unit_cost=1.5;
+721,37801,Maryville,TN,17105,Lofton,Mattheeuw,07722,Colts 
Neck,NJ,26,443.25719128990784,category=dry cat 
food;brand=Wellfed;flavor=Chicken & Rice;size=14.0;per_unit_cost=2.14;
+73,77083,Houston,TX,17105,Lofton,Mattheeuw,07722,Colts 
Neck,NJ,25,429.55735733784735,category=poop bags;brand=Happy 
Pup;color=multicolor;size=60.0;per_unit_cost=0.17;,category=kitty 
litter;brand=Fiesty Feline;size=7.0;per_unit_cost=1.5;
+228,08053,Marlton,NJ,17105,Lofton,Mattheeuw,07722,Colts 
Neck,NJ,24,416.1376243980079,category=dry cat food;brand=Feisty 
Feline;flavor=Tuna;size=7.0;per_unit_cost=2.14;,category=kitty 
litter;brand=Fiesty Feline;size=7.0;per_unit_cost=1.5;
+346,91711,Claremont,CA,17105,Lofton,Mattheeuw,07722,Colts 
Neck,NJ,23,413.9219953662229,category=dry dog food;brand=Dog 
Dogs;flavor=Chicken;size=30.0;per_unit_cost=3.0;330,37138,Old 
Hickory,TN,17105,Lofton,Mattheeuw,07722,Colts 
Neck,NJ,22,392.11323640456675,category=dry cat food;brand=Feisty 
Feline;flavor=Tuna;size=7.0;per_unit_cost=2.14;
+
+```
+
+# Development and Distribution #
+
+Building the distribution is relatively simple, and you can easily develop it 
using eclipse / intelliJ, for example, you can run
+```
+gradle idea
+```
+and then import it.
+
+To build a distribution , run ```gradle distZip``` followed by ```docker build 
-t yourname/bps-data-generator ./```.  This will bundle a distribution zip into 
a Dockerfile which can be pushed.
+
+Also, the distribution zip in and of itself is an artifact which we will 
release.  More notes to come on that.  For now, since this is quite new, 
contact the ASF BigTop mailing lists if interested in maintaining or 
contributing.

http://git-wip-us.apache.org/repos/asf/bigtop/blob/2b2392bf/bigtop-bigpetstore/bigpetstore-transaction-queue/build.gradle
----------------------------------------------------------------------
diff --git a/bigtop-bigpetstore/bigpetstore-transaction-queue/build.gradle 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/build.gradle
new file mode 100644
index 0000000..a475078
--- /dev/null
+++ b/bigtop-bigpetstore/bigpetstore-transaction-queue/build.gradle
@@ -0,0 +1,45 @@
+apply plugin: 'java'
+apply plugin: 'idea'
+apply plugin:'application'
+
+mainClassName = "org.apache.bigtop.bigpetstore.qstream.LoadGen"
+
+sourceCompatibility = 1.7
+version = '1.0'
+
+manifest {
+   attributes 'Main-Class': 'org.apache.bigtop.bigpetstore.qstream.LoadGen'
+}
+//create a single Jar with all dependencies
+task fatJar(type: Jar) {
+   manifest {
+     attributes 'Implementation-Title': 'Gradle Jar File Example',
+                'Implementation-Version': version,
+                'Main-Class': mainClassName
+   }
+   baseName = project.name + '-all'
+   from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) 
} }
+   with jar
+}
+
+
+repositories {
+    mavenCentral()
+    maven {
+       url "http://dl.bintray.com/rnowling/bigpetstore";
+   }
+}
+
+dependencies {
+    compile('com.fasterxml:jackson-module-json-org:0.9.1') {
+        exclude group: 'org.apache.commons', module: 'commons-io'
+    }
+    compile('org.apache.commons:commons-io:1.3.2') {
+         exclude group: 'org.apache.commons', module: 'commons-io'
+         exclude group: 'commons-io', module: 'commons-io'
+    }
+    compile 'org.apache.httpcomponents:httpclient:4.4'
+    compile group:'org.apache.commons', name:'commons-lang3',version:'3.3.2'
+    compile "com.github.rnowling.bigpetstore:bigpetstore-data-generator:0.2.1"
+    testCompile group: 'junit', name: 'junit', version: '4.11'
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/2b2392bf/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/FileLoadGen.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/FileLoadGen.java
 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/FileLoadGen.java
new file mode 100644
index 0000000..f03827b
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/FileLoadGen.java
@@ -0,0 +1,75 @@
+package org.apache.bigtop.bigpetstore.qstream;
+
+import com.github.rnowling.bps.datagenerator.datamodels.Transaction;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Stack;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one or more
+*  contributor license agreements.  See the NOTICE file distributed with
+*  this work for additional information regarding copyright ownership.
+*  The ASF licenses this file to You under the Apache License, Version 2.0
+*  (the "License"); you may not use this file except in compliance with
+*  the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License.
+*/
+public class FileLoadGen extends LoadGen{
+
+    Path path;
+
+    public FileLoadGen(int nStores, int nCustomers, double simulationLength, 
long seed, Path outputDir) throws Throwable {
+        super(nStores, nCustomers, simulationLength, seed);
+        path=outputDir;
+    }
+
+    public LinkedBlockingQueue<Transaction> startWriteQueue(final int 
milliseconds){
+        if(! path.toFile().isDirectory()) {
+            throw new RuntimeException("Input for the queue Should be a 
directory! Files will be transactions0.txt, transactions1.txt, and so on.");
+        }
+
+        /**
+         * Write queue.   Every 5 seconds, write
+         */
+        final LinkedBlockingQueue<Transaction> transactionQueue = new 
LinkedBlockingQueue<Transaction>(getQueueSize());
+        new Thread(){
+            @Override
+            public void run() {
+                int fileNumber=0;
+                while(true){
+                    waitFor(milliseconds, transactionQueue);
+                    System.out.println("Clearing " + transactionQueue.size() + 
" elements");
+                    Stack<Transaction> transactionsToWrite = new 
Stack<Transaction>();
+                    transactionQueue.drainTo(transactionsToWrite);
+                    StringBuffer lines = new StringBuffer();
+                    try{
+                        while(!transactionsToWrite.isEmpty()){
+                            
lines.append(Utils.toJson(transactionsToWrite.pop())+"\n");
+                            total++;
+                        }
+                        Path outputFile = 
Paths.get(path.toFile().getAbsolutePath(), "/transactions" + fileNumber++ + 
".txt");
+                        Files.write(outputFile, lines.toString().getBytes());
+                        System.out.println("WRITING FILE to " + 
outputFile.toFile().length() + "bytes -> " + 
outputFile.toFile().getAbsolutePath());
+                    }
+                    catch(Throwable t){
+                        t.printStackTrace();
+                    }
+                    System.out.println(
+                            "TRANSACTIONS SO FAR " + total++ +" RATE " + 
(total/((System.currentTimeMillis()-startTime)/1000) + " per second "));
+                }
+            }
+        }.start();
+        return transactionQueue;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/2b2392bf/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/HttpLoadGen.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/HttpLoadGen.java
 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/HttpLoadGen.java
new file mode 100644
index 0000000..84e574f
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/HttpLoadGen.java
@@ -0,0 +1,86 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.bigtop.bigpetstore.qstream;
+
+import com.github.rnowling.bps.datagenerator.datamodels.Transaction;
+import org.apache.http.HttpResponse;
+
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Stack;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * HTTP Consumer.  provides a write queue and empties it every so often.
+ * TODO, make the queue size configurable.
+ */
+public class HttpLoadGen extends LoadGen {
+
+    String path; // i.e.  "http://localhost:3000";;
+
+    public HttpLoadGen(int nStores, int nCustomers, double simulationLength, 
long seed, URL u) throws Throwable{
+        super(nStores, nCustomers, simulationLength, seed);
+        path=u.toString();
+    }
+
+    /**
+     * Appends via REST calls.
+     */
+    public LinkedBlockingQueue<Transaction> startWriteQueue(final int 
milliseconds) {
+        /**
+         * Write queue.   Every 5 seconds, write
+         */
+        final LinkedBlockingQueue<Transaction> transactionQueue = new 
LinkedBlockingQueue<Transaction>(getQueueSize());
+        new Thread() {
+            @Override
+            public void run() {
+                int fileNumber = 0;
+                while (true) {
+                    waitFor(milliseconds, transactionQueue);
+                    System.out.println("CLEARING " + transactionQueue.size() + 
" elements from queue.");
+                    Stack<Transaction> transactionsToWrite = new 
Stack<Transaction>();
+
+                    transactionQueue.drainTo(transactionsToWrite);
+
+                    /**
+                     * pop transactions from the queue, and sent them over 
http as json.
+                     */
+                    while (!transactionsToWrite.isEmpty()) {
+                        try {
+                            String trAsJson = 
URLEncoder.encode(Utils.toJson(transactionsToWrite.pop()));
+
+                            /**
+                             * i.e. wget 
http://localhost:3000/rpush/guestbook/{"name":"cos boudnick", "state":"...",...}
+                             */
+                            HttpResponse resp=Utils.get(path + "/" + trAsJson);
+                            if(total%20==0) System.out.println("wrote customer 
" + trAsJson);
+                            total++;
+                        }
+                        catch (Throwable t) {
+                            System.err.println("transaction failed.... !");
+                            t.printStackTrace();
+                        }
+                        System.out.println("TRANSACTIONS SO FAR " + total++ + 
" RATE " + total / ((System.currentTimeMillis() - startTime) / 1000));
+                    }
+                }
+            }
+        }.start();
+
+        return transactionQueue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/2b2392bf/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/LoadGen.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/LoadGen.java
 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/LoadGen.java
new file mode 100644
index 0000000..3e9115f
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/LoadGen.java
@@ -0,0 +1,175 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.bigtop.bigpetstore.qstream;
+
+import com.github.rnowling.bps.datagenerator.datamodels.inputs.InputData;
+import com.github.rnowling.bps.datagenerator.datamodels.inputs.ProductCategory;
+import com.github.rnowling.bps.datagenerator.datamodels.*;
+//import 
com.github.rnowling.bps.datagenerator.*{DataLoader,StoreGenerator,CustomerGenerator
 => CustGen, PurchasingProfileGenerator,TransactionGenerator}
+import com.github.rnowling.bps.datagenerator.*;
+import com.github.rnowling.bps.datagenerator.framework.SeedFactory;
+import com.google.common.collect.Lists;
+
+import java.util.*;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * This driver uses the data generator API to generate
+ * an arbitrarily large data set of petstore transactions.
+ * <p/>
+ * Each "transaction" consists of many "products", each of which
+ * is stringified into what is often called a "line item".
+ * <p/>
+ * Then, spark writes those line items out as a distributed hadoop file glob.
+ */
+public abstract class LoadGen {
+
+    /**
+     * Overridable for now.
+     * @return
+     */
+    public int getQueueSize(){
+        return 100*1000;
+    }
+
+    public abstract LinkedBlockingQueue<Transaction> startWriteQueue(int 
qSize);
+
+    public static boolean TESTING=false;
+
+    int nStores = 1000;
+    int nCustomers = 1000;
+    double simulationLength = -1;
+    long seed = System.currentTimeMillis();
+    String outputDir = null;
+
+    protected LoadGen(int nStores, int nCustomers, double simulationLength, 
long seed) throws Throwable {
+        this.nStores = nStores;
+        this.nCustomers = nCustomers;
+        this.simulationLength = simulationLength;
+        this.seed = seed;
+    }
+
+
+    /**
+     * Helper function.  Makes sure we sleep for a while
+     * when queue is empty and also when we startup.
+     */
+    public void waitFor(long milliseconds, LinkedBlockingQueue<Transaction> q){
+        try{
+            Thread.sleep(milliseconds);
+        }
+        catch(Throwable t){
+        }
+        /**
+         * Sleep for 2 seconds at a time until queue is full.
+         */
+        while(q.size()<100) {
+            try{
+                Thread.sleep(2000L);
+            }
+            catch(Throwable t){
+
+            }
+        }
+    }
+
+    static final long startTime = System.currentTimeMillis();
+    static double total = 0;
+
+
+
+    public static void main(String[] args){
+        try {
+            LoadGen lg = LoadGenFactory.parseArgs(args);
+            long start=System.currentTimeMillis();
+            int runs = 0;
+            //write everything to /tmp, every 20 seconds.
+            LinkedBlockingQueue<Transaction> q = lg.startWriteQueue(10000);
+            while(true){
+                lg.iterateData(q, System.currentTimeMillis());
+                runs++;
+                /**
+                 * if testing , dont run forever.  TODO, make runtime 
configurable.
+                 */
+                if(TESTING && runs == 2){
+                    System.out.println("DONE...");
+                    return;
+                }
+            }
+        }
+        catch(Throwable t){
+            t.printStackTrace();
+            System.exit(1);
+        }
+    }
+
+    /**
+     * Thread-friendly data iterator, writes to a blocking queue.
+     */
+    public void iterateData(LinkedBlockingQueue<Transaction> queue,long rseed) 
throws Throwable {
+        long start = System.currentTimeMillis();
+        final InputData inputData = new DataLoader().loadData();
+        final SeedFactory seedFactory = new SeedFactory(rseed);
+
+        System.out.println("Generating stores...");
+        final ArrayList<Store> stores = new ArrayList<Store>();
+        final StoreGenerator storeGenerator = new StoreGenerator(inputData, 
seedFactory);
+        for (int i = 0; i < nStores; i++) {
+            Store store = storeGenerator.generate();
+            stores.add(store);
+        }
+
+        System.out.println("Generating customers...");
+
+        final List<Customer> customers = Lists.newArrayList();
+        final CustomerGenerator custGen = new CustomerGenerator(inputData, 
stores, seedFactory);
+        for (int i = 0; i < nCustomers; i++) {
+            Customer customer = custGen.generate();
+            customers.add(customer);
+        }
+
+        System.out.println("...Generated " + customers.size());
+
+        Long nextSeed = seedFactory.getNextSeed();
+
+        Collection<ProductCategory> products = 
inputData.getProductCategories();
+        Iterator<Customer> custIter = customers.iterator();
+
+        if(! custIter.hasNext())
+            throw new RuntimeException("No customer data ");
+        //Create a new purchasing profile.
+        PurchasingProfileGenerator profileGen = new 
PurchasingProfileGenerator(products, seedFactory);
+        PurchasingProfile profile = profileGen.generate();
+
+        /** Stop either if
+        * 1) the queue is full
+        * 2) run out of customers).
+        */
+        while(queue.remainingCapacity()>0 && custIter.hasNext()){
+            Customer cust = custIter.next();
+            int transactionsForThisCustomer = 0;
+            TransactionGenerator transGen = new TransactionGenerator(cust, 
profile, stores, products, seedFactory);
+            Transaction trC = transGen.generate();
+            while(trC.getDateTime()<simulationLength) {
+                queue.put(trC);
+                trC=transGen.generate();
+            }
+        }
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/2b2392bf/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/LoadGenFactory.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/LoadGenFactory.java
 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/LoadGenFactory.java
new file mode 100644
index 0000000..1dc5893
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/LoadGenFactory.java
@@ -0,0 +1,106 @@
+package org.apache.bigtop.bigpetstore.qstream;
+
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.Random;
+
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one or more
+*  contributor license agreements.  See the NOTICE file distributed with
+*  this work for additional information regarding copyright ownership.
+*  The ASF licenses this file to You under the Apache License, Version 2.0
+*  (the "License"); you may not use this file except in compliance with
+*  the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License.
+*/
+
+/**
+ * Build a load generator based on command line args.
+ * This class decides, i.e. wether to use HTTP, FileLoadGen, and so on.
+ *
+ * We avoided using reflection for this, for all the obvious reasons :).
+ */
+public class LoadGenFactory {
+
+    static final String[] DEFAULT = new 
String[]{"/tmp","1000","100000","1500","13241234"};
+    static final String DEFAULT_EXPECTED = "10 to 25K transactions per second 
on i7 chip w/ SSD";
+
+    public static void printUsage() {
+        String usage =
+                "BigPetStore Data Generator.\n" +
+                        "Usage: outputDir nStores nCustomers simulationLength 
[seed]\n" +
+                        "outputDir - (string) directory to write files\n" +
+                        "nStores - (int) number of stores to generate\n" +
+                        "nCustomers - (int) number of customers to generate\n" 
+
+                        "simulationLength - (float) number of days to 
simulate\n" +
+                        "seed - (long) seed for RNG. If not given, one is 
reandomly generated.\n";
+        System.err.println(usage);
+    }
+
+    public static LoadGen parseArgs(String[] args) throws Throwable {
+        if(args.length==0){
+            System.out.println("Running default simulation, which should 
result in " + DEFAULT_EXPECTED);
+            return parseArgs(DEFAULT);
+        }
+        int nStores = 1000;
+        int nCustomers = 1000;
+        double simulationLength = -1;
+        long seed = System.currentTimeMillis();
+        String outputARG = "/shared";
+
+        int PARAMS=5;
+        if (args.length != PARAMS && args.length != (PARAMS - 1)) {
+            printUsage();
+            System.exit(1);
+        }
+        //Was Dir, now ARG, since we can support http or file paths.
+        outputARG = args[0];
+        try {
+            nStores = Integer.parseInt(args[1]);
+        } catch (Throwable t) {
+            System.err.println("Unable to parse '" + args[1] + "' as an 
integer for nStores.\n");
+            printUsage();
+            System.exit(1);
+        }
+        try {
+            nCustomers = Integer.parseInt(args[2]);
+        } catch (Throwable t) {
+            System.err.println("Unable to parse '" + args[2] + "' as an 
integer for nCustomers.\n");
+            printUsage();
+            System.exit(1);
+        }
+        try {
+            simulationLength = Double.parseDouble(args[3]);
+        } catch (Throwable t) {
+            System.err.println("Unable to parse '" + args[3] + "' as a float 
for simulationLength.\n");
+            printUsage();
+            System.exit(1);
+        }
+
+        //If seed isnt present, then no is used seed.
+        if (args.length == PARAMS) {
+            try {
+                seed = Long.parseLong(args[4]);
+            } catch (Throwable t) {
+                System.err.println("Unable to parse '" + args[4] + "' as a 
long for seed.\n");
+                printUsage();
+                System.exit(1);
+            }
+        } else {
+            seed = new Random().nextLong();
+        }
+        if(outputARG.startsWith("http://";)){
+            return new HttpLoadGen(nStores,nCustomers,simulationLength,seed, 
new URL(outputARG));
+        }
+        else{
+            return new FileLoadGen(nStores,nCustomers,simulationLength,seed, 
Paths.get(outputARG));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/2b2392bf/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/SimpleHttpServer.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/SimpleHttpServer.java
 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/SimpleHttpServer.java
new file mode 100644
index 0000000..71070a5
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/SimpleHttpServer.java
@@ -0,0 +1,128 @@
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one or more
+*  contributor license agreements.  See the NOTICE file distributed with
+*  this work for additional information regarding copyright ownership.
+*  The ASF licenses this file to You under the Apache License, Version 2.0
+*  (the "License"); you may not use this file except in compliance with
+*  the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License.
+*/
+
+package org.apache.bigtop.bigpetstore.qstream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+
+/**
+ * A simple http server for unit testing of HTTP connections.
+ */
+public class SimpleHttpServer {
+
+    /**
+     * Run a web server for args[0] milliseconds...
+     * @param args
+     */
+    public static void main(String[] args){
+        try{
+            SimpleHttpServer s=new SimpleHttpServer(8123);
+            Thread.sleep(Integer.parseInt(args[0]));
+            s.stop();
+        }
+        catch(Exception e){
+            e.printStackTrace();;
+        }
+    }
+
+    HttpServer server = null;
+    public SimpleHttpServer(int port) throws Exception {
+        server = HttpServer.create(new InetSocketAddress("localhost",port), 1);
+        server.createContext("/", new GetHandler());
+        server.createContext("/info", new InfoHandler());
+        //server.createContext("/get", new GetHandler());
+        server.setExecutor(null); // creates a default executor
+        server.start();
+    }
+
+    public void stop(){
+        server.stop(1);
+    }
+
+
+    // http://localhost:8000/info
+    static class InfoHandler implements HttpHandler {
+        public void handle(HttpExchange httpExchange) throws IOException {
+            String response = "Add parameters to the request and they will be 
returned in response.";
+            SimpleHttpServer.writeResponse(httpExchange, response.toString());
+        }
+    }
+
+    /**
+     * Response lines that are embedded in the response.
+     * This can be used by external programs which need to confirm that
+     * their parameters are being correctly encoded.
+     */
+    public static String responseLine(String key, String value){
+        return "the value of " + key + " is " + value;
+    }
+
+    static class GetHandler implements HttpHandler {
+        public void handle(HttpExchange httpExchange) throws IOException {
+            StringBuilder response = new StringBuilder();
+
+            Map <String,String> parms = 
SimpleHttpServer.queryToMap(httpExchange.getRequestURI().toASCIIString());
+            response.append("<html><body>");
+            for(String k:parms.keySet())
+                response.append(" " + responseLine(k,parms.get(k)));
+            response.append("</body></html>");
+            SimpleHttpServer.writeResponse(httpExchange, response.toString());
+        }
+    }
+
+    public static void writeResponse(HttpExchange httpExchange, String 
response) throws IOException {
+        try {
+            httpExchange.sendResponseHeaders(200, response.length());
+            OutputStream os = httpExchange.getResponseBody();
+            os.write(response.getBytes());
+            os.close();
+        }
+        catch(Throwable t){
+            t.printStackTrace();
+            throw new RuntimeException(t);
+        }
+    }
+
+    public static Map<String, String> queryToMap(String q){
+        //after the slash.
+        String query=q.substring(2);
+        Map<String, String> result = new HashMap<String, String>();
+        if(query==null){
+            System.out.println("QUERY IS NULL") ;
+            return result;
+        }
+        System.out.println(query);
+        for (String param : query.split("&")) {
+            System.out.println("reading param on server : " + param + " " + 
query);
+            String pair[] = param.split("=");
+            if (pair.length>1) {
+                result.put(pair[0], pair[1]);
+            }else{
+                result.put(pair[0], "");
+            }
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/bigtop/blob/2b2392bf/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/Utils.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/Utils.java
 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/Utils.java
new file mode 100644
index 0000000..c54fa04
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/main/java/org/apache/bigtop/bigpetstore/qstream/Utils.java
@@ -0,0 +1,96 @@
+package org.apache.bigtop.bigpetstore.qstream;
+
+import com.github.rnowling.bps.datagenerator.datamodels.Transaction;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIUtils;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.apache.http.impl.client.HttpClients;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one or more
+*  contributor license agreements.  See the NOTICE file distributed with
+*  this work for additional information regarding copyright ownership.
+*  The ASF licenses this file to You under the Apache License, Version 2.0
+*  (the "License"); you may not use this file except in compliance with
+*  the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License.
+*/
+
+/**
+ * Consolidated utilities class for dealing with HTTP, serializing 
transactions,
+ * and so on.  This makes the core logic in the implementation classes
+ * easier to read.
+ * */
+public class Utils {
+
+    public static HttpResponse get(String hostnam) throws Exception {
+        URI uri = new URI(hostnam);
+        return get(uri);
+    }
+
+    public static HttpResponse get(String hostname ,  String resource, List<? 
extends NameValuePair> params) throws Exception {
+        System.out.println("getting http "+hostname);
+        //URI uri = URIUtils.createURI("http", "www.google.com", -1, "/search",
+          URI uri =
+                  URIUtils.createURI("http", hostname, -1, resource,
+                  URLEncodedUtils.format(params, "UTF-8"), null);
+        System.out.println(uri.toASCIIString());
+       HttpResponse respo =  get(uri);
+       return respo;
+    }
+
+    public static HttpResponse get(URI uri) throws Exception {
+        HttpGet httppost = new HttpGet(uri);
+        HttpClient httpclient = HttpClients.createDefault();
+        //Execute and get the response.
+        try {
+            HttpResponse response = httpclient.execute(httppost);
+            if(response.getStatusLine().getStatusCode()!=200)
+                System.err.println("FAILURE! " + 
response.getStatusLine().getStatusCode());
+            return response;
+        }
+        catch (Throwable t) {
+            System.out.println("failed, sleeping");
+            Thread.sleep(10000);
+        }
+        System.err.println("FAILURE getting URI " + uri.toASCIIString());
+        return null;
+    }
+
+    public static String toJson(Transaction t) throws Exception{
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.writeValueAsString(t) ;
+    }
+
+    /**
+     * Borrowed from apache StringUtils.
+     */
+    public static String join(Collection var0, Object var1) {
+        StringBuffer var2 = new StringBuffer();
+
+        for(Iterator var3 = var0.iterator(); var3.hasNext(); 
var2.append(var3.next())) {
+            if(var2.length() != 0) {
+                var2.append(var1);
+            }
+        }
+
+        return var2.toString();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/bigtop/blob/2b2392bf/bigtop-bigpetstore/bigpetstore-transaction-queue/src/test/java/org/apache/bigtop/bigpetstore/qstream/TestLoadGen.java
----------------------------------------------------------------------
diff --git 
a/bigtop-bigpetstore/bigpetstore-transaction-queue/src/test/java/org/apache/bigtop/bigpetstore/qstream/TestLoadGen.java
 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/test/java/org/apache/bigtop/bigpetstore/qstream/TestLoadGen.java
new file mode 100644
index 0000000..08f359a
--- /dev/null
+++ 
b/bigtop-bigpetstore/bigpetstore-transaction-queue/src/test/java/org/apache/bigtop/bigpetstore/qstream/TestLoadGen.java
@@ -0,0 +1,88 @@
+/*
+*  Licensed to the Apache Software Foundation (ASF) under one or more
+*  contributor license agreements.  See the NOTICE file distributed with
+*  this work for additional information regarding copyright ownership.
+*  The ASF licenses this file to You under the Apache License, Version 2.0
+*  (the "License"); you may not use this file except in compliance with
+*  the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License.
+*/
+
+package org.apache.bigtop.bigpetstore.qstream;
+
+import junit.framework.Assert;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.message.BasicNameValuePair;
+import org.junit.Ignore;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * For simplicity, the tests for this application are in this class.
+ */
+public class TestLoadGen {
+
+    @org.junit.Test
+    public void testFileLoadGen(){
+        new File("/tmp/transactions0.txt").delete();
+        LoadGen.TESTING=true;
+        LoadGen.main(new String[]{"/tmp","1","5","1500000","13241234"});
+        Assert.assertTrue(new File("/tmp/transactions0.txt").length()>0);
+    }
+
+    /**
+     * We don't run this test, except if we want to
+     * test a live app which has an existing REST
+     * API server running.
+     */
+    @Ignore
+    @org.junit.Test
+    public void testWebLoadGen(){
+        LoadGen.TESTING=true;
+        LoadGen.main(new String[]{"http://localhost:3000/rpush/guestbook";, 
"1", "5", "1500000", "13241234"});
+    }
+
+    /**
+     * This is a generic test that our server wrappers etc are okay.
+     * The code paths it exersizes aren't necessarily used via the
+     * prime purpose of the app as of the march 1 2015.
+     * @throws Exception
+     */
+    @org.junit.Test
+    public void testUtilsParameters() throws Exception {
+
+        final String bind="localhost";
+        final int port=8129;
+        final String ctx="/";
+        final String rsp = "This is the response...";
+
+        SimpleHttpServer server = new SimpleHttpServer(port);
+
+        List<NameValuePair> params = new java.util.ArrayList<NameValuePair>();
+        params.add(new BasicNameValuePair("json", "{\"a\":\"json\"}"));
+        params.add(new BasicNameValuePair("abc", "123"));
+
+        HttpResponse r = Utils.get("localhost:8129","", params);
+        System.out.println(r );
+        String contents = 
org.apache.commons.io.IOUtils.toString(r.getEntity().getContent());
+        System.out.println(contents);
+        /**
+         * Use responseLine() to test that the lines in the http response
+         * which originate from the above parameters are returned exactly.
+         */
+
+        
Assert.assertTrue(contents.contains(SimpleHttpServer.responseLine("abc","123")));
+        
Assert.assertTrue(contents.contains(SimpleHttpServer.responseLine("json", 
"%7B%22a%22%3A%22json%22%7D")));
+        server.stop();
+
+    }
+}

Reply via email to