http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java index e917844..1db95d0 100644 --- a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java +++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java @@ -25,118 +25,118 @@ package com.yahoo.labs.samoa.instances; */ /** - * + * * @author abifet */ -public class SparseInstanceData implements InstanceData{ - - public SparseInstanceData(double[] attributeValues, int[] indexValues, int numberAttributes) { - this.attributeValues = attributeValues; - this.indexValues = indexValues; - this.numberAttributes = numberAttributes; - } - - public SparseInstanceData(int length) { - this.attributeValues = new double[length]; - this.indexValues = new int[length]; - } - - - protected double[] attributeValues; +public class SparseInstanceData implements InstanceData { - public double[] getAttributeValues() { - return attributeValues; - } + public SparseInstanceData(double[] attributeValues, int[] indexValues, int numberAttributes) { + this.attributeValues = attributeValues; + this.indexValues = indexValues; + this.numberAttributes = numberAttributes; + } - public void setAttributeValues(double[] attributeValues) { - this.attributeValues = attributeValues; - } + public SparseInstanceData(int length) { + this.attributeValues = new double[length]; + this.indexValues = new int[length]; + } - public int[] getIndexValues() { - return indexValues; - } + protected double[] attributeValues; - public void setIndexValues(int[] indexValues) { - this.indexValues = indexValues; - } + public double[] getAttributeValues() { + return attributeValues; + } - public int getNumberAttributes() { - return numberAttributes; - } + public void setAttributeValues(double[] attributeValues) { + this.attributeValues = attributeValues; + } - public void setNumberAttributes(int numberAttributes) { - this.numberAttributes = numberAttributes; - } - protected int[] indexValues; - protected int numberAttributes; + public int[] getIndexValues() { + return indexValues; + } - @Override - public int numAttributes() { - return this.numberAttributes; - } + public void setIndexValues(int[] indexValues) { + this.indexValues = indexValues; + } + + public int getNumberAttributes() { + return numberAttributes; + } + + public void setNumberAttributes(int numberAttributes) { + this.numberAttributes = numberAttributes; + } + + protected int[] indexValues; + protected int numberAttributes; - @Override - public double value(int indexAttribute) { - int location = locateIndex(indexAttribute); - //return location == -1 ? 0 : this.attributeValues[location]; - // int index = locateIndex(attIndex); + @Override + public int numAttributes() { + return this.numberAttributes; + } + + @Override + public double value(int indexAttribute) { + int location = locateIndex(indexAttribute); + // return location == -1 ? 0 : this.attributeValues[location]; + // int index = locateIndex(attIndex); if ((location >= 0) && (indexValues[location] == indexAttribute)) { return attributeValues[location]; } else { return 0.0; } - } + } - @Override - public boolean isMissing(int indexAttribute) { - return Double.isNaN(this.value(indexAttribute)); - } + @Override + public boolean isMissing(int indexAttribute) { + return Double.isNaN(this.value(indexAttribute)); + } - @Override - public int numValues() { - return this.attributeValues.length; - } + @Override + public int numValues() { + return this.attributeValues.length; + } - @Override - public int index(int indexAttribute) { - return this.indexValues[indexAttribute]; - } + @Override + public int index(int indexAttribute) { + return this.indexValues[indexAttribute]; + } - @Override - public double valueSparse(int indexAttribute) { - return this.attributeValues[indexAttribute]; - } + @Override + public double valueSparse(int indexAttribute) { + return this.attributeValues[indexAttribute]; + } - @Override - public boolean isMissingSparse(int indexAttribute) { - return Double.isNaN(this.valueSparse(indexAttribute)); - } + @Override + public boolean isMissingSparse(int indexAttribute) { + return Double.isNaN(this.valueSparse(indexAttribute)); + } + + /* + * @Override public double value(Attribute attribute) { return + * value(attribute.index()); } + */ - /*@Override - public double value(Attribute attribute) { - return value(attribute.index()); - }*/ - - @Override - public double[] toDoubleArray() { - double[] array = new double[numAttributes()]; - for (int i=0; i<numValues() ; i++) { - array[index(i)] = valueSparse(i); - } - return array; + @Override + public double[] toDoubleArray() { + double[] array = new double[numAttributes()]; + for (int i = 0; i < numValues(); i++) { + array[index(i)] = valueSparse(i); } + return array; + } - @Override - public void setValue(int attributeIndex, double d) { - int index = locateIndex(attributeIndex); - if (index(index) == attributeIndex) { - this.attributeValues[index] = d; - } else { - // We need to add the value - } + @Override + public void setValue(int attributeIndex, double d) { + int index = locateIndex(attributeIndex); + if (index(index) == attributeIndex) { + this.attributeValues[index] = d; + } else { + // We need to add the value } - - /** + } + + /** * Locates the greatest index that is not greater than the given index. * * @return the internal index of the attribute index. Returns -1 if no index @@ -168,5 +168,5 @@ public class SparseInstanceData implements InstanceData{ return min - 1; } } - + }
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java index f3dc1b9..dd9df6d 100644 --- a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java +++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java @@ -21,68 +21,71 @@ package com.yahoo.labs.samoa.instances; */ public class Utils { - public static int maxIndex(double[] doubles) { + public static int maxIndex(double[] doubles) { - double maximum = 0; - int maxIndex = 0; + double maximum = 0; + int maxIndex = 0; - for (int i = 0; i < doubles.length; i++) { - if ((i == 0) || (doubles[i] > maximum)) { - maxIndex = i; - maximum = doubles[i]; - } - } - - return maxIndex; + for (int i = 0; i < doubles.length; i++) { + if ((i == 0) || (doubles[i] > maximum)) { + maxIndex = i; + maximum = doubles[i]; + } } - public static String quote(String string) { - boolean quote = false; + return maxIndex; + } - // backquote the following characters - if ((string.indexOf('\n') != -1) || (string.indexOf('\r') != -1) || (string.indexOf('\'') != -1) || (string.indexOf('"') != -1) - || (string.indexOf('\\') != -1) || (string.indexOf('\t') != -1) || (string.indexOf('%') != -1) || (string.indexOf('\u001E') != -1)) { - string = backQuoteChars(string); - quote = true; - } + public static String quote(String string) { + boolean quote = false; - // Enclose the string in 's if the string contains a recently added - // backquote or contains one of the following characters. - if ((quote == true) || (string.indexOf('{') != -1) || (string.indexOf('}') != -1) || (string.indexOf(',') != -1) || (string.equals("?")) - || (string.indexOf(' ') != -1) || (string.equals(""))) { - string = ("'".concat(string)).concat("'"); - } + // backquote the following characters + if ((string.indexOf('\n') != -1) || (string.indexOf('\r') != -1) || (string.indexOf('\'') != -1) + || (string.indexOf('"') != -1) + || (string.indexOf('\\') != -1) || (string.indexOf('\t') != -1) || (string.indexOf('%') != -1) + || (string.indexOf('\u001E') != -1)) { + string = backQuoteChars(string); + quote = true; + } - return string; + // Enclose the string in 's if the string contains a recently added + // backquote or contains one of the following characters. + if ((quote == true) || (string.indexOf('{') != -1) || (string.indexOf('}') != -1) || (string.indexOf(',') != -1) + || (string.equals("?")) + || (string.indexOf(' ') != -1) || (string.equals(""))) { + string = ("'".concat(string)).concat("'"); } - public static String backQuoteChars(String string) { + return string; + } - int index; - StringBuffer newStringBuffer; + public static String backQuoteChars(String string) { - // replace each of the following characters with the backquoted version - char charsFind[] = { '\\', '\'', '\t', '\n', '\r', '"', '%', '\u001E' }; - String charsReplace[] = { "\\\\", "\\'", "\\t", "\\n", "\\r", "\\\"", "\\%", "\\u001E" }; - for (int i = 0; i < charsFind.length; i++) { - if (string.indexOf(charsFind[i]) != -1) { - newStringBuffer = new StringBuffer(); - while ((index = string.indexOf(charsFind[i])) != -1) { - if (index > 0) { - newStringBuffer.append(string.substring(0, index)); - } - newStringBuffer.append(charsReplace[i]); - if ((index + 1) < string.length()) { - string = string.substring(index + 1); - } else { - string = ""; - } - } - newStringBuffer.append(string); - string = newStringBuffer.toString(); - } - } + int index; + StringBuffer newStringBuffer; - return string; + // replace each of the following characters with the backquoted version + char charsFind[] = { '\\', '\'', '\t', '\n', '\r', '"', '%', '\u001E' }; + String charsReplace[] = { "\\\\", "\\'", "\\t", "\\n", "\\r", "\\\"", "\\%", "\\u001E" }; + for (int i = 0; i < charsFind.length; i++) { + if (string.indexOf(charsFind[i]) != -1) { + newStringBuffer = new StringBuffer(); + while ((index = string.indexOf(charsFind[i])) != -1) { + if (index > 0) { + newStringBuffer.append(string.substring(0, index)); + } + newStringBuffer.append(charsReplace[i]); + if ((index + 1) < string.length()) { + string = string.substring(index + 1); + } else { + string = ""; + } + } + newStringBuffer.append(string); + string = newStringBuffer.toString(); + } } + + return string; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java index 05ee1e1..27ca7a1 100644 --- a/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java +++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java @@ -36,54 +36,55 @@ import com.yahoo.labs.samoa.topology.impl.SimpleEngine; */ public class LocalDoTask { - // TODO: clean up this class for helping ML Developer in SAMOA - // TODO: clean up code from storm-impl - - // It seems that the 3 extra options are not used. - // Probably should remove them - private static final String SUPPRESS_STATUS_OUT_MSG = "Suppress the task status output. Normally it is sent to stderr."; - private static final String SUPPRESS_RESULT_OUT_MSG = "Suppress the task result output. Normally it is sent to stdout."; - private static final String STATUS_UPDATE_FREQ_MSG = "Wait time in milliseconds between status updates."; - private static final Logger logger = LoggerFactory.getLogger(LocalDoTask.class); + // TODO: clean up this class for helping ML Developer in SAMOA + // TODO: clean up code from storm-impl - /** - * The main method. - * - * @param args - * the arguments - */ - public static void main(String[] args) { + // It seems that the 3 extra options are not used. + // Probably should remove them + private static final String SUPPRESS_STATUS_OUT_MSG = "Suppress the task status output. Normally it is sent to stderr."; + private static final String SUPPRESS_RESULT_OUT_MSG = "Suppress the task result output. Normally it is sent to stdout."; + private static final String STATUS_UPDATE_FREQ_MSG = "Wait time in milliseconds between status updates."; + private static final Logger logger = LoggerFactory.getLogger(LocalDoTask.class); - // ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); + /** + * The main method. + * + * @param args + * the arguments + */ + public static void main(String[] args) { - // args = tmpArgs.toArray(new String[0]); + // ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args)); - FlagOption suppressStatusOutOpt = new FlagOption("suppressStatusOut", 'S', SUPPRESS_STATUS_OUT_MSG); + // args = tmpArgs.toArray(new String[0]); - FlagOption suppressResultOutOpt = new FlagOption("suppressResultOut", 'R', SUPPRESS_RESULT_OUT_MSG); + FlagOption suppressStatusOutOpt = new FlagOption("suppressStatusOut", 'S', SUPPRESS_STATUS_OUT_MSG); - IntOption statusUpdateFreqOpt = new IntOption("statusUpdateFrequency", 'F', STATUS_UPDATE_FREQ_MSG, 1000, 0, Integer.MAX_VALUE); + FlagOption suppressResultOutOpt = new FlagOption("suppressResultOut", 'R', SUPPRESS_RESULT_OUT_MSG); - Option[] extraOptions = new Option[] { suppressStatusOutOpt, suppressResultOutOpt, statusUpdateFreqOpt }; + IntOption statusUpdateFreqOpt = new IntOption("statusUpdateFrequency", 'F', STATUS_UPDATE_FREQ_MSG, 1000, 0, + Integer.MAX_VALUE); - StringBuilder cliString = new StringBuilder(); - for (String arg : args) { - cliString.append(" ").append(arg); - } - logger.debug("Command line string = {}", cliString.toString()); - System.out.println("Command line string = " + cliString.toString()); + Option[] extraOptions = new Option[] { suppressStatusOutOpt, suppressResultOutOpt, statusUpdateFreqOpt }; - Task task; - try { - task = ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions); - logger.info("Successfully instantiating {}", task.getClass().getCanonicalName()); - } catch (Exception e) { - logger.error("Fail to initialize the task", e); - System.out.println("Fail to initialize the task" + e); - return; - } - task.setFactory(new SimpleComponentFactory()); - task.init(); - SimpleEngine.submitTopology(task.getTopology()); + StringBuilder cliString = new StringBuilder(); + for (String arg : args) { + cliString.append(" ").append(arg); } + logger.debug("Command line string = {}", cliString.toString()); + System.out.println("Command line string = " + cliString.toString()); + + Task task; + try { + task = ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions); + logger.info("Successfully instantiating {}", task.getClass().getCanonicalName()); + } catch (Exception e) { + logger.error("Fail to initialize the task", e); + System.out.println("Fail to initialize the task" + e); + return; + } + task.setFactory(new SimpleComponentFactory()); + task.init(); + SimpleEngine.submitTopology(task.getTopology()); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java index b289dbe..0c2f301 100644 --- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java +++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java @@ -31,23 +31,23 @@ import com.yahoo.labs.samoa.topology.Topology; public class SimpleComponentFactory implements ComponentFactory { - public ProcessingItem createPi(Processor processor, int paralellism) { - return new SimpleProcessingItem(processor, paralellism); - } + public ProcessingItem createPi(Processor processor, int paralellism) { + return new SimpleProcessingItem(processor, paralellism); + } - public ProcessingItem createPi(Processor processor) { - return this.createPi(processor, 1); - } + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } - public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { - return new SimpleEntranceProcessingItem(processor); - } + public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { + return new SimpleEntranceProcessingItem(processor); + } - public Stream createStream(IProcessingItem sourcePi) { - return new SimpleStream(sourcePi); - } + public Stream createStream(IProcessingItem sourcePi) { + return new SimpleStream(sourcePi); + } - public Topology createTopology(String topoName) { - return new SimpleTopology(topoName); - } + public Topology createTopology(String topoName) { + return new SimpleTopology(topoName); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java index 9d131e1..5ca5837 100644 --- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java +++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java @@ -28,10 +28,10 @@ import com.yahoo.labs.samoa.topology.Topology; public class SimpleEngine { - public static void submitTopology(Topology topology) { - SimpleTopology simpleTopology = (SimpleTopology) topology; - simpleTopology.run(); - // runs until completion - } + public static void submitTopology(Topology topology) { + SimpleTopology simpleTopology = (SimpleTopology) topology; + simpleTopology.run(); + // runs until completion + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java index 4652ebb..c9cc601 100644 --- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java +++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java @@ -24,10 +24,10 @@ import com.yahoo.labs.samoa.core.EntranceProcessor; import com.yahoo.labs.samoa.topology.LocalEntranceProcessingItem; class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem { - public SimpleEntranceProcessingItem(EntranceProcessor processor) { - super(processor); - } - - // The default waiting time when there is no available events is 100ms - // Override waitForNewEvents() to change it + public SimpleEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + // The default waiting time when there is no available events is 100ms + // Override waitForNewEvents() to change it } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java index e3cc765..77361b1 100644 --- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java +++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java @@ -34,54 +34,54 @@ import com.yahoo.labs.samoa.utils.PartitioningScheme; import com.yahoo.labs.samoa.utils.StreamDestination; /** - * + * * @author abifet */ class SimpleProcessingItem extends AbstractProcessingItem { - private IProcessingItem[] arrayProcessingItem; + private IProcessingItem[] arrayProcessingItem; - SimpleProcessingItem(Processor processor) { - super(processor); - } - - SimpleProcessingItem(Processor processor, int parallelism) { - super(processor); - this.setParallelism(parallelism); - } - - public IProcessingItem getProcessingItem(int i) { - return arrayProcessingItem[i]; - } - - @Override - protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { - StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme); - ((SimpleStream)inputStream).addDestination(destination); - return this; - } + SimpleProcessingItem(Processor processor) { + super(processor); + } - public SimpleProcessingItem copy() { - Processor processor = this.getProcessor(); - return new SimpleProcessingItem(processor.newProcessor(processor)); - } + SimpleProcessingItem(Processor processor, int parallelism) { + super(processor); + this.setParallelism(parallelism); + } - public void processEvent(ContentEvent event, int counter) { - - int parallelism = this.getParallelism(); - //System.out.println("Process event "+event+" (isLast="+event.isLastEvent()+") with counter="+counter+" while parallelism="+parallelism); - if (this.arrayProcessingItem == null && parallelism > 0) { - //Init processing elements, the first time they are needed - this.arrayProcessingItem = new IProcessingItem[parallelism]; - for (int j = 0; j < parallelism; j++) { - arrayProcessingItem[j] = this.copy(); - arrayProcessingItem[j].getProcessor().onCreate(j); - } - } - if (this.arrayProcessingItem != null) { - IProcessingItem pi = this.getProcessingItem(counter); - Processor p = pi.getProcessor(); - //System.out.println("PI="+pi+", p="+p); - this.getProcessingItem(counter).getProcessor().process(event); - } + public IProcessingItem getProcessingItem(int i) { + return arrayProcessingItem[i]; + } + + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme); + ((SimpleStream) inputStream).addDestination(destination); + return this; + } + + public SimpleProcessingItem copy() { + Processor processor = this.getProcessor(); + return new SimpleProcessingItem(processor.newProcessor(processor)); + } + + public void processEvent(ContentEvent event, int counter) { + + int parallelism = this.getParallelism(); + // System.out.println("Process event "+event+" (isLast="+event.isLastEvent()+") with counter="+counter+" while parallelism="+parallelism); + if (this.arrayProcessingItem == null && parallelism > 0) { + // Init processing elements, the first time they are needed + this.arrayProcessingItem = new IProcessingItem[parallelism]; + for (int j = 0; j < parallelism; j++) { + arrayProcessingItem[j] = this.copy(); + arrayProcessingItem[j].getProcessor().onCreate(j); + } + } + if (this.arrayProcessingItem != null) { + IProcessingItem pi = this.getProcessingItem(counter); + Processor p = pi.getProcessor(); + // System.out.println("PI="+pi+", p="+p); + this.getProcessingItem(counter).getProcessor().process(event); } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java index 74684a7..09dc555 100644 --- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java +++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java @@ -38,56 +38,58 @@ import com.yahoo.labs.samoa.utils.StreamDestination; * @author abifet */ class SimpleStream extends AbstractStream { - private List<StreamDestination> destinations; - private int maxCounter; - private int eventCounter; + private List<StreamDestination> destinations; + private int maxCounter; + private int eventCounter; - SimpleStream(IProcessingItem sourcePi) { - super(sourcePi); - this.destinations = new LinkedList<>(); - this.eventCounter = 0; - this.maxCounter = 1; - } + SimpleStream(IProcessingItem sourcePi) { + super(sourcePi); + this.destinations = new LinkedList<>(); + this.eventCounter = 0; + this.maxCounter = 1; + } - private int getNextCounter() { - if (maxCounter > 0 && eventCounter >= maxCounter) eventCounter = 0; - this.eventCounter++; - return this.eventCounter; - } + private int getNextCounter() { + if (maxCounter > 0 && eventCounter >= maxCounter) + eventCounter = 0; + this.eventCounter++; + return this.eventCounter; + } - @Override - public void put(ContentEvent event) { - this.put(event, this.getNextCounter()); - } - - private void put(ContentEvent event, int counter) { - SimpleProcessingItem pi; - int parallelism; - for (StreamDestination destination:destinations) { - pi = (SimpleProcessingItem) destination.getProcessingItem(); - parallelism = destination.getParallelism(); - switch (destination.getPartitioningScheme()) { - case SHUFFLE: - pi.processEvent(event, counter % parallelism); - break; - case GROUP_BY_KEY: - HashCodeBuilder hb = new HashCodeBuilder(); - hb.append(event.getKey()); - int key = hb.build() % parallelism; - pi.processEvent(event, key); - break; - case BROADCAST: - for (int p = 0; p < parallelism; p++) { - pi.processEvent(event, p); - } - break; - } + @Override + public void put(ContentEvent event) { + this.put(event, this.getNextCounter()); + } + + private void put(ContentEvent event, int counter) { + SimpleProcessingItem pi; + int parallelism; + for (StreamDestination destination : destinations) { + pi = (SimpleProcessingItem) destination.getProcessingItem(); + parallelism = destination.getParallelism(); + switch (destination.getPartitioningScheme()) { + case SHUFFLE: + pi.processEvent(event, counter % parallelism); + break; + case GROUP_BY_KEY: + HashCodeBuilder hb = new HashCodeBuilder(); + hb.append(event.getKey()); + int key = hb.build() % parallelism; + pi.processEvent(event, key); + break; + case BROADCAST: + for (int p = 0; p < parallelism; p++) { + pi.processEvent(event, p); } + break; + } } + } - public void addDestination(StreamDestination destination) { - this.destinations.add(destination); - if (maxCounter <= 0) maxCounter = 1; - maxCounter *= destination.getParallelism(); - } + public void addDestination(StreamDestination destination) { + this.destinations.add(destination); + if (maxCounter <= 0) + maxCounter = 1; + maxCounter *= destination.getParallelism(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java index 675b4ac..e7fddbd 100644 --- a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java +++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java @@ -27,18 +27,21 @@ package com.yahoo.labs.samoa.topology.impl; import com.yahoo.labs.samoa.topology.AbstractTopology; public class SimpleTopology extends AbstractTopology { - SimpleTopology(String name) { - super(name); - } + SimpleTopology(String name) { + super(name); + } - public void run() { - if (this.getEntranceProcessingItems() == null) - throw new IllegalStateException("You need to set entrance PI before running the topology."); - if (this.getEntranceProcessingItems().size() != 1) - throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is "+this.getEntranceProcessingItems().size()); - - SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems().toArray()[0]; - entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode - entrancePi.startSendingEvents(); - } + public void run() { + if (this.getEntranceProcessingItems() == null) + throw new IllegalStateException("You need to set entrance PI before running the topology."); + if (this.getEntranceProcessingItems().size() != 1) + throw new IllegalStateException("SimpleTopology supports 1 entrance PI only. Number of entrance PIs is " + + this.getEntranceProcessingItems().size()); + + SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) this.getEntranceProcessingItems() + .toArray()[0]; + entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple + // mode + entrancePi.startSendingEvents(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java index 9bf1c2d..d3e54a8 100644 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java +++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java @@ -24,64 +24,63 @@ import org.junit.Test; public class AlgosTest { + @Test + public void testVHTLocal() throws Exception { - @Test - public void testVHTLocal() throws Exception { + TestParams vhtConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(200_000) + .classifiedInstances(200_000) + .classificationsCorrect(75f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE) + .resultFilePollTimeout(10) + .prePollWait(10) + .taskClassName(LocalDoTask.class.getName()) + .build(); + TestUtils.test(vhtConfig); - TestParams vhtConfig = new TestParams.Builder() - .inputInstances(200_000) - .samplingSize(20_000) - .evaluationInstances(200_000) - .classifiedInstances(200_000) - .classificationsCorrect(75f) - .kappaStat(0f) - .kappaTempStat(0f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE) - .resultFilePollTimeout(10) - .prePollWait(10) - .taskClassName(LocalDoTask.class.getName()) - .build(); - TestUtils.test(vhtConfig); + } - } + @Test + public void testBaggingLocal() throws Exception { + TestParams baggingConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(180_000) + .classifiedInstances(210_000) + .classificationsCorrect(60f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE) + .prePollWait(10) + .resultFilePollTimeout(10) + .taskClassName(LocalDoTask.class.getName()) + .build(); + TestUtils.test(baggingConfig); - @Test - public void testBaggingLocal() throws Exception { - TestParams baggingConfig = new TestParams.Builder() - .inputInstances(200_000) - .samplingSize(20_000) - .evaluationInstances(180_000) - .classifiedInstances(210_000) - .classificationsCorrect(60f) - .kappaStat(0f) - .kappaTempStat(0f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE) - .prePollWait(10) - .resultFilePollTimeout(10) - .taskClassName(LocalDoTask.class.getName()) - .build(); - TestUtils.test(baggingConfig); + } - } + @Test + public void testNaiveBayesLocal() throws Exception { - @Test - public void testNaiveBayesLocal() throws Exception { + TestParams vhtConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(200_000) + .classifiedInstances(200_000) + .classificationsCorrect(65f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_NAIVEBAYES_HYPERPLANE) + .resultFilePollTimeout(10) + .prePollWait(10) + .taskClassName(LocalDoTask.class.getName()) + .build(); + TestUtils.test(vhtConfig); - TestParams vhtConfig = new TestParams.Builder() - .inputInstances(200_000) - .samplingSize(20_000) - .evaluationInstances(200_000) - .classifiedInstances(200_000) - .classificationsCorrect(65f) - .kappaStat(0f) - .kappaTempStat(0f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_NAIVEBAYES_HYPERPLANE) - .resultFilePollTimeout(10) - .prePollWait(10) - .taskClassName(LocalDoTask.class.getName()) - .build(); - TestUtils.test(vhtConfig); - - } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java index 02a9295..bfd6fe1 100644 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java +++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java @@ -36,61 +36,64 @@ import com.yahoo.labs.samoa.topology.Topology; /** * @author Anh Thu Vu - * + * */ public class SimpleComponentFactoryTest { - @Tested private SimpleComponentFactory factory; - @Mocked private Processor processor, processorReplica; - @Mocked private EntranceProcessor entranceProcessor; - - private final int parallelism = 3; - private final String topoName = "TestTopology"; - + @Tested + private SimpleComponentFactory factory; + @Mocked + private Processor processor, processorReplica; + @Mocked + private EntranceProcessor entranceProcessor; + + private final int parallelism = 3; + private final String topoName = "TestTopology"; + + @Before + public void setUp() throws Exception { + factory = new SimpleComponentFactory(); + } + + @Test + public void testCreatePiNoParallelism() { + ProcessingItem pi = factory.createPi(processor); + assertNotNull("ProcessingItem created is null.", pi); + assertEquals("ProcessingItem created is not a SimpleProcessingItem.", SimpleProcessingItem.class, pi.getClass()); + assertEquals("Parallelism of PI is not 1", 1, pi.getParallelism(), 0); + } + + @Test + public void testCreatePiWithParallelism() { + ProcessingItem pi = factory.createPi(processor, parallelism); + assertNotNull("ProcessingItem created is null.", pi); + assertEquals("ProcessingItem created is not a SimpleProcessingItem.", SimpleProcessingItem.class, pi.getClass()); + assertEquals("Parallelism of PI is not ", parallelism, pi.getParallelism(), 0); + } + + @Test + public void testCreateStream() { + ProcessingItem pi = factory.createPi(processor); + + Stream stream = factory.createStream(pi); + assertNotNull("Stream created is null", stream); + assertEquals("Stream created is not a SimpleStream.", SimpleStream.class, stream.getClass()); + } - @Before - public void setUp() throws Exception { - factory = new SimpleComponentFactory(); - } + @Test + public void testCreateTopology() { + Topology topology = factory.createTopology(topoName); + assertNotNull("Topology created is null.", topology); + assertEquals("Topology created is not a SimpleTopology.", SimpleTopology.class, topology.getClass()); + } - @Test - public void testCreatePiNoParallelism() { - ProcessingItem pi = factory.createPi(processor); - assertNotNull("ProcessingItem created is null.",pi); - assertEquals("ProcessingItem created is not a SimpleProcessingItem.",SimpleProcessingItem.class,pi.getClass()); - assertEquals("Parallelism of PI is not 1",1,pi.getParallelism(),0); - } - - @Test - public void testCreatePiWithParallelism() { - ProcessingItem pi = factory.createPi(processor,parallelism); - assertNotNull("ProcessingItem created is null.",pi); - assertEquals("ProcessingItem created is not a SimpleProcessingItem.",SimpleProcessingItem.class,pi.getClass()); - assertEquals("Parallelism of PI is not ",parallelism,pi.getParallelism(),0); - } - - @Test - public void testCreateStream() { - ProcessingItem pi = factory.createPi(processor); - - Stream stream = factory.createStream(pi); - assertNotNull("Stream created is null",stream); - assertEquals("Stream created is not a SimpleStream.",SimpleStream.class,stream.getClass()); - } - - @Test - public void testCreateTopology() { - Topology topology = factory.createTopology(topoName); - assertNotNull("Topology created is null.",topology); - assertEquals("Topology created is not a SimpleTopology.",SimpleTopology.class,topology.getClass()); - } - - @Test - public void testCreateEntrancePi() { - EntranceProcessingItem entrancePi = factory.createEntrancePi(entranceProcessor); - assertNotNull("EntranceProcessingItem created is null.",entrancePi); - assertEquals("EntranceProcessingItem created is not a SimpleEntranceProcessingItem.",SimpleEntranceProcessingItem.class,entrancePi.getClass()); - assertSame("EntranceProcessor is not set correctly.",entranceProcessor, entrancePi.getProcessor()); - } + @Test + public void testCreateEntrancePi() { + EntranceProcessingItem entrancePi = factory.createEntrancePi(entranceProcessor); + assertNotNull("EntranceProcessingItem created is null.", entrancePi); + assertEquals("EntranceProcessingItem created is not a SimpleEntranceProcessingItem.", + SimpleEntranceProcessingItem.class, entrancePi.getClass()); + assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor()); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java index c4649ed..23b38b4 100644 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java +++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java @@ -29,29 +29,32 @@ import org.junit.Test; /** * @author Anh Thu Vu - * + * */ public class SimpleEngineTest { - @Tested private SimpleEngine unused; - @Mocked private SimpleTopology topology; - @Mocked private Runtime mockedRuntime; - - @Test - public void testSubmitTopology() { - new NonStrictExpectations() { - { - Runtime.getRuntime(); - result=mockedRuntime; - mockedRuntime.exit(0); - } - }; - SimpleEngine.submitTopology(topology); - new Verifications() { - { - topology.run(); - } - }; - } + @Tested + private SimpleEngine unused; + @Mocked + private SimpleTopology topology; + @Mocked + private Runtime mockedRuntime; + + @Test + public void testSubmitTopology() { + new NonStrictExpectations() { + { + Runtime.getRuntime(); + result = mockedRuntime; + mockedRuntime.exit(0); + } + }; + SimpleEngine.submitTopology(topology); + new Verifications() { + { + topology.run(); + } + }; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java index 41ae22b..0c1e475 100644 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java +++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java @@ -36,118 +36,137 @@ import com.yahoo.labs.samoa.topology.Stream; /** * @author Anh Thu Vu - * + * */ public class SimpleEntranceProcessingItemTest { - @Tested private SimpleEntranceProcessingItem entrancePi; - - @Mocked private EntranceProcessor entranceProcessor; - @Mocked private Stream outputStream, anotherStream; - @Mocked private ContentEvent event; - - @Mocked private Thread unused; - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - entrancePi = new SimpleEntranceProcessingItem(entranceProcessor); - } - - @Test - public void testContructor() { - assertSame("EntranceProcessor is not set correctly.",entranceProcessor,entrancePi.getProcessor()); - } - - @Test - public void testSetOutputStream() { - entrancePi.setOutputStream(outputStream); - assertSame("OutputStream is not set correctly.",outputStream,entrancePi.getOutputStream()); - } - - @Test - public void testSetOutputStreamRepeate() { - entrancePi.setOutputStream(outputStream); - entrancePi.setOutputStream(outputStream); - assertSame("OutputStream is not set correctly.",outputStream,entrancePi.getOutputStream()); - } - - @Test(expected=IllegalStateException.class) - public void testSetOutputStreamError() { - entrancePi.setOutputStream(outputStream); - entrancePi.setOutputStream(anotherStream); - } - - @Test - public void testInjectNextEventSuccess() { - entrancePi.setOutputStream(outputStream); - new StrictExpectations() { - { - entranceProcessor.hasNext(); - result=true; - - entranceProcessor.nextEvent(); - result=event; - } - }; - entrancePi.injectNextEvent(); - new Verifications() { - { - outputStream.put(event); - } - }; - } - - @Test - public void testStartSendingEvents() { - entrancePi.setOutputStream(outputStream); - new StrictExpectations() { - { - for (int i=0; i<1; i++) { - entranceProcessor.isFinished(); result=false; - entranceProcessor.hasNext(); result=false; - } - - for (int i=0; i<5; i++) { - entranceProcessor.isFinished(); result=false; - entranceProcessor.hasNext(); result=true; - entranceProcessor.nextEvent(); result=event; - outputStream.put(event); - } - - for (int i=0; i<2; i++) { - entranceProcessor.isFinished(); result=false; - entranceProcessor.hasNext(); result=false; - } - - for (int i=0; i<5; i++) { - entranceProcessor.isFinished(); result=false; - entranceProcessor.hasNext(); result=true; - entranceProcessor.nextEvent(); result=event; - outputStream.put(event); - } - - entranceProcessor.isFinished(); result=true; times=1; - entranceProcessor.hasNext(); times=0; - } - }; - entrancePi.startSendingEvents(); - new Verifications() { - { - try { - Thread.sleep(anyInt); times=3; - } catch (InterruptedException e) { - - } - } - }; - } - - @Test(expected=IllegalStateException.class) - public void testStartSendingEventsError() { - entrancePi.startSendingEvents(); - } + @Tested + private SimpleEntranceProcessingItem entrancePi; + + @Mocked + private EntranceProcessor entranceProcessor; + @Mocked + private Stream outputStream, anotherStream; + @Mocked + private ContentEvent event; + + @Mocked + private Thread unused; + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + entrancePi = new SimpleEntranceProcessingItem(entranceProcessor); + } + + @Test + public void testContructor() { + assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor()); + } + + @Test + public void testSetOutputStream() { + entrancePi.setOutputStream(outputStream); + assertSame("OutputStream is not set correctly.", outputStream, entrancePi.getOutputStream()); + } + + @Test + public void testSetOutputStreamRepeate() { + entrancePi.setOutputStream(outputStream); + entrancePi.setOutputStream(outputStream); + assertSame("OutputStream is not set correctly.", outputStream, entrancePi.getOutputStream()); + } + + @Test(expected = IllegalStateException.class) + public void testSetOutputStreamError() { + entrancePi.setOutputStream(outputStream); + entrancePi.setOutputStream(anotherStream); + } + + @Test + public void testInjectNextEventSuccess() { + entrancePi.setOutputStream(outputStream); + new StrictExpectations() { + { + entranceProcessor.hasNext(); + result = true; + + entranceProcessor.nextEvent(); + result = event; + } + }; + entrancePi.injectNextEvent(); + new Verifications() { + { + outputStream.put(event); + } + }; + } + + @Test + public void testStartSendingEvents() { + entrancePi.setOutputStream(outputStream); + new StrictExpectations() { + { + for (int i = 0; i < 1; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = false; + } + + for (int i = 0; i < 5; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = true; + entranceProcessor.nextEvent(); + result = event; + outputStream.put(event); + } + + for (int i = 0; i < 2; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = false; + } + + for (int i = 0; i < 5; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = true; + entranceProcessor.nextEvent(); + result = event; + outputStream.put(event); + } + + entranceProcessor.isFinished(); + result = true; + times = 1; + entranceProcessor.hasNext(); + times = 0; + } + }; + entrancePi.startSendingEvents(); + new Verifications() { + { + try { + Thread.sleep(anyInt); + times = 3; + } catch (InterruptedException e) { + + } + } + }; + } + + @Test(expected = IllegalStateException.class) + public void testStartSendingEventsError() { + entrancePi.startSendingEvents(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java index a4a288a..caa82bf 100644 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java +++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java @@ -40,81 +40,85 @@ import com.yahoo.labs.samoa.utils.StreamDestination; /** * @author Anh Thu Vu - * + * */ public class SimpleProcessingItemTest { - @Tested private SimpleProcessingItem pi; - - @Mocked private Processor processor; - @Mocked private SimpleStream stream; - @Mocked private StreamDestination destination; - @Mocked private ContentEvent event; - - private final int parallelism = 4; - private final int counter = 2; - - - @Before - public void setUp() throws Exception { - pi = new SimpleProcessingItem(processor, parallelism); - } - - @Test - public void testConstructor() { - assertSame("Processor was not set correctly.",processor,pi.getProcessor()); - assertEquals("Parallelism was not set correctly.",parallelism,pi.getParallelism(),0); - } - - @Test - public void testConnectInputShuffleStream() { - new Expectations() { - { - destination = new StreamDestination(pi, parallelism, PartitioningScheme.SHUFFLE); - stream.addDestination(destination); - } - }; - pi.connectInputShuffleStream(stream); - } - - @Test - public void testConnectInputKeyStream() { - new Expectations() { - { - destination = new StreamDestination(pi, parallelism, PartitioningScheme.GROUP_BY_KEY); - stream.addDestination(destination); - } - }; - pi.connectInputKeyStream(stream); - } - - @Test - public void testConnectInputAllStream() { - new Expectations() { - { - destination = new StreamDestination(pi, parallelism, PartitioningScheme.BROADCAST); - stream.addDestination(destination); - } - }; - pi.connectInputAllStream(stream); - } - - @Test - public void testProcessEvent() { - new Expectations() { - { - for (int i=0; i<parallelism; i++) { - processor.newProcessor(processor); - result=processor; - - processor.onCreate(anyInt); - } - - processor.process(event); - } - }; - pi.processEvent(event, counter); - - } + @Tested + private SimpleProcessingItem pi; + + @Mocked + private Processor processor; + @Mocked + private SimpleStream stream; + @Mocked + private StreamDestination destination; + @Mocked + private ContentEvent event; + + private final int parallelism = 4; + private final int counter = 2; + + @Before + public void setUp() throws Exception { + pi = new SimpleProcessingItem(processor, parallelism); + } + + @Test + public void testConstructor() { + assertSame("Processor was not set correctly.", processor, pi.getProcessor()); + assertEquals("Parallelism was not set correctly.", parallelism, pi.getParallelism(), 0); + } + + @Test + public void testConnectInputShuffleStream() { + new Expectations() { + { + destination = new StreamDestination(pi, parallelism, PartitioningScheme.SHUFFLE); + stream.addDestination(destination); + } + }; + pi.connectInputShuffleStream(stream); + } + + @Test + public void testConnectInputKeyStream() { + new Expectations() { + { + destination = new StreamDestination(pi, parallelism, PartitioningScheme.GROUP_BY_KEY); + stream.addDestination(destination); + } + }; + pi.connectInputKeyStream(stream); + } + + @Test + public void testConnectInputAllStream() { + new Expectations() { + { + destination = new StreamDestination(pi, parallelism, PartitioningScheme.BROADCAST); + stream.addDestination(destination); + } + }; + pi.connectInputAllStream(stream); + } + + @Test + public void testProcessEvent() { + new Expectations() { + { + for (int i = 0; i < parallelism; i++) { + processor.newProcessor(processor); + result = processor; + + processor.onCreate(anyInt); + } + + processor.process(event); + } + }; + pi.processEvent(event, counter); + + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java index 2a625b5..c8f6c5d 100644 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java +++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java @@ -40,72 +40,82 @@ import com.yahoo.labs.samoa.utils.StreamDestination; /** * @author Anh Thu Vu - * + * */ @RunWith(Parameterized.class) public class SimpleStreamTest { - @Tested private SimpleStream stream; - - @Mocked private SimpleProcessingItem sourcePi, destPi; - @Mocked private ContentEvent event; - @Mocked private StreamDestination destination; + @Tested + private SimpleStream stream; + + @Mocked + private SimpleProcessingItem sourcePi, destPi; + @Mocked + private ContentEvent event; + @Mocked + private StreamDestination destination; + + private final String eventKey = "eventkey"; + private final int parallelism; + private final PartitioningScheme scheme; + + @Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][] { + { 2, PartitioningScheme.SHUFFLE }, + { 3, PartitioningScheme.GROUP_BY_KEY }, + { 4, PartitioningScheme.BROADCAST } + }); + } + + public SimpleStreamTest(int parallelism, PartitioningScheme scheme) { + this.parallelism = parallelism; + this.scheme = scheme; + } + + @Before + public void setUp() throws Exception { + stream = new SimpleStream(sourcePi); + stream.addDestination(destination); + } - private final String eventKey = "eventkey"; - private final int parallelism; - private final PartitioningScheme scheme; - - - @Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][] { - { 2, PartitioningScheme.SHUFFLE }, - { 3, PartitioningScheme.GROUP_BY_KEY }, - { 4, PartitioningScheme.BROADCAST } - }); - } - - public SimpleStreamTest(int parallelism, PartitioningScheme scheme) { - this.parallelism = parallelism; - this.scheme = scheme; - } - - @Before - public void setUp() throws Exception { - stream = new SimpleStream(sourcePi); - stream.addDestination(destination); - } + @Test + public void testPut() { + new NonStrictExpectations() { + { + event.getKey(); + result = eventKey; + destination.getProcessingItem(); + result = destPi; + destination.getPartitioningScheme(); + result = scheme; + destination.getParallelism(); + result = parallelism; - @Test - public void testPut() { - new NonStrictExpectations() { - { - event.getKey(); result=eventKey; - destination.getProcessingItem(); result=destPi; - destination.getPartitioningScheme(); result=scheme; - destination.getParallelism(); result=parallelism; - - } - }; - switch(this.scheme) { - case SHUFFLE: case GROUP_BY_KEY: - new Expectations() { - { - // TODO: restrict the range of counter value - destPi.processEvent(event, anyInt); times=1; - } - }; - break; - case BROADCAST: - new Expectations() { - { - // TODO: restrict the range of counter value - destPi.processEvent(event, anyInt); times=parallelism; - } - }; - break; - } - stream.put(event); - } + } + }; + switch (this.scheme) { + case SHUFFLE: + case GROUP_BY_KEY: + new Expectations() { + { + // TODO: restrict the range of counter value + destPi.processEvent(event, anyInt); + times = 1; + } + }; + break; + case BROADCAST: + new Expectations() { + { + // TODO: restrict the range of counter value + destPi.processEvent(event, anyInt); + times = parallelism; + } + }; + break; + } + stream.put(event); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java ---------------------------------------------------------------------- diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java index 2423778..418ad14 100644 --- a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java +++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java @@ -31,63 +31,64 @@ import org.junit.Test; * #L% */ - - import com.yahoo.labs.samoa.core.EntranceProcessor; import com.yahoo.labs.samoa.topology.EntranceProcessingItem; /** * @author Anh Thu Vu - * + * */ public class SimpleTopologyTest { - @Tested private SimpleTopology topology; - - @Mocked private SimpleEntranceProcessingItem entrancePi; - @Mocked private EntranceProcessor entranceProcessor; - - @Before - public void setUp() throws Exception { - topology = new SimpleTopology("TestTopology"); - } - - @Test - public void testAddEntrancePi() { - topology.addEntranceProcessingItem(entrancePi); - - Set<EntranceProcessingItem> entrancePIs = topology.getEntranceProcessingItems(); - assertNotNull("Set of entrance PIs is null.",entrancePIs); - assertEquals("Number of entrance PI in SimpleTopology must be 1",1,entrancePIs.size()); - assertSame("Entrance PI was not set correctly.",entrancePi,entrancePIs.toArray()[0]); - // TODO: verify that entrance PI is in the set of ProcessingItems - // Need to access topology's set of PIs (getProcessingItems() method) - } - - @Test - public void testRun() { - topology.addEntranceProcessingItem(entrancePi); - - new NonStrictExpectations() { - { - entrancePi.getProcessor(); - result=entranceProcessor; - - } - }; - - new Expectations() { - { - entranceProcessor.onCreate(anyInt); - entrancePi.startSendingEvents(); - } - }; - topology.run(); - } - - @Test(expected=IllegalStateException.class) - public void testRunWithoutEntrancePI() { - topology.run(); - } + @Tested + private SimpleTopology topology; + + @Mocked + private SimpleEntranceProcessingItem entrancePi; + @Mocked + private EntranceProcessor entranceProcessor; + + @Before + public void setUp() throws Exception { + topology = new SimpleTopology("TestTopology"); + } + + @Test + public void testAddEntrancePi() { + topology.addEntranceProcessingItem(entrancePi); + + Set<EntranceProcessingItem> entrancePIs = topology.getEntranceProcessingItems(); + assertNotNull("Set of entrance PIs is null.", entrancePIs); + assertEquals("Number of entrance PI in SimpleTopology must be 1", 1, entrancePIs.size()); + assertSame("Entrance PI was not set correctly.", entrancePi, entrancePIs.toArray()[0]); + // TODO: verify that entrance PI is in the set of ProcessingItems + // Need to access topology's set of PIs (getProcessingItems() method) + } + + @Test + public void testRun() { + topology.addEntranceProcessingItem(entrancePi); + + new NonStrictExpectations() { + { + entrancePi.getProcessor(); + result = entranceProcessor; + + } + }; + + new Expectations() { + { + entranceProcessor.onCreate(anyInt); + entrancePi.startSendingEvents(); + } + }; + topology.run(); + } + + @Test(expected = IllegalStateException.class) + public void testRunWithoutEntrancePI() { + topology.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java index 33299ac..b627416 100644 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java @@ -40,58 +40,59 @@ import com.yahoo.labs.samoa.topology.Topology; */ public class S4ComponentFactory implements ComponentFactory { - public static final Logger logger = LoggerFactory.getLogger(S4ComponentFactory.class); - protected S4DoTask app; + public static final Logger logger = LoggerFactory.getLogger(S4ComponentFactory.class); + protected S4DoTask app; - @Override - public ProcessingItem createPi(Processor processor, int paralellism) { - S4ProcessingItem processingItem = new S4ProcessingItem(app); - // TODO refactor how to set the paralellism level - processingItem.setParalellismLevel(paralellism); - processingItem.setProcessor(processor); + @Override + public ProcessingItem createPi(Processor processor, int paralellism) { + S4ProcessingItem processingItem = new S4ProcessingItem(app); + // TODO refactor how to set the paralellism level + processingItem.setParalellismLevel(paralellism); + processingItem.setProcessor(processor); - return processingItem; - } + return processingItem; + } - @Override - public ProcessingItem createPi(Processor processor) { - return this.createPi(processor, 1); - } + @Override + public ProcessingItem createPi(Processor processor) { + return this.createPi(processor, 1); + } - @Override - public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { - // TODO Create source Entry processing item that connects to an external stream - S4EntranceProcessingItem entrancePi = new S4EntranceProcessingItem(entranceProcessor, app); - entrancePi.setParallelism(1); // FIXME should not be set to 1 statically - return entrancePi; - } + @Override + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor) { + // TODO Create source Entry processing item that connects to an external + // stream + S4EntranceProcessingItem entrancePi = new S4EntranceProcessingItem(entranceProcessor, app); + entrancePi.setParallelism(1); // FIXME should not be set to 1 statically + return entrancePi; + } - @Override - public Stream createStream(IProcessingItem sourcePi) { - S4Stream aStream = new S4Stream(app); - return aStream; - } + @Override + public Stream createStream(IProcessingItem sourcePi) { + S4Stream aStream = new S4Stream(app); + return aStream; + } - @Override - public Topology createTopology(String topoName) { - return new S4Topology(topoName); - } + @Override + public Topology createTopology(String topoName) { + return new S4Topology(topoName); + } - /** - * Initialization method. - * - * @param evalTask - */ - public void init(String evalTask) { - // Task is initiated in the DoTaskApp - } + /** + * Initialization method. + * + * @param evalTask + */ + public void init(String evalTask) { + // Task is initiated in the DoTaskApp + } - /** - * Sets S4 application. - * - * @param app - */ - public void setApp(S4DoTask app) { - this.app = app; - } + /** + * Sets S4 application. + * + * @param app + */ + public void setApp(S4DoTask app) { + this.app = app; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java index 0f474a4..3691a82 100644 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java @@ -56,208 +56,213 @@ import com.google.inject.name.Named; */ final public class S4DoTask extends App { - private final Logger logger = LoggerFactory.getLogger(S4DoTask.class); - Task task; + private final Logger logger = LoggerFactory.getLogger(S4DoTask.class); + Task task; - @Inject @Named("evalTask") public String evalTask; + @Inject + @Named("evalTask") + public String evalTask; - public S4DoTask() { - super(); - } - - /** The engine. */ - protected ComponentFactory componentFactory; - - /** - * Gets the factory. - * - * @return the factory - */ - public ComponentFactory getFactory() { - return componentFactory; - } + public S4DoTask() { + super(); + } - /** - * Sets the factory. - * - * @param factory - * the new factory - */ - public void setFactory(ComponentFactory factory) { - this.componentFactory = factory; - } + /** The engine. */ + protected ComponentFactory componentFactory; - /* - * Build the application - * - * @see org.apache.s4.core.App#onInit() - */ - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#onInit() - */ - @Override - protected void onInit() { - logger.info("DoTaskApp onInit"); - // ConsoleReporters prints S4 metrics - // MetricsRegistry mr = new MetricsRegistry(); - // - // CsvReporter.enable(new File(System.getProperty("user.home") - // + "/monitor/"), 10, TimeUnit.SECONDS); - // ConsoleReporter.enable(10, TimeUnit.SECONDS); - try { - System.err.println(); - System.err.println(Globals.getWorkbenchInfoString()); - System.err.println(); + /** + * Gets the factory. + * + * @return the factory + */ + public ComponentFactory getFactory() { + return componentFactory; + } - } catch (Exception ex) { - ex.printStackTrace(); - } - S4ComponentFactory factory = new S4ComponentFactory(); - factory.setApp(this); + /** + * Sets the factory. + * + * @param factory + * the new factory + */ + public void setFactory(ComponentFactory factory) { + this.componentFactory = factory; + } - // logger.debug("LC {}", lc); + /* + * Build the application + * + * @see org.apache.s4.core.App#onInit() + */ + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#onInit() + */ + @Override + protected void onInit() { + logger.info("DoTaskApp onInit"); + // ConsoleReporters prints S4 metrics + // MetricsRegistry mr = new MetricsRegistry(); + // + // CsvReporter.enable(new File(System.getProperty("user.home") + // + "/monitor/"), 10, TimeUnit.SECONDS); + // ConsoleReporter.enable(10, TimeUnit.SECONDS); + try { + System.err.println(); + System.err.println(Globals.getWorkbenchInfoString()); + System.err.println(); - // task = TaskProvider.getTask(evalTask); + } catch (Exception ex) { + ex.printStackTrace(); + } + S4ComponentFactory factory = new S4ComponentFactory(); + factory.setApp(this); - // EXAMPLE OPTIONS - // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K - // 5 -N 0.0) - // String[] args = new String[] {evalTask,"-l", "Clustream","-g", - // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents", - // "-K", "5", "-N", "0.0)"}; - // String[] args = new String[] { evalTask, "-l", "clustream.Clustream", - // "-g", "clustream.Clustream", "-i", "100000", "-s", - // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" }; - logger.debug("PARAMETERS {}", evalTask); - // params = params.replace(":", " "); - List<String> parameters = new ArrayList<String>(); - // parameters.add(evalTask); - try { - parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, "UTF-8").split(" "))); - } catch (UnsupportedEncodingException ex) { - ex.printStackTrace(); - } - String[] args = parameters.toArray(new String[0]); - Option[] extraOptions = new Option[] {}; - // build a single string by concatenating cli options - StringBuilder cliString = new StringBuilder(); - for (int i = 0; i < args.length; i++) { - cliString.append(" ").append(args[i]); - } + // logger.debug("LC {}", lc); - // parse options - try { - task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions); - task.setFactory(factory); - task.init(); - } catch (Exception e) { - e.printStackTrace(); - } + // task = TaskProvider.getTask(evalTask); + // EXAMPLE OPTIONS + // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K + // 5 -N 0.0) + // String[] args = new String[] {evalTask,"-l", "Clustream","-g", + // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents", + // "-K", "5", "-N", "0.0)"}; + // String[] args = new String[] { evalTask, "-l", "clustream.Clustream", + // "-g", "clustream.Clustream", "-i", "100000", "-s", + // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" }; + logger.debug("PARAMETERS {}", evalTask); + // params = params.replace(":", " "); + List<String> parameters = new ArrayList<String>(); + // parameters.add(evalTask); + try { + parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, "UTF-8").split(" "))); + } catch (UnsupportedEncodingException ex) { + ex.printStackTrace(); + } + String[] args = parameters.toArray(new String[0]); + Option[] extraOptions = new Option[] {}; + // build a single string by concatenating cli options + StringBuilder cliString = new StringBuilder(); + for (int i = 0; i < args.length; i++) { + cliString.append(" ").append(args[i]); } - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#onStart() - */ - @Override - protected void onStart() { - logger.info("Starting DoTaskApp... App Partition [{}]", this.getPartitionId()); - // <<<<<<< HEAD Task doesn't have start in latest storm-impl - // TODO change the way the app starts - // if (this.getPartitionId() == 0) - S4Topology s4topology = (S4Topology) getTask().getTopology(); - S4EntranceProcessingItem epi = (S4EntranceProcessingItem) s4topology.getEntranceProcessingItem(); - while (epi.injectNextEvent()) - // inject events from the EntrancePI - ; + // parse options + try { + task = (Task) ClassOption.cliStringToObject(cliString.toString(), Task.class, extraOptions); + task.setFactory(factory); + task.init(); + } catch (Exception e) { + e.printStackTrace(); } - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#onClose() - */ - @Override - protected void onClose() { - System.out.println("Closing DoTaskApp..."); + } - } + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#onStart() + */ + @Override + protected void onStart() { + logger.info("Starting DoTaskApp... App Partition [{}]", this.getPartitionId()); + // <<<<<<< HEAD Task doesn't have start in latest storm-impl + // TODO change the way the app starts + // if (this.getPartitionId() == 0) + S4Topology s4topology = (S4Topology) getTask().getTopology(); + S4EntranceProcessingItem epi = (S4EntranceProcessingItem) s4topology.getEntranceProcessingItem(); + while (epi.injectNextEvent()) + // inject events from the EntrancePI + ; + } - /** - * Gets the task. - * - * @return the task - */ - public Task getTask() { - return task; - } + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#onClose() + */ + @Override + protected void onClose() { + System.out.println("Closing DoTaskApp..."); - // These methods are protected in App and can not be accessed from outside. - // They are - // called from parallel classifiers and evaluations. Is there a better way - // to do that? + } - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#createPE(java.lang.Class) - */ - @Override - public <T extends ProcessingElement> T createPE(Class<T> type) { - return super.createPE(type); - } + /** + * Gets the task. + * + * @return the task + */ + public Task getTask() { + return task; + } - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#createStream(java.lang.String, org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[]) - */ - @Override - public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder, ProcessingElement... processingElements) { - return super.createStream(name, finder, processingElements); - } + // These methods are protected in App and can not be accessed from outside. + // They are + // called from parallel classifiers and evaluations. Is there a better way + // to do that? - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.App#createStream(java.lang.String, org.apache.s4.core.ProcessingElement[]) - */ - @Override - public <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) { - return super.createStream(name, processingElements); - } + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#createPE(java.lang.Class) + */ + @Override + public <T extends ProcessingElement> T createPE(Class<T> type) { + return super.createPE(type); + } - // @com.beust.jcommander.Parameters(separators = "=") - // class Parameters { - // - // @Parameter(names={"-lc","-local"}, description="Local clustering method") - // private String localClustering; - // - // @Parameter(names={"-gc","-global"}, - // description="Global clustering method") - // private String globalClustering; - // - // } - // - // class ParametersConverter {// implements IStringConverter<String[]> { - // - // - // public String[] convertToArgs(String value) { - // - // String[] params = value.split(","); - // String[] args = new String[params.length*2]; - // for(int i=0; i<params.length ; i++) { - // args[i] = params[i].split("=")[0]; - // args[i+1] = params[i].split("=")[1]; - // i++; - // } - // return args; - // } - // - // } + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#createStream(java.lang.String, + * org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[]) + */ + @Override + public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> finder, + ProcessingElement... processingElements) { + return super.createStream(name, finder, processingElements); + } + + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.App#createStream(java.lang.String, + * org.apache.s4.core.ProcessingElement[]) + */ + @Override + public <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) { + return super.createStream(name, processingElements); + } + + // @com.beust.jcommander.Parameters(separators = "=") + // class Parameters { + // + // @Parameter(names={"-lc","-local"}, description="Local clustering method") + // private String localClustering; + // + // @Parameter(names={"-gc","-global"}, + // description="Global clustering method") + // private String globalClustering; + // + // } + // + // class ParametersConverter {// implements IStringConverter<String[]> { + // + // + // public String[] convertToArgs(String value) { + // + // String[] params = value.split(","); + // String[] args = new String[params.length*2]; + // for(int i=0; i<params.length ; i++) { + // args[i] = params[i].split("=")[0]; + // args[i+1] = params[i].split("=")[1]; + // i++; + // } + // return args; + // } + // + // } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java index 2b0c595..6f374fa 100644 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4EntranceProcessingItem.java @@ -32,89 +32,89 @@ import com.yahoo.labs.samoa.topology.Stream; public class S4EntranceProcessingItem extends ProcessingElement implements EntranceProcessingItem { - private EntranceProcessor entranceProcessor; - // private S4DoTask app; - private int parallelism; - protected Stream outputStream; - - /** - * Constructor of an S4 entrance processing item. - * - * @param app - * : S4 application - */ - public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App app) { - super(app); - this.entranceProcessor = entranceProcessor; - // this.app = (S4DoTask) app; - // this.setSingleton(true); - } - - public void setParallelism(int parallelism) { - this.parallelism = parallelism; - } - - public int getParallelism() { - return this.parallelism; - } - - @Override - public EntranceProcessor getProcessor() { - return this.entranceProcessor; - } - - // - // @Override - // public void put(Instance inst) { - // // do nothing - // // may not needed - // } - - @Override - protected void onCreate() { - // was commented - if (this.entranceProcessor != null) { - // TODO revisit if we need to change it to a clone() call - this.entranceProcessor = (EntranceProcessor) this.entranceProcessor.newProcessor(this.entranceProcessor); - this.entranceProcessor.onCreate(Integer.parseInt(getId())); - } - } - - @Override - protected void onRemove() { - // do nothing - } - - // - // /** - // * Sets the entrance processing item processor. - // * - // * @param processor - // */ - // public void setProcessor(Processor processor) { - // this.entranceProcessor = processor; - // } - - @Override - public void setName(String name) { - super.setName(name); - } - - @Override - public EntranceProcessingItem setOutputStream(Stream stream) { - if (this.outputStream != null) - throw new IllegalStateException("Output stream for an EntrancePI sohuld be initialized only once"); - this.outputStream = stream; - return this; - } - - public boolean injectNextEvent() { - if (entranceProcessor.hasNext()) { - ContentEvent nextEvent = this.entranceProcessor.nextEvent(); - outputStream.put(nextEvent); - return entranceProcessor.hasNext(); - } else - return false; - // return !nextEvent.isLastEvent(); + private EntranceProcessor entranceProcessor; + // private S4DoTask app; + private int parallelism; + protected Stream outputStream; + + /** + * Constructor of an S4 entrance processing item. + * + * @param app + * : S4 application + */ + public S4EntranceProcessingItem(EntranceProcessor entranceProcessor, App app) { + super(app); + this.entranceProcessor = entranceProcessor; + // this.app = (S4DoTask) app; + // this.setSingleton(true); + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public int getParallelism() { + return this.parallelism; + } + + @Override + public EntranceProcessor getProcessor() { + return this.entranceProcessor; + } + + // + // @Override + // public void put(Instance inst) { + // // do nothing + // // may not needed + // } + + @Override + protected void onCreate() { + // was commented + if (this.entranceProcessor != null) { + // TODO revisit if we need to change it to a clone() call + this.entranceProcessor = (EntranceProcessor) this.entranceProcessor.newProcessor(this.entranceProcessor); + this.entranceProcessor.onCreate(Integer.parseInt(getId())); } + } + + @Override + protected void onRemove() { + // do nothing + } + + // + // /** + // * Sets the entrance processing item processor. + // * + // * @param processor + // */ + // public void setProcessor(Processor processor) { + // this.entranceProcessor = processor; + // } + + @Override + public void setName(String name) { + super.setName(name); + } + + @Override + public EntranceProcessingItem setOutputStream(Stream stream) { + if (this.outputStream != null) + throw new IllegalStateException("Output stream for an EntrancePI sohuld be initialized only once"); + this.outputStream = stream; + return this; + } + + public boolean injectNextEvent() { + if (entranceProcessor.hasNext()) { + ContentEvent nextEvent = this.entranceProcessor.nextEvent(); + outputStream.put(nextEvent); + return entranceProcessor.hasNext(); + } else + return false; + // return !nextEvent.isLastEvent(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java ---------------------------------------------------------------------- diff --git a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java index 8f8ad9f..62c623c 100644 --- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java +++ b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4Event.java @@ -36,55 +36,57 @@ import com.yahoo.labs.samoa.core.ContentEvent; @Immutable final public class S4Event extends Event { - private String key; - - public String getKey() { - return key; - } - - public void setKey(String key) { - this.key = key; - } - - /** The content event. */ - private ContentEvent contentEvent; - - /** - * Instantiates a new instance event. - */ - public S4Event() { - // Needed for serialization of kryo - } - - /** - * Instantiates a new instance event. - * - * @param contentEvent the content event - */ - public S4Event(ContentEvent contentEvent) { - if (contentEvent != null) { - this.contentEvent = contentEvent; - this.key = contentEvent.getKey(); - - } - } - - /** - * Gets the content event. - * - * @return the content event - */ - public ContentEvent getContentEvent() { - return contentEvent; - } - - /** - * Sets the content event. - * - * @param contentEvent the new content event - */ - public void setContentEvent(ContentEvent contentEvent) { - this.contentEvent = contentEvent; - } + private String key; + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + /** The content event. */ + private ContentEvent contentEvent; + + /** + * Instantiates a new instance event. + */ + public S4Event() { + // Needed for serialization of kryo + } + + /** + * Instantiates a new instance event. + * + * @param contentEvent + * the content event + */ + public S4Event(ContentEvent contentEvent) { + if (contentEvent != null) { + this.contentEvent = contentEvent; + this.key = contentEvent.getKey(); + + } + } + + /** + * Gets the content event. + * + * @return the content event + */ + public ContentEvent getContentEvent() { + return contentEvent; + } + + /** + * Sets the content event. + * + * @param contentEvent + * the new content event + */ + public void setContentEvent(ContentEvent contentEvent) { + this.contentEvent = contentEvent; + } }
