http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java
index 7e9cb4a..915d09b 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/NaiveBayes.java
@@ -41,229 +41,229 @@ import com.yahoo.labs.samoa.moa.core.GaussianEstimator;
  */
 public class NaiveBayes implements LocalLearner {
 
-       /**
-        * Default smoothing factor. For now fixed to 1E-20.
-        */
-       private static final double ADDITIVE_SMOOTHING_FACTOR = 1e-20;
+  /**
+   * Default smoothing factor. For now fixed to 1E-20.
+   */
+  private static final double ADDITIVE_SMOOTHING_FACTOR = 1e-20;
 
-       /**
-        * serialVersionUID for serialization
-        */
-       private static final long serialVersionUID = 1325775209672996822L;
+  /**
+   * serialVersionUID for serialization
+   */
+  private static final long serialVersionUID = 1325775209672996822L;
 
-       /**
-        * Instance of a logger for use in this class.
-        */
-       private static final Logger logger = 
LoggerFactory.getLogger(NaiveBayes.class);
+  /**
+   * Instance of a logger for use in this class.
+   */
+  private static final Logger logger = 
LoggerFactory.getLogger(NaiveBayes.class);
 
-       /**
-        * The actual model.
-        */
-       protected Map<Integer, GaussianNumericAttributeClassObserver> 
attributeObservers;
+  /**
+   * The actual model.
+   */
+  protected Map<Integer, GaussianNumericAttributeClassObserver> 
attributeObservers;
 
-       /**
-        * Class statistics
-        */
-       protected Map<Integer, Double> classInstances;
+  /**
+   * Class statistics
+   */
+  protected Map<Integer, Double> classInstances;
 
-       /**
-        * Class zero-prototypes.
-        */
-       protected Map<Integer, Double> classPrototypes;
-       
-       /**
-        * Retrieve the number of classes currently known to this local model
-        * 
-        * @return the number of classes currently known to this local model
-        */
-       protected int getNumberOfClasses() {
-               return this.classInstances.size();
-       }
+  /**
+   * Class zero-prototypes.
+   */
+  protected Map<Integer, Double> classPrototypes;
 
-       /**
-        * Track training instances seen.
-        */
-       protected long instancesSeen = 0L;
+  /**
+   * Retrieve the number of classes currently known to this local model
+   * 
+   * @return the number of classes currently known to this local model
+   */
+  protected int getNumberOfClasses() {
+    return this.classInstances.size();
+  }
 
-       /**
-        * Explicit no-arg constructor.
-        */
-       public NaiveBayes() {
-               // Init the model
-               resetLearning();
-       }
+  /**
+   * Track training instances seen.
+   */
+  protected long instancesSeen = 0L;
 
-       /**
-        * Create an instance of this LocalLearner implementation.
-        */
-       @Override
-       public LocalLearner create() {
-               return new NaiveBayes();
-       }
+  /**
+   * Explicit no-arg constructor.
+   */
+  public NaiveBayes() {
+    // Init the model
+    resetLearning();
+  }
 
-       /**
-        * Predicts the class memberships for a given instance. If an instance 
is
-        * unclassified, the returned array elements will be all zero.
-        * 
-        * Smoothing is being implemented by the AttributeClassObserver 
classes. At
-        * the moment, the GaussianNumericProbabilityAttributeClassObserver 
needs no
-        * smoothing as it processes continuous variables.
-        * 
-        * Please note that we transform the scores to log space to avoid 
underflow,
-        * and we replace the multiplication with addition.
-        * 
-        * The resulting scores are no longer probabilities, as a mixture of
-        * probability densities and probabilities can be used in the 
computation.
-        * 
-        * @param inst
-        *            the instance to be classified
-        * @return an array containing the estimated membership scores of the 
test
-        *         instance in each class, in log space.
-        */
-       @Override
-       public double[] getVotesForInstance(Instance inst) {
-               // Prepare the results array
-               double[] votes = new double[getNumberOfClasses()];
-               // Over all classes
-               for (int classIndex = 0; classIndex < votes.length; 
classIndex++) {
-                       // Get the prior for this class
-                       votes[classIndex] = Math.log(getPrior(classIndex));     
        
-                       // Iterate over the instance attributes
-                       for (int index = 0; index < inst.numAttributes(); 
index++) {
-                               int attributeID = inst.index(index);
-                               // Skip class attribute
-                               if (attributeID == inst.classIndex())
-                                       continue;
-                               Double value = inst.value(attributeID);
-                               // Get the observer for the given attribute
-                               GaussianNumericAttributeClassObserver obs = 
attributeObservers.get(attributeID);
-                               // Init the estimator to null by default
-                               GaussianEstimator estimator = null;
-                               if (obs != null && obs.getEstimator(classIndex) 
!= null) {
-                                       // Get the estimator
-                                       estimator = 
obs.getEstimator(classIndex);
-                               }
-                               double valueNonZero;
-                               // The null case should be handled by smoothing!
-                               if (estimator != null) {
-                                       // Get the score for a NON-ZERO 
attribute value
-                                       valueNonZero = 
estimator.probabilityDensity(value);
-                               }
-                               // We don't have an estimator
-                               else {
-                                       // Assign a very small probability that 
we do see this value
-                                       valueNonZero = 
ADDITIVE_SMOOTHING_FACTOR;
-                               }
-                               votes[classIndex] += Math.log(valueNonZero); // 
 - Math.log(valueZero);
-                       }
-                       // Check for null in the case of prequential evaluation
-                       if (this.classPrototypes.get(classIndex) != null) {
-                               // Add the prototype for the class, already in 
log space
-                               votes[classIndex] += 
Math.log(this.classPrototypes.get(classIndex));
-                       }
-               }
-               return votes;
-       }
-               
-       /**
-        * Compute the prior for the given classIndex.
-        * 
-        * Implemented by maximum likelihood at the moment.
-        * 
-        * @param classIndex
-        *            Id of the class for which we want to compute the prior.
-        * @return Prior probability for the requested class
-        */
-       private double getPrior(int classIndex) {
-               // Maximum likelihood
-               Double currentCount = this.classInstances.get(classIndex);
-               if (currentCount == null || currentCount == 0)
-                       return 0;
-               else
-                       return currentCount * 1. / this.instancesSeen;
-       }
+  /**
+   * Create an instance of this LocalLearner implementation.
+   */
+  @Override
+  public LocalLearner create() {
+    return new NaiveBayes();
+  }
 
-       /**
-        * Resets this classifier. It must be similar to starting a new 
classifier
-        * from scratch.
-        */
-       @Override
-       public void resetLearning() {
-               // Reset priors
-               this.instancesSeen = 0L;
-               this.classInstances = new HashMap<>();
-               this.classPrototypes = new HashMap<>();
-               // Init the attribute observers
-               this.attributeObservers = new HashMap<>();
-       }
+  /**
+   * Predicts the class memberships for a given instance. If an instance is
+   * unclassified, the returned array elements will be all zero.
+   * 
+   * Smoothing is being implemented by the AttributeClassObserver classes. At
+   * the moment, the GaussianNumericProbabilityAttributeClassObserver needs no
+   * smoothing as it processes continuous variables.
+   * 
+   * Please note that we transform the scores to log space to avoid underflow,
+   * and we replace the multiplication with addition.
+   * 
+   * The resulting scores are no longer probabilities, as a mixture of
+   * probability densities and probabilities can be used in the computation.
+   * 
+   * @param inst
+   *          the instance to be classified
+   * @return an array containing the estimated membership scores of the test
+   *         instance in each class, in log space.
+   */
+  @Override
+  public double[] getVotesForInstance(Instance inst) {
+    // Prepare the results array
+    double[] votes = new double[getNumberOfClasses()];
+    // Over all classes
+    for (int classIndex = 0; classIndex < votes.length; classIndex++) {
+      // Get the prior for this class
+      votes[classIndex] = Math.log(getPrior(classIndex));
+      // Iterate over the instance attributes
+      for (int index = 0; index < inst.numAttributes(); index++) {
+        int attributeID = inst.index(index);
+        // Skip class attribute
+        if (attributeID == inst.classIndex())
+          continue;
+        Double value = inst.value(attributeID);
+        // Get the observer for the given attribute
+        GaussianNumericAttributeClassObserver obs = 
attributeObservers.get(attributeID);
+        // Init the estimator to null by default
+        GaussianEstimator estimator = null;
+        if (obs != null && obs.getEstimator(classIndex) != null) {
+          // Get the estimator
+          estimator = obs.getEstimator(classIndex);
+        }
+        double valueNonZero;
+        // The null case should be handled by smoothing!
+        if (estimator != null) {
+          // Get the score for a NON-ZERO attribute value
+          valueNonZero = estimator.probabilityDensity(value);
+        }
+        // We don't have an estimator
+        else {
+          // Assign a very small probability that we do see this value
+          valueNonZero = ADDITIVE_SMOOTHING_FACTOR;
+        }
+        votes[classIndex] += Math.log(valueNonZero); // - Math.log(valueZero);
+      }
+      // Check for null in the case of prequential evaluation
+      if (this.classPrototypes.get(classIndex) != null) {
+        // Add the prototype for the class, already in log space
+        votes[classIndex] += Math.log(this.classPrototypes.get(classIndex));
+      }
+    }
+    return votes;
+  }
 
-       /**
-        * Trains this classifier incrementally using the given instance.
-        * 
-        * @param inst
-        *            the instance to be used for training
-        */
-       @Override
-       public void trainOnInstance(Instance inst) {
-               // Update class statistics with weights
-               int classIndex = (int) inst.classValue();
-               Double weight = this.classInstances.get(classIndex);
-               if (weight == null)
-                       weight = 0.;
-               this.classInstances.put(classIndex, weight + inst.weight());
-               
-               // Get the class prototype
-               Double classPrototype = this.classPrototypes.get(classIndex);
-               if (classPrototype == null)
-                       classPrototype = 1.;
-               
-               // Iterate over the attributes of the given instance
-               for (int attributePosition = 0; attributePosition < inst
-                               .numAttributes(); attributePosition++) {
-                       // Get the attribute index - Dense -> 1:1, Sparse is 
remapped
-                       int attributeID = inst.index(attributePosition);
-                       // Skip class attribute
-                       if (attributeID == inst.classIndex())
-                               continue;
-                       // Get the attribute observer for the current attribute
-                       GaussianNumericAttributeClassObserver obs = 
this.attributeObservers
-                                       .get(attributeID);
-                       // Lazy init of observers, if null, instantiate a new 
one
-                       if (obs == null) {
-                               // FIXME: At this point, we model everything as 
a numeric
-                               // attribute
-                               obs = new 
GaussianNumericAttributeClassObserver();
-                               this.attributeObservers.put(attributeID, obs);
-                       }
-                       
-                       // Get the probability density function under the 
current model
-                       GaussianEstimator obs_estimator = 
obs.getEstimator(classIndex);
-                       if (obs_estimator != null) {
-                               // Fetch the probability that the feature value 
is zero
-                               double probDens_zero_current = 
obs_estimator.probabilityDensity(0);
-                               classPrototype -= probDens_zero_current;
-                       }
-                       
-                       // FIXME: Sanity check on data values, for now just 
learn
-                       // Learn attribute value for given class
-                       
obs.observeAttributeClass(inst.valueSparse(attributePosition),
-                                       (int) inst.classValue(), inst.weight());
-                       
-                       // Update obs_estimator to fetch the pdf from the 
updated model
-                       obs_estimator = obs.getEstimator(classIndex);
-                       // Fetch the probability that the feature value is zero
-                       double probDens_zero_updated = 
obs_estimator.probabilityDensity(0);
-                       // Update the class prototype
-                       classPrototype += probDens_zero_updated;
-               }
-               // Store the class prototype
-               this.classPrototypes.put(classIndex, classPrototype);
-               // Count another training instance
-               this.instancesSeen++;
-       }
+  /**
+   * Compute the prior for the given classIndex.
+   * 
+   * Implemented by maximum likelihood at the moment.
+   * 
+   * @param classIndex
+   *          Id of the class for which we want to compute the prior.
+   * @return Prior probability for the requested class
+   */
+  private double getPrior(int classIndex) {
+    // Maximum likelihood
+    Double currentCount = this.classInstances.get(classIndex);
+    if (currentCount == null || currentCount == 0)
+      return 0;
+    else
+      return currentCount * 1. / this.instancesSeen;
+  }
 
-       @Override
-       public void setDataset(Instances dataset) {
-               // Do nothing
-       }
+  /**
+   * Resets this classifier. It must be similar to starting a new classifier
+   * from scratch.
+   */
+  @Override
+  public void resetLearning() {
+    // Reset priors
+    this.instancesSeen = 0L;
+    this.classInstances = new HashMap<>();
+    this.classPrototypes = new HashMap<>();
+    // Init the attribute observers
+    this.attributeObservers = new HashMap<>();
+  }
+
+  /**
+   * Trains this classifier incrementally using the given instance.
+   * 
+   * @param inst
+   *          the instance to be used for training
+   */
+  @Override
+  public void trainOnInstance(Instance inst) {
+    // Update class statistics with weights
+    int classIndex = (int) inst.classValue();
+    Double weight = this.classInstances.get(classIndex);
+    if (weight == null)
+      weight = 0.;
+    this.classInstances.put(classIndex, weight + inst.weight());
+
+    // Get the class prototype
+    Double classPrototype = this.classPrototypes.get(classIndex);
+    if (classPrototype == null)
+      classPrototype = 1.;
+
+    // Iterate over the attributes of the given instance
+    for (int attributePosition = 0; attributePosition < inst
+        .numAttributes(); attributePosition++) {
+      // Get the attribute index - Dense -> 1:1, Sparse is remapped
+      int attributeID = inst.index(attributePosition);
+      // Skip class attribute
+      if (attributeID == inst.classIndex())
+        continue;
+      // Get the attribute observer for the current attribute
+      GaussianNumericAttributeClassObserver obs = this.attributeObservers
+          .get(attributeID);
+      // Lazy init of observers, if null, instantiate a new one
+      if (obs == null) {
+        // FIXME: At this point, we model everything as a numeric
+        // attribute
+        obs = new GaussianNumericAttributeClassObserver();
+        this.attributeObservers.put(attributeID, obs);
+      }
+
+      // Get the probability density function under the current model
+      GaussianEstimator obs_estimator = obs.getEstimator(classIndex);
+      if (obs_estimator != null) {
+        // Fetch the probability that the feature value is zero
+        double probDens_zero_current = obs_estimator.probabilityDensity(0);
+        classPrototype -= probDens_zero_current;
+      }
+
+      // FIXME: Sanity check on data values, for now just learn
+      // Learn attribute value for given class
+      obs.observeAttributeClass(inst.valueSparse(attributePosition),
+          (int) inst.classValue(), inst.weight());
+
+      // Update obs_estimator to fetch the pdf from the updated model
+      obs_estimator = obs.getEstimator(classIndex);
+      // Fetch the probability that the feature value is zero
+      double probDens_zero_updated = obs_estimator.probabilityDensity(0);
+      // Update the class prototype
+      classPrototype += probDens_zero_updated;
+    }
+    // Store the class prototype
+    this.classPrototypes.put(classIndex, classPrototype);
+    // Count another training instance
+    this.instancesSeen++;
+  }
+
+  @Override
+  public void setDataset(Instances dataset) {
+    // Do nothing
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java
index a3fb89f..75c5284 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SimpleClassifierAdapter.java
@@ -30,121 +30,125 @@ import com.yahoo.labs.samoa.instances.InstancesHeader;
 import com.yahoo.labs.samoa.moa.classifiers.functions.MajorityClass;
 
 /**
- *
+ * 
  * Base class for adapting external classifiers.
- *
+ * 
  */
 public class SimpleClassifierAdapter implements LocalLearner, Configurable {
 
-    /**
+  /**
      *
      */
-    private static final long serialVersionUID = 4372366401338704353L;
-    
-    public ClassOption learnerOption = new ClassOption("learner", 'l',
-            "Classifier to train.", 
com.yahoo.labs.samoa.moa.classifiers.Classifier.class, 
MajorityClass.class.getName());
-    /**
-     * The learner.
-     */
-    protected com.yahoo.labs.samoa.moa.classifiers.Classifier learner;
-    
-    /**
-     * The is init.
-     */
-    protected Boolean isInit;
-    
-    /**
-     * The dataset.
-     */
-    protected Instances dataset;
+  private static final long serialVersionUID = 4372366401338704353L;
 
-    @Override
-    public void setDataset(Instances dataset) {
-        this.dataset = dataset;
-    }
+  public ClassOption learnerOption = new ClassOption("learner", 'l',
+      "Classifier to train.", 
com.yahoo.labs.samoa.moa.classifiers.Classifier.class, 
MajorityClass.class.getName());
+  /**
+   * The learner.
+   */
+  protected com.yahoo.labs.samoa.moa.classifiers.Classifier learner;
 
-    /**
-     * Instantiates a new learner.
-     *
-     * @param learner the learner
-     * @param dataset the dataset
-     */
-    public 
SimpleClassifierAdapter(com.yahoo.labs.samoa.moa.classifiers.Classifier 
learner, Instances dataset) {
-        this.learner = learner.copy();
-        this.isInit = false;
-        this.dataset = dataset;
-    }
+  /**
+   * The is init.
+   */
+  protected Boolean isInit;
 
-    /**
-     * Instantiates a new learner.
-     *
-     */
-    public SimpleClassifierAdapter() {
-        this.learner = ((com.yahoo.labs.samoa.moa.classifiers.Classifier) 
this.learnerOption.getValue()).copy();
-        this.isInit = false;
-    }
+  /**
+   * The dataset.
+   */
+  protected Instances dataset;
 
-    /**
-     * Creates a new learner object.
-     *
-     * @return the learner
-     */
-    @Override
-    public SimpleClassifierAdapter create() {
-        SimpleClassifierAdapter l = new SimpleClassifierAdapter(learner, 
dataset);
-        if (dataset == null) {
-            System.out.println("dataset null while creating");
-        }
-        return l;
-    }
+  @Override
+  public void setDataset(Instances dataset) {
+    this.dataset = dataset;
+  }
 
-    /**
-     * Trains this classifier incrementally using the given instance.
-     *
-     * @param inst the instance to be used for training
-     */
-    @Override
-    public void trainOnInstance(Instance inst) {
-        if (!this.isInit) {
-            this.isInit = true;
-            InstancesHeader instances = new InstancesHeader(dataset);
-            this.learner.setModelContext(instances);
-            this.learner.prepareForUse();
-        }
-        if (inst.weight() > 0) {
-            inst.setDataset(dataset);
-            learner.trainOnInstance(inst);
-        }
+  /**
+   * Instantiates a new learner.
+   * 
+   * @param learner
+   *          the learner
+   * @param dataset
+   *          the dataset
+   */
+  public 
SimpleClassifierAdapter(com.yahoo.labs.samoa.moa.classifiers.Classifier 
learner, Instances dataset) {
+    this.learner = learner.copy();
+    this.isInit = false;
+    this.dataset = dataset;
+  }
+
+  /**
+   * Instantiates a new learner.
+   * 
+   */
+  public SimpleClassifierAdapter() {
+    this.learner = ((com.yahoo.labs.samoa.moa.classifiers.Classifier) 
this.learnerOption.getValue()).copy();
+    this.isInit = false;
+  }
+
+  /**
+   * Creates a new learner object.
+   * 
+   * @return the learner
+   */
+  @Override
+  public SimpleClassifierAdapter create() {
+    SimpleClassifierAdapter l = new SimpleClassifierAdapter(learner, dataset);
+    if (dataset == null) {
+      System.out.println("dataset null while creating");
     }
+    return l;
+  }
 
-    /**
-     * Predicts the class memberships for a given instance. If an instance is
-     * unclassified, the returned array elements must be all zero.
-     *
-     * @param inst the instance to be classified
-     * @return an array containing the estimated membership probabilities of 
the
-     * test instance in each class
-     */
-    @Override
-    public double[] getVotesForInstance(Instance inst) {
-        double[] ret;
-        inst.setDataset(dataset);
-        if (!this.isInit) {
-           ret = new double[dataset.numClasses()];
-        } else {
-            ret = learner.getVotesForInstance(inst);
-        }
-        return ret;
+  /**
+   * Trains this classifier incrementally using the given instance.
+   * 
+   * @param inst
+   *          the instance to be used for training
+   */
+  @Override
+  public void trainOnInstance(Instance inst) {
+    if (!this.isInit) {
+      this.isInit = true;
+      InstancesHeader instances = new InstancesHeader(dataset);
+      this.learner.setModelContext(instances);
+      this.learner.prepareForUse();
+    }
+    if (inst.weight() > 0) {
+      inst.setDataset(dataset);
+      learner.trainOnInstance(inst);
     }
+  }
 
-    /**
-     * Resets this classifier. It must be similar to starting a new classifier
-     * from scratch.
-     *
-     */
-    @Override
-    public void resetLearning() {
-        learner.resetLearning();
+  /**
+   * Predicts the class memberships for a given instance. If an instance is
+   * unclassified, the returned array elements must be all zero.
+   * 
+   * @param inst
+   *          the instance to be classified
+   * @return an array containing the estimated membership probabilities of the
+   *         test instance in each class
+   */
+  @Override
+  public double[] getVotesForInstance(Instance inst) {
+    double[] ret;
+    inst.setDataset(dataset);
+    if (!this.isInit) {
+      ret = new double[dataset.numClasses()];
+    } else {
+      ret = learner.getVotesForInstance(inst);
     }
+    return ret;
+  }
+
+  /**
+   * Resets this classifier. It must be similar to starting a new classifier
+   * from scratch.
+   * 
+   */
+  @Override
+  public void resetLearning() {
+    learner.resetLearning();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java
index affc935..46352e0 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/SingleClassifier.java
@@ -36,6 +36,7 @@ import com.yahoo.labs.samoa.learners.Learner;
 import com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ChangeDetector;
 import com.yahoo.labs.samoa.topology.Stream;
 import com.yahoo.labs.samoa.topology.TopologyBuilder;
+
 /**
  * 
  * Classifier that contain a single classifier.
@@ -43,67 +44,67 @@ import com.yahoo.labs.samoa.topology.TopologyBuilder;
  */
 public final class SingleClassifier implements Learner, AdaptiveLearner, 
Configurable {
 
-       private static final long serialVersionUID = 684111382631697031L;
-       
-       private LocalLearnerProcessor learnerP;
-               
-       private Stream resultStream;
-
-       private Instances dataset;
-
-       public ClassOption learnerOption = new ClassOption("learner", 'l',
-                       "Classifier to train.", LocalLearner.class, 
SimpleClassifierAdapter.class.getName());
-       
-       private TopologyBuilder builder;
-
-       private int parallelism;
-
-
-       @Override
-       public void init(TopologyBuilder builder, Instances dataset, int 
parallelism){
-               this.builder = builder;
-               this.dataset = dataset;
-               this.parallelism = parallelism;
-               this.setLayout();
-       }
-
-
-       protected void setLayout() {            
-               learnerP = new LocalLearnerProcessor();
-               learnerP.setChangeDetector(this.getChangeDetector());
-               LocalLearner learner = this.learnerOption.getValue();
-               learner.setDataset(this.dataset);
-               learnerP.setLearner(learner);
-                
-               //learnerPI = this.builder.createPi(learnerP, 1);
-               this.builder.addProcessor(learnerP, parallelism);
-               resultStream = this.builder.createStream(learnerP);
-
-               learnerP.setOutputStream(resultStream);
-       }
-
-       @Override
-       public Processor getInputProcessor() {
-               return learnerP;
-       }
-
-       /* (non-Javadoc)
-        * @see samoa.learners.Learner#getResultStreams()
-        */
-       @Override
-       public Set<Stream> getResultStreams() {
-               return ImmutableSet.of(this.resultStream);
-       }
-
-       protected ChangeDetector changeDetector;    
-
-       @Override
-       public ChangeDetector getChangeDetector() {
-               return this.changeDetector;
-       }
-
-       @Override
-       public void setChangeDetector(ChangeDetector cd) {
-               this.changeDetector = cd;
-       }
+  private static final long serialVersionUID = 684111382631697031L;
+
+  private LocalLearnerProcessor learnerP;
+
+  private Stream resultStream;
+
+  private Instances dataset;
+
+  public ClassOption learnerOption = new ClassOption("learner", 'l',
+      "Classifier to train.", LocalLearner.class, 
SimpleClassifierAdapter.class.getName());
+
+  private TopologyBuilder builder;
+
+  private int parallelism;
+
+  @Override
+  public void init(TopologyBuilder builder, Instances dataset, int 
parallelism) {
+    this.builder = builder;
+    this.dataset = dataset;
+    this.parallelism = parallelism;
+    this.setLayout();
+  }
+
+  protected void setLayout() {
+    learnerP = new LocalLearnerProcessor();
+    learnerP.setChangeDetector(this.getChangeDetector());
+    LocalLearner learner = this.learnerOption.getValue();
+    learner.setDataset(this.dataset);
+    learnerP.setLearner(learner);
+
+    // learnerPI = this.builder.createPi(learnerP, 1);
+    this.builder.addProcessor(learnerP, parallelism);
+    resultStream = this.builder.createStream(learnerP);
+
+    learnerP.setOutputStream(resultStream);
+  }
+
+  @Override
+  public Processor getInputProcessor() {
+    return learnerP;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.learners.Learner#getResultStreams()
+   */
+  @Override
+  public Set<Stream> getResultStreams() {
+    return ImmutableSet.of(this.resultStream);
+  }
+
+  protected ChangeDetector changeDetector;
+
+  @Override
+  public ChangeDetector getChangeDetector() {
+    return this.changeDetector;
+  }
+
+  @Override
+  public void setChangeDetector(ChangeDetector cd) {
+    this.changeDetector = cd;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java
index aba3d1d..3bdea57 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/AdaptiveBagging.java
@@ -47,103 +47,105 @@ import com.yahoo.labs.samoa.topology.TopologyBuilder;
  * The Bagging Classifier by Oza and Russell.
  */
 public class AdaptiveBagging implements Learner, Configurable {
-    
-       /** Logger */
+
+  /** Logger */
   private static final Logger logger = 
LoggerFactory.getLogger(AdaptiveBagging.class);
 
-       /** The Constant serialVersionUID. */
-       private static final long serialVersionUID = -2971850264864952099L;
-       
-       /** The base learner option. */
-       public ClassOption baseLearnerOption = new ClassOption("baseLearner", 
'l',
-                       "Classifier to train.", Learner.class, 
VerticalHoeffdingTree.class.getName());
+  /** The Constant serialVersionUID. */
+  private static final long serialVersionUID = -2971850264864952099L;
+
+  /** The base learner option. */
+  public ClassOption baseLearnerOption = new ClassOption("baseLearner", 'l',
+      "Classifier to train.", Learner.class, 
VerticalHoeffdingTree.class.getName());
 
-       /** The ensemble size option. */
-       public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's',
-                       "The number of models in the bag.", 10, 1, 
Integer.MAX_VALUE);
+  /** The ensemble size option. */
+  public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's',
+      "The number of models in the bag.", 10, 1, Integer.MAX_VALUE);
 
-       public ClassOption driftDetectionMethodOption = new 
ClassOption("driftDetectionMethod", 'd',
+  public ClassOption driftDetectionMethodOption = new 
ClassOption("driftDetectionMethod", 'd',
       "Drift detection method to use.", ChangeDetector.class, 
ADWINChangeDetector.class.getName());
 
-       /** The distributor processor. */
-       private BaggingDistributorProcessor distributorP;
-
-       /** The result stream. */
-       protected Stream resultStream;
-       
-       /** The dataset. */
-       private Instances dataset;
-        
-       protected Learner classifier;
-        
+  /** The distributor processor. */
+  private BaggingDistributorProcessor distributorP;
+
+  /** The result stream. */
+  protected Stream resultStream;
+
+  /** The dataset. */
+  private Instances dataset;
+
+  protected Learner classifier;
+
   protected int parallelism;
 
-       /**
-        * Sets the layout.
-        */
-       protected void setLayout() {
-
-               int sizeEnsemble = this.ensembleSizeOption.getValue();
-
-               distributorP = new BaggingDistributorProcessor();
-               distributorP.setSizeEnsemble(sizeEnsemble);
-                this.builder.addProcessor(distributorP, 1);
-                       
-               //instantiate classifier
-               classifier = this.baseLearnerOption.getValue();
-               if (classifier instanceof AdaptiveLearner) {
-                               // logger.info("Building an AdaptiveLearner 
{}", classifier.getClass().getName());
-                               AdaptiveLearner ada = (AdaptiveLearner) 
classifier;
-                               ada.setChangeDetector((ChangeDetector) 
this.driftDetectionMethodOption.getValue());
-               }
-               classifier.init(builder, this.dataset, sizeEnsemble);
-        
-               PredictionCombinerProcessor predictionCombinerP= new 
PredictionCombinerProcessor();
-               predictionCombinerP.setSizeEnsemble(sizeEnsemble);
-               this.builder.addProcessor(predictionCombinerP, 1);
-               
-               //Streams
-               resultStream = this.builder.createStream(predictionCombinerP);
-               predictionCombinerP.setOutputStream(resultStream);
-
-               for (Stream subResultStream:classifier.getResultStreams()) {
-                       this.builder.connectInputKeyStream(subResultStream, 
predictionCombinerP);
-               }
-               
-               /* The training stream. */
-               Stream testingStream = this.builder.createStream(distributorP);
-                this.builder.connectInputKeyStream(testingStream, 
classifier.getInputProcessor());
-       
-               /* The prediction stream. */
-               Stream predictionStream = 
this.builder.createStream(distributorP);
-                this.builder.connectInputKeyStream(predictionStream, 
classifier.getInputProcessor());
-               
-               distributorP.setOutputStream(testingStream);
-               distributorP.setPredictionStream(predictionStream);
-       }
-
-       /** The builder. */
-       private TopologyBuilder builder;
-               
-       
-       @Override
-       public void init(TopologyBuilder builder, Instances dataset, int 
parallelism) {
-               this.builder = builder;
-               this.dataset = dataset;
-                this.parallelism = parallelism;
-               this.setLayout();
-       }
-
-        @Override
-       public Processor getInputProcessor() {
-               return distributorP;
-       }
-        
-       /* (non-Javadoc)
-        * @see samoa.learners.Learner#getResultStreams()
-        */
-       @Override
-       public Set<Stream> getResultStreams() {
-               return ImmutableSet.of(this.resultStream);
-       }
+  /**
+   * Sets the layout.
+   */
+  protected void setLayout() {
+
+    int sizeEnsemble = this.ensembleSizeOption.getValue();
+
+    distributorP = new BaggingDistributorProcessor();
+    distributorP.setSizeEnsemble(sizeEnsemble);
+    this.builder.addProcessor(distributorP, 1);
+
+    // instantiate classifier
+    classifier = this.baseLearnerOption.getValue();
+    if (classifier instanceof AdaptiveLearner) {
+      // logger.info("Building an AdaptiveLearner {}",
+      // classifier.getClass().getName());
+      AdaptiveLearner ada = (AdaptiveLearner) classifier;
+      ada.setChangeDetector((ChangeDetector) 
this.driftDetectionMethodOption.getValue());
+    }
+    classifier.init(builder, this.dataset, sizeEnsemble);
+
+    PredictionCombinerProcessor predictionCombinerP = new 
PredictionCombinerProcessor();
+    predictionCombinerP.setSizeEnsemble(sizeEnsemble);
+    this.builder.addProcessor(predictionCombinerP, 1);
+
+    // Streams
+    resultStream = this.builder.createStream(predictionCombinerP);
+    predictionCombinerP.setOutputStream(resultStream);
+
+    for (Stream subResultStream : classifier.getResultStreams()) {
+      this.builder.connectInputKeyStream(subResultStream, predictionCombinerP);
+    }
+
+    /* The training stream. */
+    Stream testingStream = this.builder.createStream(distributorP);
+    this.builder.connectInputKeyStream(testingStream, 
classifier.getInputProcessor());
+
+    /* The prediction stream. */
+    Stream predictionStream = this.builder.createStream(distributorP);
+    this.builder.connectInputKeyStream(predictionStream, 
classifier.getInputProcessor());
+
+    distributorP.setOutputStream(testingStream);
+    distributorP.setPredictionStream(predictionStream);
+  }
+
+  /** The builder. */
+  private TopologyBuilder builder;
+
+  @Override
+  public void init(TopologyBuilder builder, Instances dataset, int 
parallelism) {
+    this.builder = builder;
+    this.dataset = dataset;
+    this.parallelism = parallelism;
+    this.setLayout();
+  }
+
+  @Override
+  public Processor getInputProcessor() {
+    return distributorP;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.learners.Learner#getResultStreams()
+   */
+  @Override
+  public Set<Stream> getResultStreams() {
+    return ImmutableSet.of(this.resultStream);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java
index 9f99ff1..3a78933 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Bagging.java
@@ -40,99 +40,99 @@ import 
com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree;
 /**
  * The Bagging Classifier by Oza and Russell.
  */
-public class Bagging implements Learner , Configurable {
-
-       /** The Constant serialVersionUID. */
-       private static final long serialVersionUID = -2971850264864952099L;
-       
-       /** The base learner option. */
-       public ClassOption baseLearnerOption = new ClassOption("baseLearner", 
'l',
-                       "Classifier to train.", Learner.class, 
VerticalHoeffdingTree.class.getName());
-
-        
-       /** The ensemble size option. */
-       public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's',
-                       "The number of models in the bag.", 10, 1, 
Integer.MAX_VALUE);
-
-       /** The distributor processor. */
-       private BaggingDistributorProcessor distributorP;
-       
-       /** The training stream. */
-       private Stream testingStream;
-       
-       /** The prediction stream. */
-       private Stream predictionStream;
-       
-       /** The result stream. */
-       protected Stream resultStream;
-       
-       /** The dataset. */
-       private Instances dataset;
-        
-        protected Learner classifier;
-        
-        protected int parallelism;
-
-       /**
-        * Sets the layout.
-        */
-       protected void setLayout() {
-
-               int sizeEnsemble = this.ensembleSizeOption.getValue();
-
-               distributorP = new BaggingDistributorProcessor();
-               distributorP.setSizeEnsemble(sizeEnsemble);
-                this.builder.addProcessor(distributorP, 1);
-                       
-                //instantiate classifier 
-                classifier = (Learner) this.baseLearnerOption.getValue();
-                classifier.init(builder, this.dataset, sizeEnsemble);
-        
-               PredictionCombinerProcessor predictionCombinerP= new 
PredictionCombinerProcessor();
-               predictionCombinerP.setSizeEnsemble(sizeEnsemble);
-               this.builder.addProcessor(predictionCombinerP, 1);
-               
-               //Streams
-               resultStream = this.builder.createStream(predictionCombinerP);
-               predictionCombinerP.setOutputStream(resultStream);
-
-               for (Stream subResultStream:classifier.getResultStreams()) {
-                       this.builder.connectInputKeyStream(subResultStream, 
predictionCombinerP);
-               }
-               
-               testingStream = this.builder.createStream(distributorP);
-                this.builder.connectInputKeyStream(testingStream, 
classifier.getInputProcessor());
-       
-               predictionStream = this.builder.createStream(distributorP);     
        
-                this.builder.connectInputKeyStream(predictionStream, 
classifier.getInputProcessor());
-               
-               distributorP.setOutputStream(testingStream);
-               distributorP.setPredictionStream(predictionStream);
-       }
-
-       /** The builder. */
-       private TopologyBuilder builder;
-               
-       
-       @Override
-       public void init(TopologyBuilder builder, Instances dataset, int 
parallelism) {
-               this.builder = builder;
-               this.dataset = dataset;
-                this.parallelism = parallelism;
-               this.setLayout();
-       }
-
-        @Override
-       public Processor getInputProcessor() {
-               return distributorP;
-       }
-        
-    /* (non-Javadoc)
-     * @see samoa.learners.Learner#getResultStreams()
-     */
-    @Override
-    public Set<Stream> getResultStreams() {
-       Set<Stream> streams = ImmutableSet.of(this.resultStream);
-               return streams;
+public class Bagging implements Learner, Configurable {
+
+  /** The Constant serialVersionUID. */
+  private static final long serialVersionUID = -2971850264864952099L;
+
+  /** The base learner option. */
+  public ClassOption baseLearnerOption = new ClassOption("baseLearner", 'l',
+      "Classifier to train.", Learner.class, 
VerticalHoeffdingTree.class.getName());
+
+  /** The ensemble size option. */
+  public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's',
+      "The number of models in the bag.", 10, 1, Integer.MAX_VALUE);
+
+  /** The distributor processor. */
+  private BaggingDistributorProcessor distributorP;
+
+  /** The training stream. */
+  private Stream testingStream;
+
+  /** The prediction stream. */
+  private Stream predictionStream;
+
+  /** The result stream. */
+  protected Stream resultStream;
+
+  /** The dataset. */
+  private Instances dataset;
+
+  protected Learner classifier;
+
+  protected int parallelism;
+
+  /**
+   * Sets the layout.
+   */
+  protected void setLayout() {
+
+    int sizeEnsemble = this.ensembleSizeOption.getValue();
+
+    distributorP = new BaggingDistributorProcessor();
+    distributorP.setSizeEnsemble(sizeEnsemble);
+    this.builder.addProcessor(distributorP, 1);
+
+    // instantiate classifier
+    classifier = (Learner) this.baseLearnerOption.getValue();
+    classifier.init(builder, this.dataset, sizeEnsemble);
+
+    PredictionCombinerProcessor predictionCombinerP = new 
PredictionCombinerProcessor();
+    predictionCombinerP.setSizeEnsemble(sizeEnsemble);
+    this.builder.addProcessor(predictionCombinerP, 1);
+
+    // Streams
+    resultStream = this.builder.createStream(predictionCombinerP);
+    predictionCombinerP.setOutputStream(resultStream);
+
+    for (Stream subResultStream : classifier.getResultStreams()) {
+      this.builder.connectInputKeyStream(subResultStream, predictionCombinerP);
     }
+
+    testingStream = this.builder.createStream(distributorP);
+    this.builder.connectInputKeyStream(testingStream, 
classifier.getInputProcessor());
+
+    predictionStream = this.builder.createStream(distributorP);
+    this.builder.connectInputKeyStream(predictionStream, 
classifier.getInputProcessor());
+
+    distributorP.setOutputStream(testingStream);
+    distributorP.setPredictionStream(predictionStream);
+  }
+
+  /** The builder. */
+  private TopologyBuilder builder;
+
+  @Override
+  public void init(TopologyBuilder builder, Instances dataset, int 
parallelism) {
+    this.builder = builder;
+    this.dataset = dataset;
+    this.parallelism = parallelism;
+    this.setLayout();
+  }
+
+  @Override
+  public Processor getInputProcessor() {
+    return distributorP;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.learners.Learner#getResultStreams()
+   */
+  @Override
+  public Set<Stream> getResultStreams() {
+    Set<Stream> streams = ImmutableSet.of(this.resultStream);
+    return streams;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java
index 65c782b..44264ac 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BaggingDistributorProcessor.java
@@ -35,167 +35,174 @@ import java.util.Random;
 /**
  * The Class BaggingDistributorPE.
  */
-public class BaggingDistributorProcessor implements Processor{
+public class BaggingDistributorProcessor implements Processor {
 
-       /**
+  /**
         * 
         */
-       private static final long serialVersionUID = -1550901409625192730L;
-
-       /** The size ensemble. */
-       private int sizeEnsemble;
-       
-       /** The training stream. */
-       private Stream trainingStream;
-       
-       /** The prediction stream. */
-       private Stream predictionStream;
-
-       /**
-        * On event.
-        *
-        * @param event the event
-        * @return true, if successful
-        */
-       public boolean process(ContentEvent event) {
-               InstanceContentEvent inEvent = (InstanceContentEvent) event; 
//((s4Event) event).getContentEvent();
-               //InstanceEvent inEvent = (InstanceEvent) event;
-
-               if (inEvent.getInstanceIndex() < 0) {
-                       // End learning
-                       predictionStream.put(event);
-                       return false;
-               }
-
-
-                if (inEvent.isTesting()){ 
-                       Instance trainInst = inEvent.getInstance();
-                       for (int i = 0; i < sizeEnsemble; i++) {
-                               Instance weightedInst = trainInst.copy();
-                               //weightedInst.setWeight(trainInst.weight() * 
k);
-                               InstanceContentEvent instanceContentEvent = new 
InstanceContentEvent(
-                                               inEvent.getInstanceIndex(), 
weightedInst, false, true);
-                               instanceContentEvent.setClassifierIndex(i);
-                               
instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());  
-                               predictionStream.put(instanceContentEvent);
-                       }
-               }
-                
-                               /* Estimate model parameters using the training 
data. */
-               if (inEvent.isTraining()) {
-                       train(inEvent);
-               } 
-               return false;
-       }
-
-       /** The random. */
-       protected Random random = new Random();
-
-       /**
-        * Train.
-        *
-        * @param inEvent the in event
-        */
-       protected void train(InstanceContentEvent inEvent) {
-               Instance trainInst = inEvent.getInstance();
-               for (int i = 0; i < sizeEnsemble; i++) {
-                       int k = MiscUtils.poisson(1.0, this.random);
-                       if (k > 0) {
-                               Instance weightedInst = trainInst.copy();
-                               weightedInst.setWeight(trainInst.weight() * k);
-                               InstanceContentEvent instanceContentEvent = new 
InstanceContentEvent(
-                                               inEvent.getInstanceIndex(), 
weightedInst, true, false);
-                               instanceContentEvent.setClassifierIndex(i);
-                               
instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());  
-                               trainingStream.put(instanceContentEvent);
-                       }
-               }
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.s4.core.ProcessingElement#onCreate()
-        */
-       @Override
-       public void onCreate(int id) {
-               //do nothing
-       }
-
-
-       /**
-        * Gets the training stream.
-        *
-        * @return the training stream
-        */
-       public Stream getTrainingStream() {
-               return trainingStream;
-       }
-
-       /**
-        * Sets the training stream.
-        *
-        * @param trainingStream the new training stream
-        */
-       public void setOutputStream(Stream trainingStream) {
-               this.trainingStream = trainingStream;
-       }
-
-       /**
-        * Gets the prediction stream.
-        *
-        * @return the prediction stream
-        */
-       public Stream getPredictionStream() {
-               return predictionStream;
-       }
-
-       /**
-        * Sets the prediction stream.
-        *
-        * @param predictionStream the new prediction stream
-        */
-       public void setPredictionStream(Stream predictionStream) {
-               this.predictionStream = predictionStream;
-       }
-
-       /**
-        * Gets the size ensemble.
-        *
-        * @return the size ensemble
-        */
-       public int getSizeEnsemble() {
-               return sizeEnsemble;
-       }
-
-       /**
-        * Sets the size ensemble.
-        *
-        * @param sizeEnsemble the new size ensemble
-        */
-       public void setSizeEnsemble(int sizeEnsemble) {
-               this.sizeEnsemble = sizeEnsemble;
-       }
-       
-       
-       /* (non-Javadoc)
-        * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
-        */
-       @Override
-       public Processor newProcessor(Processor sourceProcessor) {
-               BaggingDistributorProcessor newProcessor = new 
BaggingDistributorProcessor();
-               BaggingDistributorProcessor originProcessor = 
(BaggingDistributorProcessor) sourceProcessor;
-               if (originProcessor.getPredictionStream() != null){
-                       
newProcessor.setPredictionStream(originProcessor.getPredictionStream());
-               }
-               if (originProcessor.getTrainingStream() != null){
-                       
newProcessor.setOutputStream(originProcessor.getTrainingStream());
-               }
-               newProcessor.setSizeEnsemble(originProcessor.getSizeEnsemble());
-               /*if (originProcessor.getLearningCurve() != null){
-                       newProcessor.setLearningCurve((LearningCurve) 
originProcessor.getLearningCurve().copy());
-               }*/
-               return newProcessor;
-       }
+  private static final long serialVersionUID = -1550901409625192730L;
+
+  /** The size ensemble. */
+  private int sizeEnsemble;
+
+  /** The training stream. */
+  private Stream trainingStream;
+
+  /** The prediction stream. */
+  private Stream predictionStream;
+
+  /**
+   * On event.
+   * 
+   * @param event
+   *          the event
+   * @return true, if successful
+   */
+  public boolean process(ContentEvent event) {
+    InstanceContentEvent inEvent = (InstanceContentEvent) event; // ((s4Event)
+                                                                 // 
event).getContentEvent();
+    // InstanceEvent inEvent = (InstanceEvent) event;
+
+    if (inEvent.getInstanceIndex() < 0) {
+      // End learning
+      predictionStream.put(event);
+      return false;
+    }
+
+    if (inEvent.isTesting()) {
+      Instance trainInst = inEvent.getInstance();
+      for (int i = 0; i < sizeEnsemble; i++) {
+        Instance weightedInst = trainInst.copy();
+        // weightedInst.setWeight(trainInst.weight() * k);
+        InstanceContentEvent instanceContentEvent = new InstanceContentEvent(
+            inEvent.getInstanceIndex(), weightedInst, false, true);
+        instanceContentEvent.setClassifierIndex(i);
+        instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
+        predictionStream.put(instanceContentEvent);
+      }
+    }
+
+    /* Estimate model parameters using the training data. */
+    if (inEvent.isTraining()) {
+      train(inEvent);
+    }
+    return false;
+  }
+
+  /** The random. */
+  protected Random random = new Random();
+
+  /**
+   * Train.
+   * 
+   * @param inEvent
+   *          the in event
+   */
+  protected void train(InstanceContentEvent inEvent) {
+    Instance trainInst = inEvent.getInstance();
+    for (int i = 0; i < sizeEnsemble; i++) {
+      int k = MiscUtils.poisson(1.0, this.random);
+      if (k > 0) {
+        Instance weightedInst = trainInst.copy();
+        weightedInst.setWeight(trainInst.weight() * k);
+        InstanceContentEvent instanceContentEvent = new InstanceContentEvent(
+            inEvent.getInstanceIndex(), weightedInst, true, false);
+        instanceContentEvent.setClassifierIndex(i);
+        instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
+        trainingStream.put(instanceContentEvent);
+      }
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see org.apache.s4.core.ProcessingElement#onCreate()
+   */
+  @Override
+  public void onCreate(int id) {
+    // do nothing
+  }
+
+  /**
+   * Gets the training stream.
+   * 
+   * @return the training stream
+   */
+  public Stream getTrainingStream() {
+    return trainingStream;
+  }
+
+  /**
+   * Sets the training stream.
+   * 
+   * @param trainingStream
+   *          the new training stream
+   */
+  public void setOutputStream(Stream trainingStream) {
+    this.trainingStream = trainingStream;
+  }
+
+  /**
+   * Gets the prediction stream.
+   * 
+   * @return the prediction stream
+   */
+  public Stream getPredictionStream() {
+    return predictionStream;
+  }
+
+  /**
+   * Sets the prediction stream.
+   * 
+   * @param predictionStream
+   *          the new prediction stream
+   */
+  public void setPredictionStream(Stream predictionStream) {
+    this.predictionStream = predictionStream;
+  }
+
+  /**
+   * Gets the size ensemble.
+   * 
+   * @return the size ensemble
+   */
+  public int getSizeEnsemble() {
+    return sizeEnsemble;
+  }
+
+  /**
+   * Sets the size ensemble.
+   * 
+   * @param sizeEnsemble
+   *          the new size ensemble
+   */
+  public void setSizeEnsemble(int sizeEnsemble) {
+    this.sizeEnsemble = sizeEnsemble;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
+   */
+  @Override
+  public Processor newProcessor(Processor sourceProcessor) {
+    BaggingDistributorProcessor newProcessor = new 
BaggingDistributorProcessor();
+    BaggingDistributorProcessor originProcessor = 
(BaggingDistributorProcessor) sourceProcessor;
+    if (originProcessor.getPredictionStream() != null) {
+      newProcessor.setPredictionStream(originProcessor.getPredictionStream());
+    }
+    if (originProcessor.getTrainingStream() != null) {
+      newProcessor.setOutputStream(originProcessor.getTrainingStream());
+    }
+    newProcessor.setSizeEnsemble(originProcessor.getSizeEnsemble());
+    /*
+     * if (originProcessor.getLearningCurve() != null){
+     * newProcessor.setLearningCurve((LearningCurve)
+     * originProcessor.getLearningCurve().copy()); }
+     */
+    return newProcessor;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java
index 06723e2..e81c490 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/Boosting.java
@@ -40,103 +40,108 @@ import com.yahoo.labs.samoa.topology.TopologyBuilder;
 /**
  * The Bagging Classifier by Oza and Russell.
  */
-public class Boosting implements Learner , Configurable {
-
-       /** The Constant serialVersionUID. */
-       private static final long serialVersionUID = -2971850264864952099L;
-       
-       /** The base learner option. */
-       public ClassOption baseLearnerOption = new ClassOption("baseLearner", 
'l',
-                       "Classifier to train.", Learner.class, 
SingleClassifier.class.getName());
-
-       /** The ensemble size option. */
-       public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's',
-                       "The number of models in the bag.", 10, 1, 
Integer.MAX_VALUE);
-
-       /** The distributor processor. */
-       private BoostingDistributorProcessor distributorP;
-
-       /** The result stream. */
-       protected Stream resultStream;
-       
-       /** The dataset. */
-       private Instances dataset;
-        
-       protected Learner classifier;
-        
-       protected int parallelism;
-
-       /**
-        * Sets the layout.
-        */
-       protected void setLayout() {
-
-               int sizeEnsemble = this.ensembleSizeOption.getValue();
-
-               distributorP = new BoostingDistributorProcessor();
-               distributorP.setSizeEnsemble(sizeEnsemble);
-               this.builder.addProcessor(distributorP, 1);
-       
-               //instantiate classifier
-               classifier = this.baseLearnerOption.getValue();
-               classifier.init(builder, this.dataset, sizeEnsemble);
-               
-               BoostingPredictionCombinerProcessor predictionCombinerP= new 
BoostingPredictionCombinerProcessor();
-               predictionCombinerP.setSizeEnsemble(sizeEnsemble);
-               this.builder.addProcessor(predictionCombinerP, 1);
-               
-               //Streams
-               resultStream = this.builder.createStream(predictionCombinerP);
-               predictionCombinerP.setOutputStream(resultStream);
-
-               for (Stream subResultStream:classifier.getResultStreams()) {
-                       this.builder.connectInputKeyStream(subResultStream, 
predictionCombinerP);
-               }
-               
-               /* The testing stream. */
-               Stream testingStream = this.builder.createStream(distributorP);
-               this.builder.connectInputKeyStream(testingStream, 
classifier.getInputProcessor());
-       
-               /* The prediction stream. */
-               Stream predictionStream = 
this.builder.createStream(distributorP);
-               this.builder.connectInputKeyStream(predictionStream, 
classifier.getInputProcessor());
-               
-               distributorP.setOutputStream(testingStream);
-               distributorP.setPredictionStream(predictionStream);
-                
+public class Boosting implements Learner, Configurable {
+
+  /** The Constant serialVersionUID. */
+  private static final long serialVersionUID = -2971850264864952099L;
+
+  /** The base learner option. */
+  public ClassOption baseLearnerOption = new ClassOption("baseLearner", 'l',
+      "Classifier to train.", Learner.class, SingleClassifier.class.getName());
+
+  /** The ensemble size option. */
+  public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's',
+      "The number of models in the bag.", 10, 1, Integer.MAX_VALUE);
+
+  /** The distributor processor. */
+  private BoostingDistributorProcessor distributorP;
+
+  /** The result stream. */
+  protected Stream resultStream;
+
+  /** The dataset. */
+  private Instances dataset;
+
+  protected Learner classifier;
+
+  protected int parallelism;
+
+  /**
+   * Sets the layout.
+   */
+  protected void setLayout() {
+
+    int sizeEnsemble = this.ensembleSizeOption.getValue();
+
+    distributorP = new BoostingDistributorProcessor();
+    distributorP.setSizeEnsemble(sizeEnsemble);
+    this.builder.addProcessor(distributorP, 1);
+
+    // instantiate classifier
+    classifier = this.baseLearnerOption.getValue();
+    classifier.init(builder, this.dataset, sizeEnsemble);
+
+    BoostingPredictionCombinerProcessor predictionCombinerP = new 
BoostingPredictionCombinerProcessor();
+    predictionCombinerP.setSizeEnsemble(sizeEnsemble);
+    this.builder.addProcessor(predictionCombinerP, 1);
+
+    // Streams
+    resultStream = this.builder.createStream(predictionCombinerP);
+    predictionCombinerP.setOutputStream(resultStream);
+
+    for (Stream subResultStream : classifier.getResultStreams()) {
+      this.builder.connectInputKeyStream(subResultStream, predictionCombinerP);
+    }
+
+    /* The testing stream. */
+    Stream testingStream = this.builder.createStream(distributorP);
+    this.builder.connectInputKeyStream(testingStream, 
classifier.getInputProcessor());
+
+    /* The prediction stream. */
+    Stream predictionStream = this.builder.createStream(distributorP);
+    this.builder.connectInputKeyStream(predictionStream, 
classifier.getInputProcessor());
+
+    distributorP.setOutputStream(testingStream);
+    distributorP.setPredictionStream(predictionStream);
+
     // Addition to Bagging: stream to train
     /* The training stream. */
-               Stream trainingStream = 
this.builder.createStream(predictionCombinerP);
-               predictionCombinerP.setTrainingStream(trainingStream);
-               this.builder.connectInputKeyStream(trainingStream, 
classifier.getInputProcessor());
-                
-       }
-
-       /** The builder. */
-       private TopologyBuilder builder;
-
-       /* (non-Javadoc)
-        * @see samoa.classifiers.Classifier#init(samoa.engines.Engine, 
samoa.core.Stream, weka.core.Instances)
-        */                     
-       
-       @Override
-       public void init(TopologyBuilder builder, Instances dataset, int 
parallelism) {
-               this.builder = builder;
-               this.dataset = dataset;
-                this.parallelism = parallelism;
-               this.setLayout();
-       }
-
-        @Override
-       public Processor getInputProcessor() {
-               return distributorP;
-       }
-        
-    /* (non-Javadoc)
-     * @see samoa.learners.Learner#getResultStreams()
-     */
-    @Override
-    public Set<Stream> getResultStreams() {
-                       return ImmutableSet.of(this.resultStream);
-    }
+    Stream trainingStream = this.builder.createStream(predictionCombinerP);
+    predictionCombinerP.setTrainingStream(trainingStream);
+    this.builder.connectInputKeyStream(trainingStream, 
classifier.getInputProcessor());
+
+  }
+
+  /** The builder. */
+  private TopologyBuilder builder;
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.classifiers.Classifier#init(samoa.engines.Engine,
+   * samoa.core.Stream, weka.core.Instances)
+   */
+
+  @Override
+  public void init(TopologyBuilder builder, Instances dataset, int 
parallelism) {
+    this.builder = builder;
+    this.dataset = dataset;
+    this.parallelism = parallelism;
+    this.setLayout();
+  }
+
+  @Override
+  public Processor getInputProcessor() {
+    return distributorP;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.learners.Learner#getResultStreams()
+   */
+  @Override
+  public Set<Stream> getResultStreams() {
+    return ImmutableSet.of(this.resultStream);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java
index 7100e7e..78c2bd0 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingDistributorProcessor.java
@@ -22,15 +22,14 @@ import com.yahoo.labs.samoa.learners.InstanceContentEvent;
  * #L%
  */
 
-
 /**
  * The Class BoostingDistributorProcessor.
  */
-public class BoostingDistributorProcessor extends BaggingDistributorProcessor{
-    
-        @Override
-       protected void train(InstanceContentEvent inEvent) {
-            // Boosting is trained from the prediction combiner, not from the 
input
-        }
-    
+public class BoostingDistributorProcessor extends BaggingDistributorProcessor {
+
+  @Override
+  protected void train(InstanceContentEvent inEvent) {
+    // Boosting is trained from the prediction combiner, not from the input
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java
index 1d8db50..8acbdc8 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/BoostingPredictionCombinerProcessor.java
@@ -40,137 +40,139 @@ import com.yahoo.labs.samoa.topology.Stream;
  */
 public class BoostingPredictionCombinerProcessor extends 
PredictionCombinerProcessor {
 
-    private static final long serialVersionUID = -1606045723451191232L;
-    
-    //Weigths classifier
-    protected double[] scms;
-
-    //Weights instance
-    protected double[] swms;
-
-    /**
-     * On event.
-     *
-     * @param event the event
-     * @return true, if successful
-     */
-    @Override
-    public boolean process(ContentEvent event) {
-
-        ResultContentEvent inEvent = (ResultContentEvent) event;
-        double[] prediction = inEvent.getClassVotes();
-        int instanceIndex = (int) inEvent.getInstanceIndex();
-        
-        addStatisticsForInstanceReceived(instanceIndex, 
inEvent.getClassifierIndex(), prediction, 1);
-        //Boosting
-        addPredictions(instanceIndex, inEvent, prediction);              
-               
-        if (inEvent.isLastEvent() || 
hasAllVotesArrivedInstance(instanceIndex)) {
-            DoubleVector combinedVote = 
this.mapVotesforInstanceReceived.get(instanceIndex);
-            if (combinedVote == null){
-                combinedVote = new DoubleVector();
-            }
-            ResultContentEvent outContentEvent = new 
ResultContentEvent(inEvent.getInstanceIndex(),
-                    inEvent.getInstance(), inEvent.getClassId(),
-                    combinedVote.getArrayCopy(), inEvent.isLastEvent());
-            outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
-            outputStream.put(outContentEvent);
-            clearStatisticsInstance(instanceIndex);
-            //Boosting
-            computeBoosting(inEvent, instanceIndex);
-            return true;
-        }
-        return false;
+  private static final long serialVersionUID = -1606045723451191232L;
 
+  // Weigths classifier
+  protected double[] scms;
+
+  // Weights instance
+  protected double[] swms;
+
+  /**
+   * On event.
+   * 
+   * @param event
+   *          the event
+   * @return true, if successful
+   */
+  @Override
+  public boolean process(ContentEvent event) {
+
+    ResultContentEvent inEvent = (ResultContentEvent) event;
+    double[] prediction = inEvent.getClassVotes();
+    int instanceIndex = (int) inEvent.getInstanceIndex();
+
+    addStatisticsForInstanceReceived(instanceIndex, 
inEvent.getClassifierIndex(), prediction, 1);
+    // Boosting
+    addPredictions(instanceIndex, inEvent, prediction);
+
+    if (inEvent.isLastEvent() || hasAllVotesArrivedInstance(instanceIndex)) {
+      DoubleVector combinedVote = 
this.mapVotesforInstanceReceived.get(instanceIndex);
+      if (combinedVote == null) {
+        combinedVote = new DoubleVector();
+      }
+      ResultContentEvent outContentEvent = new 
ResultContentEvent(inEvent.getInstanceIndex(),
+          inEvent.getInstance(), inEvent.getClassId(),
+          combinedVote.getArrayCopy(), inEvent.isLastEvent());
+      outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
+      outputStream.put(outContentEvent);
+      clearStatisticsInstance(instanceIndex);
+      // Boosting
+      computeBoosting(inEvent, instanceIndex);
+      return true;
     }
-    
-    protected Random random;
-    
-    protected int trainingWeightSeenByModel;
-    
-    @Override
-    protected double getEnsembleMemberWeight(int i) {
-        double em = this.swms[i] / (this.scms[i] + this.swms[i]);
-        if ((em == 0.0) || (em > 0.5)) {
-            return 0.0;
-        }
-        double Bm = em / (1.0 - em);
-        return Math.log(1.0 / Bm);
-    }
-    
-    @Override
-    public void reset() {
-        this.random = new Random();
-        this.trainingWeightSeenByModel = 0;
-        this.scms = new double[this.ensembleSize];
-        this.swms = new double[this.ensembleSize];
+    return false;
+
+  }
+
+  protected Random random;
+
+  protected int trainingWeightSeenByModel;
+
+  @Override
+  protected double getEnsembleMemberWeight(int i) {
+    double em = this.swms[i] / (this.scms[i] + this.swms[i]);
+    if ((em == 0.0) || (em > 0.5)) {
+      return 0.0;
     }
+    double Bm = em / (1.0 - em);
+    return Math.log(1.0 / Bm);
+  }
+
+  @Override
+  public void reset() {
+    this.random = new Random();
+    this.trainingWeightSeenByModel = 0;
+    this.scms = new double[this.ensembleSize];
+    this.swms = new double[this.ensembleSize];
+  }
+
+  private boolean correctlyClassifies(int i, Instance inst, int instanceIndex) 
{
+    int predictedClass = (int) mapPredictions.get(instanceIndex).getValue(i);
+    return predictedClass == (int) inst.classValue();
+  }
+
+  protected Map<Integer, DoubleVector> mapPredictions;
 
-    private boolean correctlyClassifies(int i, Instance inst, int 
instanceIndex) {
-        int predictedClass = (int) 
mapPredictions.get(instanceIndex).getValue(i);
-        return predictedClass == (int) inst.classValue();
+  private void addPredictions(int instanceIndex, ResultContentEvent inEvent, 
double[] prediction) {
+    if (this.mapPredictions == null) {
+      this.mapPredictions = new HashMap<>();
     }
-    
-    protected Map<Integer, DoubleVector> mapPredictions;
-
-    private void addPredictions(int instanceIndex, ResultContentEvent inEvent, 
double[] prediction) {
-        if (this.mapPredictions == null) {
-            this.mapPredictions = new HashMap<>();
-        }
-        DoubleVector predictions = this.mapPredictions.get(instanceIndex);
-        if (predictions == null){
-            predictions = new DoubleVector();
-        }
-        predictions.setValue(inEvent.getClassifierIndex(), 
Utils.maxIndex(prediction));
-        this.mapPredictions.put(instanceIndex, predictions);
+    DoubleVector predictions = this.mapPredictions.get(instanceIndex);
+    if (predictions == null) {
+      predictions = new DoubleVector();
     }
+    predictions.setValue(inEvent.getClassifierIndex(), 
Utils.maxIndex(prediction));
+    this.mapPredictions.put(instanceIndex, predictions);
+  }
 
-    private void computeBoosting(ResultContentEvent inEvent, int 
instanceIndex) {
-        // Starts code for Boosting
-        //Send instances to train
-        double lambda_d = 1.0;
-        for (int i = 0; i < this.ensembleSize; i++) {
-            double k = lambda_d;
-            Instance inst = inEvent.getInstance();
-            if (k > 0.0) {
-                Instance weightedInst = inst.copy();
-                weightedInst.setWeight(inst.weight() * k);
-                //this.ensemble[i].trainOnInstance(weightedInst);
-                InstanceContentEvent instanceContentEvent = new 
InstanceContentEvent(
-                                inEvent.getInstanceIndex(), weightedInst, 
true, false);
-                instanceContentEvent.setClassifierIndex(i);
-                
instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex()); 
-                trainingStream.put(instanceContentEvent);
-            }
-            if (this.correctlyClassifies(i, inst, instanceIndex)){
-                this.scms[i] += lambda_d;
-                lambda_d *= this.trainingWeightSeenByModel / (2 * 
this.scms[i]);
-            } else {
-                this.swms[i] += lambda_d;
-                lambda_d *= this.trainingWeightSeenByModel / (2 * 
this.swms[i]);
-            }
-        }
+  private void computeBoosting(ResultContentEvent inEvent, int instanceIndex) {
+    // Starts code for Boosting
+    // Send instances to train
+    double lambda_d = 1.0;
+    for (int i = 0; i < this.ensembleSize; i++) {
+      double k = lambda_d;
+      Instance inst = inEvent.getInstance();
+      if (k > 0.0) {
+        Instance weightedInst = inst.copy();
+        weightedInst.setWeight(inst.weight() * k);
+        // this.ensemble[i].trainOnInstance(weightedInst);
+        InstanceContentEvent instanceContentEvent = new InstanceContentEvent(
+            inEvent.getInstanceIndex(), weightedInst, true, false);
+        instanceContentEvent.setClassifierIndex(i);
+        instanceContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
+        trainingStream.put(instanceContentEvent);
+      }
+      if (this.correctlyClassifies(i, inst, instanceIndex)) {
+        this.scms[i] += lambda_d;
+        lambda_d *= this.trainingWeightSeenByModel / (2 * this.scms[i]);
+      } else {
+        this.swms[i] += lambda_d;
+        lambda_d *= this.trainingWeightSeenByModel / (2 * this.swms[i]);
+      }
     }
-    
-       /**
-        * Gets the training stream.
-        *
-        * @return the training stream
-        */
-       public Stream getTrainingStream() {
-               return trainingStream;
-       }
-
-       /**
-        * Sets the training stream.
-        *
-        * @param trainingStream the new training stream
-        */
-       public void setTrainingStream(Stream trainingStream) {
-               this.trainingStream = trainingStream;
-       }
-        
-        /** The training stream. */
-       private Stream trainingStream;
-    
+  }
+
+  /**
+   * Gets the training stream.
+   * 
+   * @return the training stream
+   */
+  public Stream getTrainingStream() {
+    return trainingStream;
+  }
+
+  /**
+   * Sets the training stream.
+   * 
+   * @param trainingStream
+   *          the new training stream
+   */
+  public void setTrainingStream(Stream trainingStream) {
+    this.trainingStream = trainingStream;
+  }
+
+  /** The training stream. */
+  private Stream trainingStream;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java
index e4228d8..fff801f 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/ensemble/PredictionCombinerProcessor.java
@@ -37,148 +37,151 @@ import com.yahoo.labs.samoa.topology.Stream;
  */
 public class PredictionCombinerProcessor implements Processor {
 
-    private static final long serialVersionUID = -1606045723451191132L;
-
-    /**
-     * The size ensemble.
-     */
-    protected int ensembleSize;
-
-    /**
-     * The output stream.
-     */
-    protected Stream outputStream;
-
-    /**
-     * Sets the output stream.
-     *
-     * @param stream the new output stream
-     */
-    public void setOutputStream(Stream stream) {
-        outputStream = stream;
+  private static final long serialVersionUID = -1606045723451191132L;
+
+  /**
+   * The size ensemble.
+   */
+  protected int ensembleSize;
+
+  /**
+   * The output stream.
+   */
+  protected Stream outputStream;
+
+  /**
+   * Sets the output stream.
+   * 
+   * @param stream
+   *          the new output stream
+   */
+  public void setOutputStream(Stream stream) {
+    outputStream = stream;
+  }
+
+  /**
+   * Gets the output stream.
+   * 
+   * @return the output stream
+   */
+  public Stream getOutputStream() {
+    return outputStream;
+  }
+
+  /**
+   * Gets the size ensemble.
+   * 
+   * @return the ensembleSize
+   */
+  public int getSizeEnsemble() {
+    return ensembleSize;
+  }
+
+  /**
+   * Sets the size ensemble.
+   * 
+   * @param ensembleSize
+   *          the new size ensemble
+   */
+  public void setSizeEnsemble(int ensembleSize) {
+    this.ensembleSize = ensembleSize;
+  }
+
+  protected Map<Integer, Integer> mapCountsforInstanceReceived;
+
+  protected Map<Integer, DoubleVector> mapVotesforInstanceReceived;
+
+  /**
+   * On event.
+   * 
+   * @param event
+   *          the event
+   * @return true, if successful
+   */
+  public boolean process(ContentEvent event) {
+
+    ResultContentEvent inEvent = (ResultContentEvent) event;
+    double[] prediction = inEvent.getClassVotes();
+    int instanceIndex = (int) inEvent.getInstanceIndex();
+
+    addStatisticsForInstanceReceived(instanceIndex, 
inEvent.getClassifierIndex(), prediction, 1);
+
+    if (inEvent.isLastEvent() || hasAllVotesArrivedInstance(instanceIndex)) {
+      DoubleVector combinedVote = 
this.mapVotesforInstanceReceived.get(instanceIndex);
+      if (combinedVote == null) {
+        combinedVote = new DoubleVector(new 
double[inEvent.getInstance().numClasses()]);
+      }
+      ResultContentEvent outContentEvent = new 
ResultContentEvent(inEvent.getInstanceIndex(),
+          inEvent.getInstance(), inEvent.getClassId(),
+          combinedVote.getArrayCopy(), inEvent.isLastEvent());
+      outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
+      outputStream.put(outContentEvent);
+      clearStatisticsInstance(instanceIndex);
+      return true;
     }
-
-    /**
-     * Gets the output stream.
-     *
-     * @return the output stream
-     */
-    public Stream getOutputStream() {
-        return outputStream;
-    }
-
-    /**
-     * Gets the size ensemble.
-     *
-     * @return the ensembleSize
-     */
-    public int getSizeEnsemble() {
-        return ensembleSize;
-    }
-
-    /**
-     * Sets the size ensemble.
-     *
-     * @param ensembleSize the new size ensemble
-     */
-    public void setSizeEnsemble(int ensembleSize) {
-        this.ensembleSize = ensembleSize;
-    }
-
-    protected Map<Integer, Integer> mapCountsforInstanceReceived;
-
-    protected Map<Integer, DoubleVector> mapVotesforInstanceReceived;
-    
-    /**
-     * On event.
-     *
-     * @param event the event
-     * @return true, if successful
-     */
-    public boolean process(ContentEvent event) {
-
-        ResultContentEvent inEvent = (ResultContentEvent) event;
-        double[] prediction = inEvent.getClassVotes();
-        int instanceIndex = (int) inEvent.getInstanceIndex();
-        
-        addStatisticsForInstanceReceived(instanceIndex, 
inEvent.getClassifierIndex(), prediction, 1);
-
-        if (inEvent.isLastEvent() || 
hasAllVotesArrivedInstance(instanceIndex)) {
-            DoubleVector combinedVote = 
this.mapVotesforInstanceReceived.get(instanceIndex);
-            if (combinedVote == null){
-                combinedVote = new DoubleVector(new 
double[inEvent.getInstance().numClasses()]);
-            }
-            ResultContentEvent outContentEvent = new 
ResultContentEvent(inEvent.getInstanceIndex(),
-                    inEvent.getInstance(), inEvent.getClassId(),
-                    combinedVote.getArrayCopy(), inEvent.isLastEvent());
-            outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
-            outputStream.put(outContentEvent);
-            clearStatisticsInstance(instanceIndex);
-            return true;
-        }
-        return false;
-
+    return false;
+
+  }
+
+  @Override
+  public void onCreate(int id) {
+    this.reset();
+  }
+
+  public void reset() {
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
+   */
+  @Override
+  public Processor newProcessor(Processor sourceProcessor) {
+    PredictionCombinerProcessor newProcessor = new 
PredictionCombinerProcessor();
+    PredictionCombinerProcessor originProcessor = 
(PredictionCombinerProcessor) sourceProcessor;
+    if (originProcessor.getOutputStream() != null) {
+      newProcessor.setOutputStream(originProcessor.getOutputStream());
     }
-
-    @Override
-    public void onCreate(int id) {
-        this.reset();
+    newProcessor.setSizeEnsemble(originProcessor.getSizeEnsemble());
+    return newProcessor;
+  }
+
+  protected void addStatisticsForInstanceReceived(int instanceIndex, int 
classifierIndex, double[] prediction, int add) {
+    if (this.mapCountsforInstanceReceived == null) {
+      this.mapCountsforInstanceReceived = new HashMap<>();
+      this.mapVotesforInstanceReceived = new HashMap<>();
     }
-
-    public void reset() {
+    DoubleVector vote = new DoubleVector(prediction);
+    if (vote.sumOfValues() > 0.0) {
+      vote.normalize();
+      DoubleVector combinedVote = 
this.mapVotesforInstanceReceived.get(instanceIndex);
+      if (combinedVote == null) {
+        combinedVote = new DoubleVector();
+      }
+      vote.scaleValues(getEnsembleMemberWeight(classifierIndex));
+      combinedVote.addValues(vote);
+
+      this.mapVotesforInstanceReceived.put(instanceIndex, combinedVote);
     }
-
-
-    /* (non-Javadoc)
-     * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
-     */
-    @Override
-    public Processor newProcessor(Processor sourceProcessor) {
-        PredictionCombinerProcessor newProcessor = new 
PredictionCombinerProcessor();
-        PredictionCombinerProcessor originProcessor = 
(PredictionCombinerProcessor) sourceProcessor;
-        if (originProcessor.getOutputStream() != null) {
-            newProcessor.setOutputStream(originProcessor.getOutputStream());
-        }
-        newProcessor.setSizeEnsemble(originProcessor.getSizeEnsemble());
-        return newProcessor;
+    Integer count = this.mapCountsforInstanceReceived.get(instanceIndex);
+    if (count == null) {
+      count = 0;
     }
+    this.mapCountsforInstanceReceived.put(instanceIndex, count + add);
+  }
 
-    protected void addStatisticsForInstanceReceived(int instanceIndex, int 
classifierIndex, double[] prediction, int add) {
-        if (this.mapCountsforInstanceReceived == null) {
-            this.mapCountsforInstanceReceived = new HashMap<>();
-            this.mapVotesforInstanceReceived = new HashMap<>();
-        }
-        DoubleVector vote = new DoubleVector(prediction);
-        if (vote.sumOfValues() > 0.0) {
-            vote.normalize();
-            DoubleVector combinedVote = 
this.mapVotesforInstanceReceived.get(instanceIndex);
-            if (combinedVote == null){
-                combinedVote = new DoubleVector();
-            }
-            vote.scaleValues(getEnsembleMemberWeight(classifierIndex));
-            combinedVote.addValues(vote);
-                    
-            this.mapVotesforInstanceReceived.put(instanceIndex, combinedVote);
-        }
-        Integer count = this.mapCountsforInstanceReceived.get(instanceIndex);
-        if (count == null) {
-            count = 0;
-        }
-        this.mapCountsforInstanceReceived.put(instanceIndex, count + add);
-    }
+  protected boolean hasAllVotesArrivedInstance(int instanceIndex) {
+    return (this.mapCountsforInstanceReceived.get(instanceIndex) == 
this.ensembleSize);
+  }
 
-    protected boolean hasAllVotesArrivedInstance(int instanceIndex) {
-        return (this.mapCountsforInstanceReceived.get(instanceIndex) == 
this.ensembleSize);
-    }
+  protected void clearStatisticsInstance(int instanceIndex) {
+    this.mapCountsforInstanceReceived.remove(instanceIndex);
+    this.mapVotesforInstanceReceived.remove(instanceIndex);
+  }
 
-    protected void clearStatisticsInstance(int instanceIndex) {
-        this.mapCountsforInstanceReceived.remove(instanceIndex);
-        this.mapVotesforInstanceReceived.remove(instanceIndex);
-    }
-    
-     protected double getEnsembleMemberWeight(int i) {
-        return 1.0;
-    }
+  protected double getEnsembleMemberWeight(int i) {
+    return 1.0;
+  }
 
-    
 }

Reply via email to