http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java index 7e9cb4a..915d09b 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java @@ -41,229 +41,229 @@ import com.yahoo.labs.samoa.moa.core.GaussianEstimator; */ public class NaiveBayes implements LocalLearner { - /** - * Default smoothing factor. For now fixed to 1E-20. - */ - private static final double ADDITIVE_SMOOTHING_FACTOR = 1e-20; + /** + * Default smoothing factor. For now fixed to 1E-20. + */ + private static final double ADDITIVE_SMOOTHING_FACTOR = 1e-20; - /** - * serialVersionUID for serialization - */ - private static final long serialVersionUID = 1325775209672996822L; + /** + * serialVersionUID for serialization + */ + private static final long serialVersionUID = 1325775209672996822L; - /** - * Instance of a logger for use in this class. - */ - private static final Logger logger = LoggerFactory.getLogger(NaiveBayes.class); + /** + * Instance of a logger for use in this class. + */ + private static final Logger logger = LoggerFactory.getLogger(NaiveBayes.class); - /** - * The actual model. - */ - protected Map<Integer, GaussianNumericAttributeClassObserver> attributeObservers; + /** + * The actual model. + */ + protected Map<Integer, GaussianNumericAttributeClassObserver> attributeObservers; - /** - * Class statistics - */ - protected Map<Integer, Double> classInstances; + /** + * Class statistics + */ + protected Map<Integer, Double> classInstances; - /** - * Class zero-prototypes. - */ - protected Map<Integer, Double> classPrototypes; - - /** - * Retrieve the number of classes currently known to this local model - * - * @return the number of classes currently known to this local model - */ - protected int getNumberOfClasses() { - return this.classInstances.size(); - } + /** + * Class zero-prototypes. + */ + protected Map<Integer, Double> classPrototypes; - /** - * Track training instances seen. - */ - protected long instancesSeen = 0L; + /** + * Retrieve the number of classes currently known to this local model + * + * @return the number of classes currently known to this local model + */ + protected int getNumberOfClasses() { + return this.classInstances.size(); + } - /** - * Explicit no-arg constructor. - */ - public NaiveBayes() { - // Init the model - resetLearning(); - } + /** + * Track training instances seen. + */ + protected long instancesSeen = 0L; - /** - * Create an instance of this LocalLearner implementation. - */ - @Override - public LocalLearner create() { - return new NaiveBayes(); - } + /** + * Explicit no-arg constructor. + */ + public NaiveBayes() { + // Init the model + resetLearning(); + } - /** - * Predicts the class memberships for a given instance. If an instance is - * unclassified, the returned array elements will be all zero. - * - * Smoothing is being implemented by the AttributeClassObserver classes. At - * the moment, the GaussianNumericProbabilityAttributeClassObserver needs no - * smoothing as it processes continuous variables. - * - * Please note that we transform the scores to log space to avoid underflow, - * and we replace the multiplication with addition. - * - * The resulting scores are no longer probabilities, as a mixture of - * probability densities and probabilities can be used in the computation. - * - * @param inst - * the instance to be classified - * @return an array containing the estimated membership scores of the test - * instance in each class, in log space. - */ - @Override - public double[] getVotesForInstance(Instance inst) { - // Prepare the results array - double[] votes = new double[getNumberOfClasses()]; - // Over all classes - for (int classIndex = 0; classIndex < votes.length; classIndex++) { - // Get the prior for this class - votes[classIndex] = Math.log(getPrior(classIndex)); - // Iterate over the instance attributes - for (int index = 0; index < inst.numAttributes(); index++) { - int attributeID = inst.index(index); - // Skip class attribute - if (attributeID == inst.classIndex()) - continue; - Double value = inst.value(attributeID); - // Get the observer for the given attribute - GaussianNumericAttributeClassObserver obs = attributeObservers.get(attributeID); - // Init the estimator to null by default - GaussianEstimator estimator = null; - if (obs != null && obs.getEstimator(classIndex) != null) { - // Get the estimator - estimator = obs.getEstimator(classIndex); - } - double valueNonZero; - // The null case should be handled by smoothing! - if (estimator != null) { - // Get the score for a NON-ZERO attribute value - valueNonZero = estimator.probabilityDensity(value); - } - // We don't have an estimator - else { - // Assign a very small probability that we do see this value - valueNonZero = ADDITIVE_SMOOTHING_FACTOR; - } - votes[classIndex] += Math.log(valueNonZero); // - Math.log(valueZero); - } - // Check for null in the case of prequential evaluation - if (this.classPrototypes.get(classIndex) != null) { - // Add the prototype for the class, already in log space - votes[classIndex] += Math.log(this.classPrototypes.get(classIndex)); - } - } - return votes; - } - - /** - * Compute the prior for the given classIndex. - * - * Implemented by maximum likelihood at the moment. - * - * @param classIndex - * Id of the class for which we want to compute the prior. - * @return Prior probability for the requested class - */ - private double getPrior(int classIndex) { - // Maximum likelihood - Double currentCount = this.classInstances.get(classIndex); - if (currentCount == null || currentCount == 0) - return 0; - else - return currentCount * 1. / this.instancesSeen; - } + /** + * Create an instance of this LocalLearner implementation. + */ + @Override + public LocalLearner create() { + return new NaiveBayes(); + } - /** - * Resets this classifier. It must be similar to starting a new classifier - * from scratch. - */ - @Override - public void resetLearning() { - // Reset priors - this.instancesSeen = 0L; - this.classInstances = new HashMap<>(); - this.classPrototypes = new HashMap<>(); - // Init the attribute observers - this.attributeObservers = new HashMap<>(); - } + /** + * Predicts the class memberships for a given instance. If an instance is + * unclassified, the returned array elements will be all zero. + * + * Smoothing is being implemented by the AttributeClassObserver classes. At + * the moment, the GaussianNumericProbabilityAttributeClassObserver needs no + * smoothing as it processes continuous variables. + * + * Please note that we transform the scores to log space to avoid underflow, + * and we replace the multiplication with addition. + * + * The resulting scores are no longer probabilities, as a mixture of + * probability densities and probabilities can be used in the computation. + * + * @param inst + * the instance to be classified + * @return an array containing the estimated membership scores of the test + * instance in each class, in log space. + */ + @Override + public double[] getVotesForInstance(Instance inst) { + // Prepare the results array + double[] votes = new double[getNumberOfClasses()]; + // Over all classes + for (int classIndex = 0; classIndex < votes.length; classIndex++) { + // Get the prior for this class + votes[classIndex] = Math.log(getPrior(classIndex)); + // Iterate over the instance attributes + for (int index = 0; index < inst.numAttributes(); index++) { + int attributeID = inst.index(index); + // Skip class attribute + if (attributeID == inst.classIndex()) + continue; + Double value = inst.value(attributeID); + // Get the observer for the given attribute + GaussianNumericAttributeClassObserver obs = attributeObservers.get(attributeID); + // Init the estimator to null by default + GaussianEstimator estimator = null; + if (obs != null && obs.getEstimator(classIndex) != null) { + // Get the estimator + estimator = obs.getEstimator(classIndex); + } + double valueNonZero; + // The null case should be handled by smoothing! + if (estimator != null) { + // Get the score for a NON-ZERO attribute value + valueNonZero = estimator.probabilityDensity(value); + } + // We don't have an estimator + else { + // Assign a very small probability that we do see this value + valueNonZero = ADDITIVE_SMOOTHING_FACTOR; + } + votes[classIndex] += Math.log(valueNonZero); // - Math.log(valueZero); + } + // Check for null in the case of prequential evaluation + if (this.classPrototypes.get(classIndex) != null) { + // Add the prototype for the class, already in log space + votes[classIndex] += Math.log(this.classPrototypes.get(classIndex)); + } + } + return votes; + } - /** - * Trains this classifier incrementally using the given instance. - * - * @param inst - * the instance to be used for training - */ - @Override - public void trainOnInstance(Instance inst) { - // Update class statistics with weights - int classIndex = (int) inst.classValue(); - Double weight = this.classInstances.get(classIndex); - if (weight == null) - weight = 0.; - this.classInstances.put(classIndex, weight + inst.weight()); - - // Get the class prototype - Double classPrototype = this.classPrototypes.get(classIndex); - if (classPrototype == null) - classPrototype = 1.; - - // Iterate over the attributes of the given instance - for (int attributePosition = 0; attributePosition < inst - .numAttributes(); attributePosition++) { - // Get the attribute index - Dense -> 1:1, Sparse is remapped - int attributeID = inst.index(attributePosition); - // Skip class attribute - if (attributeID == inst.classIndex()) - continue; - // Get the attribute observer for the current attribute - GaussianNumericAttributeClassObserver obs = this.attributeObservers - .get(attributeID); - // Lazy init of observers, if null, instantiate a new one - if (obs == null) { - // FIXME: At this point, we model everything as a numeric - // attribute - obs = new GaussianNumericAttributeClassObserver(); - this.attributeObservers.put(attributeID, obs); - } - - // Get the probability density function under the current model - GaussianEstimator obs_estimator = obs.getEstimator(classIndex); - if (obs_estimator != null) { - // Fetch the probability that the feature value is zero - double probDens_zero_current = obs_estimator.probabilityDensity(0); - classPrototype -= probDens_zero_current; - } - - // FIXME: Sanity check on data values, for now just learn - // Learn attribute value for given class - obs.observeAttributeClass(inst.valueSparse(attributePosition), - (int) inst.classValue(), inst.weight()); - - // Update obs_estimator to fetch the pdf from the updated model - obs_estimator = obs.getEstimator(classIndex); - // Fetch the probability that the feature value is zero - double probDens_zero_updated = obs_estimator.probabilityDensity(0); - // Update the class prototype - classPrototype += probDens_zero_updated; - } - // Store the class prototype - this.classPrototypes.put(classIndex, classPrototype); - // Count another training instance - this.instancesSeen++; - } + /** + * Compute the prior for the given classIndex. + * + * Implemented by maximum likelihood at the moment. + * + * @param classIndex + * Id of the class for which we want to compute the prior. + * @return Prior probability for the requested class + */ + private double getPrior(int classIndex) { + // Maximum likelihood + Double currentCount = this.classInstances.get(classIndex); + if (currentCount == null || currentCount == 0) + return 0; + else + return currentCount * 1. / this.instancesSeen; + } - @Override - public void setDataset(Instances dataset) { - // Do nothing - } + /** + * Resets this classifier. It must be similar to starting a new classifier + * from scratch. + */ + @Override + public void resetLearning() { + // Reset priors + this.instancesSeen = 0L; + this.classInstances = new HashMap<>(); + this.classPrototypes = new HashMap<>(); + // Init the attribute observers + this.attributeObservers = new HashMap<>(); + } + + /** + * Trains this classifier incrementally using the given instance. + * + * @param inst + * the instance to be used for training + */ + @Override + public void trainOnInstance(Instance inst) { + // Update class statistics with weights + int classIndex = (int) inst.classValue(); + Double weight = this.classInstances.get(classIndex); + if (weight == null) + weight = 0.; + this.classInstances.put(classIndex, weight + inst.weight()); + + // Get the class prototype + Double classPrototype = this.classPrototypes.get(classIndex); + if (classPrototype == null) + classPrototype = 1.; + + // Iterate over the attributes of the given instance + for (int attributePosition = 0; attributePosition < inst + .numAttributes(); attributePosition++) { + // Get the attribute index - Dense -> 1:1, Sparse is remapped + int attributeID = inst.index(attributePosition); + // Skip class attribute + if (attributeID == inst.classIndex()) + continue; + // Get the attribute observer for the current attribute + GaussianNumericAttributeClassObserver obs = this.attributeObservers + .get(attributeID); + // Lazy init of observers, if null, instantiate a new one + if (obs == null) { + // FIXME: At this point, we model everything as a numeric + // attribute + obs = new GaussianNumericAttributeClassObserver(); + this.attributeObservers.put(attributeID, obs); + } + + // Get the probability density function under the current model + GaussianEstimator obs_estimator = obs.getEstimator(classIndex); + if (obs_estimator != null) { + // Fetch the probability that the feature value is zero + double probDens_zero_current = obs_estimator.probabilityDensity(0); + classPrototype -= probDens_zero_current; + } + + // FIXME: Sanity check on data values, for now just learn + // Learn attribute value for given class + obs.observeAttributeClass(inst.valueSparse(attributePosition), + (int) inst.classValue(), inst.weight()); + + // Update obs_estimator to fetch the pdf from the updated model + obs_estimator = obs.getEstimator(classIndex); + // Fetch the probability that the feature value is zero + double probDens_zero_updated = obs_estimator.probabilityDensity(0); + // Update the class prototype + classPrototype += probDens_zero_updated; + } + // Store the class prototype + this.classPrototypes.put(classIndex, classPrototype); + // Count another training instance + this.instancesSeen++; + } + + @Override + public void setDataset(Instances dataset) { + // Do nothing + } }
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java index a3fb89f..75c5284 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java @@ -30,121 +30,125 @@ import com.yahoo.labs.samoa.instances.InstancesHeader; import com.yahoo.labs.samoa.moa.classifiers.functions.MajorityClass; /** - * + * * Base class for adapting external classifiers. - * + * */ public class SimpleClassifierAdapter implements LocalLearner, Configurable { - /** + /** * */ - private static final long serialVersionUID = 4372366401338704353L; - - public ClassOption learnerOption = new ClassOption("learner", 'l', - "Classifier to train.", com.yahoo.labs.samoa.moa.classifiers.Classifier.class, MajorityClass.class.getName()); - /** - * The learner. - */ - protected com.yahoo.labs.samoa.moa.classifiers.Classifier learner; - - /** - * The is init. - */ - protected Boolean isInit; - - /** - * The dataset. - */ - protected Instances dataset; + private static final long serialVersionUID = 4372366401338704353L; - @Override - public void setDataset(Instances dataset) { - this.dataset = dataset; - } + public ClassOption learnerOption = new ClassOption("learner", 'l', + "Classifier to train.", com.yahoo.labs.samoa.moa.classifiers.Classifier.class, MajorityClass.class.getName()); + /** + * The learner. + */ + protected com.yahoo.labs.samoa.moa.classifiers.Classifier learner; - /** - * Instantiates a new learner. - * - * @param learner the learner - * @param dataset the dataset - */ - public SimpleClassifierAdapter(com.yahoo.labs.samoa.moa.classifiers.Classifier learner, Instances dataset) { - this.learner = learner.copy(); - this.isInit = false; - this.dataset = dataset; - } + /** + * The is init. + */ + protected Boolean isInit; - /** - * Instantiates a new learner. - * - */ - public SimpleClassifierAdapter() { - this.learner = ((com.yahoo.labs.samoa.moa.classifiers.Classifier) this.learnerOption.getValue()).copy(); - this.isInit = false; - } + /** + * The dataset. + */ + protected Instances dataset; - /** - * Creates a new learner object. - * - * @return the learner - */ - @Override - public SimpleClassifierAdapter create() { - SimpleClassifierAdapter l = new SimpleClassifierAdapter(learner, dataset); - if (dataset == null) { - System.out.println("dataset null while creating"); - } - return l; - } + @Override + public void setDataset(Instances dataset) { + this.dataset = dataset; + } - /** - * Trains this classifier incrementally using the given instance. - * - * @param inst the instance to be used for training - */ - @Override - public void trainOnInstance(Instance inst) { - if (!this.isInit) { - this.isInit = true; - InstancesHeader instances = new InstancesHeader(dataset); - this.learner.setModelContext(instances); - this.learner.prepareForUse(); - } - if (inst.weight() > 0) { - inst.setDataset(dataset); - learner.trainOnInstance(inst); - } + /** + * Instantiates a new learner. + * + * @param learner + * the learner + * @param dataset + * the dataset + */ + public SimpleClassifierAdapter(com.yahoo.labs.samoa.moa.classifiers.Classifier learner, Instances dataset) { + this.learner = learner.copy(); + this.isInit = false; + this.dataset = dataset; + } + + /** + * Instantiates a new learner. + * + */ + public SimpleClassifierAdapter() { + this.learner = ((com.yahoo.labs.samoa.moa.classifiers.Classifier) this.learnerOption.getValue()).copy(); + this.isInit = false; + } + + /** + * Creates a new learner object. + * + * @return the learner + */ + @Override + public SimpleClassifierAdapter create() { + SimpleClassifierAdapter l = new SimpleClassifierAdapter(learner, dataset); + if (dataset == null) { + System.out.println("dataset null while creating"); } + return l; + } - /** - * Predicts the class memberships for a given instance. If an instance is - * unclassified, the returned array elements must be all zero. - * - * @param inst the instance to be classified - * @return an array containing the estimated membership probabilities of the - * test instance in each class - */ - @Override - public double[] getVotesForInstance(Instance inst) { - double[] ret; - inst.setDataset(dataset); - if (!this.isInit) { - ret = new double[dataset.numClasses()]; - } else { - ret = learner.getVotesForInstance(inst); - } - return ret; + /** + * Trains this classifier incrementally using the given instance. + * + * @param inst + * the instance to be used for training + */ + @Override + public void trainOnInstance(Instance inst) { + if (!this.isInit) { + this.isInit = true; + InstancesHeader instances = new InstancesHeader(dataset); + this.learner.setModelContext(instances); + this.learner.prepareForUse(); + } + if (inst.weight() > 0) { + inst.setDataset(dataset); + learner.trainOnInstance(inst); } + } - /** - * Resets this classifier. It must be similar to starting a new classifier - * from scratch. - * - */ - @Override - public void resetLearning() { - learner.resetLearning(); + /** + * Predicts the class memberships for a given instance. If an instance is + * unclassified, the returned array elements must be all zero. + * + * @param inst + * the instance to be classified + * @return an array containing the estimated membership probabilities of the + * test instance in each class + */ + @Override + public double[] getVotesForInstance(Instance inst) { + double[] ret; + inst.setDataset(dataset); + if (!this.isInit) { + ret = new double[dataset.numClasses()]; + } else { + ret = learner.getVotesForInstance(inst); } + return ret; + } + + /** + * Resets this classifier. It must be similar to starting a new classifier + * from scratch. + * + */ + @Override + public void resetLearning() { + learner.resetLearning(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java index affc935..46352e0 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java @@ -36,6 +36,7 @@ import com.yahoo.labs.samoa.learners.Learner; import com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ChangeDetector; import com.yahoo.labs.samoa.topology.Stream; import com.yahoo.labs.samoa.topology.TopologyBuilder; + /** * * Classifier that contain a single classifier. @@ -43,67 +44,67 @@ import com.yahoo.labs.samoa.topology.TopologyBuilder; */ public final class SingleClassifier implements Learner, AdaptiveLearner, Configurable { - private static final long serialVersionUID = 684111382631697031L; - - private LocalLearnerProcessor learnerP; - - private Stream resultStream; - - private Instances dataset; - - public ClassOption learnerOption = new ClassOption("learner", 'l', - "Classifier to train.", LocalLearner.class, SimpleClassifierAdapter.class.getName()); - - private TopologyBuilder builder; - - private int parallelism; - - - @Override - public void init(TopologyBuilder builder, Instances dataset, int parallelism){ - this.builder = builder; - this.dataset = dataset; - this.parallelism = parallelism; - this.setLayout(); - } - - - protected void setLayout() { - learnerP = new LocalLearnerProcessor(); - learnerP.setChangeDetector(this.getChangeDetector()); - LocalLearner learner = this.learnerOption.getValue(); - learner.setDataset(this.dataset); - learnerP.setLearner(learner); - - //learnerPI = this.builder.createPi(learnerP, 1); - this.builder.addProcessor(learnerP, parallelism); - resultStream = this.builder.createStream(learnerP); - - learnerP.setOutputStream(resultStream); - } - - @Override - public Processor getInputProcessor() { - return learnerP; - } - - /* (non-Javadoc) - * @see samoa.learners.Learner#getResultStreams() - */ - @Override - public Set<Stream> getResultStreams() { - return ImmutableSet.of(this.resultStream); - } - - protected ChangeDetector changeDetector; - - @Override - public ChangeDetector getChangeDetector() { - return this.changeDetector; - } - - @Override - public void setChangeDetector(ChangeDetector cd) { - this.changeDetector = cd; - } + private static final long serialVersionUID = 684111382631697031L; + + private LocalLearnerProcessor learnerP; + + private Stream resultStream; + + private Instances dataset; + + public ClassOption learnerOption = new ClassOption("learner", 'l', + "Classifier to train.", LocalLearner.class, SimpleClassifierAdapter.class.getName()); + + private TopologyBuilder builder; + + private int parallelism; + + @Override + public void init(TopologyBuilder builder, Instances dataset, int parallelism) { + this.builder = builder; + this.dataset = dataset; + this.parallelism = parallelism; + this.setLayout(); + } + + protected void setLayout() { + learnerP = new LocalLearnerProcessor(); + learnerP.setChangeDetector(this.getChangeDetector()); + LocalLearner learner = this.learnerOption.getValue(); + learner.setDataset(this.dataset); + learnerP.setLearner(learner); + + // learnerPI = this.builder.createPi(learnerP, 1); + this.builder.addProcessor(learnerP, parallelism); + resultStream = this.builder.createStream(learnerP); + + learnerP.setOutputStream(resultStream); + } + + @Override + public Processor getInputProcessor() { + return learnerP; + } + + /* + * (non-Javadoc) + * + * @see samoa.learners.Learner#getResultStreams() + */ + @Override + public Set<Stream> getResultStreams() { + return ImmutableSet.of(this.resultStream); + } + + protected ChangeDetector changeDetector; + + @Override + public ChangeDetector getChangeDetector() { + return this.changeDetector; + } + + @Override + public void setChangeDetector(ChangeDetector cd) { + this.changeDetector = cd; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java index aba3d1d..3bdea57 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java @@ -47,103 +47,105 @@ import com.yahoo.labs.samoa.topology.TopologyBuilder; * The Bagging Classifier by Oza and Russell. */ public class AdaptiveBagging implements Learner, Configurable { - - /** Logger */ + + /** Logger */ private static final Logger logger = LoggerFactory.getLogger(AdaptiveBagging.class); - /** The Constant serialVersionUID. */ - private static final long serialVersionUID = -2971850264864952099L; - - /** The base learner option. */ - public ClassOption baseLearnerOption = new ClassOption("baseLearner", 'l', - "Classifier to train.", Learner.class, VerticalHoeffdingTree.class.getName()); + /** The Constant serialVersionUID. */ + private static final long serialVersionUID = -2971850264864952099L; + + /** The base learner option. */ + public ClassOption baseLearnerOption = new ClassOption("baseLearner", 'l', + "Classifier to train.", Learner.class, VerticalHoeffdingTree.class.getName()); - /** The ensemble size option. */ - public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's', - "The number of models in the bag.", 10, 1, Integer.MAX_VALUE); + /** The ensemble size option. */ + public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's', + "The number of models in the bag.", 10, 1, Integer.MAX_VALUE); - public ClassOption driftDetectionMethodOption = new ClassOption("driftDetectionMethod", 'd', + public ClassOption driftDetectionMethodOption = new ClassOption("driftDetectionMethod", 'd', "Drift detection method to use.", ChangeDetector.class, ADWINChangeDetector.class.getName()); - /** The distributor processor. */ - private BaggingDistributorProcessor distributorP; - - /** The result stream. */ - protected Stream resultStream; - - /** The dataset. */ - private Instances dataset; - - protected Learner classifier; - + /** The distributor processor. */ + private BaggingDistributorProcessor distributorP; + + /** The result stream. */ + protected Stream resultStream; + + /** The dataset. */ + private Instances dataset; + + protected Learner classifier; + protected int parallelism; - /** - * Sets the layout. - */ - protected void setLayout() { - - int sizeEnsemble = this.ensembleSizeOption.getValue(); - - distributorP = new BaggingDistributorProcessor(); - distributorP.setSizeEnsemble(sizeEnsemble); - this.builder.addProcessor(distributorP, 1); - - //instantiate classifier - classifier = this.baseLearnerOption.getValue(); - if (classifier instanceof AdaptiveLearner) { - // logger.info("Building an AdaptiveLearner {}", classifier.getClass().getName()); - AdaptiveLearner ada = (AdaptiveLearner) classifier; - ada.setChangeDetector((ChangeDetector) this.driftDetectionMethodOption.getValue()); - } - classifier.init(builder, this.dataset, sizeEnsemble); - - PredictionCombinerProcessor predictionCombinerP= new PredictionCombinerProcessor(); - predictionCombinerP.setSizeEnsemble(sizeEnsemble); - this.builder.addProcessor(predictionCombinerP, 1); - - //Streams - resultStream = this.builder.createStream(predictionCombinerP); - predictionCombinerP.setOutputStream(resultStream); - - for (Stream subResultStream:classifier.getResultStreams()) { - this.builder.connectInputKeyStream(subResultStream, predictionCombinerP); - } - - /* The training stream. */ - Stream testingStream = this.builder.createStream(distributorP); - this.builder.connectInputKeyStream(testingStream, classifier.getInputProcessor()); - - /* The prediction stream. */ - Stream predictionStream = this.builder.createStream(distributorP); - this.builder.connectInputKeyStream(predictionStream, classifier.getInputProcessor()); - - distributorP.setOutputStream(testingStream); - distributorP.setPredictionStream(predictionStream); - } - - /** The builder. */ - private TopologyBuilder builder; - - - @Override - public void init(TopologyBuilder builder, Instances dataset, int parallelism) { - this.builder = builder; - this.dataset = dataset; - this.parallelism = parallelism; - this.setLayout(); - } - - @Override - public Processor getInputProcessor() { - return distributorP; - } - - /* (non-Javadoc) - * @see samoa.learners.Learner#getResultStreams() - */ - @Override - public Set<Stream> getResultStreams() { - return ImmutableSet.of(this.resultStream); - } + /** + * Sets the layout. + */ + protected void setLayout() { + + int sizeEnsemble = this.ensembleSizeOption.getValue(); + + distributorP = new BaggingDistributorProcessor(); + distributorP.setSizeEnsemble(sizeEnsemble); + this.builder.addProcessor(distributorP, 1); + + // instantiate classifier + classifier = this.baseLearnerOption.getValue(); + if (classifier instanceof AdaptiveLearner) { + // logger.info("Building an AdaptiveLearner {}", + // classifier.getClass().getName()); + AdaptiveLearner ada = (AdaptiveLearner) classifier; + ada.setChangeDetector((ChangeDetector) this.driftDetectionMethodOption.getValue()); + } + classifier.init(builder, this.dataset, sizeEnsemble); + + PredictionCombinerProcessor predictionCombinerP = new PredictionCombinerProcessor(); + predictionCombinerP.setSizeEnsemble(sizeEnsemble); + this.builder.addProcessor(predictionCombinerP, 1); + + // Streams + resultStream = this.builder.createStream(predictionCombinerP); + predictionCombinerP.setOutputStream(resultStream); + + for (Stream subResultStream : classifier.getResultStreams()) { + this.builder.connectInputKeyStream(subResultStream, predictionCombinerP); + } + + /* The training stream. */ + Stream testingStream = this.builder.createStream(distributorP); + this.builder.connectInputKeyStream(testingStream, classifier.getInputProcessor()); + + /* The prediction stream. */ + Stream predictionStream = this.builder.createStream(distributorP); + this.builder.connectInputKeyStream(predictionStream, classifier.getInputProcessor()); + + distributorP.setOutputStream(testingStream); + distributorP.setPredictionStream(predictionStream); + } + + /** The builder. */ + private TopologyBuilder builder; + + @Override + public void init(TopologyBuilder builder, Instances dataset, int parallelism) { + this.builder = builder; + this.dataset = dataset; + this.parallelism = parallelism; + this.setLayout(); + } + + @Override + public Processor getInputProcessor() { + return distributorP; + } + + /* + * (non-Javadoc) + * + * @see samoa.learners.Learner#getResultStreams() + */ + @Override + public Set<Stream> getResultStreams() { + return ImmutableSet.of(this.resultStream); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java index 9f99ff1..3a78933 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java @@ -40,99 +40,99 @@ import com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree; /** * The Bagging Classifier by Oza and Russell. */ -public class Bagging implements Learner , Configurable { - - /** The Constant serialVersionUID. */ - private static final long serialVersionUID = -2971850264864952099L; - - /** The base learner option. */ - public ClassOption baseLearnerOption = new ClassOption("baseLearner", 'l', - "Classifier to train.", Learner.class, VerticalHoeffdingTree.class.getName()); - - - /** The ensemble size option. */ - public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's', - "The number of models in the bag.", 10, 1, Integer.MAX_VALUE); - - /** The distributor processor. */ - private BaggingDistributorProcessor distributorP; - - /** The training stream. */ - private Stream testingStream; - - /** The prediction stream. */ - private Stream predictionStream; - - /** The result stream. */ - protected Stream resultStream; - - /** The dataset. */ - private Instances dataset; - - protected Learner classifier; - - protected int parallelism; - - /** - * Sets the layout. - */ - protected void setLayout() { - - int sizeEnsemble = this.ensembleSizeOption.getValue(); - - distributorP = new BaggingDistributorProcessor(); - distributorP.setSizeEnsemble(sizeEnsemble); - this.builder.addProcessor(distributorP, 1); - - //instantiate classifier - classifier = (Learner) this.baseLearnerOption.getValue(); - classifier.init(builder, this.dataset, sizeEnsemble); - - PredictionCombinerProcessor predictionCombinerP= new PredictionCombinerProcessor(); - predictionCombinerP.setSizeEnsemble(sizeEnsemble); - this.builder.addProcessor(predictionCombinerP, 1); - - //Streams - resultStream = this.builder.createStream(predictionCombinerP); - predictionCombinerP.setOutputStream(resultStream); - - for (Stream subResultStream:classifier.getResultStreams()) { - this.builder.connectInputKeyStream(subResultStream, predictionCombinerP); - } - - testingStream = this.builder.createStream(distributorP); - this.builder.connectInputKeyStream(testingStream, classifier.getInputProcessor()); - - predictionStream = this.builder.createStream(distributorP); - this.builder.connectInputKeyStream(predictionStream, classifier.getInputProcessor()); - - distributorP.setOutputStream(testingStream); - distributorP.setPredictionStream(predictionStream); - } - - /** The builder. */ - private TopologyBuilder builder; - - - @Override - public void init(TopologyBuilder builder, Instances dataset, int parallelism) { - this.builder = builder; - this.dataset = dataset; - this.parallelism = parallelism; - this.setLayout(); - } - - @Override - public Processor getInputProcessor() { - return distributorP; - } - - /* (non-Javadoc) - * @see samoa.learners.Learner#getResultStreams() - */ - @Override - public Set<Stream> getResultStreams() { - Set<Stream> streams = ImmutableSet.of(this.resultStream); - return streams; +public class Bagging implements Learner, Configurable { + + /** The Constant serialVersionUID. */ + private static final long serialVersionUID = -2971850264864952099L; + + /** The base learner option. */ + public ClassOption baseLearnerOption = new ClassOption("baseLearner", 'l', + "Classifier to train.", Learner.class, VerticalHoeffdingTree.class.getName()); + + /** The ensemble size option. */ + public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's', + "The number of models in the bag.", 10, 1, Integer.MAX_VALUE); + + /** The distributor processor. */ + private BaggingDistributorProcessor distributorP; + + /** The training stream. */ + private Stream testingStream; + + /** The prediction stream. */ + private Stream predictionStream; + + /** The result stream. */ + protected Stream resultStream; + + /** The dataset. */ + private Instances dataset; + + protected Learner classifier; + + protected int parallelism; + + /** + * Sets the layout. + */ + protected void setLayout() { + + int sizeEnsemble = this.ensembleSizeOption.getValue(); + + distributorP = new BaggingDistributorProcessor(); + distributorP.setSizeEnsemble(sizeEnsemble); + this.builder.addProcessor(distributorP, 1); + + // instantiate classifier + classifier = (Learner) this.baseLearnerOption.getValue(); + classifier.init(builder, this.dataset, sizeEnsemble); + + PredictionCombinerProcessor predictionCombinerP = new PredictionCombinerProcessor(); + predictionCombinerP.setSizeEnsemble(sizeEnsemble); + this.builder.addProcessor(predictionCombinerP, 1); + + // Streams + resultStream = this.builder.createStream(predictionCombinerP); + predictionCombinerP.setOutputStream(resultStream); + + for (Stream subResultStream : classifier.getResultStreams()) { + this.builder.connectInputKeyStream(subResultStream, predictionCombinerP); } + + testingStream = this.builder.createStream(distributorP); + this.builder.connectInputKeyStream(testingStream, classifier.getInputProcessor()); + + predictionStream = this.builder.createStream(distributorP); + this.builder.connectInputKeyStream(predictionStream, classifier.getInputProcessor()); + + distributorP.setOutputStream(testingStream); + distributorP.setPredictionStream(predictionStream); + } + + /** The builder. */ + private TopologyBuilder builder; + + @Override + public void init(TopologyBuilder builder, Instances dataset, int parallelism) { + this.builder = builder; + this.dataset = dataset; + this.parallelism = parallelism; + this.setLayout(); + } + + @Override + public Processor getInputProcessor() { + return distributorP; + } + + /* + * (non-Javadoc) + * + * @see samoa.learners.Learner#getResultStreams() + */ + @Override + public Set<Stream> getResultStreams() { + Set<Stream> streams = ImmutableSet.of(this.resultStream); + return streams; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java index 65c782b..44264ac 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java @@ -35,167 +35,174 @@ import java.util.Random; /** * The Class BaggingDistributorPE. */ -public class BaggingDistributorProcessor implements Processor{ +public class BaggingDistributorProcessor implements Processor { - /** + /** * */ - private static final long serialVersionUID = -1550901409625192730L; - - /** The size ensemble. */ - private int sizeEnsemble; - - /** The training stream. */ - private Stream trainingStream; - - /** The prediction stream. */ - private Stream predictionStream; - - /** - * On event. - * - * @param event the event - * @return true, if successful - */ - public boolean process(ContentEvent event) { - InstanceContentEvent inEvent = (InstanceContentEvent) event; //((s4Event) event).getContentEvent(); - //InstanceEvent inEvent = (InstanceEvent) event; - - if (inEvent.getInstanceIndex() < 0) { - // End learning - predictionStream.put(event); - return false; - } - - - if (inEvent.isTesting()){ - Instance trainInst = inEvent.getInstance(); - for (int i = 0; i < sizeEnsemble; i++) { - Instance weightedInst = trainInst.copy(); - //weightedInst.setWeight(trainInst.weight() * k); - InstanceContentEvent instanceContentEvent = new InstanceContentEvent( - inEvent.getInstanceIndex(), weightedInst, false, true); - instanceContentEvent.setClassifierIndex(i); - instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); - predictionStream.put(instanceContentEvent); - } - } - - /* Estimate model parameters using the training data. */ - if (inEvent.isTraining()) { - train(inEvent); - } - return false; - } - - /** The random. */ - protected Random random = new Random(); - - /** - * Train. - * - * @param inEvent the in event - */ - protected void train(InstanceContentEvent inEvent) { - Instance trainInst = inEvent.getInstance(); - for (int i = 0; i < sizeEnsemble; i++) { - int k = MiscUtils.poisson(1.0, this.random); - if (k > 0) { - Instance weightedInst = trainInst.copy(); - weightedInst.setWeight(trainInst.weight() * k); - InstanceContentEvent instanceContentEvent = new InstanceContentEvent( - inEvent.getInstanceIndex(), weightedInst, true, false); - instanceContentEvent.setClassifierIndex(i); - instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); - trainingStream.put(instanceContentEvent); - } - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.s4.core.ProcessingElement#onCreate() - */ - @Override - public void onCreate(int id) { - //do nothing - } - - - /** - * Gets the training stream. - * - * @return the training stream - */ - public Stream getTrainingStream() { - return trainingStream; - } - - /** - * Sets the training stream. - * - * @param trainingStream the new training stream - */ - public void setOutputStream(Stream trainingStream) { - this.trainingStream = trainingStream; - } - - /** - * Gets the prediction stream. - * - * @return the prediction stream - */ - public Stream getPredictionStream() { - return predictionStream; - } - - /** - * Sets the prediction stream. - * - * @param predictionStream the new prediction stream - */ - public void setPredictionStream(Stream predictionStream) { - this.predictionStream = predictionStream; - } - - /** - * Gets the size ensemble. - * - * @return the size ensemble - */ - public int getSizeEnsemble() { - return sizeEnsemble; - } - - /** - * Sets the size ensemble. - * - * @param sizeEnsemble the new size ensemble - */ - public void setSizeEnsemble(int sizeEnsemble) { - this.sizeEnsemble = sizeEnsemble; - } - - - /* (non-Javadoc) - * @see samoa.core.Processor#newProcessor(samoa.core.Processor) - */ - @Override - public Processor newProcessor(Processor sourceProcessor) { - BaggingDistributorProcessor newProcessor = new BaggingDistributorProcessor(); - BaggingDistributorProcessor originProcessor = (BaggingDistributorProcessor) sourceProcessor; - if (originProcessor.getPredictionStream() != null){ - newProcessor.setPredictionStream(originProcessor.getPredictionStream()); - } - if (originProcessor.getTrainingStream() != null){ - newProcessor.setOutputStream(originProcessor.getTrainingStream()); - } - newProcessor.setSizeEnsemble(originProcessor.getSizeEnsemble()); - /*if (originProcessor.getLearningCurve() != null){ - newProcessor.setLearningCurve((LearningCurve) originProcessor.getLearningCurve().copy()); - }*/ - return newProcessor; - } + private static final long serialVersionUID = -1550901409625192730L; + + /** The size ensemble. */ + private int sizeEnsemble; + + /** The training stream. */ + private Stream trainingStream; + + /** The prediction stream. */ + private Stream predictionStream; + + /** + * On event. + * + * @param event + * the event + * @return true, if successful + */ + public boolean process(ContentEvent event) { + InstanceContentEvent inEvent = (InstanceContentEvent) event; // ((s4Event) + // event).getContentEvent(); + // InstanceEvent inEvent = (InstanceEvent) event; + + if (inEvent.getInstanceIndex() < 0) { + // End learning + predictionStream.put(event); + return false; + } + + if (inEvent.isTesting()) { + Instance trainInst = inEvent.getInstance(); + for (int i = 0; i < sizeEnsemble; i++) { + Instance weightedInst = trainInst.copy(); + // weightedInst.setWeight(trainInst.weight() * k); + InstanceContentEvent instanceContentEvent = new InstanceContentEvent( + inEvent.getInstanceIndex(), weightedInst, false, true); + instanceContentEvent.setClassifierIndex(i); + instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); + predictionStream.put(instanceContentEvent); + } + } + + /* Estimate model parameters using the training data. */ + if (inEvent.isTraining()) { + train(inEvent); + } + return false; + } + + /** The random. */ + protected Random random = new Random(); + + /** + * Train. + * + * @param inEvent + * the in event + */ + protected void train(InstanceContentEvent inEvent) { + Instance trainInst = inEvent.getInstance(); + for (int i = 0; i < sizeEnsemble; i++) { + int k = MiscUtils.poisson(1.0, this.random); + if (k > 0) { + Instance weightedInst = trainInst.copy(); + weightedInst.setWeight(trainInst.weight() * k); + InstanceContentEvent instanceContentEvent = new InstanceContentEvent( + inEvent.getInstanceIndex(), weightedInst, true, false); + instanceContentEvent.setClassifierIndex(i); + instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); + trainingStream.put(instanceContentEvent); + } + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.s4.core.ProcessingElement#onCreate() + */ + @Override + public void onCreate(int id) { + // do nothing + } + + /** + * Gets the training stream. + * + * @return the training stream + */ + public Stream getTrainingStream() { + return trainingStream; + } + + /** + * Sets the training stream. + * + * @param trainingStream + * the new training stream + */ + public void setOutputStream(Stream trainingStream) { + this.trainingStream = trainingStream; + } + + /** + * Gets the prediction stream. + * + * @return the prediction stream + */ + public Stream getPredictionStream() { + return predictionStream; + } + + /** + * Sets the prediction stream. + * + * @param predictionStream + * the new prediction stream + */ + public void setPredictionStream(Stream predictionStream) { + this.predictionStream = predictionStream; + } + + /** + * Gets the size ensemble. + * + * @return the size ensemble + */ + public int getSizeEnsemble() { + return sizeEnsemble; + } + + /** + * Sets the size ensemble. + * + * @param sizeEnsemble + * the new size ensemble + */ + public void setSizeEnsemble(int sizeEnsemble) { + this.sizeEnsemble = sizeEnsemble; + } + + /* + * (non-Javadoc) + * + * @see samoa.core.Processor#newProcessor(samoa.core.Processor) + */ + @Override + public Processor newProcessor(Processor sourceProcessor) { + BaggingDistributorProcessor newProcessor = new BaggingDistributorProcessor(); + BaggingDistributorProcessor originProcessor = (BaggingDistributorProcessor) sourceProcessor; + if (originProcessor.getPredictionStream() != null) { + newProcessor.setPredictionStream(originProcessor.getPredictionStream()); + } + if (originProcessor.getTrainingStream() != null) { + newProcessor.setOutputStream(originProcessor.getTrainingStream()); + } + newProcessor.setSizeEnsemble(originProcessor.getSizeEnsemble()); + /* + * if (originProcessor.getLearningCurve() != null){ + * newProcessor.setLearningCurve((LearningCurve) + * originProcessor.getLearningCurve().copy()); } + */ + return newProcessor; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java index 06723e2..e81c490 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java @@ -40,103 +40,108 @@ import com.yahoo.labs.samoa.topology.TopologyBuilder; /** * The Bagging Classifier by Oza and Russell. */ -public class Boosting implements Learner , Configurable { - - /** The Constant serialVersionUID. */ - private static final long serialVersionUID = -2971850264864952099L; - - /** The base learner option. */ - public ClassOption baseLearnerOption = new ClassOption("baseLearner", 'l', - "Classifier to train.", Learner.class, SingleClassifier.class.getName()); - - /** The ensemble size option. */ - public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's', - "The number of models in the bag.", 10, 1, Integer.MAX_VALUE); - - /** The distributor processor. */ - private BoostingDistributorProcessor distributorP; - - /** The result stream. */ - protected Stream resultStream; - - /** The dataset. */ - private Instances dataset; - - protected Learner classifier; - - protected int parallelism; - - /** - * Sets the layout. - */ - protected void setLayout() { - - int sizeEnsemble = this.ensembleSizeOption.getValue(); - - distributorP = new BoostingDistributorProcessor(); - distributorP.setSizeEnsemble(sizeEnsemble); - this.builder.addProcessor(distributorP, 1); - - //instantiate classifier - classifier = this.baseLearnerOption.getValue(); - classifier.init(builder, this.dataset, sizeEnsemble); - - BoostingPredictionCombinerProcessor predictionCombinerP= new BoostingPredictionCombinerProcessor(); - predictionCombinerP.setSizeEnsemble(sizeEnsemble); - this.builder.addProcessor(predictionCombinerP, 1); - - //Streams - resultStream = this.builder.createStream(predictionCombinerP); - predictionCombinerP.setOutputStream(resultStream); - - for (Stream subResultStream:classifier.getResultStreams()) { - this.builder.connectInputKeyStream(subResultStream, predictionCombinerP); - } - - /* The testing stream. */ - Stream testingStream = this.builder.createStream(distributorP); - this.builder.connectInputKeyStream(testingStream, classifier.getInputProcessor()); - - /* The prediction stream. */ - Stream predictionStream = this.builder.createStream(distributorP); - this.builder.connectInputKeyStream(predictionStream, classifier.getInputProcessor()); - - distributorP.setOutputStream(testingStream); - distributorP.setPredictionStream(predictionStream); - +public class Boosting implements Learner, Configurable { + + /** The Constant serialVersionUID. */ + private static final long serialVersionUID = -2971850264864952099L; + + /** The base learner option. */ + public ClassOption baseLearnerOption = new ClassOption("baseLearner", 'l', + "Classifier to train.", Learner.class, SingleClassifier.class.getName()); + + /** The ensemble size option. */ + public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's', + "The number of models in the bag.", 10, 1, Integer.MAX_VALUE); + + /** The distributor processor. */ + private BoostingDistributorProcessor distributorP; + + /** The result stream. */ + protected Stream resultStream; + + /** The dataset. */ + private Instances dataset; + + protected Learner classifier; + + protected int parallelism; + + /** + * Sets the layout. + */ + protected void setLayout() { + + int sizeEnsemble = this.ensembleSizeOption.getValue(); + + distributorP = new BoostingDistributorProcessor(); + distributorP.setSizeEnsemble(sizeEnsemble); + this.builder.addProcessor(distributorP, 1); + + // instantiate classifier + classifier = this.baseLearnerOption.getValue(); + classifier.init(builder, this.dataset, sizeEnsemble); + + BoostingPredictionCombinerProcessor predictionCombinerP = new BoostingPredictionCombinerProcessor(); + predictionCombinerP.setSizeEnsemble(sizeEnsemble); + this.builder.addProcessor(predictionCombinerP, 1); + + // Streams + resultStream = this.builder.createStream(predictionCombinerP); + predictionCombinerP.setOutputStream(resultStream); + + for (Stream subResultStream : classifier.getResultStreams()) { + this.builder.connectInputKeyStream(subResultStream, predictionCombinerP); + } + + /* The testing stream. */ + Stream testingStream = this.builder.createStream(distributorP); + this.builder.connectInputKeyStream(testingStream, classifier.getInputProcessor()); + + /* The prediction stream. */ + Stream predictionStream = this.builder.createStream(distributorP); + this.builder.connectInputKeyStream(predictionStream, classifier.getInputProcessor()); + + distributorP.setOutputStream(testingStream); + distributorP.setPredictionStream(predictionStream); + // Addition to Bagging: stream to train /* The training stream. */ - Stream trainingStream = this.builder.createStream(predictionCombinerP); - predictionCombinerP.setTrainingStream(trainingStream); - this.builder.connectInputKeyStream(trainingStream, classifier.getInputProcessor()); - - } - - /** The builder. */ - private TopologyBuilder builder; - - /* (non-Javadoc) - * @see samoa.classifiers.Classifier#init(samoa.engines.Engine, samoa.core.Stream, weka.core.Instances) - */ - - @Override - public void init(TopologyBuilder builder, Instances dataset, int parallelism) { - this.builder = builder; - this.dataset = dataset; - this.parallelism = parallelism; - this.setLayout(); - } - - @Override - public Processor getInputProcessor() { - return distributorP; - } - - /* (non-Javadoc) - * @see samoa.learners.Learner#getResultStreams() - */ - @Override - public Set<Stream> getResultStreams() { - return ImmutableSet.of(this.resultStream); - } + Stream trainingStream = this.builder.createStream(predictionCombinerP); + predictionCombinerP.setTrainingStream(trainingStream); + this.builder.connectInputKeyStream(trainingStream, classifier.getInputProcessor()); + + } + + /** The builder. */ + private TopologyBuilder builder; + + /* + * (non-Javadoc) + * + * @see samoa.classifiers.Classifier#init(samoa.engines.Engine, + * samoa.core.Stream, weka.core.Instances) + */ + + @Override + public void init(TopologyBuilder builder, Instances dataset, int parallelism) { + this.builder = builder; + this.dataset = dataset; + this.parallelism = parallelism; + this.setLayout(); + } + + @Override + public Processor getInputProcessor() { + return distributorP; + } + + /* + * (non-Javadoc) + * + * @see samoa.learners.Learner#getResultStreams() + */ + @Override + public Set<Stream> getResultStreams() { + return ImmutableSet.of(this.resultStream); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java index 7100e7e..78c2bd0 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java @@ -22,15 +22,14 @@ import com.yahoo.labs.samoa.learners.InstanceContentEvent; * #L% */ - /** * The Class BoostingDistributorProcessor. */ -public class BoostingDistributorProcessor extends BaggingDistributorProcessor{ - - @Override - protected void train(InstanceContentEvent inEvent) { - // Boosting is trained from the prediction combiner, not from the input - } - +public class BoostingDistributorProcessor extends BaggingDistributorProcessor { + + @Override + protected void train(InstanceContentEvent inEvent) { + // Boosting is trained from the prediction combiner, not from the input + } + } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java index 1d8db50..8acbdc8 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java @@ -40,137 +40,139 @@ import com.yahoo.labs.samoa.topology.Stream; */ public class BoostingPredictionCombinerProcessor extends PredictionCombinerProcessor { - private static final long serialVersionUID = -1606045723451191232L; - - //Weigths classifier - protected double[] scms; - - //Weights instance - protected double[] swms; - - /** - * On event. - * - * @param event the event - * @return true, if successful - */ - @Override - public boolean process(ContentEvent event) { - - ResultContentEvent inEvent = (ResultContentEvent) event; - double[] prediction = inEvent.getClassVotes(); - int instanceIndex = (int) inEvent.getInstanceIndex(); - - addStatisticsForInstanceReceived(instanceIndex, inEvent.getClassifierIndex(), prediction, 1); - //Boosting - addPredictions(instanceIndex, inEvent, prediction); - - if (inEvent.isLastEvent() || hasAllVotesArrivedInstance(instanceIndex)) { - DoubleVector combinedVote = this.mapVotesforInstanceReceived.get(instanceIndex); - if (combinedVote == null){ - combinedVote = new DoubleVector(); - } - ResultContentEvent outContentEvent = new ResultContentEvent(inEvent.getInstanceIndex(), - inEvent.getInstance(), inEvent.getClassId(), - combinedVote.getArrayCopy(), inEvent.isLastEvent()); - outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); - outputStream.put(outContentEvent); - clearStatisticsInstance(instanceIndex); - //Boosting - computeBoosting(inEvent, instanceIndex); - return true; - } - return false; + private static final long serialVersionUID = -1606045723451191232L; + // Weigths classifier + protected double[] scms; + + // Weights instance + protected double[] swms; + + /** + * On event. + * + * @param event + * the event + * @return true, if successful + */ + @Override + public boolean process(ContentEvent event) { + + ResultContentEvent inEvent = (ResultContentEvent) event; + double[] prediction = inEvent.getClassVotes(); + int instanceIndex = (int) inEvent.getInstanceIndex(); + + addStatisticsForInstanceReceived(instanceIndex, inEvent.getClassifierIndex(), prediction, 1); + // Boosting + addPredictions(instanceIndex, inEvent, prediction); + + if (inEvent.isLastEvent() || hasAllVotesArrivedInstance(instanceIndex)) { + DoubleVector combinedVote = this.mapVotesforInstanceReceived.get(instanceIndex); + if (combinedVote == null) { + combinedVote = new DoubleVector(); + } + ResultContentEvent outContentEvent = new ResultContentEvent(inEvent.getInstanceIndex(), + inEvent.getInstance(), inEvent.getClassId(), + combinedVote.getArrayCopy(), inEvent.isLastEvent()); + outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); + outputStream.put(outContentEvent); + clearStatisticsInstance(instanceIndex); + // Boosting + computeBoosting(inEvent, instanceIndex); + return true; } - - protected Random random; - - protected int trainingWeightSeenByModel; - - @Override - protected double getEnsembleMemberWeight(int i) { - double em = this.swms[i] / (this.scms[i] + this.swms[i]); - if ((em == 0.0) || (em > 0.5)) { - return 0.0; - } - double Bm = em / (1.0 - em); - return Math.log(1.0 / Bm); - } - - @Override - public void reset() { - this.random = new Random(); - this.trainingWeightSeenByModel = 0; - this.scms = new double[this.ensembleSize]; - this.swms = new double[this.ensembleSize]; + return false; + + } + + protected Random random; + + protected int trainingWeightSeenByModel; + + @Override + protected double getEnsembleMemberWeight(int i) { + double em = this.swms[i] / (this.scms[i] + this.swms[i]); + if ((em == 0.0) || (em > 0.5)) { + return 0.0; } + double Bm = em / (1.0 - em); + return Math.log(1.0 / Bm); + } + + @Override + public void reset() { + this.random = new Random(); + this.trainingWeightSeenByModel = 0; + this.scms = new double[this.ensembleSize]; + this.swms = new double[this.ensembleSize]; + } + + private boolean correctlyClassifies(int i, Instance inst, int instanceIndex) { + int predictedClass = (int) mapPredictions.get(instanceIndex).getValue(i); + return predictedClass == (int) inst.classValue(); + } + + protected Map<Integer, DoubleVector> mapPredictions; - private boolean correctlyClassifies(int i, Instance inst, int instanceIndex) { - int predictedClass = (int) mapPredictions.get(instanceIndex).getValue(i); - return predictedClass == (int) inst.classValue(); + private void addPredictions(int instanceIndex, ResultContentEvent inEvent, double[] prediction) { + if (this.mapPredictions == null) { + this.mapPredictions = new HashMap<>(); } - - protected Map<Integer, DoubleVector> mapPredictions; - - private void addPredictions(int instanceIndex, ResultContentEvent inEvent, double[] prediction) { - if (this.mapPredictions == null) { - this.mapPredictions = new HashMap<>(); - } - DoubleVector predictions = this.mapPredictions.get(instanceIndex); - if (predictions == null){ - predictions = new DoubleVector(); - } - predictions.setValue(inEvent.getClassifierIndex(), Utils.maxIndex(prediction)); - this.mapPredictions.put(instanceIndex, predictions); + DoubleVector predictions = this.mapPredictions.get(instanceIndex); + if (predictions == null) { + predictions = new DoubleVector(); } + predictions.setValue(inEvent.getClassifierIndex(), Utils.maxIndex(prediction)); + this.mapPredictions.put(instanceIndex, predictions); + } - private void computeBoosting(ResultContentEvent inEvent, int instanceIndex) { - // Starts code for Boosting - //Send instances to train - double lambda_d = 1.0; - for (int i = 0; i < this.ensembleSize; i++) { - double k = lambda_d; - Instance inst = inEvent.getInstance(); - if (k > 0.0) { - Instance weightedInst = inst.copy(); - weightedInst.setWeight(inst.weight() * k); - //this.ensemble[i].trainOnInstance(weightedInst); - InstanceContentEvent instanceContentEvent = new InstanceContentEvent( - inEvent.getInstanceIndex(), weightedInst, true, false); - instanceContentEvent.setClassifierIndex(i); - instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); - trainingStream.put(instanceContentEvent); - } - if (this.correctlyClassifies(i, inst, instanceIndex)){ - this.scms[i] += lambda_d; - lambda_d *= this.trainingWeightSeenByModel / (2 * this.scms[i]); - } else { - this.swms[i] += lambda_d; - lambda_d *= this.trainingWeightSeenByModel / (2 * this.swms[i]); - } - } + private void computeBoosting(ResultContentEvent inEvent, int instanceIndex) { + // Starts code for Boosting + // Send instances to train + double lambda_d = 1.0; + for (int i = 0; i < this.ensembleSize; i++) { + double k = lambda_d; + Instance inst = inEvent.getInstance(); + if (k > 0.0) { + Instance weightedInst = inst.copy(); + weightedInst.setWeight(inst.weight() * k); + // this.ensemble[i].trainOnInstance(weightedInst); + InstanceContentEvent instanceContentEvent = new InstanceContentEvent( + inEvent.getInstanceIndex(), weightedInst, true, false); + instanceContentEvent.setClassifierIndex(i); + instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); + trainingStream.put(instanceContentEvent); + } + if (this.correctlyClassifies(i, inst, instanceIndex)) { + this.scms[i] += lambda_d; + lambda_d *= this.trainingWeightSeenByModel / (2 * this.scms[i]); + } else { + this.swms[i] += lambda_d; + lambda_d *= this.trainingWeightSeenByModel / (2 * this.swms[i]); + } } - - /** - * Gets the training stream. - * - * @return the training stream - */ - public Stream getTrainingStream() { - return trainingStream; - } - - /** - * Sets the training stream. - * - * @param trainingStream the new training stream - */ - public void setTrainingStream(Stream trainingStream) { - this.trainingStream = trainingStream; - } - - /** The training stream. */ - private Stream trainingStream; - + } + + /** + * Gets the training stream. + * + * @return the training stream + */ + public Stream getTrainingStream() { + return trainingStream; + } + + /** + * Sets the training stream. + * + * @param trainingStream + * the new training stream + */ + public void setTrainingStream(Stream trainingStream) { + this.trainingStream = trainingStream; + } + + /** The training stream. */ + private Stream trainingStream; + } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java index e4228d8..fff801f 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java @@ -37,148 +37,151 @@ import com.yahoo.labs.samoa.topology.Stream; */ public class PredictionCombinerProcessor implements Processor { - private static final long serialVersionUID = -1606045723451191132L; - - /** - * The size ensemble. - */ - protected int ensembleSize; - - /** - * The output stream. - */ - protected Stream outputStream; - - /** - * Sets the output stream. - * - * @param stream the new output stream - */ - public void setOutputStream(Stream stream) { - outputStream = stream; + private static final long serialVersionUID = -1606045723451191132L; + + /** + * The size ensemble. + */ + protected int ensembleSize; + + /** + * The output stream. + */ + protected Stream outputStream; + + /** + * Sets the output stream. + * + * @param stream + * the new output stream + */ + public void setOutputStream(Stream stream) { + outputStream = stream; + } + + /** + * Gets the output stream. + * + * @return the output stream + */ + public Stream getOutputStream() { + return outputStream; + } + + /** + * Gets the size ensemble. + * + * @return the ensembleSize + */ + public int getSizeEnsemble() { + return ensembleSize; + } + + /** + * Sets the size ensemble. + * + * @param ensembleSize + * the new size ensemble + */ + public void setSizeEnsemble(int ensembleSize) { + this.ensembleSize = ensembleSize; + } + + protected Map<Integer, Integer> mapCountsforInstanceReceived; + + protected Map<Integer, DoubleVector> mapVotesforInstanceReceived; + + /** + * On event. + * + * @param event + * the event + * @return true, if successful + */ + public boolean process(ContentEvent event) { + + ResultContentEvent inEvent = (ResultContentEvent) event; + double[] prediction = inEvent.getClassVotes(); + int instanceIndex = (int) inEvent.getInstanceIndex(); + + addStatisticsForInstanceReceived(instanceIndex, inEvent.getClassifierIndex(), prediction, 1); + + if (inEvent.isLastEvent() || hasAllVotesArrivedInstance(instanceIndex)) { + DoubleVector combinedVote = this.mapVotesforInstanceReceived.get(instanceIndex); + if (combinedVote == null) { + combinedVote = new DoubleVector(new double[inEvent.getInstance().numClasses()]); + } + ResultContentEvent outContentEvent = new ResultContentEvent(inEvent.getInstanceIndex(), + inEvent.getInstance(), inEvent.getClassId(), + combinedVote.getArrayCopy(), inEvent.isLastEvent()); + outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); + outputStream.put(outContentEvent); + clearStatisticsInstance(instanceIndex); + return true; } - - /** - * Gets the output stream. - * - * @return the output stream - */ - public Stream getOutputStream() { - return outputStream; - } - - /** - * Gets the size ensemble. - * - * @return the ensembleSize - */ - public int getSizeEnsemble() { - return ensembleSize; - } - - /** - * Sets the size ensemble. - * - * @param ensembleSize the new size ensemble - */ - public void setSizeEnsemble(int ensembleSize) { - this.ensembleSize = ensembleSize; - } - - protected Map<Integer, Integer> mapCountsforInstanceReceived; - - protected Map<Integer, DoubleVector> mapVotesforInstanceReceived; - - /** - * On event. - * - * @param event the event - * @return true, if successful - */ - public boolean process(ContentEvent event) { - - ResultContentEvent inEvent = (ResultContentEvent) event; - double[] prediction = inEvent.getClassVotes(); - int instanceIndex = (int) inEvent.getInstanceIndex(); - - addStatisticsForInstanceReceived(instanceIndex, inEvent.getClassifierIndex(), prediction, 1); - - if (inEvent.isLastEvent() || hasAllVotesArrivedInstance(instanceIndex)) { - DoubleVector combinedVote = this.mapVotesforInstanceReceived.get(instanceIndex); - if (combinedVote == null){ - combinedVote = new DoubleVector(new double[inEvent.getInstance().numClasses()]); - } - ResultContentEvent outContentEvent = new ResultContentEvent(inEvent.getInstanceIndex(), - inEvent.getInstance(), inEvent.getClassId(), - combinedVote.getArrayCopy(), inEvent.isLastEvent()); - outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); - outputStream.put(outContentEvent); - clearStatisticsInstance(instanceIndex); - return true; - } - return false; - + return false; + + } + + @Override + public void onCreate(int id) { + this.reset(); + } + + public void reset() { + } + + /* + * (non-Javadoc) + * + * @see samoa.core.Processor#newProcessor(samoa.core.Processor) + */ + @Override + public Processor newProcessor(Processor sourceProcessor) { + PredictionCombinerProcessor newProcessor = new PredictionCombinerProcessor(); + PredictionCombinerProcessor originProcessor = (PredictionCombinerProcessor) sourceProcessor; + if (originProcessor.getOutputStream() != null) { + newProcessor.setOutputStream(originProcessor.getOutputStream()); } - - @Override - public void onCreate(int id) { - this.reset(); + newProcessor.setSizeEnsemble(originProcessor.getSizeEnsemble()); + return newProcessor; + } + + protected void addStatisticsForInstanceReceived(int instanceIndex, int classifierIndex, double[] prediction, int add) { + if (this.mapCountsforInstanceReceived == null) { + this.mapCountsforInstanceReceived = new HashMap<>(); + this.mapVotesforInstanceReceived = new HashMap<>(); } - - public void reset() { + DoubleVector vote = new DoubleVector(prediction); + if (vote.sumOfValues() > 0.0) { + vote.normalize(); + DoubleVector combinedVote = this.mapVotesforInstanceReceived.get(instanceIndex); + if (combinedVote == null) { + combinedVote = new DoubleVector(); + } + vote.scaleValues(getEnsembleMemberWeight(classifierIndex)); + combinedVote.addValues(vote); + + this.mapVotesforInstanceReceived.put(instanceIndex, combinedVote); } - - - /* (non-Javadoc) - * @see samoa.core.Processor#newProcessor(samoa.core.Processor) - */ - @Override - public Processor newProcessor(Processor sourceProcessor) { - PredictionCombinerProcessor newProcessor = new PredictionCombinerProcessor(); - PredictionCombinerProcessor originProcessor = (PredictionCombinerProcessor) sourceProcessor; - if (originProcessor.getOutputStream() != null) { - newProcessor.setOutputStream(originProcessor.getOutputStream()); - } - newProcessor.setSizeEnsemble(originProcessor.getSizeEnsemble()); - return newProcessor; + Integer count = this.mapCountsforInstanceReceived.get(instanceIndex); + if (count == null) { + count = 0; } + this.mapCountsforInstanceReceived.put(instanceIndex, count + add); + } - protected void addStatisticsForInstanceReceived(int instanceIndex, int classifierIndex, double[] prediction, int add) { - if (this.mapCountsforInstanceReceived == null) { - this.mapCountsforInstanceReceived = new HashMap<>(); - this.mapVotesforInstanceReceived = new HashMap<>(); - } - DoubleVector vote = new DoubleVector(prediction); - if (vote.sumOfValues() > 0.0) { - vote.normalize(); - DoubleVector combinedVote = this.mapVotesforInstanceReceived.get(instanceIndex); - if (combinedVote == null){ - combinedVote = new DoubleVector(); - } - vote.scaleValues(getEnsembleMemberWeight(classifierIndex)); - combinedVote.addValues(vote); - - this.mapVotesforInstanceReceived.put(instanceIndex, combinedVote); - } - Integer count = this.mapCountsforInstanceReceived.get(instanceIndex); - if (count == null) { - count = 0; - } - this.mapCountsforInstanceReceived.put(instanceIndex, count + add); - } + protected boolean hasAllVotesArrivedInstance(int instanceIndex) { + return (this.mapCountsforInstanceReceived.get(instanceIndex) == this.ensembleSize); + } - protected boolean hasAllVotesArrivedInstance(int instanceIndex) { - return (this.mapCountsforInstanceReceived.get(instanceIndex) == this.ensembleSize); - } + protected void clearStatisticsInstance(int instanceIndex) { + this.mapCountsforInstanceReceived.remove(instanceIndex); + this.mapVotesforInstanceReceived.remove(instanceIndex); + } - protected void clearStatisticsInstance(int instanceIndex) { - this.mapCountsforInstanceReceived.remove(instanceIndex); - this.mapVotesforInstanceReceived.remove(instanceIndex); - } - - protected double getEnsembleMemberWeight(int i) { - return 1.0; - } + protected double getEnsembleMemberWeight(int i) { + return 1.0; + } - }
