http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java index 32c7ccb..5df9b0d 100644 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java +++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java @@ -25,14 +25,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.Counters; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; @@ -47,129 +48,142 @@ import com.datatorrent.lib.util.KeyHashValPair; * @since 0.9.0 */ @SuppressWarnings({ "deprecation", "unused" }) -public class ReduceOperator<K1, V1, K2, V2> implements Operator { - private static final Logger logger = LoggerFactory.getLogger(ReduceOperator.class); - - private Class<? extends Reducer<K1, V1, K2, V2>> reduceClass; - private transient Reducer<K1, V1, K2, V2> reduceObj; - private transient Reporter reporter; - private OutputCollector<K2, V2> outputCollector; - private String configFile; - - public Class<? extends Reducer<K1, V1, K2, V2>> getReduceClass() { - return reduceClass; - } - - public void setReduceClass(Class<? extends Reducer<K1, V1, K2, V2>> reduceClass) { - this.reduceClass = reduceClass; - } - - public String getConfigFile() { - return configFile; - } - - public void setConfigFile(String configFile) { - this.configFile = configFile; - } - - private int numberOfMappersRunning = -1; - private int operatorId; - - public final transient DefaultInputPort<KeyHashValPair<Integer, Integer>> inputCount = new DefaultInputPort<KeyHashValPair<Integer, Integer>>() { - @Override - public void process(KeyHashValPair<Integer, Integer> tuple) { - logger.info("processing {}", tuple); - if (numberOfMappersRunning == -1) - numberOfMappersRunning = tuple.getValue(); - else - numberOfMappersRunning += tuple.getValue(); - - } - - }; - - public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>(); - private Map<K1, List<V1>> cacheObject; - public final transient DefaultInputPort<KeyHashValPair<K1, V1>> input = new DefaultInputPort<KeyHashValPair<K1, V1>>() { - - @Override - public void process(KeyHashValPair<K1, V1> tuple) { - // logger.info("processing tupple {}",tuple); - List<V1> list = cacheObject.get(tuple.getKey()); - if (list == null) { - list = new ArrayList<V1>(); - list.add(tuple.getValue()); - cacheObject.put(tuple.getKey(), list); - } else { - list.add(tuple.getValue()); - } - } - - }; - - @Override - public void setup(OperatorContext context) { - reporter = new ReporterImpl(ReporterType.Reducer, new Counters()); - if(context != null){ - operatorId = context.getId(); - } - cacheObject = new HashMap<K1, List<V1>>(); - outputCollector = new OutputCollectorImpl<K2, V2>(); - if (reduceClass != null) { - try { - reduceObj = reduceClass.newInstance(); - } catch (Exception e) { - logger.info("can't instantiate object {}", e.getMessage()); - throw new RuntimeException(e); - } - Configuration conf = new Configuration(); - InputStream stream = null; - if (configFile != null && configFile.length() > 0) { - logger.info("system /{}", configFile); - stream = ClassLoader.getSystemResourceAsStream("/" + configFile); - if (stream == null) { - logger.info("system {}", configFile); - stream = ClassLoader.getSystemResourceAsStream(configFile); - } - } - if (stream != null) { - logger.info("found our stream... so adding it"); - conf.addResource(stream); - } - reduceObj.configure(new JobConf(conf)); - } - - } - - @Override - public void teardown() { - - } - - @Override - public void beginWindow(long windowId) { - - } - - @Override - public void endWindow() { - if (numberOfMappersRunning == 0) { - for (Map.Entry<K1, List<V1>> e : cacheObject.entrySet()) { - try { - reduceObj.reduce(e.getKey(), e.getValue().iterator(), outputCollector, reporter); - } catch (IOException e1) { - logger.info(e1.getMessage()); - throw new RuntimeException(e1); - } - } - List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>) outputCollector).getList(); - for (KeyHashValPair<K2, V2> e : list) { - output.emit(e); - } - list.clear(); - cacheObject.clear(); - numberOfMappersRunning = -1; - } - } +public class ReduceOperator<K1, V1, K2, V2> implements Operator +{ + private static final Logger logger = LoggerFactory.getLogger(ReduceOperator.class); + + private Class<? extends Reducer<K1, V1, K2, V2>> reduceClass; + private transient Reducer<K1, V1, K2, V2> reduceObj; + private transient Reporter reporter; + private OutputCollector<K2, V2> outputCollector; + private String configFile; + + public Class<? extends Reducer<K1, V1, K2, V2>> getReduceClass() + { + return reduceClass; + } + + public void setReduceClass(Class<? extends Reducer<K1, V1, K2, V2>> reduceClass) + { + this.reduceClass = reduceClass; + } + + public String getConfigFile() + { + return configFile; + } + + public void setConfigFile(String configFile) + { + this.configFile = configFile; + } + + private int numberOfMappersRunning = -1; + private int operatorId; + + public final transient DefaultInputPort<KeyHashValPair<Integer, Integer>> inputCount = new DefaultInputPort<KeyHashValPair<Integer, Integer>>() + { + @Override + public void process(KeyHashValPair<Integer, Integer> tuple) + { + logger.info("processing {}", tuple); + if (numberOfMappersRunning == -1) { + numberOfMappersRunning = tuple.getValue(); + } else { + numberOfMappersRunning += tuple.getValue(); + } + + } + + }; + + public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>(); + private Map<K1, List<V1>> cacheObject; + public final transient DefaultInputPort<KeyHashValPair<K1, V1>> input = new DefaultInputPort<KeyHashValPair<K1, V1>>() + { + @Override + public void process(KeyHashValPair<K1, V1> tuple) + { + // logger.info("processing tupple {}",tuple); + List<V1> list = cacheObject.get(tuple.getKey()); + if (list == null) { + list = new ArrayList<V1>(); + list.add(tuple.getValue()); + cacheObject.put(tuple.getKey(), list); + } else { + list.add(tuple.getValue()); + } + } + + }; + + @Override + public void setup(OperatorContext context) + { + reporter = new ReporterImpl(ReporterType.Reducer, new Counters()); + if (context != null) { + operatorId = context.getId(); + } + cacheObject = new HashMap<K1, List<V1>>(); + outputCollector = new OutputCollectorImpl<K2, V2>(); + if (reduceClass != null) { + try { + reduceObj = reduceClass.newInstance(); + } catch (Exception e) { + logger.info("can't instantiate object {}", e.getMessage()); + throw new RuntimeException(e); + } + Configuration conf = new Configuration(); + InputStream stream = null; + if (configFile != null && configFile.length() > 0) { + logger.info("system /{}", configFile); + stream = ClassLoader.getSystemResourceAsStream("/" + configFile); + if (stream == null) { + logger.info("system {}", configFile); + stream = ClassLoader.getSystemResourceAsStream(configFile); + } + } + if (stream != null) { + logger.info("found our stream... so adding it"); + conf.addResource(stream); + } + reduceObj.configure(new JobConf(conf)); + } + + } + + @Override + public void teardown() + { + + } + + @Override + public void beginWindow(long windowId) + { + + } + + @Override + public void endWindow() + { + if (numberOfMappersRunning == 0) { + for (Map.Entry<K1, List<V1>> e : cacheObject.entrySet()) { + try { + reduceObj.reduce(e.getKey(), e.getValue().iterator(), outputCollector, reporter); + } catch (IOException e1) { + logger.info(e1.getMessage()); + throw new RuntimeException(e1); + } + } + List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList(); + for (KeyHashValPair<K2, V2> e : list) { + output.emit(e); + } + list.clear(); + cacheObject.clear(); + numberOfMappersRunning = -1; + } + } }
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java index 1eb3bdd..d2d38da 100644 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java +++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java @@ -18,8 +18,8 @@ */ package com.datatorrent.demos.mroperator; -import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.Reporter; @@ -28,81 +28,92 @@ import org.apache.hadoop.mapred.Reporter; * * @since 0.9.0 */ -public class ReporterImpl implements Reporter { - - private Counters counters; - InputSplit inputSplit; - - public enum ReporterType { - Mapper, Reducer - } - - private ReporterType typ; - - public ReporterImpl(final ReporterType kind, final Counters ctrs) { - this.typ = kind; - this.counters = ctrs; - } - - @Override - public InputSplit getInputSplit() { - if (typ == ReporterType.Reducer) { - throw new UnsupportedOperationException("Reducer cannot call getInputSplit()"); - } else { - return inputSplit; - } - } - - public void setInputSplit(InputSplit inputSplit) { - this.inputSplit = inputSplit; - } - - @Override - public void incrCounter(Enum<?> key, long amount) { - if (null != counters) { - counters.incrCounter(key, amount); - } - } - - @Override - public void incrCounter(String group, String counter, long amount) { - if (null != counters) { - counters.incrCounter(group, counter, amount); - } - } - - @Override - public void setStatus(String status) { - // do nothing. - } - - @Override - public void progress() { - // do nothing. - } - - @Override - public Counter getCounter(String group, String name) { - Counters.Counter counter = null; - if (counters != null) { - counter = counters.findCounter(group, name); - } - - return counter; - } - - @Override - public Counter getCounter(Enum<?> key) { - Counters.Counter counter = null; - if (counters != null) { - counter = counters.findCounter(key); - } - - return counter; - } - - public float getProgress() { - return 0; - } +public class ReporterImpl implements Reporter +{ + private Counters counters; + InputSplit inputSplit; + + public enum ReporterType + { + Mapper, Reducer + } + + private ReporterType typ; + + public ReporterImpl(final ReporterType kind, final Counters ctrs) + { + this.typ = kind; + this.counters = ctrs; + } + + @Override + public InputSplit getInputSplit() + { + if (typ == ReporterType.Reducer) { + throw new UnsupportedOperationException("Reducer cannot call getInputSplit()"); + } else { + return inputSplit; + } + } + + public void setInputSplit(InputSplit inputSplit) + { + this.inputSplit = inputSplit; + } + + @Override + public void incrCounter(Enum<?> key, long amount) + { + if (null != counters) { + counters.incrCounter(key, amount); + } + } + + @Override + public void incrCounter(String group, String counter, long amount) + { + if (null != counters) { + counters.incrCounter(group, counter, amount); + } + } + + @Override + public void setStatus(String status) + { + // do nothing. + } + + @Override + public void progress() + { + // do nothing. + } + + @Override + public Counter getCounter(String group, String name) + { + Counters.Counter counter = null; + if (counters != null) { + counter = counters.findCounter(group, name); + } + + return counter; + } + + @Override + public Counter getCounter(Enum<?> key) + { + Counters.Counter counter = null; + if (counters != null) { + counter = counters.findCounter(key); + } + + return counter; + } + + public float getProgress() + { + return 0; + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java index d5cbdb0..f78cf99 100644 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java +++ b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java @@ -19,13 +19,24 @@ package com.datatorrent.demos.mroperator; import java.io.IOException; -import java.util.*; +import java.util.Iterator; +import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.conf.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; -import org.apache.hadoop.util.*; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; /** * <p>WordCount class.</p> @@ -38,7 +49,7 @@ public class WordCount public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { - private final static IntWritable one = new IntWritable(1); + private static final IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java index e8c71c3..0f330e8 100644 --- a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java +++ b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java @@ -23,6 +23,15 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -35,13 +44,6 @@ import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.datatorrent.lib.testbench.CollectorTestSink; @@ -118,8 +120,7 @@ public class MapOperatorTest testDir = baseDir + "/" + methodName; try { FileUtils.forceMkdir(new File(testDir)); - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); } createFile(testDir + "/" + file1, "1\n2\n3\n1\n2\n3\n"); @@ -131,16 +132,13 @@ public class MapOperatorTest try { output = new BufferedWriter(new FileWriter(new File(fileName))); output.write(data); - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); - } - finally { + } finally { if (output != null) { try { output.close(); - } - catch (IOException ex) { + } catch (IOException ex) { LOG.error("not able to close the output stream: ", ex); } } @@ -152,8 +150,7 @@ public class MapOperatorTest { try { FileUtils.deleteDirectory(new File(baseDir)); - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java index 9ad5637..b85f8ad 100644 --- a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java +++ b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java @@ -18,55 +18,57 @@ */ package com.datatorrent.demos.mroperator; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; + import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.KeyHashValPair; -public class ReduceOperatorTest { - - private static Logger logger = LoggerFactory.getLogger(ReduceOperatorTest.class); - - /** - * Test node logic emits correct results - */ - @Test - public void testNodeProcessing() throws Exception { - testNodeProcessingSchema(new ReduceOperator<Text, IntWritable,Text, IntWritable>()); - } +public class ReduceOperatorTest +{ + private static Logger logger = LoggerFactory.getLogger(ReduceOperatorTest.class); - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void testNodeProcessingSchema(ReduceOperator<Text, IntWritable,Text, IntWritable> oper) { + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new ReduceOperator<Text, IntWritable,Text, IntWritable>()); + } - oper.setReduceClass(WordCount.Reduce.class); - oper.setConfigFile(null); - oper.setup(null); + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testNodeProcessingSchema(ReduceOperator<Text, IntWritable,Text, IntWritable> oper) + { + oper.setReduceClass(WordCount.Reduce.class); + oper.setConfigFile(null); + oper.setup(null); - CollectorTestSink sortSink = new CollectorTestSink(); + CollectorTestSink sortSink = new CollectorTestSink(); oper.output.setSink(sortSink); - oper.beginWindow(0); - oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, 1)); - oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1))); - oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1))); - oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1))); - oper.endWindow(); + oper.beginWindow(0); + oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, 1)); + oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1))); + oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1))); + oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1))); + oper.endWindow(); - oper.beginWindow(1); - oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1))); - oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1))); - oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1))); - oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, -1)); - oper.endWindow(); - Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size()); - for (Object o : sortSink.collectedTuples) { + oper.beginWindow(1); + oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1))); + oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1))); + oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1))); + oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, -1)); + oper.endWindow(); + Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size()); + for (Object o : sortSink.collectedTuples) { logger.debug(o.toString()); } logger.debug("Done testing round\n"); - } + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java index cb1521a..bd732c1 100644 --- a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java +++ b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java @@ -23,14 +23,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + import com.google.common.collect.Maps; import com.datatorrent.api.LocalMode; @@ -58,7 +59,7 @@ public class WordCountMRApplicationTest List<String> readLines = FileUtils.readLines(new File(testMeta.testDir + "/output.txt")); Map<String,Integer> readMap = Maps.newHashMap(); Iterator<String> itr = readLines.iterator(); - while(itr.hasNext()){ + while (itr.hasNext()) { String[] splits = itr.next().split("="); readMap.put(splits[0],Integer.valueOf(splits[1])); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java index 8f4dd92..55ffe92 100644 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java +++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java @@ -20,13 +20,12 @@ package com.datatorrent.demos.pi; import org.apache.hadoop.conf.Configuration; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.testbench.RandomEventGenerator; - import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.RandomEventGenerator; /** * Monte Carlo PI estimation demo : <br> @@ -75,7 +74,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; * * @since 0.3.2 */ -@ApplicationAnnotation(name="PiDemo") +@ApplicationAnnotation(name = "PiDemo") public class Application implements StreamingApplication { private final Locality locality = null; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java index 57c5249..328bb10 100644 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java +++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java @@ -84,7 +84,7 @@ import com.datatorrent.lib.testbench.RandomEventGenerator; * * @since 0.3.2 */ -@ApplicationAnnotation(name="PiDemoAppData") +@ApplicationAnnotation(name = "PiDemoAppData") public class ApplicationAppData implements StreamingApplication { public static final String SNAPSHOT_SCHEMA = "PiDemoDataSchema.json"; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java index 3ed376f..0796608 100644 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java +++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java @@ -18,16 +18,15 @@ */ package com.datatorrent.demos.pi; - import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.script.JavaScriptOperator; import com.datatorrent.lib.stream.RoundRobinHashMap; import com.datatorrent.lib.testbench.RandomEventGenerator; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; /** * Monte Carlo PI estimation demo : <br> @@ -78,7 +77,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; * * @since 0.3.2 */ -@ApplicationAnnotation(name="PiJavaScriptDemo") +@ApplicationAnnotation(name = "PiJavaScriptDemo") public class ApplicationWithScript implements StreamingApplication { @@ -92,13 +91,13 @@ public class ApplicationWithScript implements StreamingApplication rand.setMaxvalue(maxValue); RoundRobinHashMap<String,Object> rrhm = dag.addOperator("rrhm", new RoundRobinHashMap<String, Object>()); - rrhm.setKeys(new String[] { "x", "y" }); + rrhm.setKeys(new String[]{"x", "y"}); JavaScriptOperator calc = dag.addOperator("picalc", new JavaScriptOperator()); calc.setPassThru(false); calc.put("i",0); calc.put("count",0); - calc.addSetupScript("function pi() { if (x*x+y*y <= "+maxValue*maxValue+") { i++; } count++; return i / count * 4; }"); + calc.addSetupScript("function pi() { if (x*x+y*y <= " + maxValue * maxValue + ") { i++; } count++; return i / count * 4; }"); calc.setInvoke("pi"); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java index 9363b88..221ecc0 100644 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java +++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java @@ -40,7 +40,7 @@ import com.datatorrent.lib.testbench.RandomEventGenerator; * * @since 0.3.2 */ -@ApplicationAnnotation(name="PiLibraryDemo") +@ApplicationAnnotation(name = "PiLibraryDemo") public class Calculator implements StreamingApplication { @Override @@ -56,7 +56,7 @@ public class Calculator implements StreamingApplication AbstractAggregator<Integer> pairOperator = dag.addOperator("PairXY", new ArrayListAggregator<Integer>()); Sigma<Integer> sumOperator = dag.addOperator("SumXY", new Sigma<Integer>()); LogicalCompareToConstant<Integer> comparator = dag.addOperator("AnalyzeLocation", new LogicalCompareToConstant<Integer>()); - comparator.setConstant(30000 *30000); + comparator.setConstant(30000 * 30000); Counter inCircle = dag.addOperator("CountInCircle", Counter.class); Counter inSquare = dag.addOperator("CountInSquare", Counter.class); Division division = dag.addOperator("Ratio", Division.class); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java index ce5ef9d..c50e17e 100644 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java +++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java @@ -25,10 +25,10 @@ import java.util.Map; import javax.validation.constraints.NotNull; -import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.common.util.BaseOperator; /** * <p>An operator which converts a raw value to a named value singleton list.</p> @@ -47,9 +47,11 @@ public class NamedValueList<T> extends BaseOperator private List<Map<String, T>> valueList; private Map<String, T> valueMap; - public final transient DefaultInputPort<T> inPort = new DefaultInputPort<T>() { + public final transient DefaultInputPort<T> inPort = new DefaultInputPort<T>() + { @Override - public void process(T val) { + public void process(T val) + { valueMap.put(valueName, val); outPort.emit(valueList); } @@ -80,11 +82,13 @@ public class NamedValueList<T> extends BaseOperator { } - public String getValueName() { + public String getValueName() + { return valueName; } - public void setValueName(String name) { + public void setValueName(String name) + { valueName = name; } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java index 14edf19..8e61991 100644 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java +++ b/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java @@ -21,10 +21,10 @@ package com.datatorrent.demos.pi; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; /** * This operator implements Monte Carlo estimation of pi. For points randomly distributed points on @@ -46,8 +46,7 @@ public class PiCalculateOperator extends BaseOperator { if (x == -1) { x = tuple; - } - else { + } else { y = tuple; if (x * x + y * y <= base) { inArea++; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java b/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java index b61077a..d8881c2 100644 --- a/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java +++ b/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java @@ -18,13 +18,11 @@ */ package com.datatorrent.demos.pi; - -import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.LocalMode; - /** * */ @@ -33,12 +31,12 @@ public class ApplicationTest @Test public void testSomeMethod() throws Exception { - LocalMode lma = LocalMode.newInstance(); - Configuration conf =new Configuration(false); - conf.addResource("dt-site-pi.xml"); - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - lc.run(10000); - + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource("dt-site-pi.xml"); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java b/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java index cd52873..8e12fcc 100644 --- a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java +++ b/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java @@ -18,9 +18,8 @@ */ package com.datatorrent.demos.pi; - -import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.LocalMode; @@ -33,7 +32,7 @@ public class CalculatorTest public void testSomeMethod() throws Exception { LocalMode lma = LocalMode.newInstance(); - Configuration conf =new Configuration(false); + Configuration conf = new Configuration(false); conf.addResource("dt-site-pilibrary.xml"); lma.prepareDAG(new Calculator(), conf); LocalMode.Controller lc = lma.getController(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java ---------------------------------------------------------------------- diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java index 8558554..cf49848 100755 --- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java +++ b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/FaithfulRScript.java @@ -25,11 +25,10 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.contrib.r.RScript; - import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.contrib.r.RScript; /** * @since 2.1.0 @@ -52,7 +51,8 @@ public class FaithfulRScript extends RScript } @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<FaithfulKey> faithfulInput = new DefaultInputPort<FaithfulKey>() { + public final transient DefaultInputPort<FaithfulKey> faithfulInput = new DefaultInputPort<FaithfulKey>() + { @Override public void process(FaithfulKey tuple) { @@ -65,7 +65,8 @@ public class FaithfulRScript extends RScript }; @InputPortFieldAnnotation(optional = true) - public final transient DefaultInputPort<Integer> inputElapsedTime = new DefaultInputPort<Integer>() { + public final transient DefaultInputPort<Integer> inputElapsedTime = new DefaultInputPort<Integer>() + { @Override public void process(Integer eT) { @@ -82,9 +83,9 @@ public class FaithfulRScript extends RScript @Override public void endWindow() { - - if (readingsList.size() == 0) + if (readingsList.size() == 0) { return; + } LOG.info("Input data size: readingsList - " + readingsList.size()); double[] eruptionDuration = new double[readingsList.size()]; @@ -106,6 +107,5 @@ public class FaithfulRScript extends RScript super.process(map); readingsList.clear(); map.clear(); - - }; + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java ---------------------------------------------------------------------- diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java index 86abba7..c45cd50 100755 --- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java +++ b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/InputGenerator.java @@ -82,8 +82,9 @@ public class InputGenerator implements InputOperator { int id; do { - id = (int) Math.abs(Math.round(random.nextGaussian() * max)); - } while (id >= max); + id = (int)Math.abs(Math.round(random.nextGaussian() * max)); + } + while (id >= max); if (id < min) { id = min; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java ---------------------------------------------------------------------- diff --git a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java index 400e80c..0483767 100755 --- a/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java +++ b/demos/r/src/main/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplication.java @@ -23,11 +23,10 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; -import com.datatorrent.lib.io.ConsoleOutputOperator; - import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; /** * The application attempts to simulate 'Old Faithful Geyser" eruption. @@ -38,7 +37,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; * waiting times and eruption duration values. * For every application window, it generates only one 'elapsed time' input for which the * prediction would be made. - * Model in R is in file ruptionModel.R located at + * Model in R is in file ruptionModel.R located at * demos/r/src/main/resources/com/datatorrent/demos/oldfaithful/ directory * * @since 2.1.0 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java b/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java index dc6a8cb..0bb1901 100755 --- a/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java +++ b/demos/r/src/test/java/com/datatorrent/demos/r/oldfaithful/OldFaithfulApplicationTest.java @@ -29,7 +29,7 @@ import com.datatorrent.api.LocalMode; public class OldFaithfulApplicationTest { - + private static final Logger LOG = LoggerFactory.getLogger(OldFaithfulApplicationTest.class); @Test http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java index fd2a430..b9d32ab 100644 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java +++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/KinesisHashtagsApplication.java @@ -18,6 +18,9 @@ */ package com.datatorrent.demos.twitter; +import java.net.URI; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.Operator.InputPort; @@ -31,10 +34,7 @@ import com.datatorrent.contrib.twitter.TwitterSampleInput; import com.datatorrent.lib.algo.UniqueCounter; import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import java.net.URI; /** * Twitter Demo Application: <br> * This demo application samples random public status from twitter, send to Hashtag @@ -167,7 +167,7 @@ import java.net.URI; * * @since 2.0.0 */ -@ApplicationAnnotation(name="TwitterKinesisDemo") +@ApplicationAnnotation(name = "TwitterKinesisDemo") public class KinesisHashtagsApplication implements StreamingApplication { private final Locality locality = null; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java index 9bd81a4..8b9f447 100644 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java +++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/SlidingContainer.java @@ -32,7 +32,7 @@ public class SlidingContainer<T> implements Serializable T identifier; int totalCount; int position; - int windowedCount[]; + int[] windowedCount; @SuppressWarnings("unused") private SlidingContainer() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java index f61f5be..9edce64 100644 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java +++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpApplication.java @@ -24,8 +24,6 @@ import java.sql.SQLException; import javax.annotation.Nonnull; import org.apache.hadoop.conf.Configuration; -import twitter4j.Status; - import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; @@ -34,6 +32,8 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.contrib.twitter.TwitterSampleInput; import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator; +import twitter4j.Status; + /** * An application which connects to Twitter Sample Input and stores all the * tweets with their usernames in a mysql database. Please review the docs @@ -63,7 +63,7 @@ import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator; * * @since 0.9.4 */ -@ApplicationAnnotation(name="TwitterDumpDemo") +@ApplicationAnnotation(name = "TwitterDumpDemo") public class TwitterDumpApplication implements StreamingApplication { public static class Status2Database extends AbstractJdbcTransactionableOutputOperator<Status> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java index ecc412f..3adbbe0 100644 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java +++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterDumpHBaseApplication.java @@ -23,14 +23,14 @@ import java.nio.ByteBuffer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; -import com.datatorrent.contrib.hbase.AbstractHBasePutOutputOperator; -import com.datatorrent.contrib.twitter.TwitterSampleInput; - import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.hbase.AbstractHBasePutOutputOperator; +import com.datatorrent.contrib.twitter.TwitterSampleInput; + import twitter4j.Status; /** @@ -47,7 +47,7 @@ import twitter4j.Status; * * @since 1.0.2 */ -@ApplicationAnnotation(name="TwitterDumpHBaseDemo") +@ApplicationAnnotation(name = "TwitterDumpHBaseDemo") public class TwitterDumpHBaseApplication implements StreamingApplication { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java index 5ed6774..d22db40 100644 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java +++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusHashtagExtractor.java @@ -18,12 +18,12 @@ */ package com.datatorrent.demos.twitter; -import twitter4j.HashtagEntity; -import twitter4j.Status; - -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +import twitter4j.HashtagEntity; +import twitter4j.Status; /** * <p>TwitterStatusHashtagExtractor class.</p> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java index ed4e207..6dbc436 100644 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java +++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusURLExtractor.java @@ -18,12 +18,13 @@ */ package com.datatorrent.demos.twitter; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + import twitter4j.Status; import twitter4j.URLEntity; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java index 1818dca..e05a37a 100644 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java +++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterStatusWordExtractor.java @@ -18,14 +18,14 @@ */ package com.datatorrent.demos.twitter; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; - import java.util.Arrays; import java.util.HashSet; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + /** * <p>TwitterStatusWordExtractor class.</p> * @@ -41,7 +41,7 @@ public class TwitterStatusWordExtractor extends BaseOperator @Override public void process(String text) { - String strs[] = text.split(" "); + String[] strs = text.split(" "); if (strs != null) { for (String str : strs) { if (str != null && !filterList.contains(str) ) { @@ -56,7 +56,7 @@ public class TwitterStatusWordExtractor extends BaseOperator public void setup(OperatorContext context) { this.filterList = new HashSet<String>(Arrays.asList(new String[]{"", " ","I","you","the","a","to","as","he","him","his","her","she","me","can","for","of","and","or","but", - "this","that","!",",",".",":","#","/","@","be","in","out","was","were","is","am","are","so","no","...","my","de","RT","on","que","la","i","your","it","have","with","?","when", - "up","just","do","at","&","-","+","*","\\","y","n","like","se","en","te","el","I'm"})); + "this","that","!",",",".",":","#","/","@","be","in","out","was","were","is","am","are","so","no","...","my","de","RT","on","que","la","i","your","it","have","with","?","when", + "up","just","do","at","&","-","+","*","\\","y","n","like","se","en","te","el","I'm"})); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java index c8d3b00..731a38f 100644 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java +++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopCounterApplication.java @@ -19,10 +19,14 @@ package com.datatorrent.demos.twitter; import java.net.URI; +import java.util.List; +import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import com.google.common.collect.Maps; + import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; @@ -35,15 +39,10 @@ import com.datatorrent.contrib.twitter.TwitterSampleInput; import com.datatorrent.lib.algo.UniqueCounter; import com.datatorrent.lib.appdata.schemas.SchemaUtils; import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; +import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.io.PubSubWebSocketOutputOperator; -import com.google.common.collect.Maps; - -import java.util.List; -import java.util.Map; /** * Twitter Demo Application: <br> * This demo application samples random public status from twitter, send to url @@ -147,7 +146,7 @@ import java.util.Map; * * @since 0.3.2 */ -@ApplicationAnnotation(name=TwitterTopCounterApplication.APP_NAME) +@ApplicationAnnotation(name = TwitterTopCounterApplication.APP_NAME) public class TwitterTopCounterApplication implements StreamingApplication { public static final String SNAPSHOT_SCHEMA = "twitterURLDataSchema.json"; @@ -188,11 +187,7 @@ public class TwitterTopCounterApplication implements StreamingApplication consoleOutput(dag, "topURLs", topCounts.output, SNAPSHOT_SCHEMA, "url"); } - public static void consoleOutput(DAG dag, - String operatorName, - OutputPort<List<Map<String, Object>>> topCount, - String schemaFile, - String alias) + public static void consoleOutput(DAG dag, String operatorName, OutputPort<List<Map<String, Object>>> topCount, String schemaFile, String alias) { String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); if (!StringUtils.isEmpty(gatewayAddress)) { @@ -217,8 +212,7 @@ public class TwitterTopCounterApplication implements StreamingApplication dag.addStream("MapProvider", topCount, snapshotServer.input); dag.addStream("Result", snapshotServer.queryResult, queryResultPort); - } - else { + } else { ConsoleOutputOperator operator = dag.addOperator(operatorName, new ConsoleOutputOperator()); operator.setStringFormat(operatorName + ": %s"); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java index 8ed3678..3953ab7 100644 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java +++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTopWordsApplication.java @@ -18,6 +18,7 @@ */ package com.datatorrent.demos.twitter; +import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; @@ -25,9 +26,6 @@ import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.contrib.twitter.TwitterSampleInput; import com.datatorrent.lib.algo.UniqueCounter; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; - /** * This application is same as other twitter demo @@ -43,7 +41,7 @@ import org.apache.hadoop.conf.Configuration; * * @since 0.3.2 */ -@ApplicationAnnotation(name=TwitterTopWordsApplication.APP_NAME) +@ApplicationAnnotation(name = TwitterTopWordsApplication.APP_NAME) public class TwitterTopWordsApplication implements StreamingApplication { public static final String SNAPSHOT_SCHEMA = "twitterWordDataSchema.json"; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java index 5246060..3597a92 100644 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java +++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/TwitterTrendingHashtagsApplication.java @@ -18,16 +18,13 @@ */ package com.datatorrent.demos.twitter; +import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.contrib.twitter.TwitterSampleInput; import com.datatorrent.lib.algo.UniqueCounter; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; - - /** * Twitter Demo Application: <br> @@ -134,7 +131,7 @@ import org.apache.hadoop.conf.Configuration; * * @since 1.0.2 */ -@ApplicationAnnotation(name=TwitterTrendingHashtagsApplication.APP_NAME) +@ApplicationAnnotation(name = TwitterTrendingHashtagsApplication.APP_NAME) public class TwitterTrendingHashtagsApplication implements StreamingApplication { public static final String SNAPSHOT_SCHEMA = "twitterHashTagDataSchema.json"; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java index 7f6f399..43ed8f7 100644 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java +++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/URLSerDe.java @@ -18,11 +18,11 @@ */ package com.datatorrent.demos.twitter; -import com.datatorrent.api.StreamCodec; -import com.datatorrent.netlet.util.Slice; import java.nio.ByteBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.netlet.util.Slice; /** * <p>URLSerDe class.</p> @@ -42,11 +42,9 @@ public class URLSerDe implements StreamCodec<byte[]> { if (fragment == null || fragment.buffer == null) { return null; - } - else if (fragment.offset == 0 && fragment.length == fragment.buffer.length) { + } else if (fragment.offset == 0 && fragment.length == fragment.buffer.length) { return fragment.buffer; - } - else { + } else { byte[] buffer = new byte[fragment.buffer.length]; System.arraycopy(fragment.buffer, fragment.offset, buffer, 0, fragment.length); return buffer; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java index 449c903..20bb673 100644 --- a/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java +++ b/demos/twitter/src/main/java/com/datatorrent/demos/twitter/WindowedTopCounter.java @@ -18,23 +18,25 @@ */ package com.datatorrent.demos.twitter; -import java.util.*; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.PriorityQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.*; -import com.datatorrent.api.Context.OperatorContext; - -import com.datatorrent.common.util.BaseOperator; - import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + /** * * WindowedTopCounter is an operator which counts the most often occurring tuples in a sliding window of a specific size. @@ -114,8 +116,7 @@ public class WindowedTopCounter<T> extends BaseOperator if (holder.totalCount == 0) { iterator.remove(); - } - else { + } else { topCounter.add(holder); if (--i == 0) { break; @@ -138,8 +139,7 @@ public class WindowedTopCounter<T> extends BaseOperator topCounter.poll(); topCounter.add(holder); smallest = topCounter.peek().totalCount; - } - else if (holder.totalCount == 0) { + } else if (holder.totalCount == 0) { iterator.remove(); } } @@ -149,7 +149,7 @@ public class WindowedTopCounter<T> extends BaseOperator Iterator<SlidingContainer<T>> topIter = topCounter.iterator(); - while(topIter.hasNext()) { + while (topIter.hasNext()) { final SlidingContainer<T> wh = topIter.next(); Map<String, Object> tableRow = Maps.newHashMap(); @@ -254,8 +254,7 @@ public class WindowedTopCounter<T> extends BaseOperator { if (o1.totalCount > o2.totalCount) { return 1; - } - else if (o1.totalCount < o2.totalCount) { + } else if (o1.totalCount < o2.totalCount) { return -1; } @@ -274,8 +273,8 @@ public class WindowedTopCounter<T> extends BaseOperator @Override public int compare(Map<String, Object> o1, Map<String, Object> o2) { - Integer count1 = (Integer) o1.get(FIELD_COUNT); - Integer count2 = (Integer) o2.get(FIELD_COUNT); + Integer count1 = (Integer)o1.get(FIELD_COUNT); + Integer count2 = (Integer)o2.get(FIELD_COUNT); return count1.compareTo(count2); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java index a4daf09..cd211ff 100644 --- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java +++ b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterDumpApplicationTest.java @@ -19,10 +19,11 @@ package com.datatorrent.demos.twitter; import org.junit.Test; -import static org.junit.Assert.*; import org.apache.hadoop.conf.Configuration; +import static org.junit.Assert.assertEquals; + import com.datatorrent.api.DAG; import com.datatorrent.api.LocalMode; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java index 0ad4d18..91a4e20 100644 --- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java +++ b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopCounterTest.java @@ -18,11 +18,10 @@ */ package com.datatorrent.demos.twitter; +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.LocalMode; import com.datatorrent.contrib.twitter.TwitterSampleInput; -import com.datatorrent.demos.twitter.TwitterTopCounterApplication; -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; /** * Test the DAG declaration in local mode. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java ---------------------------------------------------------------------- diff --git a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java index a27c60f..4ac2e8d 100644 --- a/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java +++ b/demos/twitter/src/test/java/com/datatorrent/demos/twitter/TwitterTopWordsTest.java @@ -18,12 +18,11 @@ */ package com.datatorrent.demos.twitter; +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.LocalMode; import com.datatorrent.contrib.twitter.TwitterSampleInput; -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - /** * Test the DAG declaration in local mode. */ @@ -38,9 +37,9 @@ public class TwitterTopWordsTest @Test public void testApplication() throws Exception { - TwitterTopWordsApplication app = new TwitterTopWordsApplication(); - Configuration conf =new Configuration(false); - conf.addResource("dt-site-rollingtopwords.xml"); + TwitterTopWordsApplication app = new TwitterTopWordsApplication(); + Configuration conf = new Configuration(false); + conf.addResource("dt-site-rollingtopwords.xml"); LocalMode lma = LocalMode.newInstance(); lma.prepareDAG(app, conf); LocalMode.Controller lc = lma.getController(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java index 1b27cea..57ef1a1 100644 --- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java +++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/Application.java @@ -18,7 +18,6 @@ */ package com.datatorrent.demos.uniquecount; - import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Context; @@ -27,6 +26,7 @@ import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.partitioner.StatelessPartitioner; import com.datatorrent.lib.algo.UniqueCounter; import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter; import com.datatorrent.lib.io.ConsoleOutputOperator; @@ -34,8 +34,6 @@ import com.datatorrent.lib.stream.Counter; import com.datatorrent.lib.stream.StreamDuplicater; import com.datatorrent.lib.util.KeyHashValPair; -import com.datatorrent.common.partitioner.StatelessPartitioner; - /** * Application to demonstrate PartitionableUniqueCount operator. <br> * The input operator generate random keys, which is sent to @@ -45,7 +43,7 @@ import com.datatorrent.common.partitioner.StatelessPartitioner; * * @since 1.0.2 */ -@ApplicationAnnotation(name="UniqueValueCountDemo") +@ApplicationAnnotation(name = "UniqueValueCountDemo") public class Application implements StreamingApplication { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java index 3a5140d..d201037 100644 --- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java +++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/CountVerifier.java @@ -18,6 +18,9 @@ */ package com.datatorrent.demos.uniquecount; +import java.util.HashMap; +import java.util.Map; + import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; @@ -25,9 +28,6 @@ import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.lib.util.KeyHashValPair; -import java.util.HashMap; -import java.util.Map; - /* Compare results and print non-matching values to console. */ @@ -41,35 +41,33 @@ public class CountVerifier<K> implements Operator HashMap<K, Integer> map1 = new HashMap<K, Integer>(); HashMap<K, Integer> map2 = new HashMap<K, Integer>(); - public transient final DefaultInputPort<KeyHashValPair<K, Integer>> in1 = - new DefaultInputPort<KeyHashValPair<K, Integer>>() - { - @Override - public void process(KeyHashValPair<K, Integer> tuple) - { - processTuple(tuple, map1); - } - }; + public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in1 = new DefaultInputPort<KeyHashValPair<K, Integer>>() + { + @Override + public void process(KeyHashValPair<K, Integer> tuple) + { + processTuple(tuple, map1); + } + }; - public transient final DefaultInputPort<KeyHashValPair<K, Integer>> in2 = - new DefaultInputPort<KeyHashValPair<K, Integer>>() - { - @Override - public void process(KeyHashValPair<K, Integer> tuple) - { - processTuple(tuple, map2); - } - }; + public final transient DefaultInputPort<KeyHashValPair<K, Integer>> in2 = new DefaultInputPort<KeyHashValPair<K, Integer>>() + { + @Override + public void process(KeyHashValPair<K, Integer> tuple) + { + processTuple(tuple, map2); + } + }; void processTuple(KeyHashValPair<K, Integer> tuple, HashMap<K, Integer> map) { map.put(tuple.getKey(), tuple.getValue()); } - @OutputPortFieldAnnotation(optional=true) - public transient final DefaultOutputPort<Integer> successPort = new DefaultOutputPort<Integer>(); - @OutputPortFieldAnnotation(optional=true) - public transient final DefaultOutputPort<Integer> failurePort = new DefaultOutputPort<Integer>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Integer> successPort = new DefaultOutputPort<Integer>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Integer> failurePort = new DefaultOutputPort<Integer>(); @Override public void beginWindow(long l) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java index 2742961..e806759 100644 --- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java +++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomDataGenerator.java @@ -18,14 +18,17 @@ */ package com.datatorrent.demos.uniquecount; +import java.util.HashMap; +import java.util.Random; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.datatorrent.api.Context; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; import com.datatorrent.lib.util.KeyValPair; -import java.util.HashMap; -import java.util.Random; - /** * Generate random Key value pairs. * key is string and value is int, it emits the pair as KeyValPair on outPort, @@ -36,6 +39,7 @@ public class RandomDataGenerator implements InputOperator { public final transient DefaultOutputPort<KeyValPair<String, Object>> outPort = new DefaultOutputPort<KeyValPair<String, Object>>(); private HashMap<String, Integer> dataInfo; + private final transient Logger LOG = LoggerFactory.getLogger(RandomDataGenerator.class); private int count; private int sleepMs = 10; private int keyRange = 100; @@ -51,15 +55,15 @@ public class RandomDataGenerator implements InputOperator @Override public void emitTuples() { - for(int i = 0 ; i < tupleBlast; i++) { + for (int i = 0; i < tupleBlast; i++) { String key = String.valueOf(random.nextInt(keyRange)); int val = random.nextInt(valueRange); outPort.emit(new KeyValPair<String, Object>(key, val)); } try { Thread.sleep(sleepMs); - } catch(Exception ex) { - System.out.println(ex.getMessage()); + } catch (Exception ex) { + LOG.error(ex.getMessage()); } count++; } @@ -93,7 +97,7 @@ public class RandomDataGenerator implements InputOperator @Override public void endWindow() { - System.out.println("emitTuples called " + count + " times in this window"); + LOG.debug("emitTuples called " + count + " times in this window"); count = 0; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java index feeb282..28f3bc0 100644 --- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java +++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeyValues.java @@ -18,16 +18,16 @@ */ package com.datatorrent.demos.uniquecount; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.lib.util.KeyValPair; - import java.util.BitSet; import java.util.HashMap; import java.util.Map; import java.util.Random; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.lib.util.KeyValPair; + /** * Input port operator for generating random values on keys. <br> * Key(s) : key + integer in range between 0 and numKeys <br> @@ -37,107 +37,117 @@ import java.util.Random; */ public class RandomKeyValues implements InputOperator { - public final transient DefaultOutputPort<KeyValPair<String, Object>> outport = new DefaultOutputPort<KeyValPair<String, Object>>(); - private Random random = new Random(11111); - private int numKeys; - private int numValuesPerKeys; - private int tuppleBlast = 1000; - private int emitDelay = 20; /* 20 ms */ - - /* For verification */ - private Map<Integer, BitSet> history = new HashMap<Integer, BitSet>(); - - public RandomKeyValues() { - this.numKeys = 100; - this.numValuesPerKeys = 100; - } - - public RandomKeyValues(int keys, int values) { - this.numKeys = keys; - this.numValuesPerKeys = values; - } - - @Override - public void beginWindow(long windowId) - { - } - - @Override - public void endWindow() - { - } - - @Override - public void setup(OperatorContext context) - { - } - - @Override - public void teardown() - { - } - - @Override - public void emitTuples() - { - /* generate tuples randomly, */ - for(int i = 0; i < tuppleBlast; i++) { - int intKey = random.nextInt(numKeys); - String key = "key" + String.valueOf(intKey); - int value = random.nextInt(numValuesPerKeys); - - // update history for verifying later. - BitSet bmap = history.get(intKey); - if (bmap == null) { - bmap = new BitSet(); - history.put(intKey, bmap); - } - bmap.set(value); - - // emit the key with value. - outport.emit(new KeyValPair<String, Object>(key, value)); - } - try - { - Thread.sleep(emitDelay); - } catch (Exception e) - { - } - } - - public int getNumKeys() { - return numKeys; - } - - public void setNumKeys(int numKeys) { - this.numKeys = numKeys; - } - - public int getNumValuesPerKeys() { - return numValuesPerKeys; - } - - public void setNumValuesPerKeys(int numValuesPerKeys) { - this.numValuesPerKeys = numValuesPerKeys; - } - - public int getTuppleBlast() { - return tuppleBlast; + public final transient DefaultOutputPort<KeyValPair<String, Object>> outport = new DefaultOutputPort<KeyValPair<String, Object>>(); + private Random random = new Random(11111); + private int numKeys; + private int numValuesPerKeys; + private int tuppleBlast = 1000; + private int emitDelay = 20; /* 20 ms */ + + /* For verification */ + private Map<Integer, BitSet> history = new HashMap<Integer, BitSet>(); + + public RandomKeyValues() + { + this.numKeys = 100; + this.numValuesPerKeys = 100; + } + + public RandomKeyValues(int keys, int values) + { + this.numKeys = keys; + this.numValuesPerKeys = values; + } + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + } + + @Override + public void setup(OperatorContext context) + { + } + + @Override + public void teardown() + { + } + + @Override + public void emitTuples() + { + /* generate tuples randomly, */ + for (int i = 0; i < tuppleBlast; i++) { + int intKey = random.nextInt(numKeys); + String key = "key" + String.valueOf(intKey); + int value = random.nextInt(numValuesPerKeys); + + // update history for verifying later. + BitSet bmap = history.get(intKey); + if (bmap == null) { + bmap = new BitSet(); + history.put(intKey, bmap); + } + bmap.set(value); + + // emit the key with value. + outport.emit(new KeyValPair<String, Object>(key, value)); } - - public void setTuppleBlast(int tuppleBlast) { - this.tuppleBlast = tuppleBlast; - } - - public int getEmitDelay() { - return emitDelay; - } - - public void setEmitDelay(int emitDelay) { - this.emitDelay = emitDelay; - } - - public void debug() { - + try { + Thread.sleep(emitDelay); + } catch (Exception e) { + // Ignore. } + } + + public int getNumKeys() + { + return numKeys; + } + + public void setNumKeys(int numKeys) + { + this.numKeys = numKeys; + } + + public int getNumValuesPerKeys() + { + return numValuesPerKeys; + } + + public void setNumValuesPerKeys(int numValuesPerKeys) + { + this.numValuesPerKeys = numValuesPerKeys; + } + + public int getTuppleBlast() + { + return tuppleBlast; + } + + public void setTuppleBlast(int tuppleBlast) + { + this.tuppleBlast = tuppleBlast; + } + + public int getEmitDelay() + { + return emitDelay; + } + + public void setEmitDelay(int emitDelay) + { + this.emitDelay = emitDelay; + } + + public void debug() + { + + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java index 65b5c95..eb9d22c 100644 --- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java +++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/RandomKeysGenerator.java @@ -18,14 +18,18 @@ */ package com.datatorrent.demos.uniquecount; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.lang3.mutable.MutableInt; + import com.datatorrent.api.Context; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.lib.util.KeyHashValPair; -import org.apache.commons.lang3.mutable.MutableInt; - -import java.util.*; /* Generate random keys. @@ -61,8 +65,7 @@ public class RandomKeysGenerator implements InputOperator outPort.emit(key); - if (verificationPort.isConnected()) - { + if (verificationPort.isConnected()) { // maintain history for later verification. MutableInt count = history.get(key); if (count == null) { @@ -74,10 +77,11 @@ public class RandomKeysGenerator implements InputOperator } try { - if (sleepTime != 0) + if (sleepTime != 0) { Thread.sleep(sleepTime); + } } catch (Exception ex) { - + // Ignore. } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java index 95323d5..eb9e392 100644 --- a/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java +++ b/demos/uniquecount/src/main/java/com/datatorrent/demos/uniquecount/UniqueKeyValCountDemo.java @@ -18,7 +18,6 @@ */ package com.datatorrent.demos.uniquecount; - import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Context; @@ -27,19 +26,19 @@ import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.partitioner.StatelessPartitioner; + import com.datatorrent.lib.algo.UniqueCounter; import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter; import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.util.KeyValPair; -import com.datatorrent.common.partitioner.StatelessPartitioner; - /** * <p>UniqueKeyValCountDemo class.</p> * * @since 1.0.2 */ -@ApplicationAnnotation(name="UniqueKeyValueCountDemo") +@ApplicationAnnotation(name = "UniqueKeyValueCountDemo") public class UniqueKeyValCountDemo implements StreamingApplication { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java index 991a94d..66a0af1 100644 --- a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java +++ b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/ApplicationTest.java @@ -18,9 +18,9 @@ */ package com.datatorrent.demos.uniquecount; -import com.datatorrent.api.LocalMode; -import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; /** * Test the DAG declaration in local mode. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java ---------------------------------------------------------------------- diff --git a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java index 01e790a..a198247 100644 --- a/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java +++ b/demos/uniquecount/src/test/java/com/datatorrent/demos/uniquecount/UniqueKeyValDemoTest.java @@ -18,9 +18,9 @@ */ package com.datatorrent.demos.uniquecount; -import com.datatorrent.api.LocalMode; -import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; /** * Test the DAG declaration in local mode. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7d9386d2/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java ---------------------------------------------------------------------- diff --git a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java index 1028080..d0512cf 100644 --- a/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java +++ b/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/Application.java @@ -18,14 +18,14 @@ */ package com.datatorrent.demos.wordcount; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.api.StreamingApplication; +import org.apache.hadoop.conf.Configuration; + import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.lib.algo.UniqueCounter; import com.datatorrent.lib.io.ConsoleOutputOperator; -import org.apache.hadoop.conf.Configuration; - /** * Simple Word Count Demo : <br> * This is application to count total occurrence of each word from file or any @@ -72,8 +72,8 @@ import org.apache.hadoop.conf.Configuration; * Streaming Window Size : 500ms * Operator Details : <br> * <ul> - * <li> - * <p><b> The operator wordinput : </b> This operator opens local file, reads each line and sends each word to application. + * <li> + * <p><b> The operator wordinput : </b> This operator opens local file, reads each line and sends each word to application. * This can replaced by any input stream by user. <br> * Class : {@link com.datatorrent.demos.wordcount.WordCountInputOperator} <br> * Operator Application Window Count : 1 <br> @@ -93,10 +93,10 @@ import org.apache.hadoop.conf.Configuration; * * @since 0.3.2 */ -@ApplicationAnnotation(name="WordCountDemo") +@ApplicationAnnotation(name = "WordCountDemo") public class Application implements StreamingApplication { - @Override + @Override public void populateDAG(DAG dag, Configuration conf) { WordCountInputOperator input = dag.addOperator("wordinput", new WordCountInputOperator()); @@ -104,8 +104,5 @@ public class Application implements StreamingApplication dag.addStream("wordinput-count", input.outputPort, wordCount.data); ConsoleOutputOperator consoleOperator = dag.addOperator("console", new ConsoleOutputOperator()); dag.addStream("count-console",wordCount.count, consoleOperator.input); - } - - }
