http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ControlContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ControlContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ControlContentEvent.java index 201ef88..e7b8653 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ControlContentEvent.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ControlContentEvent.java @@ -23,49 +23,51 @@ package com.yahoo.labs.samoa.learners.classifiers.trees; import com.yahoo.labs.samoa.core.ContentEvent; /** - * Abstract class to represent ContentEvent to control Local Statistic Processor. + * Abstract class to represent ContentEvent to control Local Statistic + * Processor. + * * @author Arinto Murdopo - * + * */ abstract class ControlContentEvent implements ContentEvent { - /** + /** * */ - private static final long serialVersionUID = 5837375639629708363L; + private static final long serialVersionUID = 5837375639629708363L; + + protected final long learningNodeId; + + public ControlContentEvent() { + this.learningNodeId = -1; + } + + ControlContentEvent(long id) { + this.learningNodeId = id; + } + + @Override + public final String getKey() { + return null; + } + + @Override + public void setKey(String str) { + // Do nothing + } + + @Override + public boolean isLastEvent() { + return false; + } + + final long getLearningNodeId() { + return this.learningNodeId; + } + + abstract LocStatControl getType(); - protected final long learningNodeId; - - public ControlContentEvent(){ - this.learningNodeId = -1; - } - - ControlContentEvent(long id){ - this.learningNodeId = id; - } - - @Override - public final String getKey() { - return null; - } - - @Override - public void setKey(String str){ - //Do nothing - } - - @Override - public boolean isLastEvent(){ - return false; - } - - final long getLearningNodeId(){ - return this.learningNodeId; - } - - abstract LocStatControl getType(); - - static enum LocStatControl { - COMPUTE, DELETE - } + static enum LocStatControl { + COMPUTE, DELETE + } }
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/DeleteContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/DeleteContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/DeleteContentEvent.java index c721255..52631bd 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/DeleteContentEvent.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/DeleteContentEvent.java @@ -21,25 +21,27 @@ package com.yahoo.labs.samoa.learners.classifiers.trees; */ /** - * Delete Content Event is the content event that is sent by Model Aggregator Processor - * to delete unnecessary statistic in Local Statistic Processor. + * Delete Content Event is the content event that is sent by Model Aggregator + * Processor to delete unnecessary statistic in Local Statistic Processor. + * * @author Arinto Murdopo - * + * */ final class DeleteContentEvent extends ControlContentEvent { - private static final long serialVersionUID = -2105250722560863633L; + private static final long serialVersionUID = -2105250722560863633L; + + public DeleteContentEvent() { + super(-1); + } - public DeleteContentEvent(){ - super(-1); - } - - DeleteContentEvent(long id) { - super(id); } + DeleteContentEvent(long id) { + super(id); + } - @Override - LocStatControl getType() { - return LocStatControl.DELETE; - } + @Override + LocStatControl getType() { + return LocStatControl.DELETE; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FilterProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FilterProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FilterProcessor.java index b6a73c6..d7f2cae 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FilterProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FilterProcessor.java @@ -20,7 +20,6 @@ package com.yahoo.labs.samoa.learners.classifiers.trees; * #L% */ - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,155 +36,159 @@ import java.util.LinkedList; import java.util.List; /** - * Filter Processor that stores and filters the instances before - * sending them to the Model Aggregator Processor. - + * Filter Processor that stores and filters the instances before sending them to + * the Model Aggregator Processor. + * * @author Arinto Murdopo - * + * */ final class FilterProcessor implements Processor { - private static final long serialVersionUID = -1685875718300564885L; - private static final Logger logger = LoggerFactory.getLogger(FilterProcessor.class); - - private int processorId; - - private final Instances dataset; - private InstancesHeader modelContext; - - //available streams - private Stream outputStream; - - //private constructor based on Builder pattern - private FilterProcessor(Builder builder){ - this.dataset = builder.dataset; - this.batchSize = builder.batchSize; - this.delay = builder.delay; - } - - private int waitingInstances = 0; - - private int delay = 0; - - private int batchSize = 200; - - private List<InstanceContentEvent> contentEventList = new LinkedList<InstanceContentEvent>(); - - @Override - public boolean process(ContentEvent event) { - //Receive a new instance from source - if(event instanceof InstanceContentEvent){ - InstanceContentEvent instanceContentEvent = (InstanceContentEvent) event; - this.contentEventList.add(instanceContentEvent); - this.waitingInstances++; - if (this.waitingInstances == this.batchSize || instanceContentEvent.isLastEvent()){ - //Send Instances - InstancesContentEvent outputEvent = new InstancesContentEvent(instanceContentEvent); - boolean isLastEvent = false; - while (!this.contentEventList.isEmpty()){ - InstanceContentEvent ice = this.contentEventList.remove(0); - Instance inst = ice.getInstance(); - outputEvent.add(inst); - if (!isLastEvent) { - isLastEvent = ice.isLastEvent(); - } - } - outputEvent.setLast(isLastEvent); - this.waitingInstances = 0; - this.outputStream.put(outputEvent); - if (this.delay > 0) { - try { - Thread.sleep(this.delay); - } catch(InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - } - } - return false; + private static final long serialVersionUID = -1685875718300564885L; + private static final Logger logger = LoggerFactory.getLogger(FilterProcessor.class); + + private int processorId; + + private final Instances dataset; + private InstancesHeader modelContext; + + // available streams + private Stream outputStream; + + // private constructor based on Builder pattern + private FilterProcessor(Builder builder) { + this.dataset = builder.dataset; + this.batchSize = builder.batchSize; + this.delay = builder.delay; + } + + private int waitingInstances = 0; + + private int delay = 0; + + private int batchSize = 200; + + private List<InstanceContentEvent> contentEventList = new LinkedList<InstanceContentEvent>(); + + @Override + public boolean process(ContentEvent event) { + // Receive a new instance from source + if (event instanceof InstanceContentEvent) { + InstanceContentEvent instanceContentEvent = (InstanceContentEvent) event; + this.contentEventList.add(instanceContentEvent); + this.waitingInstances++; + if (this.waitingInstances == this.batchSize || instanceContentEvent.isLastEvent()) { + // Send Instances + InstancesContentEvent outputEvent = new InstancesContentEvent(instanceContentEvent); + boolean isLastEvent = false; + while (!this.contentEventList.isEmpty()) { + InstanceContentEvent ice = this.contentEventList.remove(0); + Instance inst = ice.getInstance(); + outputEvent.add(inst); + if (!isLastEvent) { + isLastEvent = ice.isLastEvent(); + } } - - @Override - public void onCreate(int id) { - this.processorId = id; - this.waitingInstances = 0; - + outputEvent.setLast(isLastEvent); + this.waitingInstances = 0; + this.outputStream.put(outputEvent); + if (this.delay > 0) { + try { + Thread.sleep(this.delay); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } } + } + } + return false; + } + + @Override + public void onCreate(int id) { + this.processorId = id; + this.waitingInstances = 0; + + } + + @Override + public Processor newProcessor(Processor p) { + FilterProcessor oldProcessor = (FilterProcessor) p; + FilterProcessor newProcessor = + new FilterProcessor.Builder(oldProcessor).build(); + + newProcessor.setOutputStream(oldProcessor.outputStream); + return newProcessor; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(super.toString()); + return sb.toString(); + } + + void setOutputStream(Stream outputStream) { + this.outputStream = outputStream; + } + + /** + * Helper method to generate new ResultContentEvent based on an instance and + * its prediction result. + * + * @param prediction + * The predicted class label from the decision tree model. + * @param inEvent + * The associated instance content event + * @return ResultContentEvent to be sent into Evaluator PI or other + * destination PI. + */ + private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) { + ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), + inEvent.getClassId(), prediction, inEvent.isLastEvent()); + rce.setClassifierIndex(this.processorId); + rce.setEvaluationIndex(inEvent.getEvaluationIndex()); + return rce; + } + + /** + * Builder class to replace constructors with many parameters + * + * @author Arinto Murdopo + * + */ + static class Builder { + + // required parameters + private final Instances dataset; + + private int delay = 0; - @Override - public Processor newProcessor(Processor p) { - FilterProcessor oldProcessor = (FilterProcessor)p; - FilterProcessor newProcessor = - new FilterProcessor.Builder(oldProcessor).build(); - - newProcessor.setOutputStream(oldProcessor.outputStream); - return newProcessor; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(super.toString()); - return sb.toString(); - } - - void setOutputStream(Stream outputStream){ - this.outputStream = outputStream; - } - - - /** - * Helper method to generate new ResultContentEvent based on an instance and - * its prediction result. - * @param prediction The predicted class label from the decision tree model. - * @param inEvent The associated instance content event - * @return ResultContentEvent to be sent into Evaluator PI or other destination PI. - */ - private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent){ - ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), inEvent.getClassId(), prediction, inEvent.isLastEvent()); - rce.setClassifierIndex(this.processorId); - rce.setEvaluationIndex(inEvent.getEvaluationIndex()); - return rce; - } - - - /** - * Builder class to replace constructors with many parameters - * @author Arinto Murdopo - * - */ - static class Builder{ - - //required parameters - private final Instances dataset; - - private int delay = 0; - private int batchSize = 200; - Builder(Instances dataset){ - this.dataset = dataset; - } - - Builder(FilterProcessor oldProcessor){ - this.dataset = oldProcessor.dataset; - this.delay = oldProcessor.delay; - this.batchSize = oldProcessor.batchSize; - } - - public Builder delay(int delay){ - this.delay = delay; - return this; - } - - public Builder batchSize(int val){ - this.batchSize = val; - return this; - } - - FilterProcessor build(){ - return new FilterProcessor(this); - } - } - + Builder(Instances dataset) { + this.dataset = dataset; + } + + Builder(FilterProcessor oldProcessor) { + this.dataset = oldProcessor.dataset; + this.delay = oldProcessor.delay; + this.batchSize = oldProcessor.batchSize; + } + + public Builder delay(int delay) { + this.delay = delay; + return this; + } + + public Builder batchSize(int val) { + this.batchSize = val; + return this; + } + + FilterProcessor build() { + return new FilterProcessor(this); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FoundNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FoundNode.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FoundNode.java index 4123ea5..ba1c602 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FoundNode.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FoundNode.java @@ -21,57 +21,57 @@ package com.yahoo.labs.samoa.learners.classifiers.trees; */ /** - * Class that represents the necessary data structure of the node where an instance - * is routed/filtered through the decision tree model. + * Class that represents the necessary data structure of the node where an + * instance is routed/filtered through the decision tree model. * * @author Arinto Murdopo - * + * */ -final class FoundNode implements java.io.Serializable{ - - /** - * - */ - private static final long serialVersionUID = -637695387934143293L; - - private final Node node; - private final SplitNode parent; - private final int parentBranch; - - FoundNode(Node node, SplitNode splitNode, int parentBranch){ - this.node = node; - this.parent = splitNode; - this.parentBranch = parentBranch; - } - - /** - * Method to get the node where an instance is routed/filtered through the decision tree - * model for testing and training. - * - * @return The node where the instance is routed/filtered - */ - Node getNode(){ - return this.node; - } - - /** - * Method to get the parent of the node where an instance is routed/filtered through the decision tree - * model for testing and training - * - * @return The parent of the node - */ - SplitNode getParent(){ - return this.parent; - } - - /** - * Method to get the index of the node (where an instance is routed/filtered through the decision tree - * model for testing and training) in its parent. +final class FoundNode implements java.io.Serializable { + + /** * - * @return The index of the node in its parent node. */ - int getParentBranch(){ - return this.parentBranch; - } - + private static final long serialVersionUID = -637695387934143293L; + + private final Node node; + private final SplitNode parent; + private final int parentBranch; + + FoundNode(Node node, SplitNode splitNode, int parentBranch) { + this.node = node; + this.parent = splitNode; + this.parentBranch = parentBranch; + } + + /** + * Method to get the node where an instance is routed/filtered through the + * decision tree model for testing and training. + * + * @return The node where the instance is routed/filtered + */ + Node getNode() { + return this.node; + } + + /** + * Method to get the parent of the node where an instance is routed/filtered + * through the decision tree model for testing and training + * + * @return The parent of the node + */ + SplitNode getParent() { + return this.parent; + } + + /** + * Method to get the index of the node (where an instance is routed/filtered + * through the decision tree model for testing and training) in its parent. + * + * @return The index of the node in its parent node. + */ + int getParentBranch() { + return this.parentBranch; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/InactiveLearningNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/InactiveLearningNode.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/InactiveLearningNode.java index 82a05de..d979d1b 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/InactiveLearningNode.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/InactiveLearningNode.java @@ -23,34 +23,33 @@ package com.yahoo.labs.samoa.learners.classifiers.trees; import com.yahoo.labs.samoa.instances.Instance; /** - * Class that represents inactive learning node. Inactive learning node is - * a node which only keeps track of the observed class distribution. It does - * not store the statistic for splitting the node. + * Class that represents inactive learning node. Inactive learning node is a + * node which only keeps track of the observed class distribution. It does not + * store the statistic for splitting the node. * * @author Arinto Murdopo - * + * */ final class InactiveLearningNode extends LearningNode { - /** + /** * */ - private static final long serialVersionUID = -814552382883472302L; - - - InactiveLearningNode(double[] initialClassObservation) { - super(initialClassObservation); - } - - @Override - void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) { - this.observedClassDistribution.addToValue( - (int)inst.classValue(), inst.weight()); - } - - @Override - double[] getClassVotes(Instance inst, ModelAggregatorProcessor map) { - return this.observedClassDistribution.getArrayCopy(); - } + private static final long serialVersionUID = -814552382883472302L; + + InactiveLearningNode(double[] initialClassObservation) { + super(initialClassObservation); + } + + @Override + void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) { + this.observedClassDistribution.addToValue( + (int) inst.classValue(), inst.weight()); + } + + @Override + double[] getClassVotes(Instance inst, ModelAggregatorProcessor map) { + return this.observedClassDistribution.getArrayCopy(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LearningNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LearningNode.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LearningNode.java index 58de671..8014ce7 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LearningNode.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LearningNode.java @@ -24,32 +24,36 @@ import com.yahoo.labs.samoa.instances.Instance; /** * Abstract class that represents a learning node + * * @author Arinto Murdopo - * + * */ abstract class LearningNode extends Node { - private static final long serialVersionUID = 7157319356146764960L; - - protected LearningNode(double[] classObservation) { - super(classObservation); - } - - /** - * Method to process the instance for learning - * @param inst The processed instance - * @param proc The model aggregator processor where this learning node exists - */ - abstract void learnFromInstance(Instance inst, ModelAggregatorProcessor proc); - - @Override - protected boolean isLeaf(){ - return true; - } - - @Override - protected FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, - int parentBranch) { - return new FoundNode(this, parent, parentBranch); - } + private static final long serialVersionUID = 7157319356146764960L; + + protected LearningNode(double[] classObservation) { + super(classObservation); + } + + /** + * Method to process the instance for learning + * + * @param inst + * The processed instance + * @param proc + * The model aggregator processor where this learning node exists + */ + abstract void learnFromInstance(Instance inst, ModelAggregatorProcessor proc); + + @Override + protected boolean isLeaf() { + return true; + } + + @Override + protected FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, + int parentBranch) { + return new FoundNode(this, parent, parentBranch); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalResultContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalResultContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalResultContentEvent.java index 142d28a..10ea055 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalResultContentEvent.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalResultContentEvent.java @@ -24,69 +24,74 @@ import com.yahoo.labs.samoa.moa.classifiers.core.AttributeSplitSuggestion; import com.yahoo.labs.samoa.core.ContentEvent; /** - * Local Result Content Event is the content event that represents local + * Local Result Content Event is the content event that represents local * calculation of statistic in Local Statistic Processor. * * @author Arinto Murdopo - * + * */ -final class LocalResultContentEvent implements ContentEvent{ - - private static final long serialVersionUID = -4206620993777418571L; - - private final AttributeSplitSuggestion bestSuggestion; - private final AttributeSplitSuggestion secondBestSuggestion; - private final long splitId; - - public LocalResultContentEvent(){ - bestSuggestion = null; - secondBestSuggestion = null; - splitId = -1; - } - - LocalResultContentEvent(long splitId, AttributeSplitSuggestion best, AttributeSplitSuggestion secondBest){ - this.splitId = splitId; - this.bestSuggestion = best; - this.secondBestSuggestion = secondBest; - } - - @Override - public String getKey() { - return null; - } - - /** - * Method to return the best attribute split suggestion from this local statistic calculation. - * @return The best attribute split suggestion. - */ - AttributeSplitSuggestion getBestSuggestion(){ - return this.bestSuggestion; - } - - /** - * Method to return the second best attribute split suggestion from this local statistic calculation. - * @return The second best attribute split suggestion. - */ - AttributeSplitSuggestion getSecondBestSuggestion(){ - return this.secondBestSuggestion; - } - - /** - * Method to get the split ID of this local statistic calculation result - * @return The split id of this local calculation result - */ - long getSplitId(){ - return this.splitId; - } +final class LocalResultContentEvent implements ContentEvent { + + private static final long serialVersionUID = -4206620993777418571L; + + private final AttributeSplitSuggestion bestSuggestion; + private final AttributeSplitSuggestion secondBestSuggestion; + private final long splitId; + + public LocalResultContentEvent() { + bestSuggestion = null; + secondBestSuggestion = null; + splitId = -1; + } + + LocalResultContentEvent(long splitId, AttributeSplitSuggestion best, AttributeSplitSuggestion secondBest) { + this.splitId = splitId; + this.bestSuggestion = best; + this.secondBestSuggestion = secondBest; + } + + @Override + public String getKey() { + return null; + } + + /** + * Method to return the best attribute split suggestion from this local + * statistic calculation. + * + * @return The best attribute split suggestion. + */ + AttributeSplitSuggestion getBestSuggestion() { + return this.bestSuggestion; + } + + /** + * Method to return the second best attribute split suggestion from this local + * statistic calculation. + * + * @return The second best attribute split suggestion. + */ + AttributeSplitSuggestion getSecondBestSuggestion() { + return this.secondBestSuggestion; + } + + /** + * Method to get the split ID of this local statistic calculation result + * + * @return The split id of this local calculation result + */ + long getSplitId() { + return this.splitId; + } + + @Override + public void setKey(String str) { + // do nothing - @Override - public void setKey(String str) { - //do nothing - - } + } - @Override - public boolean isLastEvent() { - return false; - } + @Override + public boolean isLastEvent() { + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java index 25e5592..a6335e7 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java @@ -44,203 +44,203 @@ import com.yahoo.labs.samoa.core.Processor; import com.yahoo.labs.samoa.topology.Stream; /** - * Local Statistic Processor contains the local statistic of a subset of the attributes. + * Local Statistic Processor contains the local statistic of a subset of the + * attributes. + * * @author Arinto Murdopo - * + * */ final class LocalStatisticsProcessor implements Processor { - /** + /** * */ - private static final long serialVersionUID = -3967695130634517631L; - private static Logger logger = LoggerFactory.getLogger(LocalStatisticsProcessor.class); - - //Collection of AttributeObservers, for each ActiveLearningNode and AttributeId - private Table<Long, Integer, AttributeClassObserver> localStats; - - private Stream computationResultStream; - - private final SplitCriterion splitCriterion; - private final boolean binarySplit; - private final AttributeClassObserver nominalClassObserver; - private final AttributeClassObserver numericClassObserver; - - //the two observer classes below are also needed to be setup from the Tree - private LocalStatisticsProcessor(Builder builder){ - this.splitCriterion = builder.splitCriterion; - this.binarySplit = builder.binarySplit; - this.nominalClassObserver = builder.nominalClassObserver; - this.numericClassObserver = builder.numericClassObserver; + private static final long serialVersionUID = -3967695130634517631L; + private static Logger logger = LoggerFactory.getLogger(LocalStatisticsProcessor.class); + + // Collection of AttributeObservers, for each ActiveLearningNode and + // AttributeId + private Table<Long, Integer, AttributeClassObserver> localStats; + + private Stream computationResultStream; + + private final SplitCriterion splitCriterion; + private final boolean binarySplit; + private final AttributeClassObserver nominalClassObserver; + private final AttributeClassObserver numericClassObserver; + + // the two observer classes below are also needed to be setup from the Tree + private LocalStatisticsProcessor(Builder builder) { + this.splitCriterion = builder.splitCriterion; + this.binarySplit = builder.binarySplit; + this.nominalClassObserver = builder.nominalClassObserver; + this.numericClassObserver = builder.numericClassObserver; + } + + @Override + public boolean process(ContentEvent event) { + // process AttributeContentEvent by updating the subset of local statistics + if (event instanceof AttributeBatchContentEvent) { + AttributeBatchContentEvent abce = (AttributeBatchContentEvent) event; + List<ContentEvent> contentEventList = abce.getContentEventList(); + for (ContentEvent contentEvent : contentEventList) { + AttributeContentEvent ace = (AttributeContentEvent) contentEvent; + Long learningNodeId = ace.getLearningNodeId(); + Integer obsIndex = ace.getObsIndex(); + + AttributeClassObserver obs = localStats.get( + learningNodeId, obsIndex); + + if (obs == null) { + obs = ace.isNominal() ? newNominalClassObserver() + : newNumericClassObserver(); + localStats.put(ace.getLearningNodeId(), obsIndex, obs); + } + obs.observeAttributeClass(ace.getAttrVal(), ace.getClassVal(), + ace.getWeight()); + } + + /* + * if (event instanceof AttributeContentEvent) { AttributeContentEvent ace + * = (AttributeContentEvent) event; Long learningNodeId = + * Long.valueOf(ace.getLearningNodeId()); Integer obsIndex = + * Integer.valueOf(ace.getObsIndex()); + * + * AttributeClassObserver obs = localStats.get( learningNodeId, obsIndex); + * + * if (obs == null) { obs = ace.isNominal() ? newNominalClassObserver() : + * newNumericClassObserver(); localStats.put(ace.getLearningNodeId(), + * obsIndex, obs); } obs.observeAttributeClass(ace.getAttrVal(), + * ace.getClassVal(), ace.getWeight()); + */ + } else if (event instanceof ComputeContentEvent) { + // process ComputeContentEvent by calculating the local statistic + // and send back the calculation results via computation result stream. + ComputeContentEvent cce = (ComputeContentEvent) event; + Long learningNodeId = cce.getLearningNodeId(); + double[] preSplitDist = cce.getPreSplitDist(); + + Map<Integer, AttributeClassObserver> learningNodeRowMap = localStats + .row(learningNodeId); + List<AttributeSplitSuggestion> suggestions = new Vector<>(); + + for (Entry<Integer, AttributeClassObserver> entry : learningNodeRowMap.entrySet()) { + AttributeClassObserver obs = entry.getValue(); + AttributeSplitSuggestion suggestion = obs + .getBestEvaluatedSplitSuggestion(splitCriterion, + preSplitDist, entry.getKey(), binarySplit); + if (suggestion != null) { + suggestions.add(suggestion); + } + } + + AttributeSplitSuggestion[] bestSuggestions = suggestions + .toArray(new AttributeSplitSuggestion[suggestions.size()]); + + Arrays.sort(bestSuggestions); + + AttributeSplitSuggestion bestSuggestion = null; + AttributeSplitSuggestion secondBestSuggestion = null; + + if (bestSuggestions.length >= 1) { + bestSuggestion = bestSuggestions[bestSuggestions.length - 1]; + + if (bestSuggestions.length >= 2) { + secondBestSuggestion = bestSuggestions[bestSuggestions.length - 2]; + } + } + + // create the local result content event + LocalResultContentEvent lcre = + new LocalResultContentEvent(cce.getSplitId(), bestSuggestion, secondBestSuggestion); + computationResultStream.put(lcre); + logger.debug("Finish compute event"); + } else if (event instanceof DeleteContentEvent) { + DeleteContentEvent dce = (DeleteContentEvent) event; + Long learningNodeId = dce.getLearningNodeId(); + localStats.rowMap().remove(learningNodeId); } - - @Override - public boolean process(ContentEvent event) { - //process AttributeContentEvent by updating the subset of local statistics - if (event instanceof AttributeBatchContentEvent) { - AttributeBatchContentEvent abce = (AttributeBatchContentEvent) event; - List<ContentEvent> contentEventList = abce.getContentEventList(); - for (ContentEvent contentEvent: contentEventList ){ - AttributeContentEvent ace = (AttributeContentEvent) contentEvent; - Long learningNodeId = ace.getLearningNodeId(); - Integer obsIndex = ace.getObsIndex(); - - AttributeClassObserver obs = localStats.get( - learningNodeId, obsIndex); - - if (obs == null) { - obs = ace.isNominal() ? newNominalClassObserver() - : newNumericClassObserver(); - localStats.put(ace.getLearningNodeId(), obsIndex, obs); - } - obs.observeAttributeClass(ace.getAttrVal(), ace.getClassVal(), - ace.getWeight()); - } - - - /*if (event instanceof AttributeContentEvent) { - AttributeContentEvent ace = (AttributeContentEvent) event; - Long learningNodeId = Long.valueOf(ace.getLearningNodeId()); - Integer obsIndex = Integer.valueOf(ace.getObsIndex()); - - AttributeClassObserver obs = localStats.get( - learningNodeId, obsIndex); - - if (obs == null) { - obs = ace.isNominal() ? newNominalClassObserver() - : newNumericClassObserver(); - localStats.put(ace.getLearningNodeId(), obsIndex, obs); - } - obs.observeAttributeClass(ace.getAttrVal(), ace.getClassVal(), - ace.getWeight()); - */ - } else if (event instanceof ComputeContentEvent) { - //process ComputeContentEvent by calculating the local statistic - //and send back the calculation results via computation result stream. - ComputeContentEvent cce = (ComputeContentEvent) event; - Long learningNodeId = cce.getLearningNodeId(); - double[] preSplitDist = cce.getPreSplitDist(); - - Map<Integer, AttributeClassObserver> learningNodeRowMap = localStats - .row(learningNodeId); - List<AttributeSplitSuggestion> suggestions = new Vector<>(); - - for (Entry<Integer, AttributeClassObserver> entry : learningNodeRowMap.entrySet()) { - AttributeClassObserver obs = entry.getValue(); - AttributeSplitSuggestion suggestion = obs - .getBestEvaluatedSplitSuggestion(splitCriterion, - preSplitDist, entry.getKey(), binarySplit); - if(suggestion != null){ - suggestions.add(suggestion); - } - } - - AttributeSplitSuggestion[] bestSuggestions = suggestions - .toArray(new AttributeSplitSuggestion[suggestions.size()]); - - Arrays.sort(bestSuggestions); - - AttributeSplitSuggestion bestSuggestion = null; - AttributeSplitSuggestion secondBestSuggestion = null; - - if (bestSuggestions.length >= 1){ - bestSuggestion = bestSuggestions[bestSuggestions.length - 1]; - - if(bestSuggestions.length >= 2){ - secondBestSuggestion = bestSuggestions[bestSuggestions.length - 2]; - } - } - - //create the local result content event - LocalResultContentEvent lcre = - new LocalResultContentEvent(cce.getSplitId(), bestSuggestion, secondBestSuggestion); - computationResultStream.put(lcre); - logger.debug("Finish compute event"); - } else if (event instanceof DeleteContentEvent) { - DeleteContentEvent dce = (DeleteContentEvent) event; - Long learningNodeId = dce.getLearningNodeId(); - localStats.rowMap().remove(learningNodeId); - } - return false; - } - - @Override - public void onCreate(int id) { - this.localStats = HashBasedTable.create(); - } - - @Override - public Processor newProcessor(Processor p) { - LocalStatisticsProcessor oldProcessor = (LocalStatisticsProcessor) p; - LocalStatisticsProcessor newProcessor - = new LocalStatisticsProcessor.Builder(oldProcessor).build(); - - newProcessor.setComputationResultStream(oldProcessor.computationResultStream); - - return newProcessor; - } - - /** - * Method to set the computation result when using this processor to build - * a topology. - * @param computeStream - */ - void setComputationResultStream(Stream computeStream){ - this.computationResultStream = computeStream; - } - - private AttributeClassObserver newNominalClassObserver() { - return (AttributeClassObserver)this.nominalClassObserver.copy(); + return false; + } + + @Override + public void onCreate(int id) { + this.localStats = HashBasedTable.create(); + } + + @Override + public Processor newProcessor(Processor p) { + LocalStatisticsProcessor oldProcessor = (LocalStatisticsProcessor) p; + LocalStatisticsProcessor newProcessor = new LocalStatisticsProcessor.Builder(oldProcessor).build(); + + newProcessor.setComputationResultStream(oldProcessor.computationResultStream); + + return newProcessor; + } + + /** + * Method to set the computation result when using this processor to build a + * topology. + * + * @param computeStream + */ + void setComputationResultStream(Stream computeStream) { + this.computationResultStream = computeStream; + } + + private AttributeClassObserver newNominalClassObserver() { + return (AttributeClassObserver) this.nominalClassObserver.copy(); + } + + private AttributeClassObserver newNumericClassObserver() { + return (AttributeClassObserver) this.numericClassObserver.copy(); + } + + /** + * Builder class to replace constructors with many parameters + * + * @author Arinto Murdopo + * + */ + static class Builder { + + private SplitCriterion splitCriterion = new InfoGainSplitCriterion(); + private boolean binarySplit = false; + private AttributeClassObserver nominalClassObserver = new NominalAttributeClassObserver(); + private AttributeClassObserver numericClassObserver = new GaussianNumericAttributeClassObserver(); + + Builder() { + } - private AttributeClassObserver newNumericClassObserver() { - return (AttributeClassObserver)this.numericClassObserver.copy(); + Builder(LocalStatisticsProcessor oldProcessor) { + this.splitCriterion = oldProcessor.splitCriterion; + this.binarySplit = oldProcessor.binarySplit; } - - /** - * Builder class to replace constructors with many parameters - * @author Arinto Murdopo - * - */ - static class Builder{ - - private SplitCriterion splitCriterion = new InfoGainSplitCriterion(); - private boolean binarySplit = false; - private AttributeClassObserver nominalClassObserver = new NominalAttributeClassObserver(); - private AttributeClassObserver numericClassObserver = new GaussianNumericAttributeClassObserver(); - - Builder(){ - - } - - Builder(LocalStatisticsProcessor oldProcessor){ - this.splitCriterion = oldProcessor.splitCriterion; - this.binarySplit = oldProcessor.binarySplit; - } - - Builder splitCriterion(SplitCriterion splitCriterion){ - this.splitCriterion = splitCriterion; - return this; - } - - Builder binarySplit(boolean binarySplit){ - this.binarySplit = binarySplit; - return this; - } - - Builder nominalClassObserver(AttributeClassObserver nominalClassObserver){ - this.nominalClassObserver = nominalClassObserver; - return this; - } - - Builder numericClassObserver(AttributeClassObserver numericClassObserver){ - this.numericClassObserver = numericClassObserver; - return this; - } - - LocalStatisticsProcessor build(){ - return new LocalStatisticsProcessor(this); - } + + Builder splitCriterion(SplitCriterion splitCriterion) { + this.splitCriterion = splitCriterion; + return this; + } + + Builder binarySplit(boolean binarySplit) { + this.binarySplit = binarySplit; + return this; + } + + Builder nominalClassObserver(AttributeClassObserver nominalClassObserver) { + this.nominalClassObserver = nominalClassObserver; + return this; + } + + Builder numericClassObserver(AttributeClassObserver numericClassObserver) { + this.numericClassObserver = numericClassObserver; + return this; + } + + LocalStatisticsProcessor build() { + return new LocalStatisticsProcessor(this); } + } }
