http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java deleted file mode 100644 index 5e83338..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRRuleSetProcessor.java +++ /dev/null @@ -1,372 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.rules.distributed; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.instances.Instance; -import com.yahoo.labs.samoa.instances.Instances; -import com.yahoo.labs.samoa.learners.InstanceContentEvent; -import com.yahoo.labs.samoa.learners.ResultContentEvent; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.ActiveRule; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.LearningRule; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.PassiveRule; -import com.yahoo.labs.samoa.moa.classifiers.rules.core.voting.ErrorWeightedVote; -import com.yahoo.labs.samoa.moa.classifiers.rules.core.voting.InverseErrorWeightedVote; -import com.yahoo.labs.samoa.moa.classifiers.rules.core.voting.UniformWeightedVote; -import com.yahoo.labs.samoa.topology.Stream; -import java.util.LinkedList; -import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Model Aggregator Processor (HAMR). - * - * @author Anh Thu Vu - * - */ -public class AMRRuleSetProcessor implements Processor { - - /** - * - */ - private static final long serialVersionUID = -6544096255649379334L; - private static final Logger logger = LoggerFactory.getLogger(AMRRuleSetProcessor.class); - - private int processorId; - - // Rules & default rule - protected transient List<PassiveRule> ruleSet; - - // SAMOA Stream - private Stream statisticsStream; - private Stream resultStream; - private Stream defaultRuleStream; - - // Options - protected boolean noAnomalyDetection; - protected double multivariateAnomalyProbabilityThreshold; - protected double univariateAnomalyprobabilityThreshold; - protected int anomalyNumInstThreshold; - - protected boolean unorderedRules; - - protected int voteType; - - /* - * Constructor - */ - public AMRRuleSetProcessor(Builder builder) { - - this.noAnomalyDetection = builder.noAnomalyDetection; - this.multivariateAnomalyProbabilityThreshold = builder.multivariateAnomalyProbabilityThreshold; - this.univariateAnomalyprobabilityThreshold = builder.univariateAnomalyprobabilityThreshold; - this.anomalyNumInstThreshold = builder.anomalyNumInstThreshold; - this.unorderedRules = builder.unorderedRules; - - this.voteType = builder.voteType; - } - - /* - * (non-Javadoc) - * - * @see com.yahoo.labs.samoa.core.Processor#process(com.yahoo.labs.samoa.core. - * ContentEvent) - */ - @Override - public boolean process(ContentEvent event) { - if (event instanceof InstanceContentEvent) { - this.processInstanceEvent((InstanceContentEvent) event); - } - else if (event instanceof PredicateContentEvent) { - PredicateContentEvent pce = (PredicateContentEvent) event; - if (pce.getRuleSplitNode() == null) { - this.updateLearningNode(pce); - } - else { - this.updateRuleSplitNode(pce); - } - } - else if (event instanceof RuleContentEvent) { - RuleContentEvent rce = (RuleContentEvent) event; - if (rce.isRemoving()) { - this.removeRule(rce.getRuleNumberID()); - } - else { - addRule(rce.getRule()); - } - } - return true; - } - - private void processInstanceEvent(InstanceContentEvent instanceEvent) { - Instance instance = instanceEvent.getInstance(); - boolean predictionCovered = false; - boolean trainingCovered = false; - boolean continuePrediction = instanceEvent.isTesting(); - boolean continueTraining = instanceEvent.isTraining(); - - ErrorWeightedVote errorWeightedVote = newErrorWeightedVote(); - for (PassiveRule aRuleSet : this.ruleSet) { - if (!continuePrediction && !continueTraining) - break; - - if (aRuleSet.isCovering(instance)) { - predictionCovered = true; - - if (continuePrediction) { - double[] vote = aRuleSet.getPrediction(instance); - double error = aRuleSet.getCurrentError(); - errorWeightedVote.addVote(vote, error); - if (!this.unorderedRules) - continuePrediction = false; - } - - if (continueTraining) { - if (!isAnomaly(instance, aRuleSet)) { - trainingCovered = true; - aRuleSet.updateStatistics(instance); - - // Send instance to statistics PIs - sendInstanceToRule(instance, aRuleSet.getRuleNumberID()); - - if (!this.unorderedRules) - continueTraining = false; - } - } - } - } - - if (predictionCovered) { - // Combined prediction - ResultContentEvent rce = newResultContentEvent(errorWeightedVote.computeWeightedVote(), instanceEvent); - resultStream.put(rce); - } - - boolean defaultPrediction = instanceEvent.isTesting() && !predictionCovered; - boolean defaultTraining = instanceEvent.isTraining() && !trainingCovered; - if (defaultPrediction || defaultTraining) { - instanceEvent.setTesting(defaultPrediction); - instanceEvent.setTraining(defaultTraining); - this.defaultRuleStream.put(instanceEvent); - } - } - - private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) { - ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), - inEvent.getClassId(), prediction, inEvent.isLastEvent()); - rce.setClassifierIndex(this.processorId); - rce.setEvaluationIndex(inEvent.getEvaluationIndex()); - return rce; - } - - public ErrorWeightedVote newErrorWeightedVote() { - // TODO: do a reset instead of init a new object - if (voteType == 1) - return new UniformWeightedVote(); - return new InverseErrorWeightedVote(); - } - - /** - * Method to verify if the instance is an anomaly. - * - * @param instance - * @param rule - * @return - */ - private boolean isAnomaly(Instance instance, LearningRule rule) { - // AMRUles is equipped with anomaly detection. If on, compute the anomaly - // value. - boolean isAnomaly = false; - if (!this.noAnomalyDetection) { - if (rule.getInstancesSeen() >= this.anomalyNumInstThreshold) { - isAnomaly = rule.isAnomaly(instance, - this.univariateAnomalyprobabilityThreshold, - this.multivariateAnomalyProbabilityThreshold, - this.anomalyNumInstThreshold); - } - } - return isAnomaly; - } - - /* - * Add predicate/RuleSplitNode for a rule - */ - private void updateRuleSplitNode(PredicateContentEvent pce) { - int ruleID = pce.getRuleNumberID(); - for (PassiveRule rule : ruleSet) { - if (rule.getRuleNumberID() == ruleID) { - rule.nodeListAdd(pce.getRuleSplitNode()); - rule.setLearningNode(pce.getLearningNode()); - } - } - } - - private void updateLearningNode(PredicateContentEvent pce) { - int ruleID = pce.getRuleNumberID(); - for (PassiveRule rule : ruleSet) { - if (rule.getRuleNumberID() == ruleID) { - rule.setLearningNode(pce.getLearningNode()); - } - } - } - - /* - * Add new rule/Remove rule - */ - private boolean addRule(ActiveRule rule) { - this.ruleSet.add(new PassiveRule(rule)); - return true; - } - - private void removeRule(int ruleID) { - for (PassiveRule rule : ruleSet) { - if (rule.getRuleNumberID() == ruleID) { - ruleSet.remove(rule); - break; - } - } - } - - @Override - public void onCreate(int id) { - this.processorId = id; - this.ruleSet = new LinkedList<PassiveRule>(); - - } - - /* - * Clone processor - */ - @Override - public Processor newProcessor(Processor p) { - AMRRuleSetProcessor oldProcessor = (AMRRuleSetProcessor) p; - Builder builder = new Builder(oldProcessor); - AMRRuleSetProcessor newProcessor = builder.build(); - newProcessor.resultStream = oldProcessor.resultStream; - newProcessor.statisticsStream = oldProcessor.statisticsStream; - newProcessor.defaultRuleStream = oldProcessor.defaultRuleStream; - return newProcessor; - } - - /* - * Send events - */ - private void sendInstanceToRule(Instance instance, int ruleID) { - AssignmentContentEvent ace = new AssignmentContentEvent(ruleID, instance); - this.statisticsStream.put(ace); - } - - /* - * Output streams - */ - public void setStatisticsStream(Stream statisticsStream) { - this.statisticsStream = statisticsStream; - } - - public Stream getStatisticsStream() { - return this.statisticsStream; - } - - public void setResultStream(Stream resultStream) { - this.resultStream = resultStream; - } - - public Stream getResultStream() { - return this.resultStream; - } - - public Stream getDefaultRuleStream() { - return this.defaultRuleStream; - } - - public void setDefaultRuleStream(Stream defaultRuleStream) { - this.defaultRuleStream = defaultRuleStream; - } - - /* - * Builder - */ - public static class Builder { - private boolean noAnomalyDetection; - private double multivariateAnomalyProbabilityThreshold; - private double univariateAnomalyprobabilityThreshold; - private int anomalyNumInstThreshold; - - private boolean unorderedRules; - - // private FIMTDDNumericAttributeClassLimitObserver numericObserver; - private int voteType; - - private Instances dataset; - - public Builder(Instances dataset) { - this.dataset = dataset; - } - - public Builder(AMRRuleSetProcessor processor) { - - this.noAnomalyDetection = processor.noAnomalyDetection; - this.multivariateAnomalyProbabilityThreshold = processor.multivariateAnomalyProbabilityThreshold; - this.univariateAnomalyprobabilityThreshold = processor.univariateAnomalyprobabilityThreshold; - this.anomalyNumInstThreshold = processor.anomalyNumInstThreshold; - this.unorderedRules = processor.unorderedRules; - - this.voteType = processor.voteType; - } - - public Builder noAnomalyDetection(boolean noAnomalyDetection) { - this.noAnomalyDetection = noAnomalyDetection; - return this; - } - - public Builder multivariateAnomalyProbabilityThreshold(double mAnomalyThreshold) { - this.multivariateAnomalyProbabilityThreshold = mAnomalyThreshold; - return this; - } - - public Builder univariateAnomalyProbabilityThreshold(double uAnomalyThreshold) { - this.univariateAnomalyprobabilityThreshold = uAnomalyThreshold; - return this; - } - - public Builder anomalyNumberOfInstancesThreshold(int anomalyNumInstThreshold) { - this.anomalyNumInstThreshold = anomalyNumInstThreshold; - return this; - } - - public Builder unorderedRules(boolean unorderedRules) { - this.unorderedRules = unorderedRules; - return this; - } - - public Builder voteType(int voteType) { - this.voteType = voteType; - return this; - } - - public AMRRuleSetProcessor build() { - return new AMRRuleSetProcessor(this); - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRulesAggregatorProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRulesAggregatorProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRulesAggregatorProcessor.java deleted file mode 100644 index 176afce..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRulesAggregatorProcessor.java +++ /dev/null @@ -1,531 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.rules.distributed; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.instances.Instance; -import com.yahoo.labs.samoa.instances.Instances; -import com.yahoo.labs.samoa.learners.InstanceContentEvent; -import com.yahoo.labs.samoa.learners.ResultContentEvent; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.ActiveRule; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.LearningRule; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.PassiveRule; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.Perceptron; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.RuleActiveRegressionNode; -import com.yahoo.labs.samoa.moa.classifiers.rules.core.attributeclassobservers.FIMTDDNumericAttributeClassLimitObserver; -import com.yahoo.labs.samoa.moa.classifiers.rules.core.voting.ErrorWeightedVote; -import com.yahoo.labs.samoa.moa.classifiers.rules.core.voting.InverseErrorWeightedVote; -import com.yahoo.labs.samoa.moa.classifiers.rules.core.voting.UniformWeightedVote; -import com.yahoo.labs.samoa.topology.Stream; - -/** - * Model Aggregator Processor (VAMR). - * - * @author Anh Thu Vu - * - */ -public class AMRulesAggregatorProcessor implements Processor { - - /** - * - */ - private static final long serialVersionUID = 6303385725332704251L; - - private static final Logger logger = - LoggerFactory.getLogger(AMRulesAggregatorProcessor.class); - - private int processorId; - - // Rules & default rule - protected transient List<PassiveRule> ruleSet; - protected transient ActiveRule defaultRule; - protected transient int ruleNumberID; - protected transient double[] statistics; - - // SAMOA Stream - private Stream statisticsStream; - private Stream resultStream; - - // Options - protected int pageHinckleyThreshold; - protected double pageHinckleyAlpha; - protected boolean driftDetection; - protected int predictionFunction; // Adaptive=0 Perceptron=1 TargetMean=2 - protected boolean constantLearningRatioDecay; - protected double learningRatio; - - protected double splitConfidence; - protected double tieThreshold; - protected int gracePeriod; - - protected boolean noAnomalyDetection; - protected double multivariateAnomalyProbabilityThreshold; - protected double univariateAnomalyprobabilityThreshold; - protected int anomalyNumInstThreshold; - - protected boolean unorderedRules; - - protected FIMTDDNumericAttributeClassLimitObserver numericObserver; - protected int voteType; - - /* - * Constructor - */ - public AMRulesAggregatorProcessor(Builder builder) { - this.pageHinckleyThreshold = builder.pageHinckleyThreshold; - this.pageHinckleyAlpha = builder.pageHinckleyAlpha; - this.driftDetection = builder.driftDetection; - this.predictionFunction = builder.predictionFunction; - this.constantLearningRatioDecay = builder.constantLearningRatioDecay; - this.learningRatio = builder.learningRatio; - this.splitConfidence = builder.splitConfidence; - this.tieThreshold = builder.tieThreshold; - this.gracePeriod = builder.gracePeriod; - - this.noAnomalyDetection = builder.noAnomalyDetection; - this.multivariateAnomalyProbabilityThreshold = builder.multivariateAnomalyProbabilityThreshold; - this.univariateAnomalyprobabilityThreshold = builder.univariateAnomalyprobabilityThreshold; - this.anomalyNumInstThreshold = builder.anomalyNumInstThreshold; - this.unorderedRules = builder.unorderedRules; - - this.numericObserver = builder.numericObserver; - this.voteType = builder.voteType; - } - - /* - * Process - */ - @Override - public boolean process(ContentEvent event) { - if (event instanceof InstanceContentEvent) { - InstanceContentEvent instanceEvent = (InstanceContentEvent) event; - this.processInstanceEvent(instanceEvent); - } - else if (event instanceof PredicateContentEvent) { - this.updateRuleSplitNode((PredicateContentEvent) event); - } - else if (event instanceof RuleContentEvent) { - RuleContentEvent rce = (RuleContentEvent) event; - if (rce.isRemoving()) { - this.removeRule(rce.getRuleNumberID()); - } - } - - return true; - } - - // Merge predict and train so we only check for covering rules one time - private void processInstanceEvent(InstanceContentEvent instanceEvent) { - Instance instance = instanceEvent.getInstance(); - boolean predictionCovered = false; - boolean trainingCovered = false; - boolean continuePrediction = instanceEvent.isTesting(); - boolean continueTraining = instanceEvent.isTraining(); - - ErrorWeightedVote errorWeightedVote = newErrorWeightedVote(); - Iterator<PassiveRule> ruleIterator = this.ruleSet.iterator(); - while (ruleIterator.hasNext()) { - if (!continuePrediction && !continueTraining) - break; - - PassiveRule rule = ruleIterator.next(); - - if (rule.isCovering(instance) == true) { - predictionCovered = true; - - if (continuePrediction) { - double[] vote = rule.getPrediction(instance); - double error = rule.getCurrentError(); - errorWeightedVote.addVote(vote, error); - if (!this.unorderedRules) - continuePrediction = false; - } - - if (continueTraining) { - if (!isAnomaly(instance, rule)) { - trainingCovered = true; - rule.updateStatistics(instance); - // Send instance to statistics PIs - sendInstanceToRule(instance, rule.getRuleNumberID()); - - if (!this.unorderedRules) - continueTraining = false; - } - } - } - } - - if (predictionCovered) { - // Combined prediction - ResultContentEvent rce = newResultContentEvent(errorWeightedVote.computeWeightedVote(), instanceEvent); - resultStream.put(rce); - } - else if (instanceEvent.isTesting()) { - // predict with default rule - double[] vote = defaultRule.getPrediction(instance); - ResultContentEvent rce = newResultContentEvent(vote, instanceEvent); - resultStream.put(rce); - } - - if (!trainingCovered && instanceEvent.isTraining()) { - // train default rule with this instance - defaultRule.updateStatistics(instance); - if (defaultRule.getInstancesSeen() % this.gracePeriod == 0.0) { - if (defaultRule.tryToExpand(this.splitConfidence, this.tieThreshold) == true) { - ActiveRule newDefaultRule = newRule(defaultRule.getRuleNumberID(), - (RuleActiveRegressionNode) defaultRule.getLearningNode(), - ((RuleActiveRegressionNode) defaultRule.getLearningNode()).getStatisticsOtherBranchSplit()); // other branch - defaultRule.split(); - defaultRule.setRuleNumberID(++ruleNumberID); - this.ruleSet.add(new PassiveRule(this.defaultRule)); - // send to statistics PI - sendAddRuleEvent(defaultRule.getRuleNumberID(), this.defaultRule); - defaultRule = newDefaultRule; - } - } - } - } - - /** - * Helper method to generate new ResultContentEvent based on an instance and its prediction result. - * - * @param prediction - * The predicted class label from the decision tree model. - * @param inEvent - * The associated instance content event - * @return ResultContentEvent to be sent into Evaluator PI or other destination PI. - */ - private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) { - ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), - inEvent.getClassId(), prediction, inEvent.isLastEvent()); - rce.setClassifierIndex(this.processorId); - rce.setEvaluationIndex(inEvent.getEvaluationIndex()); - return rce; - } - - public ErrorWeightedVote newErrorWeightedVote() { - if (voteType == 1) - return new UniformWeightedVote(); - return new InverseErrorWeightedVote(); - } - - /** - * Method to verify if the instance is an anomaly. - * - * @param instance - * @param rule - * @return - */ - private boolean isAnomaly(Instance instance, LearningRule rule) { - // AMRUles is equipped with anomaly detection. If on, compute the anomaly - // value. - boolean isAnomaly = false; - if (this.noAnomalyDetection == false) { - if (rule.getInstancesSeen() >= this.anomalyNumInstThreshold) { - isAnomaly = rule.isAnomaly(instance, - this.univariateAnomalyprobabilityThreshold, - this.multivariateAnomalyProbabilityThreshold, - this.anomalyNumInstThreshold); - } - } - return isAnomaly; - } - - /* - * Create new rules - */ - private ActiveRule newRule(int ID, RuleActiveRegressionNode node, double[] statistics) { - ActiveRule r = newRule(ID); - - if (node != null) - { - if (node.getPerceptron() != null) - { - r.getLearningNode().setPerceptron(new Perceptron(node.getPerceptron())); - r.getLearningNode().getPerceptron().setLearningRatio(this.learningRatio); - } - if (statistics == null) - { - double mean; - if (node.getNodeStatistics().getValue(0) > 0) { - mean = node.getNodeStatistics().getValue(1) / node.getNodeStatistics().getValue(0); - r.getLearningNode().getTargetMean().reset(mean, 1); - } - } - } - if (statistics != null && ((RuleActiveRegressionNode) r.getLearningNode()).getTargetMean() != null) - { - double mean; - if (statistics[0] > 0) { - mean = statistics[1] / statistics[0]; - ((RuleActiveRegressionNode) r.getLearningNode()).getTargetMean().reset(mean, (long) statistics[0]); - } - } - return r; - } - - private ActiveRule newRule(int ID) { - ActiveRule r = new ActiveRule.Builder(). - threshold(this.pageHinckleyThreshold). - alpha(this.pageHinckleyAlpha). - changeDetection(this.driftDetection). - predictionFunction(this.predictionFunction). - statistics(new double[3]). - learningRatio(this.learningRatio). - numericObserver(numericObserver). - id(ID).build(); - return r; - } - - /* - * Add predicate/RuleSplitNode for a rule - */ - private void updateRuleSplitNode(PredicateContentEvent pce) { - int ruleID = pce.getRuleNumberID(); - for (PassiveRule rule : ruleSet) { - if (rule.getRuleNumberID() == ruleID) { - if (pce.getRuleSplitNode() != null) - rule.nodeListAdd(pce.getRuleSplitNode()); - if (pce.getLearningNode() != null) - rule.setLearningNode(pce.getLearningNode()); - } - } - } - - /* - * Remove rule - */ - private void removeRule(int ruleID) { - for (PassiveRule rule : ruleSet) { - if (rule.getRuleNumberID() == ruleID) { - ruleSet.remove(rule); - break; - } - } - } - - @Override - public void onCreate(int id) { - this.processorId = id; - this.statistics = new double[] { 0.0, 0, 0 }; - this.ruleNumberID = 0; - this.defaultRule = newRule(++this.ruleNumberID); - - this.ruleSet = new LinkedList<PassiveRule>(); - } - - /* - * Clone processor - */ - @Override - public Processor newProcessor(Processor p) { - AMRulesAggregatorProcessor oldProcessor = (AMRulesAggregatorProcessor) p; - Builder builder = new Builder(oldProcessor); - AMRulesAggregatorProcessor newProcessor = builder.build(); - newProcessor.resultStream = oldProcessor.resultStream; - newProcessor.statisticsStream = oldProcessor.statisticsStream; - return newProcessor; - } - - /* - * Send events - */ - private void sendInstanceToRule(Instance instance, int ruleID) { - AssignmentContentEvent ace = new AssignmentContentEvent(ruleID, instance); - this.statisticsStream.put(ace); - } - - private void sendAddRuleEvent(int ruleID, ActiveRule rule) { - RuleContentEvent rce = new RuleContentEvent(ruleID, rule, false); - this.statisticsStream.put(rce); - } - - /* - * Output streams - */ - public void setStatisticsStream(Stream statisticsStream) { - this.statisticsStream = statisticsStream; - } - - public Stream getStatisticsStream() { - return this.statisticsStream; - } - - public void setResultStream(Stream resultStream) { - this.resultStream = resultStream; - } - - public Stream getResultStream() { - return this.resultStream; - } - - /* - * Others - */ - public boolean isRandomizable() { - return true; - } - - /* - * Builder - */ - public static class Builder { - private int pageHinckleyThreshold; - private double pageHinckleyAlpha; - private boolean driftDetection; - private int predictionFunction; // Adaptive=0 Perceptron=1 TargetMean=2 - private boolean constantLearningRatioDecay; - private double learningRatio; - private double splitConfidence; - private double tieThreshold; - private int gracePeriod; - - private boolean noAnomalyDetection; - private double multivariateAnomalyProbabilityThreshold; - private double univariateAnomalyprobabilityThreshold; - private int anomalyNumInstThreshold; - - private boolean unorderedRules; - - private FIMTDDNumericAttributeClassLimitObserver numericObserver; - private int voteType; - - private Instances dataset; - - public Builder(Instances dataset) { - this.dataset = dataset; - } - - public Builder(AMRulesAggregatorProcessor processor) { - this.pageHinckleyThreshold = processor.pageHinckleyThreshold; - this.pageHinckleyAlpha = processor.pageHinckleyAlpha; - this.driftDetection = processor.driftDetection; - this.predictionFunction = processor.predictionFunction; - this.constantLearningRatioDecay = processor.constantLearningRatioDecay; - this.learningRatio = processor.learningRatio; - this.splitConfidence = processor.splitConfidence; - this.tieThreshold = processor.tieThreshold; - this.gracePeriod = processor.gracePeriod; - - this.noAnomalyDetection = processor.noAnomalyDetection; - this.multivariateAnomalyProbabilityThreshold = processor.multivariateAnomalyProbabilityThreshold; - this.univariateAnomalyprobabilityThreshold = processor.univariateAnomalyprobabilityThreshold; - this.anomalyNumInstThreshold = processor.anomalyNumInstThreshold; - this.unorderedRules = processor.unorderedRules; - - this.numericObserver = processor.numericObserver; - this.voteType = processor.voteType; - } - - public Builder threshold(int threshold) { - this.pageHinckleyThreshold = threshold; - return this; - } - - public Builder alpha(double alpha) { - this.pageHinckleyAlpha = alpha; - return this; - } - - public Builder changeDetection(boolean changeDetection) { - this.driftDetection = changeDetection; - return this; - } - - public Builder predictionFunction(int predictionFunction) { - this.predictionFunction = predictionFunction; - return this; - } - - public Builder constantLearningRatioDecay(boolean constantDecay) { - this.constantLearningRatioDecay = constantDecay; - return this; - } - - public Builder learningRatio(double learningRatio) { - this.learningRatio = learningRatio; - return this; - } - - public Builder splitConfidence(double splitConfidence) { - this.splitConfidence = splitConfidence; - return this; - } - - public Builder tieThreshold(double tieThreshold) { - this.tieThreshold = tieThreshold; - return this; - } - - public Builder gracePeriod(int gracePeriod) { - this.gracePeriod = gracePeriod; - return this; - } - - public Builder noAnomalyDetection(boolean noAnomalyDetection) { - this.noAnomalyDetection = noAnomalyDetection; - return this; - } - - public Builder multivariateAnomalyProbabilityThreshold(double mAnomalyThreshold) { - this.multivariateAnomalyProbabilityThreshold = mAnomalyThreshold; - return this; - } - - public Builder univariateAnomalyProbabilityThreshold(double uAnomalyThreshold) { - this.univariateAnomalyprobabilityThreshold = uAnomalyThreshold; - return this; - } - - public Builder anomalyNumberOfInstancesThreshold(int anomalyNumInstThreshold) { - this.anomalyNumInstThreshold = anomalyNumInstThreshold; - return this; - } - - public Builder unorderedRules(boolean unorderedRules) { - this.unorderedRules = unorderedRules; - return this; - } - - public Builder numericObserver(FIMTDDNumericAttributeClassLimitObserver numericObserver) { - this.numericObserver = numericObserver; - return this; - } - - public Builder voteType(int voteType) { - this.voteType = voteType; - return this; - } - - public AMRulesAggregatorProcessor build() { - return new AMRulesAggregatorProcessor(this); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRulesStatisticsProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRulesStatisticsProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRulesStatisticsProcessor.java deleted file mode 100644 index 6c5d25b..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AMRulesStatisticsProcessor.java +++ /dev/null @@ -1,220 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.rules.distributed; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.instances.Instance; -import com.yahoo.labs.samoa.instances.Instances; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.ActiveRule; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.RuleActiveRegressionNode; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.RulePassiveRegressionNode; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.RuleSplitNode; -import com.yahoo.labs.samoa.topology.Stream; - -/** - * Learner Processor (VAMR). - * - * @author Anh Thu Vu - * - */ -public class AMRulesStatisticsProcessor implements Processor { - - /** - * - */ - private static final long serialVersionUID = 5268933189695395573L; - - private static final Logger logger = - LoggerFactory.getLogger(AMRulesStatisticsProcessor.class); - - private int processorId; - - private transient List<ActiveRule> ruleSet; - - private Stream outputStream; - - private double splitConfidence; - private double tieThreshold; - private int gracePeriod; - - private int frequency; - - public AMRulesStatisticsProcessor(Builder builder) { - this.splitConfidence = builder.splitConfidence; - this.tieThreshold = builder.tieThreshold; - this.gracePeriod = builder.gracePeriod; - this.frequency = builder.frequency; - } - - @Override - public boolean process(ContentEvent event) { - if (event instanceof AssignmentContentEvent) { - - AssignmentContentEvent attrContentEvent = (AssignmentContentEvent) event; - trainRuleOnInstance(attrContentEvent.getRuleNumberID(), attrContentEvent.getInstance()); - } - else if (event instanceof RuleContentEvent) { - RuleContentEvent ruleContentEvent = (RuleContentEvent) event; - if (!ruleContentEvent.isRemoving()) { - addRule(ruleContentEvent.getRule()); - } - } - - return false; - } - - /* - * Process input instances - */ - private void trainRuleOnInstance(int ruleID, Instance instance) { - Iterator<ActiveRule> ruleIterator = this.ruleSet.iterator(); - while (ruleIterator.hasNext()) { - ActiveRule rule = ruleIterator.next(); - if (rule.getRuleNumberID() == ruleID) { - // Check (again) for coverage - // Skip anomaly check as Aggregator's perceptron should be well-updated - if (rule.isCovering(instance) == true) { - double error = rule.computeError(instance); // Use adaptive mode error - boolean changeDetected = ((RuleActiveRegressionNode) rule.getLearningNode()).updateChangeDetection(error); - if (changeDetected == true) { - ruleIterator.remove(); - - this.sendRemoveRuleEvent(ruleID); - } else { - rule.updateStatistics(instance); - if (rule.getInstancesSeen() % this.gracePeriod == 0.0) { - if (rule.tryToExpand(this.splitConfidence, this.tieThreshold)) { - rule.split(); - - // expanded: update Aggregator with new/updated predicate - this.sendPredicate(rule.getRuleNumberID(), rule.getLastUpdatedRuleSplitNode(), - (RuleActiveRegressionNode) rule.getLearningNode()); - } - } - } - } - - return; - } - } - } - - private void sendRemoveRuleEvent(int ruleID) { - RuleContentEvent rce = new RuleContentEvent(ruleID, null, true); - this.outputStream.put(rce); - } - - private void sendPredicate(int ruleID, RuleSplitNode splitNode, RuleActiveRegressionNode learningNode) { - this.outputStream.put(new PredicateContentEvent(ruleID, splitNode, new RulePassiveRegressionNode(learningNode))); - } - - /* - * Process control message (regarding adding or removing rules) - */ - private boolean addRule(ActiveRule rule) { - this.ruleSet.add(rule); - return true; - } - - @Override - public void onCreate(int id) { - this.processorId = id; - this.ruleSet = new LinkedList<ActiveRule>(); - } - - @Override - public Processor newProcessor(Processor p) { - AMRulesStatisticsProcessor oldProcessor = (AMRulesStatisticsProcessor) p; - AMRulesStatisticsProcessor newProcessor = - new AMRulesStatisticsProcessor.Builder(oldProcessor).build(); - - newProcessor.setOutputStream(oldProcessor.outputStream); - return newProcessor; - } - - /* - * Builder - */ - public static class Builder { - private double splitConfidence; - private double tieThreshold; - private int gracePeriod; - - private int frequency; - - private Instances dataset; - - public Builder(Instances dataset) { - this.dataset = dataset; - } - - public Builder(AMRulesStatisticsProcessor processor) { - this.splitConfidence = processor.splitConfidence; - this.tieThreshold = processor.tieThreshold; - this.gracePeriod = processor.gracePeriod; - this.frequency = processor.frequency; - } - - public Builder splitConfidence(double splitConfidence) { - this.splitConfidence = splitConfidence; - return this; - } - - public Builder tieThreshold(double tieThreshold) { - this.tieThreshold = tieThreshold; - return this; - } - - public Builder gracePeriod(int gracePeriod) { - this.gracePeriod = gracePeriod; - return this; - } - - public Builder frequency(int frequency) { - this.frequency = frequency; - return this; - } - - public AMRulesStatisticsProcessor build() { - return new AMRulesStatisticsProcessor(this); - } - } - - /* - * Output stream - */ - public void setOutputStream(Stream stream) { - this.outputStream = stream; - } - - public Stream getOutputStream() { - return this.outputStream; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AssignmentContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AssignmentContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AssignmentContentEvent.java deleted file mode 100644 index 2a53f19..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/AssignmentContentEvent.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.rules.distributed; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.instances.Instance; - -/** - * Forwarded instances from Model Agrregator to Learners/Default Rule Learner. - * - * @author Anh Thu Vu - * - */ -public class AssignmentContentEvent implements ContentEvent { - - /** - * - */ - private static final long serialVersionUID = 1031695762172836629L; - - private int ruleNumberID; - private Instance instance; - - public AssignmentContentEvent() { - this(0, null); - } - - public AssignmentContentEvent(int ruleID, Instance instance) { - this.ruleNumberID = ruleID; - this.instance = instance; - } - - @Override - public String getKey() { - return Integer.toString(this.ruleNumberID); - } - - @Override - public void setKey(String key) { - // do nothing - } - - @Override - public boolean isLastEvent() { - return false; - } - - public Instance getInstance() { - return this.instance; - } - - public int getRuleNumberID() { - return this.ruleNumberID; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/PredicateContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/PredicateContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/PredicateContentEvent.java deleted file mode 100644 index 881a284..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/PredicateContentEvent.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.rules.distributed; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.RulePassiveRegressionNode; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.RuleSplitNode; - -/** - * New features (of newly expanded rules) from Learners to Model Aggregators. - * - * @author Anh Thu Vu - * - */ -public class PredicateContentEvent implements ContentEvent { - - /** - * - */ - private static final long serialVersionUID = 7909435830443732451L; - - private int ruleNumberID; - private RuleSplitNode ruleSplitNode; - private RulePassiveRegressionNode learningNode; - - /* - * Constructor - */ - public PredicateContentEvent() { - this(0, null, null); - } - - public PredicateContentEvent(int ruleID, RuleSplitNode ruleSplitNode, RulePassiveRegressionNode learningNode) { - this.ruleNumberID = ruleID; - this.ruleSplitNode = ruleSplitNode; // is this is null: this is for updating learningNode only - this.learningNode = learningNode; - } - - @Override - public String getKey() { - return Integer.toString(this.ruleNumberID); - } - - @Override - public void setKey(String key) { - // do nothing - } - - @Override - public boolean isLastEvent() { - return false; // N/A - } - - public int getRuleNumberID() { - return this.ruleNumberID; - } - - public RuleSplitNode getRuleSplitNode() { - return this.ruleSplitNode; - } - - public RulePassiveRegressionNode getLearningNode() { - return this.learningNode; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/RuleContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/RuleContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/RuleContentEvent.java deleted file mode 100644 index dbe2c5b..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/rules/distributed/RuleContentEvent.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.rules.distributed; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.learners.classifiers.rules.common.ActiveRule; - -/** - * New rule from Model Aggregator/Default Rule Learner to Learners or removed rule from Learner to Model Aggregators. - * - * @author Anh Thu Vu - * - */ -public class RuleContentEvent implements ContentEvent { - - /** - * - */ - private static final long serialVersionUID = -9046390274402894461L; - - private final int ruleNumberID; - private final ActiveRule addingRule; // for removing rule, we only need the rule's ID - private final boolean isRemoving; - - public RuleContentEvent() { - this(0, null, false); - } - - public RuleContentEvent(int ruleID, ActiveRule rule, boolean isRemoving) { - this.ruleNumberID = ruleID; - this.isRemoving = isRemoving; - this.addingRule = rule; - } - - @Override - public String getKey() { - return Integer.toString(this.ruleNumberID); - } - - @Override - public void setKey(String key) { - // do nothing - } - - @Override - public boolean isLastEvent() { - return false; - } - - public int getRuleNumberID() { - return this.ruleNumberID; - } - - public ActiveRule getRule() { - return this.addingRule; - } - - public boolean isRemoving() { - return this.isRemoving; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ActiveLearningNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ActiveLearningNode.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ActiveLearningNode.java deleted file mode 100644 index 90e1c67..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ActiveLearningNode.java +++ /dev/null @@ -1,208 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.trees; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.HashMap; -import java.util.Map; - -import com.yahoo.labs.samoa.moa.classifiers.core.AttributeSplitSuggestion; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.instances.Instance; - -final class ActiveLearningNode extends LearningNode { - /** - * - */ - private static final long serialVersionUID = -2892102872646338908L; - private static final Logger logger = LoggerFactory.getLogger(ActiveLearningNode.class); - - private double weightSeenAtLastSplitEvaluation; - - private final Map<Integer, String> attributeContentEventKeys; - - private AttributeSplitSuggestion bestSuggestion; - private AttributeSplitSuggestion secondBestSuggestion; - - private final long id; - private final int parallelismHint; - private int suggestionCtr; - private int thrownAwayInstance; - - private boolean isSplitting; - - ActiveLearningNode(double[] classObservation, int parallelismHint) { - super(classObservation); - this.weightSeenAtLastSplitEvaluation = this.getWeightSeen(); - this.id = VerticalHoeffdingTree.LearningNodeIdGenerator.generate(); - this.attributeContentEventKeys = new HashMap<>(); - this.isSplitting = false; - this.parallelismHint = parallelismHint; - } - - long getId() { - return id; - } - - protected AttributeBatchContentEvent[] attributeBatchContentEvent; - - public AttributeBatchContentEvent[] getAttributeBatchContentEvent() { - return this.attributeBatchContentEvent; - } - - public void setAttributeBatchContentEvent(AttributeBatchContentEvent[] attributeBatchContentEvent) { - this.attributeBatchContentEvent = attributeBatchContentEvent; - } - - @Override - void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) { - // TODO: what statistics should we keep for unused instance? - if (isSplitting) { // currently throw all instance will splitting - this.thrownAwayInstance++; - return; - } - this.observedClassDistribution.addToValue((int) inst.classValue(), - inst.weight()); - // done: parallelize by sending attributes one by one - // TODO: meanwhile, we can try to use the ThreadPool to execute it - // separately - // TODO: parallelize by sending in batch, i.e. split the attributes into - // chunk instead of send the attribute one by one - for (int i = 0; i < inst.numAttributes() - 1; i++) { - int instAttIndex = modelAttIndexToInstanceAttIndex(i, inst); - Integer obsIndex = i; - String key = attributeContentEventKeys.get(obsIndex); - - if (key == null) { - key = this.generateKey(i); - attributeContentEventKeys.put(obsIndex, key); - } - AttributeContentEvent ace = new AttributeContentEvent.Builder( - this.id, i, key) - .attrValue(inst.value(instAttIndex)) - .classValue((int) inst.classValue()) - .weight(inst.weight()) - .isNominal(inst.attribute(instAttIndex).isNominal()) - .build(); - if (this.attributeBatchContentEvent == null) { - this.attributeBatchContentEvent = new AttributeBatchContentEvent[inst.numAttributes() - 1]; - } - if (this.attributeBatchContentEvent[i] == null) { - this.attributeBatchContentEvent[i] = new AttributeBatchContentEvent.Builder( - this.id, i, key) - // .attrValue(inst.value(instAttIndex)) - // .classValue((int) inst.classValue()) - // .weight(inst.weight()] - .isNominal(inst.attribute(instAttIndex).isNominal()) - .build(); - } - this.attributeBatchContentEvent[i].add(ace); - // proc.sendToAttributeStream(ace); - } - } - - @Override - double[] getClassVotes(Instance inst, ModelAggregatorProcessor map) { - return this.observedClassDistribution.getArrayCopy(); - } - - double getWeightSeen() { - return this.observedClassDistribution.sumOfValues(); - } - - void setWeightSeenAtLastSplitEvaluation(double weight) { - this.weightSeenAtLastSplitEvaluation = weight; - } - - double getWeightSeenAtLastSplitEvaluation() { - return this.weightSeenAtLastSplitEvaluation; - } - - void requestDistributedSuggestions(long splitId, ModelAggregatorProcessor modelAggrProc) { - this.isSplitting = true; - this.suggestionCtr = 0; - this.thrownAwayInstance = 0; - - ComputeContentEvent cce = new ComputeContentEvent(splitId, this.id, - this.getObservedClassDistribution()); - modelAggrProc.sendToControlStream(cce); - } - - void addDistributedSuggestions(AttributeSplitSuggestion bestSuggestion, AttributeSplitSuggestion secondBestSuggestion) { - // starts comparing from the best suggestion - if (bestSuggestion != null) { - if ((this.bestSuggestion == null) || (bestSuggestion.compareTo(this.bestSuggestion) > 0)) { - this.secondBestSuggestion = this.bestSuggestion; - this.bestSuggestion = bestSuggestion; - - if (secondBestSuggestion != null) { - - if ((this.secondBestSuggestion == null) || (secondBestSuggestion.compareTo(this.secondBestSuggestion) > 0)) { - this.secondBestSuggestion = secondBestSuggestion; - } - } - } else { - if ((this.secondBestSuggestion == null) || (bestSuggestion.compareTo(this.secondBestSuggestion) > 0)) { - this.secondBestSuggestion = bestSuggestion; - } - } - } - - // TODO: optimize the code to use less memory - this.suggestionCtr++; - } - - boolean isSplitting() { - return this.isSplitting; - } - - void endSplitting() { - this.isSplitting = false; - logger.trace("wasted instance: {}", this.thrownAwayInstance); - this.thrownAwayInstance = 0; - } - - AttributeSplitSuggestion getDistributedBestSuggestion() { - return this.bestSuggestion; - } - - AttributeSplitSuggestion getDistributedSecondBestSuggestion() { - return this.secondBestSuggestion; - } - - boolean isAllSuggestionsCollected() { - return (this.suggestionCtr == this.parallelismHint); - } - - private static int modelAttIndexToInstanceAttIndex(int index, Instance inst) { - return inst.classIndex() > index ? index : index + 1; - } - - private String generateKey(int obsIndex) { - final int prime = 31; - int result = 1; - result = prime * result + (int) (this.id ^ (this.id >>> 32)); - result = prime * result + obsIndex; - return Integer.toString(result); - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/AttributeBatchContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/AttributeBatchContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/AttributeBatchContentEvent.java deleted file mode 100644 index 55aac58..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/AttributeBatchContentEvent.java +++ /dev/null @@ -1,135 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.trees; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.ContentEvent; -import java.util.LinkedList; -import java.util.List; - -/** - * Attribute Content Event represents the instances that split vertically based on their attribute - * - * @author Arinto Murdopo - * - */ -final class AttributeBatchContentEvent implements ContentEvent { - - private static final long serialVersionUID = 6652815649846676832L; - - private final long learningNodeId; - private final int obsIndex; - private final List<ContentEvent> contentEventList; - private final transient String key; - private final boolean isNominal; - - public AttributeBatchContentEvent() { - learningNodeId = -1; - obsIndex = -1; - contentEventList = new LinkedList<>(); - key = ""; - isNominal = true; - } - - private AttributeBatchContentEvent(Builder builder) { - this.learningNodeId = builder.learningNodeId; - this.obsIndex = builder.obsIndex; - this.contentEventList = new LinkedList<>(); - if (builder.contentEvent != null) { - this.contentEventList.add(builder.contentEvent); - } - this.isNominal = builder.isNominal; - this.key = builder.key; - } - - public void add(ContentEvent contentEvent) { - this.contentEventList.add(contentEvent); - } - - @Override - public String getKey() { - return this.key; - } - - @Override - public void setKey(String str) { - // do nothing, maybe useful when we want to reuse the object for - // serialization/deserialization purpose - } - - @Override - public boolean isLastEvent() { - return false; - } - - long getLearningNodeId() { - return this.learningNodeId; - } - - int getObsIndex() { - return this.obsIndex; - } - - public List<ContentEvent> getContentEventList() { - return this.contentEventList; - } - - boolean isNominal() { - return this.isNominal; - } - - static final class Builder { - - // required parameters - private final long learningNodeId; - private final int obsIndex; - private final String key; - - private ContentEvent contentEvent; - private boolean isNominal = false; - - Builder(long id, int obsIndex, String key) { - this.learningNodeId = id; - this.obsIndex = obsIndex; - this.key = key; - } - - private Builder(long id, int obsIndex) { - this.learningNodeId = id; - this.obsIndex = obsIndex; - this.key = ""; - } - - Builder contentEvent(ContentEvent contentEvent) { - this.contentEvent = contentEvent; - return this; - } - - Builder isNominal(boolean val) { - this.isNominal = val; - return this; - } - - AttributeBatchContentEvent build() { - return new AttributeBatchContentEvent(this); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/AttributeContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/AttributeContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/AttributeContentEvent.java deleted file mode 100644 index 090de75..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/AttributeContentEvent.java +++ /dev/null @@ -1,224 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.trees; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -import com.yahoo.labs.samoa.core.ContentEvent; - -/** - * Attribute Content Event represents the instances that split vertically based on their attribute - * - * @author Arinto Murdopo - * - */ -public final class AttributeContentEvent implements ContentEvent { - - private static final long serialVersionUID = 6652815649846676832L; - - private final long learningNodeId; - private final int obsIndex; - private final double attrVal; - private final int classVal; - private final double weight; - private final transient String key; - private final boolean isNominal; - - public AttributeContentEvent() { - learningNodeId = -1; - obsIndex = -1; - attrVal = 0.0; - classVal = -1; - weight = 0.0; - key = ""; - isNominal = true; - } - - private AttributeContentEvent(Builder builder) { - this.learningNodeId = builder.learningNodeId; - this.obsIndex = builder.obsIndex; - this.attrVal = builder.attrVal; - this.classVal = builder.classVal; - this.weight = builder.weight; - this.isNominal = builder.isNominal; - this.key = builder.key; - } - - @Override - public String getKey() { - return this.key; - } - - @Override - public void setKey(String str) { - // do nothing, maybe useful when we want to reuse the object for - // serialization/deserialization purpose - } - - @Override - public boolean isLastEvent() { - return false; - } - - long getLearningNodeId() { - return this.learningNodeId; - } - - int getObsIndex() { - return this.obsIndex; - } - - int getClassVal() { - return this.classVal; - } - - double getAttrVal() { - return this.attrVal; - } - - double getWeight() { - return this.weight; - } - - boolean isNominal() { - return this.isNominal; - } - - static final class Builder { - - // required parameters - private final long learningNodeId; - private final int obsIndex; - private final String key; - - // optional parameters - private double attrVal = 0.0; - private int classVal = 0; - private double weight = 0.0; - private boolean isNominal = false; - - Builder(long id, int obsIndex, String key) { - this.learningNodeId = id; - this.obsIndex = obsIndex; - this.key = key; - } - - private Builder(long id, int obsIndex) { - this.learningNodeId = id; - this.obsIndex = obsIndex; - this.key = ""; - } - - Builder attrValue(double val) { - this.attrVal = val; - return this; - } - - Builder classValue(int val) { - this.classVal = val; - return this; - } - - Builder weight(double val) { - this.weight = val; - return this; - } - - Builder isNominal(boolean val) { - this.isNominal = val; - return this; - } - - AttributeContentEvent build() { - return new AttributeContentEvent(this); - } - } - - /** - * The Kryo serializer class for AttributeContentEvent when executing on top of Storm. This class allow us to change - * the precision of the statistics. - * - * @author Arinto Murdopo - * - */ - public static final class AttributeCESerializer extends Serializer<AttributeContentEvent> { - - private static double PRECISION = 1000000.0; - - @Override - public void write(Kryo kryo, Output output, AttributeContentEvent event) { - output.writeLong(event.learningNodeId, true); - output.writeInt(event.obsIndex, true); - output.writeDouble(event.attrVal, PRECISION, true); - output.writeInt(event.classVal, true); - output.writeDouble(event.weight, PRECISION, true); - output.writeBoolean(event.isNominal); - } - - @Override - public AttributeContentEvent read(Kryo kryo, Input input, - Class<AttributeContentEvent> type) { - AttributeContentEvent ace = new AttributeContentEvent.Builder(input.readLong(true), input.readInt(true)) - .attrValue(input.readDouble(PRECISION, true)) - .classValue(input.readInt(true)) - .weight(input.readDouble(PRECISION, true)) - .isNominal(input.readBoolean()) - .build(); - return ace; - } - } - - /** - * The Kryo serializer class for AttributeContentEvent when executing on top of Storm with full precision of the - * statistics. - * - * @author Arinto Murdopo - * - */ - public static final class AttributeCEFullPrecSerializer extends Serializer<AttributeContentEvent> { - - @Override - public void write(Kryo kryo, Output output, AttributeContentEvent event) { - output.writeLong(event.learningNodeId, true); - output.writeInt(event.obsIndex, true); - output.writeDouble(event.attrVal); - output.writeInt(event.classVal, true); - output.writeDouble(event.weight); - output.writeBoolean(event.isNominal); - } - - @Override - public AttributeContentEvent read(Kryo kryo, Input input, - Class<AttributeContentEvent> type) { - AttributeContentEvent ace = new AttributeContentEvent.Builder(input.readLong(true), input.readInt(true)) - .attrValue(input.readDouble()) - .classValue(input.readInt(true)) - .weight(input.readDouble()) - .isNominal(input.readBoolean()) - .build(); - return ace; - } - - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ComputeContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ComputeContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ComputeContentEvent.java deleted file mode 100644 index 52591e1..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ComputeContentEvent.java +++ /dev/null @@ -1,145 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.trees; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - -/** - * Compute content event is the message that is sent by Model Aggregator Processor to request Local Statistic PI to - * start the local statistic calculation for splitting - * - * @author Arinto Murdopo - * - */ -public final class ComputeContentEvent extends ControlContentEvent { - - private static final long serialVersionUID = 5590798490073395190L; - - private final double[] preSplitDist; - private final long splitId; - - public ComputeContentEvent() { - super(-1); - preSplitDist = null; - splitId = -1; - } - - ComputeContentEvent(long splitId, long id, double[] preSplitDist) { - super(id); - // this.preSplitDist = Arrays.copyOf(preSplitDist, preSplitDist.length); - this.preSplitDist = preSplitDist; - this.splitId = splitId; - } - - @Override - LocStatControl getType() { - return LocStatControl.COMPUTE; - } - - double[] getPreSplitDist() { - return this.preSplitDist; - } - - long getSplitId() { - return this.splitId; - } - - /** - * The Kryo serializer class for ComputeContentEevent when executing on top of Storm. This class allow us to change - * the precision of the statistics. - * - * @author Arinto Murdopo - * - */ - public static final class ComputeCESerializer extends Serializer<ComputeContentEvent> { - - private static double PRECISION = 1000000.0; - - @Override - public void write(Kryo kryo, Output output, ComputeContentEvent object) { - output.writeLong(object.splitId, true); - output.writeLong(object.learningNodeId, true); - - output.writeInt(object.preSplitDist.length, true); - for (int i = 0; i < object.preSplitDist.length; i++) { - output.writeDouble(object.preSplitDist[i], PRECISION, true); - } - } - - @Override - public ComputeContentEvent read(Kryo kryo, Input input, - Class<ComputeContentEvent> type) { - long splitId = input.readLong(true); - long learningNodeId = input.readLong(true); - - int dataLength = input.readInt(true); - double[] preSplitDist = new double[dataLength]; - - for (int i = 0; i < dataLength; i++) { - preSplitDist[i] = input.readDouble(PRECISION, true); - } - - return new ComputeContentEvent(splitId, learningNodeId, preSplitDist); - } - } - - /** - * The Kryo serializer class for ComputeContentEevent when executing on top of Storm with full precision of the - * statistics. - * - * @author Arinto Murdopo - * - */ - public static final class ComputeCEFullPrecSerializer extends Serializer<ComputeContentEvent> { - - @Override - public void write(Kryo kryo, Output output, ComputeContentEvent object) { - output.writeLong(object.splitId, true); - output.writeLong(object.learningNodeId, true); - - output.writeInt(object.preSplitDist.length, true); - for (int i = 0; i < object.preSplitDist.length; i++) { - output.writeDouble(object.preSplitDist[i]); - } - } - - @Override - public ComputeContentEvent read(Kryo kryo, Input input, - Class<ComputeContentEvent> type) { - long splitId = input.readLong(true); - long learningNodeId = input.readLong(true); - - int dataLength = input.readInt(true); - double[] preSplitDist = new double[dataLength]; - - for (int i = 0; i < dataLength; i++) { - preSplitDist[i] = input.readDouble(); - } - - return new ComputeContentEvent(splitId, learningNodeId, preSplitDist); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ControlContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ControlContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ControlContentEvent.java deleted file mode 100644 index a7f01ee..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ControlContentEvent.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.trees; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.core.ContentEvent; - -/** - * Abstract class to represent ContentEvent to control Local Statistic Processor. - * - * @author Arinto Murdopo - * - */ -abstract class ControlContentEvent implements ContentEvent { - - /** - * - */ - private static final long serialVersionUID = 5837375639629708363L; - - protected final long learningNodeId; - - public ControlContentEvent() { - this.learningNodeId = -1; - } - - ControlContentEvent(long id) { - this.learningNodeId = id; - } - - @Override - public final String getKey() { - return null; - } - - @Override - public void setKey(String str) { - // Do nothing - } - - @Override - public boolean isLastEvent() { - return false; - } - - final long getLearningNodeId() { - return this.learningNodeId; - } - - abstract LocStatControl getType(); - - static enum LocStatControl { - COMPUTE, DELETE - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/DeleteContentEvent.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/DeleteContentEvent.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/DeleteContentEvent.java deleted file mode 100644 index bbdc39d..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/DeleteContentEvent.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.trees; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * Delete Content Event is the content event that is sent by Model Aggregator Processor to delete unnecessary statistic - * in Local Statistic Processor. - * - * @author Arinto Murdopo - * - */ -final class DeleteContentEvent extends ControlContentEvent { - - private static final long serialVersionUID = -2105250722560863633L; - - public DeleteContentEvent() { - super(-1); - } - - DeleteContentEvent(long id) { - super(id); - } - - @Override - LocStatControl getType() { - return LocStatControl.DELETE; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FilterProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FilterProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FilterProcessor.java deleted file mode 100644 index 179ccb4..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FilterProcessor.java +++ /dev/null @@ -1,191 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.trees; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.learners.InstanceContentEvent; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.instances.Instance; -import com.yahoo.labs.samoa.learners.ResultContentEvent; -import com.yahoo.labs.samoa.instances.Instances; -import com.yahoo.labs.samoa.instances.InstancesHeader; -import com.yahoo.labs.samoa.learners.InstancesContentEvent; -import com.yahoo.labs.samoa.topology.Stream; -import java.util.LinkedList; -import java.util.List; - -/** - * Filter Processor that stores and filters the instances before sending them to the Model Aggregator Processor. - * - * @author Arinto Murdopo - * - */ -final class FilterProcessor implements Processor { - - private static final long serialVersionUID = -1685875718300564885L; - private static final Logger logger = LoggerFactory.getLogger(FilterProcessor.class); - - private int processorId; - - private final Instances dataset; - private InstancesHeader modelContext; - - // available streams - private Stream outputStream; - - // private constructor based on Builder pattern - private FilterProcessor(Builder builder) { - this.dataset = builder.dataset; - this.batchSize = builder.batchSize; - this.delay = builder.delay; - } - - private int waitingInstances = 0; - - private int delay = 0; - - private int batchSize = 200; - - private List<InstanceContentEvent> contentEventList = new LinkedList<InstanceContentEvent>(); - - @Override - public boolean process(ContentEvent event) { - // Receive a new instance from source - if (event instanceof InstanceContentEvent) { - InstanceContentEvent instanceContentEvent = (InstanceContentEvent) event; - this.contentEventList.add(instanceContentEvent); - this.waitingInstances++; - if (this.waitingInstances == this.batchSize || instanceContentEvent.isLastEvent()) { - // Send Instances - InstancesContentEvent outputEvent = new InstancesContentEvent(instanceContentEvent); - boolean isLastEvent = false; - while (!this.contentEventList.isEmpty()) { - InstanceContentEvent ice = this.contentEventList.remove(0); - Instance inst = ice.getInstance(); - outputEvent.add(inst); - if (!isLastEvent) { - isLastEvent = ice.isLastEvent(); - } - } - outputEvent.setLast(isLastEvent); - this.waitingInstances = 0; - this.outputStream.put(outputEvent); - if (this.delay > 0) { - try { - Thread.sleep(this.delay); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - } - } - } - return false; - } - - @Override - public void onCreate(int id) { - this.processorId = id; - this.waitingInstances = 0; - - } - - @Override - public Processor newProcessor(Processor p) { - FilterProcessor oldProcessor = (FilterProcessor) p; - FilterProcessor newProcessor = - new FilterProcessor.Builder(oldProcessor).build(); - - newProcessor.setOutputStream(oldProcessor.outputStream); - return newProcessor; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(super.toString()); - return sb.toString(); - } - - void setOutputStream(Stream outputStream) { - this.outputStream = outputStream; - } - - /** - * Helper method to generate new ResultContentEvent based on an instance and its prediction result. - * - * @param prediction - * The predicted class label from the decision tree model. - * @param inEvent - * The associated instance content event - * @return ResultContentEvent to be sent into Evaluator PI or other destination PI. - */ - private ResultContentEvent newResultContentEvent(double[] prediction, InstanceContentEvent inEvent) { - ResultContentEvent rce = new ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(), - inEvent.getClassId(), prediction, inEvent.isLastEvent()); - rce.setClassifierIndex(this.processorId); - rce.setEvaluationIndex(inEvent.getEvaluationIndex()); - return rce; - } - - /** - * Builder class to replace constructors with many parameters - * - * @author Arinto Murdopo - * - */ - static class Builder { - - // required parameters - private final Instances dataset; - - private int delay = 0; - - private int batchSize = 200; - - Builder(Instances dataset) { - this.dataset = dataset; - } - - Builder(FilterProcessor oldProcessor) { - this.dataset = oldProcessor.dataset; - this.delay = oldProcessor.delay; - this.batchSize = oldProcessor.batchSize; - } - - public Builder delay(int delay) { - this.delay = delay; - return this; - } - - public Builder batchSize(int val) { - this.batchSize = val; - return this; - } - - FilterProcessor build() { - return new FilterProcessor(this); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FoundNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FoundNode.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FoundNode.java deleted file mode 100644 index 575aaaf..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/FoundNode.java +++ /dev/null @@ -1,77 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.trees; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * Class that represents the necessary data structure of the node where an instance is routed/filtered through the - * decision tree model. - * - * @author Arinto Murdopo - * - */ -final class FoundNode implements java.io.Serializable { - - /** - * - */ - private static final long serialVersionUID = -637695387934143293L; - - private final Node node; - private final SplitNode parent; - private final int parentBranch; - - FoundNode(Node node, SplitNode splitNode, int parentBranch) { - this.node = node; - this.parent = splitNode; - this.parentBranch = parentBranch; - } - - /** - * Method to get the node where an instance is routed/filtered through the decision tree model for testing and - * training. - * - * @return The node where the instance is routed/filtered - */ - Node getNode() { - return this.node; - } - - /** - * Method to get the parent of the node where an instance is routed/filtered through the decision tree model for - * testing and training - * - * @return The parent of the node - */ - SplitNode getParent() { - return this.parent; - } - - /** - * Method to get the index of the node (where an instance is routed/filtered through the decision tree model for - * testing and training) in its parent. - * - * @return The index of the node in its parent node. - */ - int getParentBranch() { - return this.parentBranch; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/InactiveLearningNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/InactiveLearningNode.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/InactiveLearningNode.java deleted file mode 100644 index f02383d..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/InactiveLearningNode.java +++ /dev/null @@ -1,54 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.trees; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.instances.Instance; - -/** - * Class that represents inactive learning node. Inactive learning node is a node which only keeps track of the observed - * class distribution. It does not store the statistic for splitting the node. - * - * @author Arinto Murdopo - * - */ -final class InactiveLearningNode extends LearningNode { - - /** - * - */ - private static final long serialVersionUID = -814552382883472302L; - - InactiveLearningNode(double[] initialClassObservation) { - super(initialClassObservation); - } - - @Override - void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) { - this.observedClassDistribution.addToValue( - (int) inst.classValue(), inst.weight()); - } - - @Override - double[] getClassVotes(Instance inst, ModelAggregatorProcessor map) { - return this.observedClassDistribution.getArrayCopy(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LearningNode.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LearningNode.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LearningNode.java deleted file mode 100644 index 54dd4f3..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LearningNode.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.yahoo.labs.samoa.learners.classifiers.trees; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.instances.Instance; - -/** - * Abstract class that represents a learning node - * - * @author Arinto Murdopo - * - */ -abstract class LearningNode extends Node { - - private static final long serialVersionUID = 7157319356146764960L; - - protected LearningNode(double[] classObservation) { - super(classObservation); - } - - /** - * Method to process the instance for learning - * - * @param inst - * The processed instance - * @param proc - * The model aggregator processor where this learning node exists - */ - abstract void learnFromInstance(Instance inst, ModelAggregatorProcessor proc); - - @Override - protected boolean isLeaf() { - return true; - } - - @Override - protected FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, - int parentBranch) { - return new FoundNode(this, parent, parentBranch); - } -}
