http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java index cf7a1b3..b526a40 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java @@ -53,679 +53,706 @@ import com.yahoo.labs.samoa.topology.Stream; import static com.yahoo.labs.samoa.moa.core.Utils.maxIndex; /** - * Model Aggegator Processor consists of the decision tree model. It connects - * to local-statistic PI via attribute stream and control stream. - * Model-aggregator PI sends the split instances via attribute stream and - * it sends control messages to ask local-statistic PI to perform computation - * via control stream. + * Model Aggegator Processor consists of the decision tree model. It connects to + * local-statistic PI via attribute stream and control stream. Model-aggregator + * PI sends the split instances via attribute stream and it sends control + * messages to ask local-statistic PI to perform computation via control stream. + * + * Model-aggregator PI sends the classification result via result stream to an + * evaluator PI for classifier or other destination PI. The calculation results + * from local statistic arrive to the model-aggregator PI via computation-result + * stream. * - * Model-aggregator PI sends the classification result via result stream to - * an evaluator PI for classifier or other destination PI. The calculation - * results from local statistic arrive to the model-aggregator PI via - * computation-result stream. - * @author Arinto Murdopo - * + * */ final class ModelAggregatorProcessor implements Processor { - private static final long serialVersionUID = -1685875718300564886L; - private static final Logger logger = LoggerFactory.getLogger(ModelAggregatorProcessor.class); - - private int processorId; - - private Node treeRoot; - - private int activeLeafNodeCount; - private int inactiveLeafNodeCount; - private int decisionNodeCount; - private boolean growthAllowed; - - private final Instances dataset; - - //to support concurrent split - private long splitId; - private ConcurrentMap<Long, SplittingNodeInfo> splittingNodes; - private BlockingQueue<Long> timedOutSplittingNodes; - - //available streams - private Stream resultStream; - private Stream attributeStream; - private Stream controlStream; - - private transient ScheduledExecutorService executor; - - private final SplitCriterion splitCriterion; - private final double splitConfidence; - private final double tieThreshold; - private final int gracePeriod; - private final int parallelismHint; - private final long timeOut; - - //private constructor based on Builder pattern - private ModelAggregatorProcessor(Builder builder){ - this.dataset = builder.dataset; - this.splitCriterion = builder.splitCriterion; - this.splitConfidence = builder.splitConfidence; - this.tieThreshold = builder.tieThreshold; - this.gracePeriod = builder.gracePeriod; - this.parallelismHint = builder.parallelismHint; - this.timeOut = builder.timeOut; - this.changeDetector = builder.changeDetector; - - InstancesHeader ih = new InstancesHeader(dataset); - this.setModelContext(ih); - } - - @Override - public boolean process(ContentEvent event) { - - //Poll the blocking queue shared between ModelAggregator and the time-out threads - Long timedOutSplitId = timedOutSplittingNodes.poll(); - if(timedOutSplitId != null){ //time out has been reached! - SplittingNodeInfo splittingNode = splittingNodes.get(timedOutSplitId); - if (splittingNode != null) { - this.splittingNodes.remove(timedOutSplitId); - this.continueAttemptToSplit(splittingNode.activeLearningNode, - splittingNode.foundNode); - - } - - } - - //Receive a new instance from source - if(event instanceof InstancesContentEvent){ - InstancesContentEvent instancesEvent = (InstancesContentEvent) event; - this.processInstanceContentEvent(instancesEvent); - //Send information to local-statistic PI - //for each of the nodes - if (this.foundNodeSet != null){ - for (FoundNode foundNode: this.foundNodeSet ){ - ActiveLearningNode leafNode = (ActiveLearningNode) foundNode.getNode(); - AttributeBatchContentEvent[] abce = leafNode.getAttributeBatchContentEvent(); - if (abce != null) { - for (int i = 0; i< this.dataset.numAttributes() - 1; i++) { - this.sendToAttributeStream(abce[i]); - } - } - leafNode.setAttributeBatchContentEvent(null); - //this.sendToControlStream(event); //split information - //See if we can ask for splits - if(!leafNode.isSplitting()){ - double weightSeen = leafNode.getWeightSeen(); - //check whether it is the time for splitting - if(weightSeen - leafNode.getWeightSeenAtLastSplitEvaluation() >= this.gracePeriod){ - attemptToSplit(leafNode, foundNode); - } - } - } - } - this.foundNodeSet = null; - } else if(event instanceof LocalResultContentEvent){ - LocalResultContentEvent lrce = (LocalResultContentEvent) event; - Long lrceSplitId = lrce.getSplitId(); - SplittingNodeInfo splittingNodeInfo = splittingNodes.get(lrceSplitId); - - if (splittingNodeInfo != null) { // if null, that means - // activeLearningNode has been - // removed by timeout thread - ActiveLearningNode activeLearningNode = splittingNodeInfo.activeLearningNode; - - activeLearningNode.addDistributedSuggestions( - lrce.getBestSuggestion(), - lrce.getSecondBestSuggestion()); - - if (activeLearningNode.isAllSuggestionsCollected()) { - splittingNodeInfo.scheduledFuture.cancel(false); - this.splittingNodes.remove(lrceSplitId); - this.continueAttemptToSplit(activeLearningNode, - splittingNodeInfo.foundNode); - } - } - } - return false; - } - - protected Set<FoundNode> foundNodeSet; - - @Override - public void onCreate(int id) { - this.processorId = id; - - this.activeLeafNodeCount = 0; - this.inactiveLeafNodeCount = 0; - this.decisionNodeCount = 0; - this.growthAllowed = true; - - this.splittingNodes = new ConcurrentHashMap<>(); - this.timedOutSplittingNodes = new LinkedBlockingQueue<>(); - this.splitId = 0; - - //Executor for scheduling time-out threads - this.executor = Executors.newScheduledThreadPool(8); - } - - @Override - public Processor newProcessor(Processor p) { - ModelAggregatorProcessor oldProcessor = (ModelAggregatorProcessor)p; - ModelAggregatorProcessor newProcessor = - new ModelAggregatorProcessor.Builder(oldProcessor).build(); - - newProcessor.setResultStream(oldProcessor.resultStream); - newProcessor.setAttributeStream(oldProcessor.attributeStream); - newProcessor.setControlStream(oldProcessor.controlStream); - return newProcessor; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(super.toString()); - - sb.append("ActiveLeafNodeCount: ").append(activeLeafNodeCount); - sb.append("InactiveLeafNodeCount: ").append(inactiveLeafNodeCount); - sb.append("DecisionNodeCount: ").append(decisionNodeCount); - sb.append("Growth allowed: ").append(growthAllowed); - return sb.toString(); - } - - void setResultStream(Stream resultStream){ - this.resultStream = resultStream; - } - - void setAttributeStream(Stream attributeStream){ - this.attributeStream = attributeStream; - } - - void setControlStream(Stream controlStream){ - this.controlStream = controlStream; - } - - void sendToAttributeStream(ContentEvent event){ - this.attributeStream.put(event); - } - - void sendToControlStream(ContentEvent event){ - this.controlStream.put(event); - } - - /** - * Helper method to generate new ResultContentEvent based on an instance and - * its prediction result. - * @param prediction The predicted class label from the decision tree model. - * @param inEvent The associated instance content event - * @return ResultContentEvent to be sent into Evaluator PI or other destination PI. - */ - private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent){ - ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), - inEvent.getClassId(), prediction, inEvent.isLastEvent()); - rce.setClassifierIndex(this.processorId); - rce.setEvaluationIndex(inEvent.getEvaluationIndex()); - return rce; - } - - private ResultContentEvent newResultContentEvent(double[] prediction, Instance inst, InstancesContentEvent inEvent){ - ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inst, (int) inst.classValue(), prediction, inEvent.isLastEvent()); - rce.setClassifierIndex(this.processorId); - rce.setEvaluationIndex(inEvent.getEvaluationIndex()); - return rce; - } - - private List<InstancesContentEvent> contentEventList = new LinkedList<>(); - - - /** - * Helper method to process the InstanceContentEvent - * @param instContentEvent - */ - private void processInstanceContentEvent(InstancesContentEvent instContentEvent){ - this.numBatches++; - this.contentEventList.add(instContentEvent); - if (this.numBatches == 1 || this.numBatches > 4){ - this.processInstances(this.contentEventList.remove(0)); - } + private static final long serialVersionUID = -1685875718300564886L; + private static final Logger logger = LoggerFactory.getLogger(ModelAggregatorProcessor.class); + + private int processorId; + + private Node treeRoot; + + private int activeLeafNodeCount; + private int inactiveLeafNodeCount; + private int decisionNodeCount; + private boolean growthAllowed; + + private final Instances dataset; + + // to support concurrent split + private long splitId; + private ConcurrentMap<Long, SplittingNodeInfo> splittingNodes; + private BlockingQueue<Long> timedOutSplittingNodes; + + // available streams + private Stream resultStream; + private Stream attributeStream; + private Stream controlStream; + + private transient ScheduledExecutorService executor; + + private final SplitCriterion splitCriterion; + private final double splitConfidence; + private final double tieThreshold; + private final int gracePeriod; + private final int parallelismHint; + private final long timeOut; + + // private constructor based on Builder pattern + private ModelAggregatorProcessor(Builder builder) { + this.dataset = builder.dataset; + this.splitCriterion = builder.splitCriterion; + this.splitConfidence = builder.splitConfidence; + this.tieThreshold = builder.tieThreshold; + this.gracePeriod = builder.gracePeriod; + this.parallelismHint = builder.parallelismHint; + this.timeOut = builder.timeOut; + this.changeDetector = builder.changeDetector; + + InstancesHeader ih = new InstancesHeader(dataset); + this.setModelContext(ih); + } + + @Override + public boolean process(ContentEvent event) { + + // Poll the blocking queue shared between ModelAggregator and the time-out + // threads + Long timedOutSplitId = timedOutSplittingNodes.poll(); + if (timedOutSplitId != null) { // time out has been reached! + SplittingNodeInfo splittingNode = splittingNodes.get(timedOutSplitId); + if (splittingNode != null) { + this.splittingNodes.remove(timedOutSplitId); + this.continueAttemptToSplit(splittingNode.activeLearningNode, + splittingNode.foundNode); + + } + + } - if (instContentEvent.isLastEvent()) { - // drain remaining instances - while (!contentEventList.isEmpty()) { - processInstances(contentEventList.remove(0)); - } + // Receive a new instance from source + if (event instanceof InstancesContentEvent) { + InstancesContentEvent instancesEvent = (InstancesContentEvent) event; + this.processInstanceContentEvent(instancesEvent); + // Send information to local-statistic PI + // for each of the nodes + if (this.foundNodeSet != null) { + for (FoundNode foundNode : this.foundNodeSet) { + ActiveLearningNode leafNode = (ActiveLearningNode) foundNode.getNode(); + AttributeBatchContentEvent[] abce = leafNode.getAttributeBatchContentEvent(); + if (abce != null) { + for (int i = 0; i < this.dataset.numAttributes() - 1; i++) { + this.sendToAttributeStream(abce[i]); } - - } - - private int numBatches = 0; - - private void processInstances(InstancesContentEvent instContentEvent){ - - Instance[] instances = instContentEvent.getInstances(); - boolean isTesting = instContentEvent.isTesting(); - boolean isTraining= instContentEvent.isTraining(); - for (Instance inst: instances){ - this.processInstance(inst,instContentEvent, isTesting, isTraining); + } + leafNode.setAttributeBatchContentEvent(null); + // this.sendToControlStream(event); //split information + // See if we can ask for splits + if (!leafNode.isSplitting()) { + double weightSeen = leafNode.getWeightSeen(); + // check whether it is the time for splitting + if (weightSeen - leafNode.getWeightSeenAtLastSplitEvaluation() >= this.gracePeriod) { + attemptToSplit(leafNode, foundNode); } + } } - - private void processInstance(Instance inst, InstancesContentEvent instContentEvent, boolean isTesting, boolean isTraining){ - inst.setDataset(this.dataset); - //Check the instance whether it is used for testing or training - //boolean testAndTrain = isTraining; //Train after testing - double[] prediction = null; - if (isTesting) { - prediction = getVotesForInstance(inst, false); - this.resultStream.put(newResultContentEvent(prediction, inst, - instContentEvent)); - } - - if (isTraining) { - trainOnInstanceImpl(inst); - if (this.changeDetector != null) { - if (prediction == null) { - prediction = getVotesForInstance(inst); - } - boolean correctlyClassifies = this.correctlyClassifies(inst,prediction); - double oldEstimation = this.changeDetector.getEstimation(); - this.changeDetector.input(correctlyClassifies ? 0 : 1); - if (this.changeDetector.getEstimation() > oldEstimation) { - //Start a new classifier - logger.info("Change detected, resetting the classifier"); - this.resetLearning(); - this.changeDetector.resetLearning(); - } - } - } - } - - private boolean correctlyClassifies(Instance inst, double[] prediction) { - return maxIndex(prediction) == (int) inst.classValue(); + } + this.foundNodeSet = null; + } else if (event instanceof LocalResultContentEvent) { + LocalResultContentEvent lrce = (LocalResultContentEvent) event; + Long lrceSplitId = lrce.getSplitId(); + SplittingNodeInfo splittingNodeInfo = splittingNodes.get(lrceSplitId); + + if (splittingNodeInfo != null) { // if null, that means + // activeLearningNode has been + // removed by timeout thread + ActiveLearningNode activeLearningNode = splittingNodeInfo.activeLearningNode; + + activeLearningNode.addDistributedSuggestions( + lrce.getBestSuggestion(), + lrce.getSecondBestSuggestion()); + + if (activeLearningNode.isAllSuggestionsCollected()) { + splittingNodeInfo.scheduledFuture.cancel(false); + this.splittingNodes.remove(lrceSplitId); + this.continueAttemptToSplit(activeLearningNode, + splittingNodeInfo.foundNode); } - - private void resetLearning() { - this.treeRoot = null; - //Remove nodes - FoundNode[] learningNodes = findNodes(); - for (FoundNode learningNode : learningNodes) { - Node node = learningNode.getNode(); - if (node instanceof SplitNode) { - SplitNode splitNode; - splitNode = (SplitNode) node; - for (int i = 0; i < splitNode.numChildren(); i++) { - splitNode.setChild(i, null); - } - } - } + } + } + return false; + } + + protected Set<FoundNode> foundNodeSet; + + @Override + public void onCreate(int id) { + this.processorId = id; + + this.activeLeafNodeCount = 0; + this.inactiveLeafNodeCount = 0; + this.decisionNodeCount = 0; + this.growthAllowed = true; + + this.splittingNodes = new ConcurrentHashMap<>(); + this.timedOutSplittingNodes = new LinkedBlockingQueue<>(); + this.splitId = 0; + + // Executor for scheduling time-out threads + this.executor = Executors.newScheduledThreadPool(8); + } + + @Override + public Processor newProcessor(Processor p) { + ModelAggregatorProcessor oldProcessor = (ModelAggregatorProcessor) p; + ModelAggregatorProcessor newProcessor = + new ModelAggregatorProcessor.Builder(oldProcessor).build(); + + newProcessor.setResultStream(oldProcessor.resultStream); + newProcessor.setAttributeStream(oldProcessor.attributeStream); + newProcessor.setControlStream(oldProcessor.controlStream); + return newProcessor; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(super.toString()); + + sb.append("ActiveLeafNodeCount: ").append(activeLeafNodeCount); + sb.append("InactiveLeafNodeCount: ").append(inactiveLeafNodeCount); + sb.append("DecisionNodeCount: ").append(decisionNodeCount); + sb.append("Growth allowed: ").append(growthAllowed); + return sb.toString(); + } + + void setResultStream(Stream resultStream) { + this.resultStream = resultStream; + } + + void setAttributeStream(Stream attributeStream) { + this.attributeStream = attributeStream; + } + + void setControlStream(Stream controlStream) { + this.controlStream = controlStream; + } + + void sendToAttributeStream(ContentEvent event) { + this.attributeStream.put(event); + } + + void sendToControlStream(ContentEvent event) { + this.controlStream.put(event); + } + + /** + * Helper method to generate new ResultContentEvent based on an instance and + * its prediction result. + * + * @param prediction + * The predicted class label from the decision tree model. + * @param inEvent + * The associated instance content event + * @return ResultContentEvent to be sent into Evaluator PI or other + * destination PI. + */ + private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) { + ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), + inEvent.getClassId(), prediction, inEvent.isLastEvent()); + rce.setClassifierIndex(this.processorId); + rce.setEvaluationIndex(inEvent.getEvaluationIndex()); + return rce; + } + + private ResultContentEvent newResultContentEvent(double[] prediction, Instance inst, InstancesContentEvent inEvent) { + ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inst, (int) inst.classValue(), + prediction, inEvent.isLastEvent()); + rce.setClassifierIndex(this.processorId); + rce.setEvaluationIndex(inEvent.getEvaluationIndex()); + return rce; + } + + private List<InstancesContentEvent> contentEventList = new LinkedList<>(); + + /** + * Helper method to process the InstanceContentEvent + * + * @param instContentEvent + */ + private void processInstanceContentEvent(InstancesContentEvent instContentEvent) { + this.numBatches++; + this.contentEventList.add(instContentEvent); + if (this.numBatches == 1 || this.numBatches > 4) { + this.processInstances(this.contentEventList.remove(0)); + } + + if (instContentEvent.isLastEvent()) { + // drain remaining instances + while (!contentEventList.isEmpty()) { + processInstances(contentEventList.remove(0)); + } + } + + } + + private int numBatches = 0; + + private void processInstances(InstancesContentEvent instContentEvent) { + + Instance[] instances = instContentEvent.getInstances(); + boolean isTesting = instContentEvent.isTesting(); + boolean isTraining = instContentEvent.isTraining(); + for (Instance inst : instances) { + this.processInstance(inst, instContentEvent, isTesting, isTraining); + } + } + + private void processInstance(Instance inst, InstancesContentEvent instContentEvent, boolean isTesting, + boolean isTraining) { + inst.setDataset(this.dataset); + // Check the instance whether it is used for testing or training + // boolean testAndTrain = isTraining; //Train after testing + double[] prediction = null; + if (isTesting) { + prediction = getVotesForInstance(inst, false); + this.resultStream.put(newResultContentEvent(prediction, inst, + instContentEvent)); + } + + if (isTraining) { + trainOnInstanceImpl(inst); + if (this.changeDetector != null) { + if (prediction == null) { + prediction = getVotesForInstance(inst); } - - protected FoundNode[] findNodes() { - List<FoundNode> foundList = new LinkedList<>(); - findNodes(this.treeRoot, null, -1, foundList); - return foundList.toArray(new FoundNode[foundList.size()]); - } - - protected void findNodes(Node node, SplitNode parent, - int parentBranch, List<FoundNode> found) { - if (node != null) { - found.add(new FoundNode(node, parent, parentBranch)); - if (node instanceof SplitNode) { - SplitNode splitNode = (SplitNode) node; - for (int i = 0; i < splitNode.numChildren(); i++) { - findNodes(splitNode.getChild(i), splitNode, i, - found); - } - } + boolean correctlyClassifies = this.correctlyClassifies(inst, prediction); + double oldEstimation = this.changeDetector.getEstimation(); + this.changeDetector.input(correctlyClassifies ? 0 : 1); + if (this.changeDetector.getEstimation() > oldEstimation) { + // Start a new classifier + logger.info("Change detected, resetting the classifier"); + this.resetLearning(); + this.changeDetector.resetLearning(); } + } } - - - /** - * Helper method to get the prediction result. - * The actual prediction result is delegated to the leaf node. - * @param inst - * @return - */ - private double[] getVotesForInstance(Instance inst){ - return getVotesForInstance(inst, false); + } + + private boolean correctlyClassifies(Instance inst, double[] prediction) { + return maxIndex(prediction) == (int) inst.classValue(); + } + + private void resetLearning() { + this.treeRoot = null; + // Remove nodes + FoundNode[] learningNodes = findNodes(); + for (FoundNode learningNode : learningNodes) { + Node node = learningNode.getNode(); + if (node instanceof SplitNode) { + SplitNode splitNode; + splitNode = (SplitNode) node; + for (int i = 0; i < splitNode.numChildren(); i++) { + splitNode.setChild(i, null); } - - private double[] getVotesForInstance(Instance inst, boolean isTraining){ - double[] ret; - FoundNode foundNode = null; - if(this.treeRoot != null){ - foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1); - Node leafNode = foundNode.getNode(); - if(leafNode == null){ - leafNode = foundNode.getParent(); - } - - ret = leafNode.getClassVotes(inst, this); - } else { - int numClasses = this.dataset.numClasses(); - ret = new double[numClasses]; - - } - - //Training after testing to speed up the process - if (isTraining){ - if(this.treeRoot == null){ - this.treeRoot = newLearningNode(this.parallelismHint); + } + } + } + + protected FoundNode[] findNodes() { + List<FoundNode> foundList = new LinkedList<>(); + findNodes(this.treeRoot, null, -1, foundList); + return foundList.toArray(new FoundNode[foundList.size()]); + } + + protected void findNodes(Node node, SplitNode parent, + int parentBranch, List<FoundNode> found) { + if (node != null) { + found.add(new FoundNode(node, parent, parentBranch)); + if (node instanceof SplitNode) { + SplitNode splitNode = (SplitNode) node; + for (int i = 0; i < splitNode.numChildren(); i++) { + findNodes(splitNode.getChild(i), splitNode, i, + found); + } + } + } + } + + /** + * Helper method to get the prediction result. The actual prediction result is + * delegated to the leaf node. + * + * @param inst + * @return + */ + private double[] getVotesForInstance(Instance inst) { + return getVotesForInstance(inst, false); + } + + private double[] getVotesForInstance(Instance inst, boolean isTraining) { + double[] ret; + FoundNode foundNode = null; + if (this.treeRoot != null) { + foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1); + Node leafNode = foundNode.getNode(); + if (leafNode == null) { + leafNode = foundNode.getParent(); + } + + ret = leafNode.getClassVotes(inst, this); + } else { + int numClasses = this.dataset.numClasses(); + ret = new double[numClasses]; + + } + + // Training after testing to speed up the process + if (isTraining) { + if (this.treeRoot == null) { + this.treeRoot = newLearningNode(this.parallelismHint); this.activeLeafNodeCount = 1; foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1); - } - trainOnInstanceImpl(foundNode, inst); - } - return ret; - } - - /** - * Helper method that represent training of an instance. Since it is decision tree, - * this method routes the incoming instance into the correct leaf and then update the - * statistic on the found leaf. - * @param inst - */ - private void trainOnInstanceImpl(Instance inst) { - if(this.treeRoot == null){ - this.treeRoot = newLearningNode(this.parallelismHint); - this.activeLeafNodeCount = 1; - - } - FoundNode foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1); - trainOnInstanceImpl(foundNode, inst); + } + trainOnInstanceImpl(foundNode, inst); + } + return ret; + } + + /** + * Helper method that represent training of an instance. Since it is decision + * tree, this method routes the incoming instance into the correct leaf and + * then update the statistic on the found leaf. + * + * @param inst + */ + private void trainOnInstanceImpl(Instance inst) { + if (this.treeRoot == null) { + this.treeRoot = newLearningNode(this.parallelismHint); + this.activeLeafNodeCount = 1; + + } + FoundNode foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1); + trainOnInstanceImpl(foundNode, inst); + } + + private void trainOnInstanceImpl(FoundNode foundNode, Instance inst) { + + Node leafNode = foundNode.getNode(); + + if (leafNode == null) { + leafNode = newLearningNode(this.parallelismHint); + foundNode.getParent().setChild(foundNode.getParentBranch(), leafNode); + activeLeafNodeCount++; + } + + if (leafNode instanceof LearningNode) { + LearningNode learningNode = (LearningNode) leafNode; + learningNode.learnFromInstance(inst, this); + } + if (this.foundNodeSet == null) { + this.foundNodeSet = new HashSet<>(); + } + this.foundNodeSet.add(foundNode); + } + + /** + * Helper method to represent a split attempt + * + * @param activeLearningNode + * The corresponding active learning node which will be split + * @param foundNode + * The data structure to represents the filtering of the instance + * using the tree model. + */ + private void attemptToSplit(ActiveLearningNode activeLearningNode, FoundNode foundNode) { + // Increment the split ID + this.splitId++; + + // Schedule time-out thread + ScheduledFuture<?> timeOutHandler = this.executor.schedule(new AggregationTimeOutHandler(this.splitId, + this.timedOutSplittingNodes), + this.timeOut, TimeUnit.SECONDS); + + // Keep track of the splitting node information, so that we can continue the + // split + // once we receive all local statistic calculation from Local Statistic PI + // this.splittingNodes.put(Long.valueOf(this.splitId), new + // SplittingNodeInfo(activeLearningNode, foundNode, null)); + this.splittingNodes.put(this.splitId, new SplittingNodeInfo(activeLearningNode, foundNode, timeOutHandler)); + + // Inform Local Statistic PI to perform local statistic calculation + activeLearningNode.requestDistributedSuggestions(this.splitId, this); + } + + /** + * Helper method to continue the attempt to split once all local calculation + * results are received. + * + * @param activeLearningNode + * The corresponding active learning node which will be split + * @param foundNode + * The data structure to represents the filtering of the instance + * using the tree model. + */ + private void continueAttemptToSplit(ActiveLearningNode activeLearningNode, FoundNode foundNode) { + AttributeSplitSuggestion bestSuggestion = activeLearningNode.getDistributedBestSuggestion(); + AttributeSplitSuggestion secondBestSuggestion = activeLearningNode.getDistributedSecondBestSuggestion(); + + // compare with null split + double[] preSplitDist = activeLearningNode.getObservedClassDistribution(); + AttributeSplitSuggestion nullSplit = new AttributeSplitSuggestion(null, + new double[0][], this.splitCriterion.getMeritOfSplit( + preSplitDist, + new double[][] { preSplitDist })); + + if ((bestSuggestion == null) || (nullSplit.compareTo(bestSuggestion) > 0)) { + secondBestSuggestion = bestSuggestion; + bestSuggestion = nullSplit; + } else { + if ((secondBestSuggestion == null) || (nullSplit.compareTo(secondBestSuggestion) > 0)) { + secondBestSuggestion = nullSplit; + } + } + + boolean shouldSplit = false; + + if (secondBestSuggestion == null) { + shouldSplit = (bestSuggestion != null); + } else { + double hoeffdingBound = computeHoeffdingBound( + this.splitCriterion.getRangeOfMerit(activeLearningNode.getObservedClassDistribution()), + this.splitConfidence, + activeLearningNode.getWeightSeen()); + + if ((bestSuggestion.merit - secondBestSuggestion.merit > hoeffdingBound) + || (hoeffdingBound < tieThreshold)) { + shouldSplit = true; + } + // TODO: add poor attributes removal + } + + SplitNode parent = foundNode.getParent(); + int parentBranch = foundNode.getParentBranch(); + + // split if the Hoeffding bound condition is satisfied + if (shouldSplit) { + + if (bestSuggestion.splitTest != null) { + SplitNode newSplit = new SplitNode(bestSuggestion.splitTest, activeLearningNode.getObservedClassDistribution()); + + for (int i = 0; i < bestSuggestion.numSplits(); i++) { + Node newChild = newLearningNode(bestSuggestion.resultingClassDistributionFromSplit(i), this.parallelismHint); + newSplit.setChild(i, newChild); } - - private void trainOnInstanceImpl(FoundNode foundNode, Instance inst) { - - Node leafNode = foundNode.getNode(); - - if(leafNode == null){ - leafNode = newLearningNode(this.parallelismHint); - foundNode.getParent().setChild(foundNode.getParentBranch(), leafNode); - activeLeafNodeCount++; - } - - if(leafNode instanceof LearningNode){ - LearningNode learningNode = (LearningNode) leafNode; - learningNode.learnFromInstance(inst, this); - } - if (this.foundNodeSet == null){ - this.foundNodeSet = new HashSet<>(); - } - this.foundNodeSet.add(foundNode); - } - - /** - * Helper method to represent a split attempt - * @param activeLearningNode The corresponding active learning node which will be split - * @param foundNode The data structure to represents the filtering of the instance using the - * tree model. - */ - private void attemptToSplit(ActiveLearningNode activeLearningNode, FoundNode foundNode){ - //Increment the split ID - this.splitId++; - - //Schedule time-out thread - ScheduledFuture<?> timeOutHandler = this.executor.schedule(new AggregationTimeOutHandler(this.splitId, this.timedOutSplittingNodes), - this.timeOut, TimeUnit.SECONDS); - - //Keep track of the splitting node information, so that we can continue the split - //once we receive all local statistic calculation from Local Statistic PI - //this.splittingNodes.put(Long.valueOf(this.splitId), new SplittingNodeInfo(activeLearningNode, foundNode, null)); - this.splittingNodes.put(this.splitId, new SplittingNodeInfo(activeLearningNode, foundNode, timeOutHandler)); - - //Inform Local Statistic PI to perform local statistic calculation - activeLearningNode.requestDistributedSuggestions(this.splitId, this); - } - - - /** - * Helper method to continue the attempt to split once all local calculation results are received. - * @param activeLearningNode The corresponding active learning node which will be split - * @param foundNode The data structure to represents the filtering of the instance using the - * tree model. - */ - private void continueAttemptToSplit(ActiveLearningNode activeLearningNode, FoundNode foundNode){ - AttributeSplitSuggestion bestSuggestion = activeLearningNode.getDistributedBestSuggestion(); - AttributeSplitSuggestion secondBestSuggestion = activeLearningNode.getDistributedSecondBestSuggestion(); - - //compare with null split - double[] preSplitDist = activeLearningNode.getObservedClassDistribution(); - AttributeSplitSuggestion nullSplit = new AttributeSplitSuggestion(null, - new double[0][], this.splitCriterion.getMeritOfSplit( - preSplitDist, - new double[][]{preSplitDist})); - - if((bestSuggestion == null) || (nullSplit.compareTo(bestSuggestion) > 0)){ - secondBestSuggestion = bestSuggestion; - bestSuggestion = nullSplit; - }else{ - if((secondBestSuggestion == null) || (nullSplit.compareTo(secondBestSuggestion) > 0)){ - secondBestSuggestion = nullSplit; - } - } - - boolean shouldSplit = false; - - if(secondBestSuggestion == null){ - shouldSplit = (bestSuggestion != null); - }else{ - double hoeffdingBound = computeHoeffdingBound( - this.splitCriterion.getRangeOfMerit(activeLearningNode.getObservedClassDistribution()), - this.splitConfidence, - activeLearningNode.getWeightSeen()); - - if((bestSuggestion.merit - secondBestSuggestion.merit > hoeffdingBound) - || (hoeffdingBound < tieThreshold)) { - shouldSplit = true; - } - //TODO: add poor attributes removal - } - - SplitNode parent = foundNode.getParent(); - int parentBranch = foundNode.getParentBranch(); - - //split if the Hoeffding bound condition is satisfied - if(shouldSplit){ - - if (bestSuggestion.splitTest != null) { - SplitNode newSplit = new SplitNode(bestSuggestion.splitTest, activeLearningNode.getObservedClassDistribution()); - - for(int i = 0; i < bestSuggestion.numSplits(); i++){ - Node newChild = newLearningNode(bestSuggestion.resultingClassDistributionFromSplit(i), this.parallelismHint); - newSplit.setChild(i, newChild); - } - - this.activeLeafNodeCount--; - this.decisionNodeCount++; - this.activeLeafNodeCount += bestSuggestion.numSplits(); - - if(parent == null){ - this.treeRoot = newSplit; - }else{ - parent.setChild(parentBranch, newSplit); - } - } - //TODO: add check on the model's memory size - } - - //housekeeping - activeLearningNode.endSplitting(); - activeLearningNode.setWeightSeenAtLastSplitEvaluation(activeLearningNode.getWeightSeen()); - } - - /** - * Helper method to deactivate learning node - * @param toDeactivate Active Learning Node that will be deactivated - * @param parent Parent of the soon-to-be-deactivated Active LearningNode - * @param parentBranch the branch index of the node in the parent node - */ - private void deactivateLearningNode(ActiveLearningNode toDeactivate, SplitNode parent, int parentBranch){ - Node newLeaf = new InactiveLearningNode(toDeactivate.getObservedClassDistribution()); - if(parent == null){ - this.treeRoot = newLeaf; - }else{ - parent.setChild(parentBranch, newLeaf); - } - - this.activeLeafNodeCount--; - this.inactiveLeafNodeCount++; - } - - - private LearningNode newLearningNode(int parallelismHint){ - return newLearningNode(new double[0], parallelismHint); - } - - private LearningNode newLearningNode(double[] initialClassObservations, int parallelismHint){ - //for VHT optimization, we need to dynamically instantiate the appropriate ActiveLearningNode - return new ActiveLearningNode(initialClassObservations, parallelismHint); - } - - /** - * Helper method to set the model context, i.e. how many attributes they are and what is the class index - * @param ih - */ - private void setModelContext(InstancesHeader ih){ - //TODO possibly refactored - if ((ih != null) && (ih.classIndex() < 0)) { - throw new IllegalArgumentException( - "Context for a classifier must include a class to learn"); + + this.activeLeafNodeCount--; + this.decisionNodeCount++; + this.activeLeafNodeCount += bestSuggestion.numSplits(); + + if (parent == null) { + this.treeRoot = newSplit; + } else { + parent.setChild(parentBranch, newSplit); } - //TODO: check flag for checking whether training has started or not - - //model context is used to describe the model - logger.trace("Model context: {}", ih.toString()); - } - - private static double computeHoeffdingBound(double range, double confidence, double n){ - return Math.sqrt((Math.pow(range, 2.0) * Math.log(1.0/confidence)) / (2.0*n)); - } - - /** - * AggregationTimeOutHandler is a class to support time-out feature while waiting for local computation results - * from the local statistic PIs. - * @author Arinto Murdopo - * - */ - static class AggregationTimeOutHandler implements Runnable{ - - private static final Logger logger = LoggerFactory.getLogger(AggregationTimeOutHandler.class); - private final Long splitId; - private final BlockingQueue<Long> toBeSplittedNodes; - - AggregationTimeOutHandler(Long splitId, BlockingQueue<Long> toBeSplittedNodes){ - this.splitId = splitId; - this.toBeSplittedNodes = toBeSplittedNodes; - } - - @Override - public void run() { - logger.debug("Time out is reached. AggregationTimeOutHandler is started."); - try { - toBeSplittedNodes.put(splitId); - } catch (InterruptedException e) { - logger.warn("Interrupted while trying to put the ID into the queue"); - } - logger.debug("AggregationTimeOutHandler is finished."); - } - } - - /** - * SplittingNodeInfo is a class to represents the ActiveLearningNode that is splitting - * @author Arinto Murdopo - * - */ - static class SplittingNodeInfo{ - - private final ActiveLearningNode activeLearningNode; - private final FoundNode foundNode; - private final ScheduledFuture<?> scheduledFuture; - - SplittingNodeInfo(ActiveLearningNode activeLearningNode, FoundNode foundNode, ScheduledFuture<?> scheduledFuture){ - this.activeLearningNode = activeLearningNode; - this.foundNode = foundNode; - this.scheduledFuture = scheduledFuture; - } - } - - protected ChangeDetector changeDetector; - - public ChangeDetector getChangeDetector() { - return this.changeDetector; - } - - public void setChangeDetector(ChangeDetector cd) { - this.changeDetector = cd; - } - - /** - * Builder class to replace constructors with many parameters - * @author Arinto Murdopo - * - */ - static class Builder{ - - //required parameters - private final Instances dataset; - - //default values - private SplitCriterion splitCriterion = new InfoGainSplitCriterion(); - private double splitConfidence = 0.0000001; - private double tieThreshold = 0.05; - private int gracePeriod = 200; - private int parallelismHint = 1; - private long timeOut = 30; - private ChangeDetector changeDetector = null; - - Builder(Instances dataset){ - this.dataset = dataset; - } - - Builder(ModelAggregatorProcessor oldProcessor){ - this.dataset = oldProcessor.dataset; - this.splitCriterion = oldProcessor.splitCriterion; - this.splitConfidence = oldProcessor.splitConfidence; - this.tieThreshold = oldProcessor.tieThreshold; - this.gracePeriod = oldProcessor.gracePeriod; - this.parallelismHint = oldProcessor.parallelismHint; - this.timeOut = oldProcessor.timeOut; - } - - Builder splitCriterion(SplitCriterion splitCriterion){ - this.splitCriterion = splitCriterion; - return this; - } - - Builder splitConfidence(double splitConfidence){ - this.splitConfidence = splitConfidence; - return this; - } - - Builder tieThreshold(double tieThreshold){ - this.tieThreshold = tieThreshold; - return this; - } - - Builder gracePeriod(int gracePeriod){ - this.gracePeriod = gracePeriod; - return this; - } - - Builder parallelismHint(int parallelismHint){ - this.parallelismHint = parallelismHint; - return this; - } - - Builder timeOut(long timeOut){ - this.timeOut = timeOut; - return this; - } - - Builder changeDetector(ChangeDetector changeDetector){ - this.changeDetector = changeDetector; - return this; - } - ModelAggregatorProcessor build(){ - return new ModelAggregatorProcessor(this); - } - } - + } + // TODO: add check on the model's memory size + } + + // housekeeping + activeLearningNode.endSplitting(); + activeLearningNode.setWeightSeenAtLastSplitEvaluation(activeLearningNode.getWeightSeen()); + } + + /** + * Helper method to deactivate learning node + * + * @param toDeactivate + * Active Learning Node that will be deactivated + * @param parent + * Parent of the soon-to-be-deactivated Active LearningNode + * @param parentBranch + * the branch index of the node in the parent node + */ + private void deactivateLearningNode(ActiveLearningNode toDeactivate, SplitNode parent, int parentBranch) { + Node newLeaf = new InactiveLearningNode(toDeactivate.getObservedClassDistribution()); + if (parent == null) { + this.treeRoot = newLeaf; + } else { + parent.setChild(parentBranch, newLeaf); + } + + this.activeLeafNodeCount--; + this.inactiveLeafNodeCount++; + } + + private LearningNode newLearningNode(int parallelismHint) { + return newLearningNode(new double[0], parallelismHint); + } + + private LearningNode newLearningNode(double[] initialClassObservations, int parallelismHint) { + // for VHT optimization, we need to dynamically instantiate the appropriate + // ActiveLearningNode + return new ActiveLearningNode(initialClassObservations, parallelismHint); + } + + /** + * Helper method to set the model context, i.e. how many attributes they are + * and what is the class index + * + * @param ih + */ + private void setModelContext(InstancesHeader ih) { + // TODO possibly refactored + if ((ih != null) && (ih.classIndex() < 0)) { + throw new IllegalArgumentException( + "Context for a classifier must include a class to learn"); + } + // TODO: check flag for checking whether training has started or not + + // model context is used to describe the model + logger.trace("Model context: {}", ih.toString()); + } + + private static double computeHoeffdingBound(double range, double confidence, double n) { + return Math.sqrt((Math.pow(range, 2.0) * Math.log(1.0 / confidence)) / (2.0 * n)); + } + + /** + * AggregationTimeOutHandler is a class to support time-out feature while + * waiting for local computation results from the local statistic PIs. + * + * @author Arinto Murdopo + * + */ + static class AggregationTimeOutHandler implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(AggregationTimeOutHandler.class); + private final Long splitId; + private final BlockingQueue<Long> toBeSplittedNodes; + + AggregationTimeOutHandler(Long splitId, BlockingQueue<Long> toBeSplittedNodes) { + this.splitId = splitId; + this.toBeSplittedNodes = toBeSplittedNodes; + } + + @Override + public void run() { + logger.debug("Time out is reached. AggregationTimeOutHandler is started."); + try { + toBeSplittedNodes.put(splitId); + } catch (InterruptedException e) { + logger.warn("Interrupted while trying to put the ID into the queue"); + } + logger.debug("AggregationTimeOutHandler is finished."); + } + } + + /** + * SplittingNodeInfo is a class to represents the ActiveLearningNode that is + * splitting + * + * @author Arinto Murdopo + * + */ + static class SplittingNodeInfo { + + private final ActiveLearningNode activeLearningNode; + private final FoundNode foundNode; + private final ScheduledFuture<?> scheduledFuture; + + SplittingNodeInfo(ActiveLearningNode activeLearningNode, FoundNode foundNode, ScheduledFuture<?> scheduledFuture) { + this.activeLearningNode = activeLearningNode; + this.foundNode = foundNode; + this.scheduledFuture = scheduledFuture; + } + } + + protected ChangeDetector changeDetector; + + public ChangeDetector getChangeDetector() { + return this.changeDetector; + } + + public void setChangeDetector(ChangeDetector cd) { + this.changeDetector = cd; + } + + /** + * Builder class to replace constructors with many parameters + * + * @author Arinto Murdopo + * + */ + static class Builder { + + // required parameters + private final Instances dataset; + + // default values + private SplitCriterion splitCriterion = new InfoGainSplitCriterion(); + private double splitConfidence = 0.0000001; + private double tieThreshold = 0.05; + private int gracePeriod = 200; + private int parallelismHint = 1; + private long timeOut = 30; + private ChangeDetector changeDetector = null; + + Builder(Instances dataset) { + this.dataset = dataset; + } + + Builder(ModelAggregatorProcessor oldProcessor) { + this.dataset = oldProcessor.dataset; + this.splitCriterion = oldProcessor.splitCriterion; + this.splitConfidence = oldProcessor.splitConfidence; + this.tieThreshold = oldProcessor.tieThreshold; + this.gracePeriod = oldProcessor.gracePeriod; + this.parallelismHint = oldProcessor.parallelismHint; + this.timeOut = oldProcessor.timeOut; + } + + Builder splitCriterion(SplitCriterion splitCriterion) { + this.splitCriterion = splitCriterion; + return this; + } + + Builder splitConfidence(double splitConfidence) { + this.splitConfidence = splitConfidence; + return this; + } + + Builder tieThreshold(double tieThreshold) { + this.tieThreshold = tieThreshold; + return this; + } + + Builder gracePeriod(int gracePeriod) { + this.gracePeriod = gracePeriod; + return this; + } + + Builder parallelismHint(int parallelismHint) { + this.parallelismHint = parallelismHint; + return this; + } + + Builder timeOut(long timeOut) { + this.timeOut = timeOut; + return this; + } + + Builder changeDetector(ChangeDetector changeDetector) { + this.changeDetector = changeDetector; + return this; + } + + ModelAggregatorProcessor build() { + return new ModelAggregatorProcessor(this); + } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java index ff9bf5f..22e551f 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java @@ -25,70 +25,83 @@ import com.yahoo.labs.samoa.instances.Instance; /** * Abstract class that represents a node in the tree model. + * * @author Arinto Murdopo - * + * */ -abstract class Node implements java.io.Serializable{ - - private static final long serialVersionUID = 4008521239214180548L; - - protected final DoubleVector observedClassDistribution; - - /** - * Method to route/filter an instance into its corresponding leaf. This method will be - * invoked recursively. - * @param inst Instance to be routed - * @param parent Parent of the current node - * @param parentBranch The index of the current node in the parent - * @return FoundNode which is the data structure to represent the resulting leaf. - */ - abstract FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int parentBranch); - - /** - * Method to return the predicted class of the instance based on the statistic - * inside the node. - * - * @param inst To-be-predicted instance - * @param map ModelAggregatorProcessor - * @return The prediction result in the form of class distribution - */ - abstract double[] getClassVotes(Instance inst, ModelAggregatorProcessor map); - - /** - * Method to check whether the node is a leaf node or not. - * @return Boolean flag to indicate whether the node is a leaf or not - */ - abstract boolean isLeaf(); - - - /** - * Constructor of the tree node - * @param classObservation distribution of the observed classes. - */ - protected Node(double[] classObservation){ - this.observedClassDistribution = new DoubleVector(classObservation); - } - - /** - * Getter method for the class distribution - * @return Observed class distribution - */ - protected double[] getObservedClassDistribution() { - return this.observedClassDistribution.getArrayCopy(); - } - - /** - * A method to check whether the class distribution only consists of one class or not. - * @return Flag whether class distribution is pure or not. - */ - protected boolean observedClassDistributionIsPure(){ - return (observedClassDistribution.numNonZeroEntries() < 2); - } - - protected void describeSubtree(ModelAggregatorProcessor modelAggrProc, StringBuilder out, int indent){ - //TODO: implement method to gracefully define the tree - } - - //TODO: calculate promise for limiting the model based on the memory size - //double calculatePromise(); +abstract class Node implements java.io.Serializable { + + private static final long serialVersionUID = 4008521239214180548L; + + protected final DoubleVector observedClassDistribution; + + /** + * Method to route/filter an instance into its corresponding leaf. This method + * will be invoked recursively. + * + * @param inst + * Instance to be routed + * @param parent + * Parent of the current node + * @param parentBranch + * The index of the current node in the parent + * @return FoundNode which is the data structure to represent the resulting + * leaf. + */ + abstract FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int parentBranch); + + /** + * Method to return the predicted class of the instance based on the statistic + * inside the node. + * + * @param inst + * To-be-predicted instance + * @param map + * ModelAggregatorProcessor + * @return The prediction result in the form of class distribution + */ + abstract double[] getClassVotes(Instance inst, ModelAggregatorProcessor map); + + /** + * Method to check whether the node is a leaf node or not. + * + * @return Boolean flag to indicate whether the node is a leaf or not + */ + abstract boolean isLeaf(); + + /** + * Constructor of the tree node + * + * @param classObservation + * distribution of the observed classes. + */ + protected Node(double[] classObservation) { + this.observedClassDistribution = new DoubleVector(classObservation); + } + + /** + * Getter method for the class distribution + * + * @return Observed class distribution + */ + protected double[] getObservedClassDistribution() { + return this.observedClassDistribution.getArrayCopy(); + } + + /** + * A method to check whether the class distribution only consists of one class + * or not. + * + * @return Flag whether class distribution is pure or not. + */ + protected boolean observedClassDistributionIsPure() { + return (observedClassDistribution.numNonZeroEntries() < 2); + } + + protected void describeSubtree(ModelAggregatorProcessor modelAggrProc, StringBuilder out, int indent) { + // TODO: implement method to gracefully define the tree + } + + // TODO: calculate promise for limiting the model based on the memory size + // double calculatePromise(); } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java index fd93db1..7c6b434 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java @@ -25,84 +25,94 @@ import com.yahoo.labs.samoa.moa.core.AutoExpandVector; import com.yahoo.labs.samoa.instances.Instance; /** - * SplitNode represents the node that contains one or more questions in the decision tree model, - * in order to route the instances into the correct leaf. + * SplitNode represents the node that contains one or more questions in the + * decision tree model, in order to route the instances into the correct leaf. + * * @author Arinto Murdopo - * + * */ public class SplitNode extends Node { - - private static final long serialVersionUID = -7380795529928485792L; - - private final AutoExpandVector<Node> children; - protected final InstanceConditionalTest splitTest; - - public SplitNode(InstanceConditionalTest splitTest, - double[] classObservation) { - super(classObservation); - this.children = new AutoExpandVector<>(); - this.splitTest = splitTest; - } - @Override - FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int parentBranch) { - int childIndex = instanceChildIndex(inst); - if(childIndex >= 0){ - Node child = getChild(childIndex); - if(child != null){ - return child.filterInstanceToLeaf(inst, this, childIndex); - } - return new FoundNode(null, this, childIndex); - } - return new FoundNode(this, parent, parentBranch); - } + private static final long serialVersionUID = -7380795529928485792L; + + private final AutoExpandVector<Node> children; + protected final InstanceConditionalTest splitTest; + + public SplitNode(InstanceConditionalTest splitTest, + double[] classObservation) { + super(classObservation); + this.children = new AutoExpandVector<>(); + this.splitTest = splitTest; + } + + @Override + FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int parentBranch) { + int childIndex = instanceChildIndex(inst); + if (childIndex >= 0) { + Node child = getChild(childIndex); + if (child != null) { + return child.filterInstanceToLeaf(inst, this, childIndex); + } + return new FoundNode(null, this, childIndex); + } + return new FoundNode(this, parent, parentBranch); + } + + @Override + boolean isLeaf() { + return false; + } + + @Override + double[] getClassVotes(Instance inst, ModelAggregatorProcessor vht) { + return this.observedClassDistribution.getArrayCopy(); + } + + /** + * Method to return the number of children of this split node + * + * @return number of children + */ + int numChildren() { + return this.children.size(); + } + + /** + * Method to set the children in a specific index of the SplitNode with the + * appropriate child + * + * @param index + * Index of the child in the SplitNode + * @param child + * The child node + */ + void setChild(int index, Node child) { + if ((this.splitTest.maxBranches() >= 0) + && (index >= this.splitTest.maxBranches())) { + throw new IndexOutOfBoundsException(); + } + this.children.set(index, child); + } + + /** + * Method to get the child node given the index + * + * @param index + * The child node index + * @return The child node in the given index + */ + Node getChild(int index) { + return this.children.get(index); + } - @Override - boolean isLeaf() { - return false; - } - - @Override - double[] getClassVotes(Instance inst, ModelAggregatorProcessor vht) { - return this.observedClassDistribution.getArrayCopy(); - } - - /** - * Method to return the number of children of this split node - * @return number of children - */ - int numChildren(){ - return this.children.size(); - } - - /** - * Method to set the children in a specific index of the SplitNode with the appropriate child - * @param index Index of the child in the SplitNode - * @param child The child node - */ - void setChild(int index, Node child){ - if ((this.splitTest.maxBranches() >= 0) - && (index >= this.splitTest.maxBranches())) { - throw new IndexOutOfBoundsException(); - } - this.children.set(index, child); - } - - /** - * Method to get the child node given the index - * @param index The child node index - * @return The child node in the given index - */ - Node getChild(int index){ - return this.children.get(index); - } - - /** - * Method to route the instance using this split node - * @param inst The routed instance - * @return The index of the branch where the instance is routed - */ - int instanceChildIndex(Instance inst){ - return this.splitTest.branchForInstance(inst); - } + /** + * Method to route the instance using this split node + * + * @param inst + * The routed instance + * @return The index of the branch where the instance is routed + */ + int instanceChildIndex(Instance inst) { + return this.splitTest.branchForInstance(inst); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java index e8ccce7..990a15b 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java @@ -46,7 +46,7 @@ import com.yahoo.labs.samoa.topology.TopologyBuilder; * Vertical Hoeffding Tree (VHT) classifier is a distributed classifier that * utilizes vertical parallelism on top of Very Fast Decision Tree (VFDT) * classifier. - * + * * @author Arinto Murdopo */ public final class VerticalHoeffdingTree implements ClassificationLearner, AdaptiveLearner, Configurable { @@ -110,7 +110,6 @@ public final class VerticalHoeffdingTree implements ClassificationLearner, Adapt Stream filterStream = topologyBuilder.createStream(filterProc); this.filterProc.setOutputStream(filterStream); - ModelAggregatorProcessor modelAggrProc = new ModelAggregatorProcessor.Builder(dataset) .splitCriterion((SplitCriterion) this.splitCriterionOption.getValue()) .splitConfidence(splitConfidenceOption.getValue()) @@ -175,7 +174,7 @@ public final class VerticalHoeffdingTree implements ClassificationLearner, Adapt static class LearningNodeIdGenerator { - //TODO: add code to warn user of when value reaches Long.MAX_VALUES + // TODO: add code to warn user of when value reaches Long.MAX_VALUES private static long id = 0; static synchronized long generate() { http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java index a0d950b..4b7b1f6 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java @@ -30,60 +30,61 @@ import com.yahoo.labs.samoa.instances.Instance; @Immutable final public class ClusteringContentEvent implements ContentEvent { - private static final long serialVersionUID = -7746983521296618922L; - private Instance instance; - private boolean isLast = false; - private String key; - private boolean isSample; + private static final long serialVersionUID = -7746983521296618922L; + private Instance instance; + private boolean isLast = false; + private String key; + private boolean isSample; - public ClusteringContentEvent() { - // Necessary for kryo serializer - } + public ClusteringContentEvent() { + // Necessary for kryo serializer + } - /** - * Instantiates a new clustering event. - * - * @param index - * the index - * @param instance - * the instance + /** + * Instantiates a new clustering event. + * + * @param index + * the index + * @param instance + * the instance + */ + public ClusteringContentEvent(long index, Instance instance) { + /* + * if (instance != null) { this.instance = new + * SerializableInstance(instance); } */ - public ClusteringContentEvent(long index, Instance instance) { - /* - * if (instance != null) { this.instance = new SerializableInstance(instance); } - */ - this.instance = instance; - this.setKey(Long.toString(index)); - } + this.instance = instance; + this.setKey(Long.toString(index)); + } - @Override - public String getKey() { - return this.key; - } + @Override + public String getKey() { + return this.key; + } - @Override - public void setKey(String str) { - this.key = str; - } + @Override + public void setKey(String str) { + this.key = str; + } - @Override - public boolean isLastEvent() { - return this.isLast; - } + @Override + public boolean isLastEvent() { + return this.isLast; + } - public void setLast(boolean isLast) { - this.isLast = isLast; - } + public void setLast(boolean isLast) { + this.isLast = isLast; + } - public Instance getInstance() { - return this.instance; - } + public Instance getInstance() { + return this.instance; + } - public boolean isSample() { - return isSample; - } + public boolean isSample() { + return isSample; + } - public void setSample(boolean b) { - this.isSample = b; - } + public void setSample(boolean b) { + this.isSample = b; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java index 057e37b..829d448 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java @@ -31,136 +31,142 @@ import com.yahoo.labs.samoa.moa.cluster.Clustering; import com.yahoo.labs.samoa.moa.clusterers.clustream.Clustream; /** - * - * Base class for adapting Clustream clusterer. - * + * + * Base class for adapting Clustream clusterer. + * */ public class ClustreamClustererAdapter implements LocalClustererAdapter, Configurable { - /** + /** * */ - private static final long serialVersionUID = 4372366401338704353L; - - public ClassOption learnerOption = new ClassOption("learner", 'l', - "Clusterer to train.", com.yahoo.labs.samoa.moa.clusterers.Clusterer.class, Clustream.class.getName()); - /** - * The learner. - */ - protected com.yahoo.labs.samoa.moa.clusterers.Clusterer learner; - - /** - * The is init. - */ - protected Boolean isInit; - - /** - * The dataset. - */ - protected Instances dataset; + private static final long serialVersionUID = 4372366401338704353L; - @Override - public void setDataset(Instances dataset) { - this.dataset = dataset; - } + public ClassOption learnerOption = new ClassOption("learner", 'l', + "Clusterer to train.", com.yahoo.labs.samoa.moa.clusterers.Clusterer.class, Clustream.class.getName()); + /** + * The learner. + */ + protected com.yahoo.labs.samoa.moa.clusterers.Clusterer learner; - /** - * Instantiates a new learner. - * - * @param learner the learner - * @param dataset the dataset - */ - public ClustreamClustererAdapter(com.yahoo.labs.samoa.moa.clusterers.Clusterer learner, Instances dataset) { - this.learner = learner.copy(); - this.isInit = false; - this.dataset = dataset; - } + /** + * The is init. + */ + protected Boolean isInit; - /** - * Instantiates a new learner. - * - * @param learner the learner - * @param dataset the dataset - */ - public ClustreamClustererAdapter() { - this.learner = ((com.yahoo.labs.samoa.moa.clusterers.Clusterer) this.learnerOption.getValue()).copy(); - this.isInit = false; - //this.dataset = dataset; - } + /** + * The dataset. + */ + protected Instances dataset; - /** - * Creates a new learner object. - * - * @return the learner - */ - @Override - public ClustreamClustererAdapter create() { - ClustreamClustererAdapter l = new ClustreamClustererAdapter(learner, dataset); - if (dataset == null) { - System.out.println("dataset null while creating"); - } - return l; - } + @Override + public void setDataset(Instances dataset) { + this.dataset = dataset; + } - /** - * Trains this classifier incrementally using the given instance. - * - * @param inst the instance to be used for training - */ - @Override - public void trainOnInstance(Instance inst) { - if (this.isInit == false) { - this.isInit = true; - InstancesHeader instances = new InstancesHeader(dataset); - this.learner.setModelContext(instances); - this.learner.prepareForUse(); - } - if (inst.weight() > 0) { - inst.setDataset(dataset); - learner.trainOnInstance(inst); - } - } + /** + * Instantiates a new learner. + * + * @param learner + * the learner + * @param dataset + * the dataset + */ + public ClustreamClustererAdapter(com.yahoo.labs.samoa.moa.clusterers.Clusterer learner, Instances dataset) { + this.learner = learner.copy(); + this.isInit = false; + this.dataset = dataset; + } - /** - * Predicts the class memberships for a given instance. If an instance is - * unclassified, the returned array elements must be all zero. - * - * @param inst the instance to be classified - * @return an array containing the estimated membership probabilities of the - * test instance in each class - */ - @Override - public double[] getVotesForInstance(Instance inst) { - double[] ret; - inst.setDataset(dataset); - if (this.isInit == false) { - ret = new double[dataset.numClasses()]; - } else { - ret = learner.getVotesForInstance(inst); - } - return ret; - } + /** + * Instantiates a new learner. + * + * @param learner + * the learner + * @param dataset + * the dataset + */ + public ClustreamClustererAdapter() { + this.learner = ((com.yahoo.labs.samoa.moa.clusterers.Clusterer) this.learnerOption.getValue()).copy(); + this.isInit = false; + // this.dataset = dataset; + } - /** - * Resets this classifier. It must be similar to starting a new classifier - * from scratch. - * - */ - @Override - public void resetLearning() { - learner.resetLearning(); + /** + * Creates a new learner object. + * + * @return the learner + */ + @Override + public ClustreamClustererAdapter create() { + ClustreamClustererAdapter l = new ClustreamClustererAdapter(learner, dataset); + if (dataset == null) { + System.out.println("dataset null while creating"); } + return l; + } - public boolean implementsMicroClusterer() { - return this.learner.implementsMicroClusterer(); + /** + * Trains this classifier incrementally using the given instance. + * + * @param inst + * the instance to be used for training + */ + @Override + public void trainOnInstance(Instance inst) { + if (this.isInit == false) { + this.isInit = true; + InstancesHeader instances = new InstancesHeader(dataset); + this.learner.setModelContext(instances); + this.learner.prepareForUse(); } - - public Clustering getMicroClusteringResult() { - return this.learner.getMicroClusteringResult(); + if (inst.weight() > 0) { + inst.setDataset(dataset); + learner.trainOnInstance(inst); } + } - public Instances getDataset() { - return this.dataset; + /** + * Predicts the class memberships for a given instance. If an instance is + * unclassified, the returned array elements must be all zero. + * + * @param inst + * the instance to be classified + * @return an array containing the estimated membership probabilities of the + * test instance in each class + */ + @Override + public double[] getVotesForInstance(Instance inst) { + double[] ret; + inst.setDataset(dataset); + if (this.isInit == false) { + ret = new double[dataset.numClasses()]; + } else { + ret = learner.getVotesForInstance(inst); } + return ret; + } + + /** + * Resets this classifier. It must be similar to starting a new classifier + * from scratch. + * + */ + @Override + public void resetLearning() { + learner.resetLearning(); + } + + public boolean implementsMicroClusterer() { + return this.learner.implementsMicroClusterer(); + } + + public Clustering getMicroClusteringResult() { + return this.learner.getMicroClusteringResult(); + } + + public Instances getDataset() { + return this.dataset; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java index fedbcfe..4e4cd6e 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java @@ -31,52 +31,52 @@ import java.io.Serializable; * @author abifet */ public interface LocalClustererAdapter extends Serializable { - - /** - * Creates a new learner object. - * - * @return the learner - */ - LocalClustererAdapter create(); - /** - * Predicts the class memberships for a given instance. If an instance is - * unclassified, the returned array elements must be all zero. - * - * @param inst - * the instance to be classified - * @return an array containing the estimated membership probabilities of the - * test instance in each class - */ - double[] getVotesForInstance(Instance inst); + /** + * Creates a new learner object. + * + * @return the learner + */ + LocalClustererAdapter create(); - /** - * Resets this classifier. It must be similar to starting a new classifier - * from scratch. - * - */ - void resetLearning(); + /** + * Predicts the class memberships for a given instance. If an instance is + * unclassified, the returned array elements must be all zero. + * + * @param inst + * the instance to be classified + * @return an array containing the estimated membership probabilities of the + * test instance in each class + */ + double[] getVotesForInstance(Instance inst); - /** - * Trains this classifier incrementally using the given instance. - * - * @param inst - * the instance to be used for training - */ - void trainOnInstance(Instance inst); - - /** - * Sets where to obtain the information of attributes of Instances - * - * @param dataset - * the dataset that contains the information - */ - public void setDataset(Instances dataset); - - public Instances getDataset(); + /** + * Resets this classifier. It must be similar to starting a new classifier + * from scratch. + * + */ + void resetLearning(); - public boolean implementsMicroClusterer(); + /** + * Trains this classifier incrementally using the given instance. + * + * @param inst + * the instance to be used for training + */ + void trainOnInstance(Instance inst); + + /** + * Sets where to obtain the information of attributes of Instances + * + * @param dataset + * the dataset that contains the information + */ + public void setDataset(Instances dataset); + + public Instances getDataset(); + + public boolean implementsMicroClusterer(); + + public Clustering getMicroClusteringResult(); - public Clustering getMicroClusteringResult(); - } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java index a397539..4688ba2 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java @@ -33,6 +33,7 @@ import com.yahoo.labs.samoa.moa.core.DataPoint; import com.yahoo.labs.samoa.topology.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + //import weka.core.Instance; /** @@ -40,152 +41,160 @@ import org.slf4j.LoggerFactory; */ final public class LocalClustererProcessor implements Processor { - /** - * - */ - private static final long serialVersionUID = -1577910988699148691L; - private static final Logger logger = LoggerFactory - .getLogger(LocalClustererProcessor.class); - private LocalClustererAdapter model; - private Stream outputStream; - private int modelId; - private long instancesCount = 0; - private long sampleFrequency = 1000; - - public long getSampleFrequency() { - return sampleFrequency; - } - - public void setSampleFrequency(long sampleFrequency) { - this.sampleFrequency = sampleFrequency; - } - - /** - * Sets the learner. - * - * @param model the model to set - */ - public void setLearner(LocalClustererAdapter model) { - this.model = model; - } - - /** - * Gets the learner. - * - * @return the model - */ - public LocalClustererAdapter getLearner() { - return model; - } - - /** - * Set the output streams. + /** * - * @param outputStream the new output stream {@link PredictionCombinerPE}. */ - public void setOutputStream(Stream outputStream) { - - this.outputStream = outputStream; + private static final long serialVersionUID = -1577910988699148691L; + private static final Logger logger = LoggerFactory + .getLogger(LocalClustererProcessor.class); + private LocalClustererAdapter model; + private Stream outputStream; + private int modelId; + private long instancesCount = 0; + private long sampleFrequency = 1000; + + public long getSampleFrequency() { + return sampleFrequency; + } + + public void setSampleFrequency(long sampleFrequency) { + this.sampleFrequency = sampleFrequency; + } + + /** + * Sets the learner. + * + * @param model + * the model to set + */ + public void setLearner(LocalClustererAdapter model) { + this.model = model; + } + + /** + * Gets the learner. + * + * @return the model + */ + public LocalClustererAdapter getLearner() { + return model; + } + + /** + * Set the output streams. + * + * @param outputStream + * the new output stream {@link PredictionCombinerPE}. + */ + public void setOutputStream(Stream outputStream) { + + this.outputStream = outputStream; + } + + /** + * Gets the output stream. + * + * @return the output stream + */ + public Stream getOutputStream() { + return outputStream; + } + + /** + * Gets the instances count. + * + * @return number of observation vectors used in training iteration. + */ + public long getInstancesCount() { + return instancesCount; + } + + /** + * Update stats. + * + * @param event + * the event + */ + private void updateStats(ContentEvent event) { + Instance instance; + if (event instanceof ClusteringContentEvent) { + // Local Clustering + ClusteringContentEvent ev = (ClusteringContentEvent) event; + instance = ev.getInstance(); + DataPoint point = new DataPoint(instance, Integer.parseInt(event.getKey())); + model.trainOnInstance(point); + instancesCount++; } - /** - * Gets the output stream. - * - * @return the output stream - */ - public Stream getOutputStream() { - return outputStream; + if (event instanceof ClusteringResultContentEvent) { + // Global Clustering + ClusteringResultContentEvent ev = (ClusteringResultContentEvent) event; + Clustering clustering = ev.getClustering(); + + for (int i = 0; i < clustering.size(); i++) { + instance = new DenseInstance(1.0, clustering.get(i).getCenter()); + instance.setDataset(model.getDataset()); + DataPoint point = new DataPoint(instance, Integer.parseInt(event.getKey())); + model.trainOnInstance(point); + instancesCount++; + } } - /** - * Gets the instances count. - * - * @return number of observation vectors used in training iteration. - */ - public long getInstancesCount() { - return instancesCount; + if (instancesCount % this.sampleFrequency == 0) { + logger.info("Trained model using {} events with classifier id {}", instancesCount, this.modelId); // getId()); } + } - /** - * Update stats. - * - * @param event the event - */ - private void updateStats(ContentEvent event) { - Instance instance; - if (event instanceof ClusteringContentEvent){ - //Local Clustering - ClusteringContentEvent ev = (ClusteringContentEvent) event; - instance = ev.getInstance(); - DataPoint point = new DataPoint(instance, Integer.parseInt(event.getKey())); - model.trainOnInstance(point); - instancesCount++; - } - - if (event instanceof ClusteringResultContentEvent){ - //Global Clustering - ClusteringResultContentEvent ev = (ClusteringResultContentEvent) event; - Clustering clustering = ev.getClustering(); - - for (int i=0; i<clustering.size(); i++) { - instance = new DenseInstance(1.0,clustering.get(i).getCenter()); - instance.setDataset(model.getDataset()); - DataPoint point = new DataPoint(instance, Integer.parseInt(event.getKey())); - model.trainOnInstance(point); - instancesCount++; - } - } - - if (instancesCount % this.sampleFrequency == 0) { - logger.info("Trained model using {} events with classifier id {}", instancesCount, this.modelId); // getId()); - } - } + /** + * On event. + * + * @param event + * the event + * @return true, if successful + */ + @Override + public boolean process(ContentEvent event) { - /** - * On event. - * - * @param event the event - * @return true, if successful - */ - @Override - public boolean process(ContentEvent event) { - - if (event.isLastEvent() || - (instancesCount > 0 && instancesCount% this.sampleFrequency == 0)) { - if (model.implementsMicroClusterer()) { + if (event.isLastEvent() || + (instancesCount > 0 && instancesCount % this.sampleFrequency == 0)) { + if (model.implementsMicroClusterer()) { - Clustering clustering = model.getMicroClusteringResult(); + Clustering clustering = model.getMicroClusteringResult(); - ClusteringResultContentEvent resultEvent = new ClusteringResultContentEvent(clustering, event.isLastEvent()); + ClusteringResultContentEvent resultEvent = new ClusteringResultContentEvent(clustering, event.isLastEvent()); - this.outputStream.put(resultEvent); - } - } - - updateStats(event); - return false; + this.outputStream.put(resultEvent); + } } - /* (non-Javadoc) - * @see samoa.core.Processor#onCreate(int) - */ - @Override - public void onCreate(int id) { - this.modelId = id; - model = model.create(); - } - - /* (non-Javadoc) - * @see samoa.core.Processor#newProcessor(samoa.core.Processor) - */ - @Override - public Processor newProcessor(Processor sourceProcessor) { - LocalClustererProcessor newProcessor = new LocalClustererProcessor(); - LocalClustererProcessor originProcessor = (LocalClustererProcessor) sourceProcessor; - if (originProcessor.getLearner() != null) { - newProcessor.setLearner(originProcessor.getLearner().create()); - } - newProcessor.setOutputStream(originProcessor.getOutputStream()); - return newProcessor; + updateStats(event); + return false; + } + + /* + * (non-Javadoc) + * + * @see samoa.core.Processor#onCreate(int) + */ + @Override + public void onCreate(int id) { + this.modelId = id; + model = model.create(); + } + + /* + * (non-Javadoc) + * + * @see samoa.core.Processor#newProcessor(samoa.core.Processor) + */ + @Override + public Processor newProcessor(Processor sourceProcessor) { + LocalClustererProcessor newProcessor = new LocalClustererProcessor(); + LocalClustererProcessor originProcessor = (LocalClustererProcessor) sourceProcessor; + if (originProcessor.getLearner() != null) { + newProcessor.setLearner(originProcessor.getLearner().create()); } + newProcessor.setOutputStream(originProcessor.getOutputStream()); + return newProcessor; + } }
