Repository: apex-malhar Updated Branches: refs/heads/master 623b803f5 -> e22ea0de1
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java index e2a94ef..29cd079 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/HashMapOperator.java @@ -18,15 +18,17 @@ */ package com.datatorrent.benchmark.testbench; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.lib.testbench.EventGenerator; import java.util.ArrayList; import java.util.HashMap; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.lib.testbench.EventGenerator; + /** * HashMap Input Operator used as a helper in testbench benchmarking apps. * @@ -36,11 +38,15 @@ public class HashMapOperator implements InputOperator { private String keys = null; private static final Logger logger = LoggerFactory.getLogger(EventGenerator.class); - private String[] keysArray = {"a","b","c","d"}; - public final transient DefaultOutputPort<HashMap<String, Double>> hmap_data = new DefaultOutputPort<HashMap<String, Double>>(); - public final transient DefaultOutputPort<HashMap<String, ArrayList<Integer>>> hmapList_data = new DefaultOutputPort<HashMap<String, ArrayList<Integer>>>(); - public final transient DefaultOutputPort<HashMap<String, HashMap<String, Integer>>> hmapMap_data = new DefaultOutputPort<HashMap<String, HashMap<String, Integer>>>(); - public final transient DefaultOutputPort<HashMap<String, Integer>> hmapInt_data = new DefaultOutputPort<HashMap<String, Integer>>(); + private String[] keysArray = {"a", "b", "c", "d"}; + public final transient DefaultOutputPort<HashMap<String, Double>> hmap_data = + new DefaultOutputPort<HashMap<String, Double>>(); + public final transient DefaultOutputPort<HashMap<String, ArrayList<Integer>>> hmapList_data = + new DefaultOutputPort<HashMap<String, ArrayList<Integer>>>(); + public final transient DefaultOutputPort<HashMap<String, HashMap<String, Integer>>> hmapMap_data = + new DefaultOutputPort<HashMap<String, HashMap<String, Integer>>>(); + public final transient DefaultOutputPort<HashMap<String, Integer>> hmapInt_data = + new DefaultOutputPort<HashMap<String, Integer>>(); private int numTuples = 1000; private String seed = "a"; private int numKeys = 2; @@ -89,7 +95,7 @@ public class HashMapOperator implements InputOperator for (int j = 0; j < numKeys; j++) { hmapMapTemp.put(keysArray[j], 100 * j); } - for (int j = 0; j < numKeys; j++) { + for (int j = 0; j < numKeys; j++) { hmapMap.put(keysArray[j], hmapMapTemp); } hmapMap_data.emit(hmapMap); @@ -107,7 +113,7 @@ public class HashMapOperator implements InputOperator } if (hmapInt_data.isConnected()) { - for (int i = 0; i < numTuples; i++) { + for (int i = 0; i < numTuples; i++) { HashMap<String, Integer> hmapMapTemp = new HashMap<String, Integer>(); for (int j = 0; j < numKeys; j++) { hmapMapTemp.put(keysArray[j], 100 * j); @@ -120,7 +126,8 @@ public class HashMapOperator implements InputOperator @Override public void beginWindow(long windowId) { - // throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + // throw new UnsupportedOperationException("Not supported yet."); + // To change body of generated methods, choose Tools | Templates. } @Override @@ -132,13 +139,15 @@ public class HashMapOperator implements InputOperator @Override public void setup(OperatorContext context) { - // throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + // throw new UnsupportedOperationException("Not supported yet."); + // To change body of generated methods, choose Tools | Templates. } @Override public void teardown() { - // throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + // throw new UnsupportedOperationException("Not supported yet."); + // To change body of generated methods, choose Tools | Templates. } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java index da757d9..df5b11e 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/RandomEventGeneratorApp.java @@ -18,13 +18,14 @@ */ package com.datatorrent.benchmark.testbench; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.stream.DevNull; import com.datatorrent.lib.testbench.RandomEventGenerator; -import org.apache.hadoop.conf.Configuration; /** * Benchmark App for RandomEventGenerator Operator. @@ -37,14 +38,15 @@ public class RandomEventGeneratorApp implements StreamingApplication { private final Locality locality = null; public static final int QUEUE_CAPACITY = 16 * 1024; + @Override public void populateDAG(DAG dag, Configuration conf) { RandomEventGenerator random = dag.addOperator("random", new RandomEventGenerator()); DevNull<Integer> dev1 = dag.addOperator("dev1", new DevNull()); DevNull<String> dev2 = dag.addOperator("dev2", new DevNull()); - dag.addStream("random1",random.integer_data,dev1.data).setLocality(locality); - dag.addStream("random2",random.string_data,dev2.data).setLocality(locality); + dag.addStream("random1", random.integer_data, dev1.data).setLocality(locality); + dag.addStream("random2", random.string_data, dev2.data).setLocality(locality); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java index db18937..faafcbf 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/SeedEventGeneratorApp.java @@ -18,6 +18,10 @@ */ package com.datatorrent.benchmark.testbench; +import java.util.ArrayList; +import java.util.HashMap; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; @@ -26,9 +30,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.stream.DevNull; import com.datatorrent.lib.testbench.SeedEventGenerator; import com.datatorrent.lib.util.KeyValPair; -import java.util.ArrayList; -import java.util.HashMap; -import org.apache.hadoop.conf.Configuration; + /** * Benchmark App for SeedEventGenerator Operator. @@ -55,10 +57,12 @@ public class SeedEventGeneratorApp implements StreamingApplication DevNull<HashMap<String, String>> devVal = dag.addOperator("devVal", new DevNull<HashMap<String, String>>()); DevNull<HashMap<String, ArrayList<Integer>>> devList = dag.addOperator("devList", new DevNull()); - dag.getMeta(seedEvent).getMeta(seedEvent.string_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); + dag.getMeta(seedEvent).getMeta(seedEvent.string_data) + .getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); dag.addStream("SeedEventGeneratorString", seedEvent.string_data, devString.data).setLocality(locality); - dag.getMeta(seedEvent).getMeta(seedEvent.keyvalpair_list).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); + dag.getMeta(seedEvent).getMeta(seedEvent.keyvalpair_list).getAttributes() + .put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); dag.addStream("SeedEventGeneratorKeyVal", seedEvent.keyvalpair_list, devKeyVal.data).setLocality(locality); dag.getMeta(seedEvent).getMeta(seedEvent.val_data).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java index 5b66b54..d6e762e 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/testbench/ThroughputCounterApp.java @@ -18,14 +18,16 @@ */ package com.datatorrent.benchmark.testbench; +import java.util.HashMap; + +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.stream.DevNull; import com.datatorrent.lib.testbench.ThroughputCounter; -import java.util.HashMap; -import org.apache.hadoop.conf.Configuration; /** * Benchmark App for ThroughputCounter Operator. @@ -38,14 +40,15 @@ public class ThroughputCounterApp implements StreamingApplication { public static final int QUEUE_CAPACITY = 16 * 1024; private final Locality locality = null; + @Override public void populateDAG(DAG dag, Configuration conf) { ThroughputCounter counter = dag.addOperator("counter", new ThroughputCounter()); HashMapOperator oper = dag.addOperator("oper", new HashMapOperator()); - DevNull<HashMap<String,Number>> dev = dag.addOperator("dev", new DevNull()); - dag.addStream("count1",oper.hmapInt_data,counter.data).setLocality(locality); - dag.addStream("count2",counter.count,dev.data).setLocality(locality); + DevNull<HashMap<String, Number>> dev = dag.addOperator("dev", new DevNull()); + dag.addStream("count1", oper.hmapInt_data, counter.data).setLocality(locality); + dag.addStream("count2", counter.count, dev.data).setLocality(locality); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java index 64af9f9..7250e74 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/AbstractWindowedOperatorBenchmarkApp.java @@ -50,7 +50,8 @@ import com.datatorrent.benchmark.window.WindowedOperatorBenchmarkApp.WindowedGen import com.datatorrent.lib.fileaccess.TFileImpl; import com.datatorrent.lib.stream.DevNull; -public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O extends AbstractWindowedOperator> implements StreamingApplication +public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O extends AbstractWindowedOperator> + implements StreamingApplication { protected static final String PROP_STORE_PATH = "dt.application.WindowedOperatorBenchmark.storeBasePath"; protected static final String DEFAULT_BASE_PATH = "WindowedOperatorBenchmark/Store"; @@ -80,7 +81,8 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O // WatermarkGenerator watermarkGenerator = new WatermarkGenerator(); // dag.addOperator("WatermarkGenerator", watermarkGenerator); -// dag.addStream("Control", watermarkGenerator.control, windowedOperator.controlInput).setLocality(Locality.CONTAINER_LOCAL); +// dag.addStream("Control", watermarkGenerator.control, windowedOperator.controlInput) +// .setLocality(Locality.CONTAINER_LOCAL); DevNull output = dag.addOperator("output", new DevNull()); dag.addStream("output", windowedOperator.output, output.data).setLocality(Locality.CONTAINER_LOCAL); @@ -112,7 +114,8 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O windowedOperator.setAllowedLateness(Duration.millis(ALLOWED_LATENESS)); windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.standardMinutes(1))); //accumulating mode - windowedOperator.setTriggerOption(TriggerOption.AtWatermark().withEarlyFiringsAtEvery(Duration.standardSeconds(1)).accumulatingFiredPanes().firingOnlyUpdatedPanes()); + windowedOperator.setTriggerOption(TriggerOption.AtWatermark() + .withEarlyFiringsAtEvery(Duration.standardSeconds(1)).accumulatingFiredPanes().firingOnlyUpdatedPanes()); windowedOperator.setFixedWatermark(30000); //windowedOperator.setTriggerOption(TriggerOption.AtWatermark()); @@ -153,7 +156,6 @@ public abstract class AbstractWindowedOperatorBenchmarkApp<G extends Operator, O return basePath; } - public static class TestStatsListener implements StatsListener, Serializable { private static final Logger LOG = LoggerFactory.getLogger(TestStatsListener.class); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java index 5a9c955..19df8fd 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/KeyedWindowedOperatorBenchmarkApp.java @@ -41,7 +41,8 @@ import com.datatorrent.api.DAG.Locality; import com.datatorrent.lib.fileaccess.TFileImpl; import com.datatorrent.lib.util.KeyValPair; -public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator> +public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp< + KeyedWindowedOperatorBenchmarkApp.KeyedWindowedGenerator, KeyedWindowedOperatorBenchmarkApp.MyKeyedWindowedOperator> { public KeyedWindowedOperatorBenchmarkApp() { @@ -58,7 +59,8 @@ public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorB } @Override - protected void setUpdatedKeyStorage(MyKeyedWindowedOperator windowedOperator, Configuration conf, SpillableComplexComponentImpl sccImpl) + protected void setUpdatedKeyStorage(MyKeyedWindowedOperator windowedOperator, + Configuration conf, SpillableComplexComponentImpl sccImpl) { windowedOperator.setUpdatedKeyStorage(createUpdatedDataStorage(conf, sccImpl)); } @@ -107,7 +109,8 @@ public class KeyedWindowedOperatorBenchmarkApp extends AbstractWindowedOperatorB } } - protected static class KeyedWindowedGenerator extends AbstractGenerator<Tuple.TimestampedTuple<KeyValPair<String, Long>>> + protected static class KeyedWindowedGenerator extends + AbstractGenerator<Tuple.TimestampedTuple<KeyValPair<String, Long>>> { @Override protected TimestampedTuple<KeyValPair<String, Long>> generateNextTuple() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java index 98275ce..d96b453 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/window/WindowedOperatorBenchmarkApp.java @@ -35,7 +35,8 @@ import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.annotation.ApplicationAnnotation; @ApplicationAnnotation(name = "WindowedOperatorBenchmark") -public class WindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp<WindowedOperatorBenchmarkApp.WindowedGenerator, WindowedOperatorBenchmarkApp.MyWindowedOperator> +public class WindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchmarkApp< + WindowedOperatorBenchmarkApp.WindowedGenerator, WindowedOperatorBenchmarkApp.MyWindowedOperator> { public WindowedOperatorBenchmarkApp() { @@ -49,7 +50,8 @@ public class WindowedOperatorBenchmarkApp extends AbstractWindowedOperatorBenchm @Override protected TimestampedTuple<Long> generateNextTuple() { - return new Tuple.TimestampedTuple<Long>(System.currentTimeMillis() - random.nextInt(120000), (long)random.nextInt(100)); + return new Tuple.TimestampedTuple<Long>(System.currentTimeMillis() - random.nextInt(120000), + (long)random.nextInt(100)); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java index 1d76855..cd8a3ec 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/ApplicationFixedTest.java @@ -18,16 +18,17 @@ */ package com.datatorrent.benchmark; -import com.datatorrent.api.DAG; -import com.datatorrent.api.LocalMode; -import com.datatorrent.api.Context.PortContext; - import java.io.IOException; -import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; + /** * Test the DAG declaration in local mode. */ @@ -40,8 +41,10 @@ public class ApplicationFixedTest new ApplicationFixed().populateDAG(lma.getDAG(), new Configuration(false)); DAG dag = lma.cloneDAG(); - FixedTuplesInputOperator wordGenerator = (FixedTuplesInputOperator)dag.getOperatorMeta("WordGenerator").getOperator(); - Assert.assertEquals("Queue Capacity", ApplicationFixed.QUEUE_CAPACITY, (int)dag.getMeta(wordGenerator).getMeta(wordGenerator.output).getValue(PortContext.QUEUE_CAPACITY)); + FixedTuplesInputOperator wordGenerator = (FixedTuplesInputOperator)dag + .getOperatorMeta("WordGenerator").getOperator(); + Assert.assertEquals("Queue Capacity", ApplicationFixed.QUEUE_CAPACITY, (int)dag.getMeta(wordGenerator) + .getMeta(wordGenerator.output).getValue(PortContext.QUEUE_CAPACITY)); LocalMode.Controller lc = lma.getController(); lc.run(60000); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java index 9439243..0a21a7c 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/BenchmarkTest.java @@ -26,7 +26,6 @@ import org.slf4j.LoggerFactory; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.LocalMode; -import com.datatorrent.benchmark.Benchmark; /** * Test the DAG declaration in local mode. @@ -38,7 +37,7 @@ public class BenchmarkTest { for (final Locality l : Locality.values()) { logger.debug("Running the with {} locality", l); - LocalMode.runApp(new Benchmark.AbstractApplication () + LocalMode.runApp(new Benchmark.AbstractApplication() { @Override public Locality getLocality() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java index 7c9f892..6a1c968 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/CouchBaseBenchmarkTest.java @@ -18,14 +18,17 @@ */ package com.datatorrent.benchmark; -import com.datatorrent.api.LocalMode; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; + +import org.junit.Test; + import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; -import org.junit.Test; + +import com.datatorrent.api.LocalMode; public class CouchBaseBenchmarkTest { @@ -52,8 +55,7 @@ public class CouchBaseBenchmarkTest LocalMode.Controller lc = lm.getController(); //lc.setHeartbeatMonitoringEnabled(false); lc.run(20000); - } - catch (Exception ex) { + } catch (Exception ex) { logger.info(ex.getCause()); } is.close(); @@ -76,8 +78,7 @@ public class CouchBaseBenchmarkTest lm.prepareDAG(new CouchBaseAppInput(), conf); LocalMode.Controller lc = lm.getController(); lc.run(20000); - } - catch (Exception ex) { + } catch (Exception ex) { logger.info(ex.getCause()); } is.close(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java index 5e407a0..e2936fe 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloApp.java @@ -27,6 +27,7 @@ import com.datatorrent.contrib.accumulo.AbstractAccumuloOutputOperator; import com.datatorrent.contrib.accumulo.AccumuloRowTupleGenerator; import com.datatorrent.contrib.accumulo.AccumuloTestHelper; import com.datatorrent.contrib.accumulo.AccumuloTuple; + /** * BenchMark Results * ----------------- @@ -39,14 +40,16 @@ import com.datatorrent.contrib.accumulo.AccumuloTuple; * * @since 1.0.4 */ -public class AccumuloApp implements StreamingApplication { +public class AccumuloApp implements StreamingApplication +{ @Override - public void populateDAG(DAG dag, Configuration conf) { + public void populateDAG(DAG dag, Configuration conf) + { AccumuloTestHelper.getConnector(); AccumuloTestHelper.clearTable(); dag.setAttribute(DAG.APPLICATION_NAME, "AccumuloOutputTest"); - AccumuloRowTupleGenerator rtg = dag.addOperator("tuplegenerator",AccumuloRowTupleGenerator.class); + AccumuloRowTupleGenerator rtg = dag.addOperator("tuplegenerator", AccumuloRowTupleGenerator.class); TestAccumuloOutputOperator taop = dag.addOperator("testaccumulooperator", TestAccumuloOutputOperator.class); dag.addStream("ss", rtg.outputPort, taop.input); com.datatorrent.api.Attribute.AttributeMap attributes = dag.getAttributes(); @@ -58,12 +61,14 @@ public class AccumuloApp implements StreamingApplication { } - public static class TestAccumuloOutputOperator extends AbstractAccumuloOutputOperator<AccumuloTuple> { + public static class TestAccumuloOutputOperator extends AbstractAccumuloOutputOperator<AccumuloTuple> + { @Override - public Mutation operationMutation(AccumuloTuple t) { + public Mutation operationMutation(AccumuloTuple t) + { Mutation mutation = new Mutation(t.getRow().getBytes()); - mutation.put(t.getColFamily().getBytes(),t.getColName().getBytes(),t.getColValue().getBytes()); + mutation.put(t.getColFamily().getBytes(), t.getColName().getBytes(), t.getColValue().getBytes()); return mutation; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java index 8f0a1fd..8b47a9b 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/accumulo/AccumuloAppTest.java @@ -22,9 +22,11 @@ import org.junit.Test; import com.datatorrent.api.LocalMode; -public class AccumuloAppTest { +public class AccumuloAppTest +{ @Test - public void testSomeMethod() throws Exception { + public void testSomeMethod() throws Exception + { LocalMode.runApp(new AccumuloApp(), 30000); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java index 8f2e19f..14fe441 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/aerospike/AerospikeBenchmarkAppTest.java @@ -21,12 +21,13 @@ package com.datatorrent.benchmark.aerospike; import org.junit.Test; import com.datatorrent.api.LocalMode; -import com.datatorrent.benchmark.aerospike.AerospikeOutputBenchmarkApplication; -public class AerospikeBenchmarkAppTest { +public class AerospikeBenchmarkAppTest +{ @Test - public void test() throws Exception { + public void test() throws Exception + { LocalMode.runApp(new AerospikeOutputBenchmarkApplication(), 10000); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java index c54cbdf..079d073 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/algo/UniqueValueCountBenchmarkTest.java @@ -18,10 +18,12 @@ */ package com.datatorrent.benchmark.algo; -import com.datatorrent.api.LocalMode; -import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + /** * Test the DAG declaration in local mode. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java index e85e38c..ec4f308 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/cassandra/CassandraApplicatonTest.java @@ -19,16 +19,18 @@ package com.datatorrent.benchmark.cassandra; import org.junit.Test; + import com.datatorrent.api.LocalMode; -import com.datatorrent.benchmark.cassandra.CassandraOutputBenchmarkApplication; /** * Test the DAG declaration in local mode. */ -public class CassandraApplicatonTest { +public class CassandraApplicatonTest +{ @Test - public void test() throws Exception { + public void test() throws Exception + { LocalMode.runApp(new CassandraOutputBenchmarkApplication(), 10000); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java index 1658ab1..32a4907 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseApplicationTest.java @@ -19,16 +19,19 @@ package com.datatorrent.benchmark.hbase; import org.junit.Test; + import com.datatorrent.api.LocalMode; /** * Test the DAG declaration in local mode. */ -public class HBaseApplicationTest { +public class HBaseApplicationTest +{ @Test - public void test() throws Exception { - LocalMode.runApp(new HBaseCsvMappingApplication(), 20000); + public void test() throws Exception + { + LocalMode.runApp(new HBaseCsvMappingApplication(), 20000); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java index 562559f..b61f1d3 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/hbase/HBaseCsvMappingApplication.java @@ -37,7 +37,7 @@ import com.datatorrent.contrib.hbase.HBaseRowStringGenerator; * * @since 1.0.4 */ -@ApplicationAnnotation(name="HBaseBenchmarkApp") +@ApplicationAnnotation(name = "HBaseBenchmarkApp") public class HBaseCsvMappingApplication implements StreamingApplication { private final Locality locality = null; @@ -47,15 +47,12 @@ public class HBaseCsvMappingApplication implements StreamingApplication { HBaseRowStringGenerator row = dag.addOperator("rand", new HBaseRowStringGenerator()); - HBaseCsvMappingPutOperator csvMappingPutOperator = dag.addOperator("HBaseoper", new HBaseCsvMappingPutOperator()); csvMappingPutOperator.getStore().setTableName("table1"); csvMappingPutOperator.getStore().setZookeeperQuorum("127.0.0.1"); csvMappingPutOperator.getStore().setZookeeperClientPort(2181); csvMappingPutOperator.setMappingString("colfam0.street,colfam0.city,colfam0.state,row"); - dag.addStream("hbasestream",row.outputPort, csvMappingPutOperator.input).setLocality(locality); + dag.addStream("hbasestream", row.outputPort, csvMappingPutOperator.input).setLocality(locality); } - - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java index fdab095..653c6f6 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveInsertBenchmarkTest.java @@ -18,18 +18,21 @@ */ package com.datatorrent.benchmark.hive; -import com.datatorrent.api.LocalMode; -import com.datatorrent.netlet.util.DTThrowable; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.InputStream; import java.sql.SQLException; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.netlet.util.DTThrowable; + public class HiveInsertBenchmarkTest { private static final Logger LOG = LoggerFactory.getLogger(HiveInsertBenchmarkTest.class); @@ -41,26 +44,30 @@ public class HiveInsertBenchmarkTest InputStream inputStream = null; try { inputStream = new FileInputStream("src/site/conf/dt-site-hive.xml"); - } - catch (FileNotFoundException ex) { - LOG.debug("Exception caught",ex); + } catch (FileNotFoundException ex) { + LOG.debug("Exception caught", ex); } conf.addResource(inputStream); - LOG.debug("conf properties are {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties")); - LOG.debug("conf dburl is {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl")); - LOG.debug("conf filepath is {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.filepath")); - LOG.debug("maximum length is {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.maxLength")); - LOG.debug("tablename is {}" , conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename")); - LOG.debug("permission is {}",conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.filePermission")); + LOG.debug("conf properties are {}", + conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties")); + LOG.debug("conf dburl is {}", + conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl")); + LOG.debug("conf filepath is {}", + conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.store.filepath")); + LOG.debug("maximum length is {}", + conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.maxLength")); + LOG.debug("tablename is {}", + conf.get("dt.application.HiveInsertBenchmarkingApp.operator.HiveOperator.tablename")); + LOG.debug("permission is {}", + conf.get("dt.application.HiveInsertBenchmarkingApp.operator.RollingFsWriter.filePermission")); HiveInsertBenchmarkingApp app = new HiveInsertBenchmarkingApp(); LocalMode lm = LocalMode.newInstance(); try { lm.prepareDAG(app, conf); LocalMode.Controller lc = lm.getController(); lc.run(120000); - } - catch (Exception ex) { + } catch (Exception ex) { DTThrowable.rethrow(ex); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java index 3acba3e..e0097c6 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/hive/HiveMapBenchmarkTest.java @@ -18,6 +18,8 @@ */ package com.datatorrent.benchmark.hive; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.InputStream; import java.sql.SQLException; @@ -28,16 +30,13 @@ import org.slf4j.LoggerFactory; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; - import com.datatorrent.api.LocalMode; - import com.datatorrent.netlet.util.DTThrowable; -import java.io.FileInputStream; -import java.io.FileNotFoundException; public class HiveMapBenchmarkTest { private static final Logger LOG = LoggerFactory.getLogger(HiveMapBenchmarkTest.class); + @Test public void testMethod() throws SQLException { @@ -45,18 +44,24 @@ public class HiveMapBenchmarkTest InputStream inputStream = null; try { inputStream = new FileInputStream("src/site/conf/dt-site-hive.xml"); - } - catch (FileNotFoundException ex) { - LOG.debug("Exception caught {}",ex); + } catch (FileNotFoundException ex) { + LOG.debug("Exception caught {}", ex); } conf.addResource(inputStream); - LOG.debug("conf properties are {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties")); - LOG.debug("conf dburl is {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl")); - LOG.debug("conf filepath is {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.filepath")); - LOG.debug("maximum length is {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.maxLength")); - LOG.debug("tablename is {}" , conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename")); - LOG.debug("permission is {}",conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.filePermission")); - LOG.debug("delimiter is {}",conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.delimiter")); + LOG.debug("conf properties are {}", + conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.connectionProperties")); + LOG.debug("conf dburl is {}", + conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.dbUrl")); + LOG.debug("conf filepath is {}", + conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.store.filepath")); + LOG.debug("maximum length is {}", + conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.maxLength")); + LOG.debug("tablename is {}", + conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.HiveOperator.tablename")); + LOG.debug("permission is {}", + conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.filePermission")); + LOG.debug("delimiter is {}", + conf.get("dt.application.HiveMapInsertBenchmarkingApp.operator.RollingFsMapWriter.delimiter")); HiveMapInsertBenchmarkingApp app = new HiveMapInsertBenchmarkingApp(); LocalMode lm = LocalMode.newInstance(); @@ -64,14 +69,11 @@ public class HiveMapBenchmarkTest lm.prepareDAG(app, conf); LocalMode.Controller lc = lm.getController(); lc.run(30000); - } - catch (Exception ex) { + } catch (Exception ex) { DTThrowable.rethrow(ex); } IOUtils.closeQuietly(inputStream); } - - } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java index e2d2f6a..6cb901a 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmarkTest.java @@ -43,8 +43,7 @@ public class KafkaInputBenchmarkTest lma.prepareDAG(new KafkaInputBenchmark(), conf); LocalMode.Controller lc = lma.getController(); lc.run(30000); - } - catch (Exception ex) { + } catch (Exception ex) { throw new RuntimeException(ex); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java index d85372f..4de7193 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/kafka/KafkaOutputBenchmarkTest.java @@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.LocalMode; - public class KafkaOutputBenchmarkTest { @Test @@ -44,8 +43,7 @@ public class KafkaOutputBenchmarkTest lma.prepareDAG(new KafkaInputBenchmark(), conf); LocalMode.Controller lc = lma.getController(); lc.run(30000); - } - catch (Exception ex) { + } catch (Exception ex) { throw new RuntimeException(ex); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java index 4c046fb..9201cd5 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmark.java @@ -18,13 +18,16 @@ */ package com.datatorrent.benchmark.memsql; -import com.datatorrent.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.contrib.memsql.MemsqlInputOperator; import com.datatorrent.lib.stream.DevNull; -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * BenchMark Results @@ -39,7 +42,7 @@ import org.slf4j.LoggerFactory; * * @since 1.0.5 */ -@ApplicationAnnotation(name="MemsqlInputBenchmark") +@ApplicationAnnotation(name = "MemsqlInputBenchmark") public class MemsqlInputBenchmark implements StreamingApplication { private static final Logger LOG = LoggerFactory.getLogger(MemsqlInputBenchmark.class); @@ -50,13 +53,13 @@ public class MemsqlInputBenchmark implements StreamingApplication public void populateDAG(DAG dag, Configuration conf) { MemsqlInputOperator memsqlInputOperator = dag.addOperator("memsqlInputOperator", - new MemsqlInputOperator()); + new MemsqlInputOperator()); DevNull<Object> devNull = dag.addOperator("devnull", - new DevNull<Object>()); + new DevNull<Object>()); dag.addStream("memsqlconnector", - memsqlInputOperator.outputPort, - devNull.data); + memsqlInputOperator.outputPort, + devNull.data); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java index 55fec7c..fa98a18 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlInputBenchmarkTest.java @@ -18,26 +18,33 @@ */ package com.datatorrent.benchmark.memsql; -import com.datatorrent.api.DAG; -import com.datatorrent.api.LocalMode; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.Operator.ProcessingMode; -import com.datatorrent.netlet.util.DTThrowable; -import com.datatorrent.contrib.memsql.*; -import static com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest.BATCH_SIZE; -import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.*; -import com.datatorrent.lib.helper.OperatorContextTestHelper; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.sql.SQLException; import java.util.Random; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.Operator.ProcessingMode; +import com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest; +import com.datatorrent.contrib.memsql.MemsqlPOJOOutputOperator; +import com.datatorrent.contrib.memsql.MemsqlStore; +import com.datatorrent.lib.helper.OperatorContextTestHelper; +import com.datatorrent.netlet.util.DTThrowable; + +import static com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest.BATCH_SIZE; +import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.APP_ID; +import static com.datatorrent.lib.db.jdbc.JdbcNonTransactionalOutputOperatorTest.OPERATOR_ID; + public class MemsqlInputBenchmarkTest { private static final Logger LOG = LoggerFactory.getLogger(MemsqlInputBenchmarkTest.class); @@ -52,28 +59,33 @@ public class MemsqlInputBenchmarkTest MemsqlStore memsqlStore = new MemsqlStore(); memsqlStore.setDatabaseUrl(conf.get("dt.rootDbUrl")); - memsqlStore.setConnectionProperties(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties")); + memsqlStore.setConnectionProperties( + conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties")); AbstractMemsqlOutputOperatorTest.memsqlInitializeDatabase(memsqlStore); MemsqlPOJOOutputOperator outputOperator = new MemsqlPOJOOutputOperator(); - outputOperator.getStore().setDatabaseUrl(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.dbUrl")); - outputOperator.getStore().setConnectionProperties(conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties")); + outputOperator.getStore().setDatabaseUrl( + conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.dbUrl")); + outputOperator.getStore().setConnectionProperties( + conf.get("dt.application.MemsqlInputBenchmark.operator.memsqlInputOperator.store.connectionProperties")); outputOperator.setBatchSize(BATCH_SIZE); Random random = new Random(); - com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); + com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = + new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap(); attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE); attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L); attributeMap.put(DAG.APPLICATION_ID, APP_ID); - OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); + OperatorContextTestHelper.TestIdOperatorContext context = + new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap); long seedSize = conf.getLong("dt.seedSize", SEED_SIZE); outputOperator.setup(context); outputOperator.beginWindow(0); - for(long valueCounter = 0; + for (long valueCounter = 0; valueCounter < seedSize; valueCounter++) { outputOperator.input.put(random.nextInt()); @@ -89,8 +101,7 @@ public class MemsqlInputBenchmarkTest lm.prepareDAG(app, conf); LocalMode.Controller lc = lm.getController(); lc.run(20000); - } - catch (Exception ex) { + } catch (Exception ex) { DTThrowable.rethrow(ex); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java index 8534e20..297bc6d 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmark.java @@ -18,15 +18,17 @@ */ package com.datatorrent.benchmark.memsql; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.contrib.memsql.MemsqlPOJOOutputOperator; import com.datatorrent.lib.testbench.RandomEventGenerator; -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * BenchMark Results @@ -41,10 +43,10 @@ import org.slf4j.LoggerFactory; * * @since 1.0.5 */ -@ApplicationAnnotation(name="MemsqlOutputBenchmark") +@ApplicationAnnotation(name = "MemsqlOutputBenchmark") public class MemsqlOutputBenchmark implements StreamingApplication { - private static transient final Logger LOG = LoggerFactory.getLogger(MemsqlOutputBenchmark.class); + private static final transient Logger LOG = LoggerFactory.getLogger(MemsqlOutputBenchmark.class); public static final int DEFAULT_BATCH_SIZE = 1000; public static final int MAX_WINDOW_COUNT = 10000; @@ -61,7 +63,7 @@ public class MemsqlOutputBenchmark implements StreamingApplication @Override public void emitTuples() { - if(done) { + if (done) { return; } @@ -73,8 +75,7 @@ public class MemsqlOutputBenchmark implements StreamingApplication { try { super.endWindow(); - } - catch(Exception e) { + } catch (Exception e) { done = true; } } @@ -83,20 +84,21 @@ public class MemsqlOutputBenchmark implements StreamingApplication @Override public void populateDAG(DAG dag, Configuration conf) { - CustomRandomEventGenerator randomEventGenerator = dag.addOperator("randomEventGenerator", new CustomRandomEventGenerator()); + CustomRandomEventGenerator randomEventGenerator = dag.addOperator( + "randomEventGenerator", new CustomRandomEventGenerator()); randomEventGenerator.setMaxCountOfWindows(MAX_WINDOW_COUNT); randomEventGenerator.setTuplesBlastIntervalMillis(TUPLE_BLAST_MILLIS); randomEventGenerator.setTuplesBlast(TUPLE_BLAST); LOG.debug("Before making output operator"); MemsqlPOJOOutputOperator memsqlOutputOperator = dag.addOperator("memsqlOutputOperator", - new MemsqlPOJOOutputOperator()); + new MemsqlPOJOOutputOperator()); LOG.debug("After making output operator"); memsqlOutputOperator.setBatchSize(DEFAULT_BATCH_SIZE); dag.addStream("memsqlConnector", - randomEventGenerator.integer_data, - memsqlOutputOperator.input); + randomEventGenerator.integer_data, + memsqlOutputOperator.input); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java index bab0c9e..bf82ab3 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/memsql/MemsqlOutputBenchmarkTest.java @@ -18,20 +18,23 @@ */ package com.datatorrent.benchmark.memsql; -import com.datatorrent.api.LocalMode; -import com.datatorrent.netlet.util.DTThrowable; -import com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest; -import com.datatorrent.contrib.memsql.MemsqlStore; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.InputStream; import java.sql.SQLException; -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest; +import com.datatorrent.contrib.memsql.MemsqlStore; +import com.datatorrent.netlet.util.DTThrowable; + public class MemsqlOutputBenchmarkTest { private static final Logger LOG = LoggerFactory.getLogger(MemsqlOutputBenchmarkTest.class); @@ -45,7 +48,8 @@ public class MemsqlOutputBenchmarkTest MemsqlStore memsqlStore = new MemsqlStore(); memsqlStore.setDatabaseUrl(conf.get("dt.rootDbUrl")); - memsqlStore.setConnectionProperties(conf.get("dt.application.MemsqlOutputBenchmark.operator.memsqlOutputOperator.store.connectionProperties")); + memsqlStore.setConnectionProperties( + conf.get("dt.application.MemsqlOutputBenchmark.operator.memsqlOutputOperator.store.connectionProperties")); AbstractMemsqlOutputOperatorTest.memsqlInitializeDatabase(memsqlStore); @@ -56,8 +60,7 @@ public class MemsqlOutputBenchmarkTest lm.prepareDAG(app, conf); LocalMode.Controller lc = lm.getController(); lc.run(20000); - } - catch (Exception ex) { + } catch (Exception ex) { DTThrowable.rethrow(ex); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java index 9f82e79..d270e7f 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/script/RubyOperatorBenchmarkAppTest.java @@ -18,14 +18,17 @@ */ package com.datatorrent.benchmark.script; -import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.LocalMode; + /** * Benchmark Test for Ruby Operator in local mode. */ -public class RubyOperatorBenchmarkAppTest { +public class RubyOperatorBenchmarkAppTest +{ @Test public void testApplication() throws Exception http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java index 7e64c5f..b87fec1 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java @@ -35,7 +35,6 @@ import org.apache.apex.malhar.lib.utils.serde.StringSerde; import com.datatorrent.lib.fileaccess.TFileImpl; - public class SpillableDSBenchmarkTest { private static final Logger logger = LoggerFactory.getLogger(SpillableDSBenchmarkTest.class); @@ -57,7 +56,6 @@ public class SpillableDSBenchmarkTest @Rule public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta(); - @Before public void setup() { @@ -116,7 +114,8 @@ public class SpillableDSBenchmarkTest long spentTime = System.currentTimeMillis() - startTime; if (spentTime > outputTimes * 5000) { ++outputTimes; - logger.info("Total Statistics: Spent {} mills for {} operation. average/second: {}", spentTime, i, i * 1000 / spentTime); + logger.info("Total Statistics: Spent {} mills for {} operation. average/second: {}", + spentTime, i, i * 1000 / spentTime); checkEnvironment(); } } @@ -126,7 +125,6 @@ public class SpillableDSBenchmarkTest loopCount / spentTime); } - public void putEntry(SpillableMapImpl<String, String> map) { map.put(keys[random.nextInt(keys.length)], values[random.nextInt(values.length)]); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java index 4f03a10..dc8f4b4 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java @@ -34,7 +34,6 @@ import com.datatorrent.benchmark.state.StoreOperator.ExecMode; /** * This is not a really unit test, but in fact a benchmark runner. * Provides this class to give developers the convenience to run in local IDE environment. - * */ public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp { @@ -91,8 +90,6 @@ public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp lc.shutdown(); } - - @Override public String getStoreBasePath(Configuration conf) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java index 14fd7e3..99d8a1f 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierAppTest.java @@ -18,16 +18,19 @@ */ package com.datatorrent.benchmark.testbench; -import com.datatorrent.api.LocalMode; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import org.apache.hadoop.conf.Configuration; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + /** * Benchmark Test for EventClassifier Operator in local mode. */ @@ -47,9 +50,8 @@ public class EventClassifierAppTest lm.prepareDAG(new EventClassifierApp(), conf); LocalMode.Controller lc = lm.getController(); lc.run(20000); - } - catch (Exception ex) { - logger.info(ex.getMessage()); + } catch (Exception ex) { + logger.info(ex.getMessage()); } is.close(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java index b793ecd..929d8bc 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventClassifierNumberToHashDoubleAppTest.java @@ -18,16 +18,19 @@ */ package com.datatorrent.benchmark.testbench; -import com.datatorrent.api.LocalMode; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import org.apache.hadoop.conf.Configuration; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + /** * Benchmark Test for EventClassifierNumberToHashDouble Operator in local mode. */ @@ -48,9 +51,8 @@ public class EventClassifierNumberToHashDoubleAppTest lm.prepareDAG(new EventClassifierNumberToHashDoubleApp(), conf); LocalMode.Controller lc = lm.getController(); lc.run(20000); - } - catch (Exception ex) { - logger.info(ex.getMessage()); + } catch (Exception ex) { + logger.info(ex.getMessage()); } is.close(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java index 4808ff2..5a427a5 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventGeneratorAppTest.java @@ -18,16 +18,19 @@ */ package com.datatorrent.benchmark.testbench; -import com.datatorrent.api.LocalMode; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import org.apache.hadoop.conf.Configuration; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + /** * Benchmark Test for EventGenerator Operator in local mode. */ @@ -50,9 +53,8 @@ public class EventGeneratorAppTest lm.prepareDAG(new EventGeneratorApp(), conf); LocalMode.Controller lc = lm.getController(); lc.run(20000); - } - catch (Exception ex) { - logger.info(ex.getMessage()); + } catch (Exception ex) { + logger.info(ex.getMessage()); } is.close(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java index 4384256..1a85a7b 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/EventIncrementerAppTest.java @@ -18,16 +18,19 @@ */ package com.datatorrent.benchmark.testbench; -import com.datatorrent.api.LocalMode; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import org.apache.hadoop.conf.Configuration; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + /** * Benchmark Test for EventIncrementerApp Operator in local mode. */ @@ -48,9 +51,8 @@ public class EventIncrementerAppTest lm.prepareDAG(new EventIncrementerApp(), conf); LocalMode.Controller lc = lm.getController(); lc.run(20000); - } - catch (Exception ex) { - logger.info(ex.getMessage()); + } catch (Exception ex) { + logger.info(ex.getMessage()); } is.close(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java index e1368ba..9419022 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilterClassifierAppTest.java @@ -19,6 +19,8 @@ package com.datatorrent.benchmark.testbench; import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; import java.io.InputStream; import org.junit.Test; @@ -28,8 +30,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.LocalMode; -import java.io.FileNotFoundException; -import java.io.IOException; + /** * Benchmark Test for FilterClassifierApp Operator in local mode. */ @@ -49,9 +50,8 @@ public class FilterClassifierAppTest lm.prepareDAG(new FilterClassifierApp(), conf); LocalMode.Controller lc = lm.getController(); lc.run(20000); - } - catch (Exception ex) { - logger.info(ex.getMessage()); + } catch (Exception ex) { + logger.info(ex.getMessage()); } is.close(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java index 95453f2..977d6b7 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/FilteredEventClassifierAppTest.java @@ -18,16 +18,19 @@ */ package com.datatorrent.benchmark.testbench; -import com.datatorrent.api.LocalMode; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import org.apache.hadoop.conf.Configuration; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + /** * Benchmark Test for FilteredEventClassifierApp Operator in local mode. */ @@ -47,9 +50,8 @@ public class FilteredEventClassifierAppTest lm.prepareDAG(new FilteredEventClassifierApp(), conf); LocalMode.Controller lc = lm.getController(); lc.run(20000); - } - catch (Exception ex) { - logger.info(ex.getMessage()); + } catch (Exception ex) { + logger.info(ex.getMessage()); } is.close(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java index cc180f0..92ca0fd 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/testbench/ThroughputCounterAppTest.java @@ -18,16 +18,19 @@ */ package com.datatorrent.benchmark.testbench; -import com.datatorrent.api.LocalMode; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import org.apache.hadoop.conf.Configuration; + import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + /** * Benchmark Test for ThroughputCounterApp Operator in local mode. */ @@ -47,9 +50,8 @@ public class ThroughputCounterAppTest lm.prepareDAG(new ThroughputCounterApp(), conf); LocalMode.Controller lc = lm.getController(); lc.run(20000); - } - catch (Exception ex) { - logger.info(ex.getMessage()); + } catch (Exception ex) { + logger.info(ex.getMessage()); } is.close(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5528a4c6/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java index 98ecf67..157accc 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java @@ -40,7 +40,6 @@ public class GenericSerdePerformanceTest private Random random = new Random(); private int serdeDataSize = 1000000; - @Test public void testCompareSerdeForString() { @@ -74,7 +73,6 @@ public class GenericSerdePerformanceTest buffer.release(); } - @Test public void testCompareSerdeForRealCase() { @@ -88,7 +86,6 @@ public class GenericSerdePerformanceTest long genericSerdeCost = System.currentTimeMillis() - beginTime; logger.info("Generic Serde cost for ImmutablePair: {}", genericSerdeCost); - beginTime = System.currentTimeMillis(); Kryo kryo = new Kryo(); for (int i = 0; i < serdeDataSize; ++i) { @@ -99,7 +96,6 @@ public class GenericSerdePerformanceTest long kryoSerdeCost = System.currentTimeMillis() - beginTime; logger.info("Kryo Serde cost for ImmutablePair without class info: {}", kryoSerdeCost); - beginTime = System.currentTimeMillis(); Kryo kryo1 = new Kryo(); for (int i = 0; i < serdeDataSize; ++i) { @@ -113,6 +109,7 @@ public class GenericSerdePerformanceTest protected ImmutablePair generatePair(long now) { - return new ImmutablePair(new Window.TimeWindow(now + random.nextInt(100), random.nextInt(100)), "" + random.nextInt(1000)); + return new ImmutablePair(new Window.TimeWindow(now + random.nextInt(100), + random.nextInt(100)), "" + random.nextInt(1000)); } }
