fixing all checkstyle violations, delete maxAllowedViolations from pom
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5528a4c6 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5528a4c6 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5528a4c6 Branch: refs/heads/master Commit: 5528a4c639a87dbfaba4a2bae68ac99971c66224 Parents: 4cbbb75 Author: Apex Dev <[email protected]> Authored: Wed Jan 18 14:43:33 2017 -0800 Committer: Oliver W <[email protected]> Committed: Fri Jan 20 11:12:20 2017 -0800 ---------------------------------------------------------------------- benchmark/pom.xml | 8 ---- .../datatorrent/benchmark/ApplicationFixed.java | 15 +++--- .../com/datatorrent/benchmark/Benchmark.java | 17 +++---- .../benchmark/CouchBaseAppInput.java | 6 +-- .../benchmark/CouchBaseAppOutput.java | 8 ++-- .../benchmark/CouchBaseInputOperator.java | 17 ++++--- .../benchmark/FixedTuplesInputOperator.java | 11 +++-- .../datatorrent/benchmark/RandomMapOutput.java | 40 +++++++++------- .../benchmark/RandomWordInputModule.java | 11 +++-- .../benchmark/WordCountOperator.java | 10 ++-- .../AerospikeOutputBenchmarkApplication.java | 17 ++++--- .../aerospike/AerospikeOutputOperator.java | 13 ++++-- .../UniqueValueCountBenchmarkApplication.java | 21 +++++---- .../CassandraOutputBenchmarkApplication.java | 17 ++++--- .../cassandra/CassandraOutputOperator.java | 14 +++--- .../benchmark/fs/FSByteOutputOperator.java | 9 ++-- .../benchmark/fs/FSOutputOperatorBenchmark.java | 26 +++++++---- .../hive/HiveInsertBenchmarkingApp.java | 28 ++++++----- .../hive/HiveMapInsertBenchmarkingApp.java | 31 +++++++------ .../kafka/BenchmarkKafkaInputOperator.java | 6 +-- ...nchmarkPartitionableKafkaOutputOperator.java | 44 +++++++++--------- .../benchmark/kafka/KafkaInputBenchmark.java | 23 ++++----- .../benchmark/kafka/KafkaOutputBenchmark.java | 5 +- .../benchmark/kafka/KafkaTestPartitioner.java | 7 +-- .../RubyOperatorBenchmarkApplication.java | 11 +++-- .../spillable/SpillableTestOperator.java | 3 +- .../state/ManagedStateBenchmarkApp.java | 3 +- .../benchmark/state/StoreOperator.java | 9 ++-- .../stream/DevNullCounterBenchmark.java | 13 +++--- .../benchmark/stream/IntegerOperator.java | 17 ++++--- .../benchmark/stream/StreamDuplicaterApp.java | 21 +++++---- .../benchmark/stream/StreamMergeApp.java | 5 +- .../benchmark/testbench/EventClassifierApp.java | 11 +++-- .../EventClassifierNumberToHashDoubleApp.java | 18 ++++--- .../benchmark/testbench/EventGeneratorApp.java | 9 ++-- .../testbench/EventIncrementerApp.java | 22 +++++---- .../testbench/FilterClassifierApp.java | 15 +++--- .../testbench/FilteredEventClassifierApp.java | 8 ++-- .../benchmark/testbench/HashMapOperator.java | 37 +++++++++------ .../testbench/RandomEventGeneratorApp.java | 8 ++-- .../testbench/SeedEventGeneratorApp.java | 14 ++++-- .../testbench/ThroughputCounterApp.java | 13 ++++-- .../AbstractWindowedOperatorBenchmarkApp.java | 10 ++-- .../KeyedWindowedOperatorBenchmarkApp.java | 9 ++-- .../window/WindowedOperatorBenchmarkApp.java | 6 ++- .../benchmark/ApplicationFixedTest.java | 17 ++++--- .../datatorrent/benchmark/BenchmarkTest.java | 3 +- .../benchmark/CouchBaseBenchmarkTest.java | 13 +++--- .../benchmark/accumulo/AccumuloApp.java | 17 ++++--- .../benchmark/accumulo/AccumuloAppTest.java | 8 ++-- .../aerospike/AerospikeBenchmarkAppTest.java | 7 +-- .../algo/UniqueValueCountBenchmarkTest.java | 6 ++- .../cassandra/CassandraApplicatonTest.java | 8 ++-- .../benchmark/hbase/HBaseApplicationTest.java | 9 ++-- .../hbase/HBaseCsvMappingApplication.java | 7 +-- .../benchmark/hive/HiveInsertBenchmarkTest.java | 37 +++++++++------ .../benchmark/hive/HiveMapBenchmarkTest.java | 38 ++++++++------- .../kafka/KafkaInputBenchmarkTest.java | 3 +- .../kafka/KafkaOutputBenchmarkTest.java | 4 +- .../benchmark/memsql/MemsqlInputBenchmark.java | 21 +++++---- .../memsql/MemsqlInputBenchmarkTest.java | 49 ++++++++++++-------- .../benchmark/memsql/MemsqlOutputBenchmark.java | 26 ++++++----- .../memsql/MemsqlOutputBenchmarkTest.java | 21 +++++---- .../script/RubyOperatorBenchmarkAppTest.java | 7 ++- .../spillable/SpillableDSBenchmarkTest.java | 6 +-- .../state/ManagedStateBenchmarkAppTest.java | 3 -- .../testbench/EventClassifierAppTest.java | 12 +++-- ...ventClassifierNumberToHashDoubleAppTest.java | 12 +++-- .../testbench/EventGeneratorAppTest.java | 12 +++-- .../testbench/EventIncrementerAppTest.java | 12 +++-- .../testbench/FilterClassifierAppTest.java | 10 ++-- .../FilteredEventClassifierAppTest.java | 12 +++-- .../testbench/ThroughputCounterAppTest.java | 12 +++-- .../util/serde/GenericSerdePerformanceTest.java | 7 +-- 74 files changed, 608 insertions(+), 457 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/pom.xml ---------------------------------------------------------------------- diff --git a/benchmark/pom.xml b/benchmark/pom.xml index b2e0981..4bbd5ac 100644 --- a/benchmark/pom.xml +++ b/benchmark/pom.xml @@ -143,14 +143,6 @@ <skip>true</skip> </configuration> </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <configuration> - <maxAllowedViolations>281</maxAllowedViolations> - <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole> - </configuration> - </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java b/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java index 53f01fc..aa10eea 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/ApplicationFixed.java @@ -18,13 +18,15 @@ */ package com.datatorrent.benchmark; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.DAG; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context.PortContext; + +import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.api.StreamingApplication; -import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.annotation.ApplicationAnnotation; /** * Example of application configuration in Java. @@ -34,7 +36,7 @@ import org.apache.hadoop.conf.Configuration; * * @since 0.3.2 */ -@ApplicationAnnotation(name="PerformanceBenchmarkForFixedNumberOfTuples") +@ApplicationAnnotation(name = "PerformanceBenchmarkForFixedNumberOfTuples") public class ApplicationFixed implements StreamingApplication { private final Locality locality = null; @@ -44,7 +46,8 @@ public class ApplicationFixed implements StreamingApplication public void populateDAG(DAG dag, Configuration conf) { FixedTuplesInputOperator wordGenerator = dag.addOperator("WordGenerator", FixedTuplesInputOperator.class); - dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); + dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes() + .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); wordGenerator.setCount(500000); WordCountOperator<byte[]> counter = dag.addOperator("Counter", new WordCountOperator<byte[]>()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java index 5649914..d8d51b8 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/Benchmark.java @@ -52,10 +52,10 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; * @since 0.9.0 */ -@ApplicationAnnotation(name="PerformanceBenchmarkingApp") +@ApplicationAnnotation(name = "PerformanceBenchmarkingApp") public abstract class Benchmark { - static abstract class AbstractApplication implements StreamingApplication + abstract static class AbstractApplication implements StreamingApplication { public static final int QUEUE_CAPACITY = 32 * 1024; @@ -63,7 +63,8 @@ public abstract class Benchmark public void populateDAG(DAG dag, Configuration conf) { RandomWordInputModule wordGenerator = dag.addOperator("wordGenerator", RandomWordInputModule.class); - dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); + dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getAttributes() + .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); WordCountOperator<byte[]> counter = dag.addOperator("counter", new WordCountOperator<byte[]>()); dag.getMeta(counter).getMeta(counter.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); @@ -77,7 +78,7 @@ public abstract class Benchmark /** * Let the engine decide how to best place the 2 operators. */ - @ApplicationAnnotation(name="PerformanceBenchmarkNoLocality") + @ApplicationAnnotation(name = "PerformanceBenchmarkNoLocality") public static class NoLocality extends AbstractApplication { @Override @@ -92,7 +93,7 @@ public abstract class Benchmark * Place the 2 operators so that they are in the same Rack. * The operators are requested to be deployed on different machines. */ - @ApplicationAnnotation(name="PerformanceBenchmarkRackLocal") + @ApplicationAnnotation(name = "PerformanceBenchmarkRackLocal") public static class RackLocal extends AbstractApplication { @Override @@ -107,7 +108,7 @@ public abstract class Benchmark * Place the 2 operators so that they are in the same node. * The operators are requested to be deployed as different processes within the same machine. */ - @ApplicationAnnotation(name="PerformanceBenchmarkNodeLocal") + @ApplicationAnnotation(name = "PerformanceBenchmarkNodeLocal") public static class NodeLocal extends AbstractApplication { @Override @@ -122,7 +123,7 @@ public abstract class Benchmark * Place the 2 operators so that they are in the same container. * The operators are deployed as different threads in the same process. */ - @ApplicationAnnotation(name="PerformanceBenchmarkContainerLocal") + @ApplicationAnnotation(name = "PerformanceBenchmarkContainerLocal") public static class ContainerLocal extends AbstractApplication { @Override @@ -136,7 +137,7 @@ public abstract class Benchmark /** * Place the 2 operators so that they are in the same thread. */ - @ApplicationAnnotation(name="PerformanceBenchmarkThreadLocal") + @ApplicationAnnotation(name = "PerformanceBenchmarkThreadLocal") public static class ThreadLocal extends AbstractApplication { @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java index 6096530..bf5b876 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppInput.java @@ -18,14 +18,14 @@ */ package com.datatorrent.benchmark; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; /** - * * Application to benchmark the performance of couchbase input operator. * The number of tuples processed per second were around 9000. * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java index f789d08..4f12791 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java @@ -18,12 +18,14 @@ */ package com.datatorrent.benchmark; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.lib.testbench.RandomEventGenerator; import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; + +import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.testbench.RandomEventGenerator; /** * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java index 923b588..8ae0a94 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseInputOperator.java @@ -18,12 +18,14 @@ */ package com.datatorrent.benchmark; -import com.datatorrent.contrib.couchbase.AbstractCouchBaseInputOperator; -import com.datatorrent.contrib.couchbase.CouchBaseWindowStore; import java.util.ArrayList; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.contrib.couchbase.AbstractCouchBaseInputOperator; +import com.datatorrent.contrib.couchbase.CouchBaseWindowStore; + /** * <p>CouchBaseInputOperator class.</p> * @@ -32,14 +34,15 @@ import org.slf4j.LoggerFactory; public class CouchBaseInputOperator extends AbstractCouchBaseInputOperator<String> { private static final Logger logger = LoggerFactory.getLogger(CouchBaseWindowStore.class); + @Override public String getTuple(Object object) { - if(object!=null) - return object.toString(); - else{ - logger.info("Object returned is null"); - return "null"; + if (object != null) { + return object.toString(); + } else { + logger.info("Object returned is null"); + return "null"; } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java index ad7f8c1..f2582bd 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/FixedTuplesInputOperator.java @@ -18,14 +18,15 @@ */ package com.datatorrent.benchmark; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.Context.OperatorContext; +import java.util.ArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; +import com.datatorrent.api.Context.OperatorContext; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; /** * <p>FixedTuplesInputOperator class.</p> @@ -44,7 +45,7 @@ public class FixedTuplesInputOperator implements InputOperator { if (firstTime) { long start = System.currentTimeMillis(); - for (int i = count; i-- > 0;) { + for (int i = count; i-- > 0; ) { output.emit(new byte[64]); } firstTime = false; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java b/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java index 106bd79..3342771 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/RandomMapOutput.java @@ -19,20 +19,23 @@ package com.datatorrent.benchmark; import java.util.HashMap; -import com.datatorrent.common.util.BaseOperator; + import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; /** * Operator that outputs random values in a map. * * @since 1.0.4 */ -public class RandomMapOutput extends BaseOperator { +public class RandomMapOutput extends BaseOperator +{ - public final transient DefaultOutputPort<HashMap<String, Object>> map_data = new DefaultOutputPort<HashMap<String, Object>>(); + public final transient DefaultOutputPort<HashMap<String, Object>> map_data = + new DefaultOutputPort<HashMap<String, Object>>(); public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>() - { + { @Override public void process(Integer tuple) { @@ -40,22 +43,25 @@ public class RandomMapOutput extends BaseOperator { map.put(key, tuple); RandomMapOutput.this.process(map); } - }; + }; - private String key; + private String key; - public String getKey() { - return key; - } + public String getKey() + { + return key; + } - public void setKey(String key) { - this.key = key; - } + public void setKey(String key) + { + this.key = key; + } - public void process(HashMap<String, Object> tuple) { + public void process(HashMap<String, Object> tuple) + { - if (map_data.isConnected()) { - map_data.emit(tuple); - } - } + if (map_data.isConnected()) { + map_data.emit(tuple); + } + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java b/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java index 11c7568..7d02de2 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/RandomWordInputModule.java @@ -18,10 +18,12 @@ */ package com.datatorrent.benchmark; +import javax.validation.constraints.Min; + +import com.datatorrent.api.Context.OperatorContext; + import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; -import com.datatorrent.api.Context.OperatorContext; -import javax.validation.constraints.Min; /** * <p> @@ -87,7 +89,6 @@ public class RandomWordInputModule implements InputOperator return emitSameTuple; } - /** * Emits byte array of specified size. * Emits either the same byte array or creates new byte array every time @@ -103,11 +104,11 @@ public class RandomWordInputModule implements InputOperator final boolean EMIT_SAME_TUPLE_COPY = emitSameTuple; if (firstTime) { if (EMIT_SAME_TUPLE_COPY) { - for (int i = count--; i-- > 0;) { + for (int i = count--; i-- > 0; ) { output.emit(sameTupleArray); } } else { - for (int i = count--; i-- > 0;) { + for (int i = count--; i-- > 0; ) { output.emit(new byte[TUPLE_SIZE_COPY]); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java index 098ed42..6e91482 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java @@ -21,14 +21,17 @@ package com.datatorrent.benchmark; /* * To change this template, choose Tools | Templates and open the template in the editor. */ -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.Operator; -import com.datatorrent.api.Context.OperatorContext; import java.util.ArrayList; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.Context.OperatorContext; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; + /** * <p>WordCountOperator class.</p> * @@ -84,5 +87,6 @@ public class WordCountOperator<T> implements Operator counts = new ArrayList<Integer>(); millis = new ArrayList<Integer>(); } + private static final Logger logger = LoggerFactory.getLogger(WordCountOperator.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java index a70aae6..0a880fd 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputBenchmarkApplication.java @@ -18,15 +18,16 @@ */ package com.datatorrent.benchmark.aerospike; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.contrib.aerospike.AerospikeTransactionalStore; import com.datatorrent.lib.testbench.RandomEventGenerator; -import org.apache.hadoop.conf.Configuration; + /** - * * Application to benchmark the performance of aerospike output operator. * The operator was tested on DT cluster and the number of tuples processed * by the operator per second were around 12,000 @@ -34,16 +35,18 @@ import org.apache.hadoop.conf.Configuration; * @since 1.0.4 */ - -@ApplicationAnnotation(name="AerospikeOutputOperatorBenchmark") -public class AerospikeOutputBenchmarkApplication implements StreamingApplication { +@ApplicationAnnotation(name = "AerospikeOutputOperatorBenchmark") +public class AerospikeOutputBenchmarkApplication implements StreamingApplication +{ private final String NODE = "127.0.0.1"; private final int PORT = 3000; private final String NAMESPACE = "test"; private final Locality locality = null; + @Override - public void populateDAG(DAG dag, Configuration conf) { + public void populateDAG(DAG dag, Configuration conf) + { RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); rand.setMaxvalue(3000); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java index 210e086..f9ee689 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/aerospike/AerospikeOutputOperator.java @@ -23,25 +23,28 @@ import java.util.List; import com.aerospike.client.AerospikeException; import com.aerospike.client.Bin; import com.aerospike.client.Key; -import com.datatorrent.contrib.aerospike.AbstractAerospikeTransactionalPutOperator; +import com.datatorrent.contrib.aerospike.AbstractAerospikeTransactionalPutOperator; /** * <p>AerospikeOutputOperator class.</p> * * @since 1.0.4 */ -public class AerospikeOutputOperator extends AbstractAerospikeTransactionalPutOperator<Integer>{ +public class AerospikeOutputOperator extends AbstractAerospikeTransactionalPutOperator<Integer> +{ private final String KEYSPACE = "test"; private final String SET_NAME = "Aerospike_Output"; private int id = 0; + @Override protected Key getUpdatedBins(Integer tuple, List<Bin> bins) - throws AerospikeException { + throws AerospikeException + { - Key key = new Key(KEYSPACE,SET_NAME,id++); - bins.add(new Bin("ID",tuple)); + Key key = new Key(KEYSPACE, SET_NAME, id++); + bins.add(new Bin("ID", tuple)); return key; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java index f522396..f74311e 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkApplication.java @@ -18,23 +18,22 @@ */ package com.datatorrent.benchmark.algo; - import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.partitioner.StatelessPartitioner; + import com.datatorrent.lib.algo.UniqueCounter; import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter; import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.common.partitioner.StatelessPartitioner; import com.datatorrent.lib.stream.Counter; import com.datatorrent.lib.testbench.RandomEventGenerator; -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - /** * Application to demonstrate PartitionableUniqueCount operator. <br> * The input operator generate random keys, which is sent to @@ -63,9 +62,11 @@ public class UniqueValueCountBenchmarkApplication implements StreamingApplicatio /* Initialize with three partition to start with */ UniqueCounter<Integer> uniqCount = dag.addOperator("uniqevalue", new UniqueCounter<Integer>()); - MapToKeyHashValuePairConverter<Integer, Integer> converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter()); + MapToKeyHashValuePairConverter<Integer, Integer> converter = + dag.addOperator("converter", new MapToKeyHashValuePairConverter()); - dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<UniqueCounter<Integer>>(3)); + dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER, + new StatelessPartitioner<UniqueCounter<Integer>>(3)); dag.setInputPortAttribute(uniqCount.data, Context.PortContext.PARTITION_PARALLEL, true); uniqCount.setCumulative(false); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java index dead2cd..46d503f 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputBenchmarkApplication.java @@ -19,24 +19,27 @@ package com.datatorrent.benchmark.cassandra; import org.apache.hadoop.conf.Configuration; -import com.datatorrent.lib.testbench.RandomEventGenerator; + import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; + import com.datatorrent.api.annotation.ApplicationAnnotation; + import com.datatorrent.contrib.cassandra.CassandraTransactionalStore; +import com.datatorrent.lib.testbench.RandomEventGenerator; + /** - * - *Application to benchmark the performance of cassandra output operator. - *The operator was tested on following configuration: - *Virtual Box with 10GB ram, 4 processor cores on an i7 machine with 16GB ram - *The number of tuples processed per second were around 20,000 + * Application to benchmark the performance of cassandra output operator. + * The operator was tested on following configuration: + * Virtual Box with 10GB ram, 4 processor cores on an i7 machine with 16GB ram + * The number of tuples processed per second were around 20,000 * * @since 1.0.3 */ -@ApplicationAnnotation(name="CassandraOperatorDemo") +@ApplicationAnnotation(name = "CassandraOperatorDemo") public class CassandraOutputBenchmarkApplication implements StreamingApplication { private final Locality locality = null; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java index 666746b..592d8a2 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/cassandra/CassandraOutputOperator.java @@ -18,34 +18,36 @@ */ package com.datatorrent.benchmark.cassandra; - import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Statement; import com.datastax.driver.core.exceptions.DriverException; -import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator; +import com.datatorrent.contrib.cassandra.AbstractCassandraTransactionableOutputOperator; /** * <p>CassandraOutputOperator class.</p> * * @since 1.0.3 */ -public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperator<Integer>{ +public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperator<Integer> +{ private int id = 0; @Override - protected PreparedStatement getUpdateCommand() { + protected PreparedStatement getUpdateCommand() + { String statement = "Insert into test.cassandra_operator(id, result) values (?,?);"; return store.getSession().prepare(statement); } @Override protected Statement setStatementParameters(PreparedStatement updateCommand, - Integer tuple) throws DriverException { + Integer tuple) throws DriverException + { BoundStatement boundStmnt = new BoundStatement(updateCommand); - return boundStmnt.bind(id++,tuple); + return boundStmnt.bind(id++, tuple); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java index 894cb75..ce0821c 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSByteOutputOperator.java @@ -18,10 +18,12 @@ */ package com.datatorrent.benchmark.fs; -import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; import java.util.Arrays; + import javax.validation.constraints.Min; +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + /** * This output operator receives * @@ -38,19 +40,20 @@ public class FSByteOutputOperator extends AbstractFileOutputOperator<byte[]> /** * The file a tuple is written out to is determined by modding the hashcode of the * tuple by the outputFileCount. + * * @param tuple The input tuple to write out. * @return The name of the file to write the tuple to. */ @Override protected String getFileName(byte[] tuple) { - return ((Integer) (Arrays.hashCode(tuple) % outputFileCount)).toString(); + return ((Integer)(Arrays.hashCode(tuple) % outputFileCount)).toString(); } @Override protected byte[] getBytesForTuple(byte[] tuple) { - for(int counter = 0; + for (int counter = 0; counter < tuple.length; counter++) { tuple[counter] += 1; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java index 8702ab8..7a63d18 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/fs/FSOutputOperatorBenchmark.java @@ -18,17 +18,20 @@ */ package com.datatorrent.benchmark.fs; -import com.datatorrent.lib.testbench.RandomWordGenerator; +import org.apache.commons.lang.mutable.MutableLong; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; -import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.DAG; + +import com.datatorrent.api.StreamingApplication; + import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.counters.BasicCounters; -import org.apache.commons.lang.mutable.MutableLong; +import com.datatorrent.lib.counters.BasicCounters; -import org.apache.hadoop.conf.Configuration; +import com.datatorrent.lib.testbench.RandomWordGenerator; /** * Application used to benchmark HDFS output operator @@ -38,25 +41,28 @@ import org.apache.hadoop.conf.Configuration; * @since 0.9.4 */ -@ApplicationAnnotation(name="HDFSOutputOperatorBenchmarkingApp") +@ApplicationAnnotation(name = "HDFSOutputOperatorBenchmarkingApp") public class FSOutputOperatorBenchmark implements StreamingApplication { @Override public void populateDAG(DAG dag, Configuration conf) { String filePath = "HDFSOutputOperatorBenchmarkingApp/" - + System.currentTimeMillis(); + + System.currentTimeMillis(); dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000); RandomWordGenerator wordGenerator = dag.addOperator("wordGenerator", RandomWordGenerator.class); - dag.getOperatorMeta("wordGenerator").getMeta(wordGenerator.output).getAttributes().put(PortContext.QUEUE_CAPACITY, 10000); - dag.getOperatorMeta("wordGenerator").getAttributes().put(OperatorContext.APPLICATION_WINDOW_COUNT, 1); + dag.getOperatorMeta("wordGenerator").getMeta(wordGenerator.output) + .getAttributes().put(PortContext.QUEUE_CAPACITY, 10000); + dag.getOperatorMeta("wordGenerator").getAttributes() + .put(OperatorContext.APPLICATION_WINDOW_COUNT, 1); FSByteOutputOperator hdfsOutputOperator = dag.addOperator("hdfsOutputOperator", new FSByteOutputOperator()); hdfsOutputOperator.setFilePath(filePath); - dag.getOperatorMeta("hdfsOutputOperator").getAttributes().put(OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>()); + dag.getOperatorMeta("hdfsOutputOperator").getAttributes() + .put(OperatorContext.COUNTERS_AGGREGATOR, new BasicCounters.LongAggregator<MutableLong>()); dag.addStream("Generator2HDFSOutput", wordGenerator.output, hdfsOutputOperator.input); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java index 60be57d..95fa961 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkingApp.java @@ -30,16 +30,18 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator; -import com.datatorrent.contrib.hive.HiveOperator; -import com.datatorrent.contrib.hive.HiveStore; - import com.datatorrent.api.Context.OperatorContext; + import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; + import com.datatorrent.api.StreamingApplication; + import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator; +import com.datatorrent.contrib.hive.HiveOperator; +import com.datatorrent.contrib.hive.HiveStore; /** * Application used to benchmark HIVE Insert operator @@ -79,13 +81,14 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication { HiveStore store = new HiveStore(); store.setDatabaseUrl(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl")); - store.setConnectionProperties(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties")); + store.setConnectionProperties(conf.get( + "dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties")); store.setFilepath(conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.filepath")); try { - hiveInitializeDatabase(store, conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename")); - } - catch (SQLException ex) { + hiveInitializeDatabase(store, conf.get( + "dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename")); + } catch (SQLException ex) { LOG.debug(ex.getMessage()); } @@ -109,8 +112,9 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication { hiveStore.connect(); Statement stmt = hiveStore.getConnection().createStatement(); - stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 string) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n" - + "STORED AS TEXTFILE "); + stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + + " (col1 string) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n" + + "STORED AS TEXTFILE "); hiveStore.disconnect(); } @@ -171,8 +175,8 @@ public class HiveInsertBenchmarkingApp implements StreamingApplication { for (; - tupleCounter < tuplesPerWindow; - tupleCounter++) { + tupleCounter < tuplesPerWindow; + tupleCounter++) { String output = "2014-12-1" + random.nextInt(10) + ""; outputString.emit(output); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java index cfbbfc5..98d9ce3 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/hive/HiveMapInsertBenchmarkingApp.java @@ -24,23 +24,23 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.Map; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import com.datatorrent.lib.testbench.RandomEventGenerator; - -import com.datatorrent.contrib.hive.*; - import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; - import com.datatorrent.benchmark.RandomMapOutput; +import com.datatorrent.contrib.hive.AbstractFSRollingOutputOperator; +import com.datatorrent.contrib.hive.HiveOperator; +import com.datatorrent.contrib.hive.HiveStore; +import com.datatorrent.lib.testbench.RandomEventGenerator; + + /** * Application used to benchmark HIVE Map Insert operator * The DAG consists of random Event generator operator that is @@ -61,12 +61,13 @@ public class HiveMapInsertBenchmarkingApp implements StreamingApplication { HiveStore store = new HiveStore(); store.setDatabaseUrl(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl")); - store.setConnectionProperties(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties")); + store.setConnectionProperties(conf.get( + "dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties")); store.setFilepath(conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.filepath")); try { - hiveInitializeMapDatabase(store, conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"), ":"); - } - catch (SQLException ex) { + hiveInitializeMapDatabase(store, conf.get( + "dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename"), ":"); + } catch (SQLException ex) { LOG.debug(ex.getMessage()); } dag.setAttribute(DAG.STREAMING_WINDOW_SIZE_MILLIS, 1000); @@ -90,13 +91,15 @@ public class HiveMapInsertBenchmarkingApp implements StreamingApplication /* * User can create table and specify data columns and partition columns in this function. */ - public static void hiveInitializeMapDatabase(HiveStore hiveStore, String tablename, String delimiterMap) throws SQLException + public static void hiveInitializeMapDatabase( + HiveStore hiveStore, String tablename, String delimiterMap) throws SQLException { hiveStore.connect(); Statement stmt = hiveStore.getConnection().createStatement(); - stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n" - + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n" - + "STORED AS TEXTFILE "); + stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n' \n" + + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n" + + "STORED AS TEXTFILE "); hiveStore.disconnect(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java index 8239ea7..e147ad7 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java @@ -18,11 +18,11 @@ */ package com.datatorrent.benchmark.kafka; -import kafka.message.Message; - import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.contrib.kafka.AbstractKafkaInputOperator; +import kafka.message.Message; + /** * This operator emits one constant message for each kafka message received. * So we can track the throughput by messages emitted per second in the stram platform. @@ -38,7 +38,7 @@ public class BenchmarkKafkaInputOperator extends AbstractKafkaInputOperator /** * The output port on which messages are emitted. */ - public transient DefaultOutputPort<String> oport = new DefaultOutputPort<String>(); + public transient DefaultOutputPort<String> oport = new DefaultOutputPort<String>(); @Override protected void emitTuple(Message message) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java index 1126ac1..6353c37 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java @@ -18,7 +18,6 @@ */ package com.datatorrent.benchmark.kafka; - import java.util.ArrayList; import java.util.Collection; import java.util.Map; @@ -26,9 +25,6 @@ import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; import javax.validation.constraints.Min; @@ -37,21 +33,27 @@ import org.slf4j.LoggerFactory; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultPartition; + import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.api.Partitioner; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + /** * This operator keep sending constant messages(1kb each) in {@link #threadNum} threads. * Messages are distributed evenly to partitions. * <p></p> + * * @displayName Benchmark Partitionable Kafka Output * @category Messaging * @tags output operator - * * @since 0.9.3 */ -public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<BenchmarkPartitionableKafkaOutputOperator>, InputOperator, ActivationListener<OperatorContext> +public class BenchmarkPartitionableKafkaOutputOperator implements + Partitioner<BenchmarkPartitionableKafkaOutputOperator>, InputOperator, ActivationListener<OperatorContext> { private static final Logger logger = LoggerFactory.getLogger(BenchmarkPartitionableKafkaOutputOperator.class); @@ -78,7 +80,8 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be private int stickyKey = 0; - private transient Runnable r = new Runnable() { + private transient Runnable r = new Runnable() + { Producer<String, String> producer = null; @@ -101,12 +104,12 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be } long k = 0; - while (k<msgsSecThread || !controlThroughput) { + while (k < msgsSecThread || !controlThroughput) { long key = (stickyKey >= 0 ? stickyKey : k); k++; producer.send(new KeyedMessage<String, String>(topic, "" + key, new String(constantMsg))); - if(k==Long.MAX_VALUE){ - k=0; + if (k == Long.MAX_VALUE) { + k = 0; } } } @@ -152,10 +155,12 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be * {@inheritDoc} */ @Override - public Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> definePartitions(Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> partitions, PartitioningContext context) + public Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> definePartitions( + Collection<Partition<BenchmarkPartitionableKafkaOutputOperator>> partitions, PartitioningContext context) { - ArrayList<Partition<BenchmarkPartitionableKafkaOutputOperator>> newPartitions = new ArrayList<Partitioner.Partition<BenchmarkPartitionableKafkaOutputOperator>>(partitionCount); + ArrayList<Partition<BenchmarkPartitionableKafkaOutputOperator>> newPartitions = + new ArrayList<Partitioner.Partition<BenchmarkPartitionableKafkaOutputOperator>>(partitionCount); for (int i = 0; i < partitionCount; i++) { BenchmarkPartitionableKafkaOutputOperator bpkoo = new BenchmarkPartitionableKafkaOutputOperator(); @@ -163,7 +168,8 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be bpkoo.setTopic(topic); bpkoo.setBrokerList(brokerList); bpkoo.setStickyKey(i); - Partition<BenchmarkPartitionableKafkaOutputOperator> p = new DefaultPartition<BenchmarkPartitionableKafkaOutputOperator>(bpkoo); + Partition<BenchmarkPartitionableKafkaOutputOperator> p = + new DefaultPartition<BenchmarkPartitionableKafkaOutputOperator>(bpkoo); newPartitions.add(p); } return newPartitions; @@ -176,20 +182,17 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be logger.info("Activate the benchmark kafka output operator .... "); constantMsg = new byte[msgSize]; for (int i = 0; i < constantMsg.length; i++) { - constantMsg[i] = (byte) ('a' + i%26); + constantMsg[i] = (byte)('a' + i % 26); } - for (int i = 0; i < threadNum; i++) { - if(controlThroughput){ + if (controlThroughput) { ses.scheduleAtFixedRate(r, 0, 1, TimeUnit.SECONDS); - } - else { + } else { ses.submit(r); } } - } @Override @@ -268,7 +271,4 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be this.stickyKey = stickyKey; } - - - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java index 159ee60..ead6c66 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java @@ -18,24 +18,27 @@ */ package com.datatorrent.benchmark.kafka; -import com.google.common.collect.Sets; import java.util.Properties; + import org.apache.hadoop.conf.Configuration; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; + +import com.datatorrent.api.DAG; + import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.DefaultInputPort; + +import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; + +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.contrib.kafka.HighlevelKafkaConsumer; import com.datatorrent.contrib.kafka.KafkaConsumer; import com.datatorrent.contrib.kafka.SimpleKafkaConsumer; - /** * The stream app to test the benckmark of kafka * You can set the property file to make it using either {@link SimpleKafkaConsumer} or {@link HighlevelKafkaConsumer} @@ -43,13 +46,14 @@ import com.datatorrent.contrib.kafka.SimpleKafkaConsumer; * * @since 0.9.3 */ -@ApplicationAnnotation(name="KafkaInputBenchmark") +@ApplicationAnnotation(name = "KafkaInputBenchmark") public class KafkaInputBenchmark implements StreamingApplication { public static class CollectorModule extends BaseOperator { - public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>() { + public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>() + { @Override public void process(String arg0) @@ -65,12 +69,10 @@ public class KafkaInputBenchmark implements StreamingApplication dag.setAttribute(DAG.APPLICATION_NAME, "KafkaInputOperatorPartitionDemo"); BenchmarkKafkaInputOperator bpkio = new BenchmarkKafkaInputOperator(); - String type = conf.get("kafka.consumertype", "simple"); KafkaConsumer consumer = null; - if (type.equals("highlevel")) { // Create template high-level consumer @@ -96,7 +98,6 @@ public class KafkaInputBenchmark implements StreamingApplication dag.setAttribute(bpkio, OperatorContext.COUNTERS_AGGREGATOR, new KafkaConsumer.KafkaMeterStatsAggregator()); // dag.setAttribute(bpkio, OperatorContext.STATS_LISTENER, KafkaMeterStatsListener.class); - } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java index ca1de48..0dd4352 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmark.java @@ -29,7 +29,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; * * @since 0.9.3 */ -@ApplicationAnnotation(name="KafkaOutputBenchmark") +@ApplicationAnnotation(name = "KafkaOutputBenchmark") public class KafkaOutputBenchmark implements StreamingApplication { @@ -37,7 +37,8 @@ public class KafkaOutputBenchmark implements StreamingApplication public void populateDAG(DAG dag, Configuration conf) { dag.setAttribute(DAG.APPLICATION_NAME, "KafkaOutputBenchmark"); - BenchmarkPartitionableKafkaOutputOperator bpkoo = dag.addOperator("KafkaBenchmarkProducer", BenchmarkPartitionableKafkaOutputOperator.class); + BenchmarkPartitionableKafkaOutputOperator bpkoo = dag.addOperator( + "KafkaBenchmarkProducer", BenchmarkPartitionableKafkaOutputOperator.class); bpkoo.setBrokerList(conf.get("kafka.brokerlist")); bpkoo.setPartitionCount(2); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java index 1d22613..65601d5 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaTestPartitioner.java @@ -21,7 +21,6 @@ package com.datatorrent.benchmark.kafka; import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; - /** * A simple partitioner class for test purpose * Key is a int string @@ -32,12 +31,14 @@ import kafka.utils.VerifiableProperties; */ public class KafkaTestPartitioner implements Partitioner { - public KafkaTestPartitioner (VerifiableProperties props) { + public KafkaTestPartitioner(VerifiableProperties props) + { } + @Override public int partition(Object key, int num_Partitions) { - return Integer.parseInt((String)key)%num_Partitions; + return Integer.parseInt((String)key) % num_Partitions; } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java index bc23404..b86cd01 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkApplication.java @@ -18,17 +18,20 @@ */ package com.datatorrent.benchmark.script; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.DAG.Locality; + +import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; + import com.datatorrent.benchmark.RandomMapOutput; -import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.contrib.ruby.RubyOperator; -import com.datatorrent.lib.testbench.RandomEventGenerator; -import org.apache.hadoop.conf.Configuration; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.RandomEventGenerator; /** * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java index 07ab02e..7c45106 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java @@ -169,7 +169,8 @@ public class SpillableTestOperator extends BaseOperator implements Operator.Chec long countInPeriod = totalCount - lastTotalCount; long timeInPeriod = System.currentTimeMillis() - lastLogTime; long totalTime = System.currentTimeMillis() - beginTime; - logger.info("Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}", + logger.info( + "Statistics: total count: {}; period count: {}; total rate (per second): {}; period rate (per second): {}", totalCount, countInPeriod, totalCount * 1000 / totalTime, countInPeriod * 1000 / timeInPeriod); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java index ae5ba40..2dc6f0d 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java @@ -98,7 +98,8 @@ public class ManagedStateBenchmarkApp implements StreamingApplication public static class TestGenerator extends BaseOperator implements InputOperator { - public final transient DefaultOutputPort<KeyValPair<byte[], byte[]>> data = new DefaultOutputPort<KeyValPair<byte[], byte[]>>(); + public final transient DefaultOutputPort<KeyValPair<byte[], byte[]>> data = + new DefaultOutputPort<KeyValPair<byte[], byte[]>>(); int emitBatchSize = 1000; byte[] val = ByteBuffer.allocate(1000).putLong(1234).array(); int rate = 20000; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java index 74ba658..60a775c 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java @@ -69,7 +69,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo private ExecMode execMode = ExecMode.INSERT; private int timeRange = 1000 * 60; - public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = new DefaultInputPort<KeyValPair<byte[], byte[]>>() + public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = + new DefaultInputPort<KeyValPair<byte[], byte[]>>() { @Override public void process(KeyValPair<byte[], byte[]> tuple) @@ -172,7 +173,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo private final int taskBarrier = 100000; /** - * This method first send request of get to the state manager, then handle all the task(get) which already done and update the value. + * This method first send request of get to the state manager, + * then handle all the task(get) which already done and update the value. * @param tuple */ private void updateAsync(KeyValPair<byte[], byte[]> tuple) @@ -251,7 +253,8 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo long spentTime = now - statisticsBeginTime; long totalSpentTime = now - applicationBeginTime; totalTupleCount += tupleCount; - logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}", windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime, + logger.info("Windows: {}; Time Spent: {}, Processed tuples: {}, rate per second: {}; total rate: {}", + windowCountPerStatistics, spentTime, tupleCount, tupleCount * 1000 / spentTime, totalTupleCount * 1000 / totalSpentTime); statisticsBeginTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java index e0ee160..b0b7314 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/DevNullCounterBenchmark.java @@ -18,13 +18,14 @@ */ package com.datatorrent.benchmark.stream; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.stream.DevNullCounter; -import org.apache.hadoop.conf.Configuration; /** * @@ -56,11 +57,11 @@ public class DevNullCounterBenchmark implements StreamingApplication @Override public void populateDAG(DAG dag, Configuration conf) { - // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); - // rand.setMinvalue(0); - // rand.setMaxvalue(999999); - // rand.setTuplesBlastIntervalMillis(50); - // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); + // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); + // rand.setMinvalue(0); + // rand.setMaxvalue(999999); + // rand.setTuplesBlastIntervalMillis(50); + // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator()); DevNullCounter oper = dag.addOperator("oper", new DevNullCounter()); dag.getMeta(oper).getMeta(oper.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java index ff6ed76..c716206 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/IntegerOperator.java @@ -35,37 +35,42 @@ public class IntegerOperator implements InputOperator * Output port which emits integer. */ public final transient DefaultOutputPort<Integer> integer_data = new DefaultOutputPort<Integer>(); + @Override public void emitTuples() { Integer i = 21; - for(int j=0;j<1000;j++){ - integer_data.emit(i); + for (int j = 0; j < 1000; j++) { + integer_data.emit(i); } } @Override public void beginWindow(long windowId) { - //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + //throw new UnsupportedOperationException("Not supported yet."); + // To change body of generated methods, choose Tools | Templates. } @Override public void endWindow() { - //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + //throw new UnsupportedOperationException("Not supported yet."); + // To change body of generated methods, choose Tools | Templates. } @Override public void setup(OperatorContext context) { - //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + //throw new UnsupportedOperationException("Not supported yet."); + // To change body of generated methods, choose Tools | Templates. } @Override public void teardown() { - //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + //throw new UnsupportedOperationException("Not supported yet."); + // To change body of generated methods, choose Tools | Templates. } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java index 951b44b..2e5bcf9 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamDuplicaterApp.java @@ -18,6 +18,8 @@ */ package com.datatorrent.benchmark.stream; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; @@ -25,7 +27,6 @@ import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.stream.DevNull; import com.datatorrent.lib.stream.StreamDuplicater; -import org.apache.hadoop.conf.Configuration; /** * Benchmark App for StreamDuplicater Operator. @@ -36,25 +37,25 @@ import org.apache.hadoop.conf.Configuration; @ApplicationAnnotation(name = "StreamDuplicaterApp") public class StreamDuplicaterApp implements StreamingApplication { - private final Locality locality = null; - public static final int QUEUE_CAPACITY = 16 * 1024; + private final Locality locality = null; + public static final int QUEUE_CAPACITY = 16 * 1024; @Override public void populateDAG(DAG dag, Configuration conf) { - // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); - // rand.setMinvalue(0); - // rand.setMaxvalue(999999); - // rand.setTuplesBlastIntervalMillis(50); - // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); + // RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); + // rand.setMinvalue(0); + // rand.setMaxvalue(999999); + // rand.setTuplesBlastIntervalMillis(50); + // dag.getMeta(rand).getMeta(rand.integer_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator()); StreamDuplicater stream = dag.addOperator("stream", new StreamDuplicater()); dag.getMeta(stream).getMeta(stream.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); dag.addStream("streamdup1", intInput.integer_data, stream.data).setLocality(locality); DevNull<Integer> dev1 = dag.addOperator("dev1", new DevNull()); DevNull<Integer> dev2 = dag.addOperator("dev2", new DevNull()); - dag.addStream("streamdup2",stream.out1,dev1.data).setLocality(locality); - dag.addStream("streamdup3",stream.out2,dev2.data).setLocality(locality); + dag.addStream("streamdup2", stream.out1, dev1.data).setLocality(locality); + dag.addStream("streamdup3", stream.out2, dev2.data).setLocality(locality); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java index d90320c..bb1d081 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/stream/StreamMergeApp.java @@ -18,6 +18,8 @@ */ package com.datatorrent.benchmark.stream; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; @@ -25,7 +27,6 @@ import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.benchmark.WordCountOperator; import com.datatorrent.lib.stream.StreamMerger; -import org.apache.hadoop.conf.Configuration; /** * Benchmark App for StreamMerge Operator. @@ -46,7 +47,7 @@ public class StreamMergeApp implements StreamingApplication StreamMerger stream = dag.addOperator("stream", new StreamMerger()); dag.getMeta(stream).getMeta(stream.data1).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); dag.getMeta(stream).getMeta(stream.data2).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); - dag.addStream("streammerge1", intInput.integer_data, stream.data1,stream.data2).setLocality(locality); + dag.addStream("streammerge1", intInput.integer_data, stream.data1, stream.data2).setLocality(locality); WordCountOperator<Integer> counter = dag.addOperator("counter", new WordCountOperator<Integer>()); dag.getMeta(counter).getMeta(counter.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java index 419de18..b1ddbee 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierApp.java @@ -18,6 +18,11 @@ */ package com.datatorrent.benchmark.testbench; +import java.util.ArrayList; +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; @@ -25,9 +30,6 @@ import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.stream.DevNull; import com.datatorrent.lib.testbench.EventClassifier; -import java.util.ArrayList; -import java.util.HashMap; -import org.apache.hadoop.conf.Configuration; /** * Benchmark App for EventClassifier Operator. @@ -75,7 +77,8 @@ public class EventClassifierApp implements StreamingApplication eventClassifier.setKeyMap(keymap); eventClassifier.setOperationReplace(); eventClassifier.setKeyWeights(wmap); - dag.getMeta(eventClassifier).getMeta(eventClassifier.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); + dag.getMeta(eventClassifier).getMeta(eventClassifier.data).getAttributes() + .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); dag.addStream("eventtest1", hmapOper.hmap_data, eventClassifier.event).setLocality(locality); DevNull<HashMap<String, Double>> dev = dag.addOperator("dev", new DevNull()); dag.addStream("eventtest2", eventClassifier.data, dev.data).setLocality(locality); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java index a49b30e..5fe478b 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleApp.java @@ -18,6 +18,10 @@ */ package com.datatorrent.benchmark.testbench; +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; @@ -26,8 +30,6 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.benchmark.WordCountOperator; import com.datatorrent.benchmark.stream.IntegerOperator; import com.datatorrent.lib.testbench.EventClassifierNumberToHashDouble; -import java.util.HashMap; -import org.apache.hadoop.conf.Configuration; /** * Benchmark App for EventClassifierNumberToHashDouble Operator. @@ -44,10 +46,14 @@ public class EventClassifierNumberToHashDoubleApp implements StreamingApplicatio @Override public void populateDAG(DAG dag, Configuration conf) { - WordCountOperator<HashMap<String, Double>> counterString = dag.addOperator("counterString", new WordCountOperator<HashMap<String, Double>>()); - dag.getMeta(counterString).getMeta(counterString.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); - EventClassifierNumberToHashDouble eventClassify = dag.addOperator("eventClassify", new EventClassifierNumberToHashDouble()); - dag.getMeta(eventClassify).getMeta(eventClassify.data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); + WordCountOperator<HashMap<String, Double>> counterString = + dag.addOperator("counterString", new WordCountOperator<HashMap<String, Double>>()); + dag.getMeta(counterString).getMeta(counterString.input).getAttributes() + .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); + EventClassifierNumberToHashDouble eventClassify = + dag.addOperator("eventClassify", new EventClassifierNumberToHashDouble()); + dag.getMeta(eventClassify).getMeta(eventClassify.data) + .getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); IntegerOperator intInput = dag.addOperator("intInput", new IntegerOperator()); dag.addStream("eventclassifier2", intInput.integer_data, eventClassify.event).setLocality(locality); dag.addStream("eventclassifier1", eventClassify.data, counterString.input).setLocality(locality); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java index 3025c7e..8f28ae6 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventGeneratorApp.java @@ -18,6 +18,10 @@ */ package com.datatorrent.benchmark.testbench; +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; @@ -25,8 +29,6 @@ import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.stream.DevNull; import com.datatorrent.lib.testbench.EventGenerator; -import java.util.HashMap; -import org.apache.hadoop.conf.Configuration; /** * Benchmark App for EventGenerator Operator. @@ -44,7 +46,8 @@ public class EventGeneratorApp implements StreamingApplication public void populateDAG(DAG dag, Configuration conf) { EventGenerator eventGenerator = dag.addOperator("eventGenerator", new EventGenerator()); - dag.getMeta(eventGenerator).getMeta(eventGenerator.count).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); + dag.getMeta(eventGenerator).getMeta(eventGenerator.count).getAttributes() + .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); DevNull<String> devString = dag.addOperator("devString", new DevNull()); DevNull<HashMap<String, Double>> devMap = dag.addOperator("devMap", new DevNull()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java index ea05d07..e562224 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/EventIncrementerApp.java @@ -18,15 +18,17 @@ */ package com.datatorrent.benchmark.testbench; +import java.util.ArrayList; +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.stream.DevNull; import com.datatorrent.lib.testbench.EventIncrementer; -import java.util.ArrayList; -import java.util.HashMap; -import org.apache.hadoop.conf.Configuration; /** * Benchmark App for EventIncrementer Operator. @@ -39,6 +41,7 @@ public class EventIncrementerApp implements StreamingApplication { private final Locality locality = null; public static final int QUEUE_CAPACITY = 16 * 1024; + @Override public void populateDAG(DAG dag, Configuration conf) { @@ -55,15 +58,14 @@ public class EventIncrementerApp implements StreamingApplication eventInc.setKeylimits(keys, low, high); eventInc.setDelta(1); HashMapOperator hmapOper = dag.addOperator("hmapOper", new HashMapOperator()); - dag.addStream("eventIncInput1",hmapOper.hmapList_data,eventInc.seed); - dag.addStream("eventIncInput2",hmapOper.hmapMap_data,eventInc.increment); - DevNull<HashMap<String,Integer>> dev1= dag.addOperator("dev1", new DevNull()); - DevNull<HashMap<String,String>> dev2= dag.addOperator("dev2", new DevNull()); - dag.addStream("eventIncOutput1",eventInc.count,dev1.data).setLocality(locality); - dag.addStream("eventIncOutput2",eventInc.data,dev2.data).setLocality(locality); + dag.addStream("eventIncInput1", hmapOper.hmapList_data, eventInc.seed); + dag.addStream("eventIncInput2", hmapOper.hmapMap_data, eventInc.increment); + DevNull<HashMap<String, Integer>> dev1 = dag.addOperator("dev1", new DevNull()); + DevNull<HashMap<String, String>> dev2 = dag.addOperator("dev2", new DevNull()); + dag.addStream("eventIncOutput1", eventInc.count, dev1.data).setLocality(locality); + dag.addStream("eventIncOutput2", eventInc.data, dev2.data).setLocality(locality); } - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java index 915e6f0..ea2943f 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilterClassifierApp.java @@ -18,15 +18,17 @@ */ package com.datatorrent.benchmark.testbench; +import java.util.ArrayList; +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.stream.DevNull; import com.datatorrent.lib.testbench.FilterClassifier; -import java.util.ArrayList; -import java.util.HashMap; -import org.apache.hadoop.conf.Configuration; /** * Benchmark App for FilterClassifier Operator. @@ -39,6 +41,7 @@ public class FilterClassifierApp implements StreamingApplication { private final Locality locality = null; public static final int QUEUE_CAPACITY = 16 * 1024; + @Override public void populateDAG(DAG dag, Configuration conf) { @@ -80,9 +83,9 @@ public class FilterClassifierApp implements StreamingApplication filter.setTotalFilter(100); HashMapOperator hmapOper = dag.addOperator("hmapOper", new HashMapOperator()); - DevNull<HashMap<String,Double>> dev = dag.addOperator("dev", new DevNull()); - dag.addStream("filter1",hmapOper.hmap_data,filter.data).setLocality(locality); - dag.addStream("filer2",filter.filter,dev.data).setLocality(locality); + DevNull<HashMap<String, Double>> dev = dag.addOperator("dev", new DevNull()); + dag.addStream("filter1", hmapOper.hmap_data, filter.data).setLocality(locality); + dag.addStream("filer2", filter.filter, dev.data).setLocality(locality); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java index c3d996e..52c0bed 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierApp.java @@ -18,15 +18,17 @@ */ package com.datatorrent.benchmark.testbench; +import java.util.ArrayList; +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.stream.DevNull; import com.datatorrent.lib.testbench.FilteredEventClassifier; -import java.util.ArrayList; -import java.util.HashMap; -import org.apache.hadoop.conf.Configuration; /** * Benchmark App for FilteredEventClassifier Operator.
