This is an automated email from the ASF dual-hosted git repository.

csterling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-samoa.git


The following commit(s) were added to refs/heads/master by this push:
     new 368346b  SAMOA-72: Squashed commits for pull request SAMOA-72 (#70). 
Fixed errors after rebase with master due to changes in intermediate commits.
368346b is described below

commit 368346b2e9fb9234ea7efd4c1cd66f4f33a26d7d
Author: fobeligi <[email protected]>
AuthorDate: Tue Jan 17 20:19:06 2017 +0200

    SAMOA-72: Squashed commits for pull request SAMOA-72 (#70). Fixed errors 
after rebase with master due to changes in intermediate commits.
    
    [SAMOA-72] Initial topology and logic for BoostVHT algorithm.
    
    [SAMOA-72] Optimize BoostVHT code with buffering and by introducing the 
AttributeSliceEvent.
    
    We bunch up all the attributes one local stats processor is responsible for 
in an AttributeSliceEvent and send one message instead of creating one message 
per single attribute.
    
    [SAMOA-72] Rebase on master branch and refactor BoostVHT code for pull 
request.
    
    Fix #70
---
 .../classifiers/ensemble/BoostMAProcessor.java     | 294 ++++++++++++++
 .../learners/classifiers/ensemble/BoostVHT.java    | 226 +++++++++++
 .../classifiers/ensemble/BoostVHTProcessor.java    | 444 +++++++++++++++++++++
 .../classifiers/trees/ActiveLearningNode.java      |  48 +--
 .../classifiers/trees/AttributeSliceEvent.java     |  90 +++++
 .../trees/BoostVHTActiveLearningNode.java          | 145 +++++++
 .../classifiers/trees/ComputeContentEvent.java     |   9 +
 .../learners/classifiers/trees/FoundNode.java      |   6 +-
 .../classifiers/trees/InactiveLearningNode.java    |   4 +-
 .../learners/classifiers/trees/LearningNode.java   |   6 +-
 .../classifiers/trees/LocalResultContentEvent.java |  15 +-
 .../trees/LocalStatisticsProcessor.java            | 163 +++++---
 .../trees/ModelAggregatorProcessor.java            | 191 ++++++---
 .../samoa/learners/classifiers/trees/Node.java     |   8 +-
 .../learners/classifiers/trees/SplitNode.java      |   4 +-
 .../classifiers/core/AttributeSplitSuggestion.java |   1 +
 .../moa/classifiers/functions/DecisionStump.java   | 157 ++++++++
 .../main/java/org/apache/samoa/moa/core/Vote.java  |  20 +
 18 files changed, 1670 insertions(+), 161 deletions(-)

diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostMAProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostMAProcessor.java
new file mode 100644
index 0000000..1a9d80f
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostMAProcessor.java
@@ -0,0 +1,294 @@
+package org.apache.samoa.learners.classifiers.ensemble;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.learners.classifiers.trees.*;
+import 
org.apache.samoa.learners.classifiers.trees.BoostVHTActiveLearningNode.SplittingOption;
+import org.apache.samoa.moa.classifiers.core.AttributeSplitSuggestion;
+import 
org.apache.samoa.moa.classifiers.core.splitcriteria.InfoGainSplitCriterion;
+import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+
+/**
+ * Boost Model Aggegator Processor consists of the decision tree model. It 
connects to local-statistic PI via attribute stream
+ * and control stream. Model-aggregator PI sends the split instances via 
attribute stream and it sends control messages
+ * to ask local-statistic PI to perform computation via control stream.
+ * <p>
+ * The calculation results from local statistic arrive to the model-aggregator 
PI via computation-result
+ * stream.
+ *
+ * @author Arinto Murdopo
+ */
+public final class BoostMAProcessor extends ModelAggregatorProcessor {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(BoostMAProcessor.class);
+
+  private final SplittingOption splittingOption;
+  private final int maxBufferSize;
+
+  // private constructor based on Builder pattern
+  private BoostMAProcessor(BoostMABuilder builder) {
+    super(builder);
+    this.splittingOption = builder.splittingOption;
+    this.maxBufferSize = builder.maxBufferSize;
+
+    // These used to happen in onCreate which no longer gets called.
+    activeLeafNodeCount = 0;
+    inactiveLeafNodeCount = 0;
+    decisionNodeCount = 0;
+    growthAllowed = true;
+
+    splittingNodes = new ConcurrentHashMap<>();
+    splitId = 0;
+
+    // Executor for scheduling time-out threads
+    executor = Executors.newScheduledThreadPool(8);
+  }
+
+  public void updateModel(LocalResultContentEvent lrce) {
+
+    Long lrceSplitId = lrce.getSplitId();
+    SplittingNodeInfo splittingNodeInfo = splittingNodes.get(lrceSplitId);
+
+    if (splittingNodeInfo != null) {
+      BoostVHTActiveLearningNode bVHTactiveLearningNode = 
(BoostVHTActiveLearningNode)splittingNodeInfo.getActiveLearningNode();
+
+      
bVHTactiveLearningNode.addDistributedSuggestions(lrce.getBestSuggestion(), 
lrce.getSecondBestSuggestion());
+
+      if (bVHTactiveLearningNode.isAllSuggestionsCollected()) {
+        this.splittingNodes.remove(lrceSplitId);
+        this.continueAttemptToSplit(bVHTactiveLearningNode, 
splittingNodeInfo.getFoundNode());
+      }
+    }
+  }
+
+  @Override
+  public boolean process(ContentEvent event) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void trainOnInstanceImpl(FoundNode foundNode, Instance inst) {
+
+    Node leafNode = foundNode.getNode();
+
+    if (leafNode == null) {
+      leafNode = newLearningNode(parallelismHint);
+      foundNode.getParent().setChild(foundNode.getParentBranch(), leafNode);
+      activeLeafNodeCount++;
+    }
+
+    if (leafNode instanceof LearningNode) {
+      LearningNode learningNode = (LearningNode) leafNode;
+      learningNode.learnFromInstance(inst, this);
+    }
+
+    if (leafNode instanceof BoostVHTActiveLearningNode) {
+      BoostVHTActiveLearningNode activeLearningNode = 
(BoostVHTActiveLearningNode) leafNode;
+      // See if we can ask for splits
+      if (!activeLearningNode.isSplitting()) {
+        double weightSeen = activeLearningNode.getWeightSeen();
+        // check whether it is the time for splitting
+        if (weightSeen - 
activeLearningNode.getWeightSeenAtLastSplitEvaluation() >= gracePeriod) {
+          attemptToSplit(activeLearningNode, foundNode);
+        }
+      }
+    }
+
+  }
+
+  @Override
+  public void onCreate(int id) {
+    this.resetLearning();
+  }
+
+  @Override
+  public Processor newProcessor(Processor p) {
+    BoostMAProcessor oldProcessor = (BoostMAProcessor) p;
+    BoostMAProcessor newProcessor = new 
BoostMAProcessor.BoostMABuilder(oldProcessor).build();
+
+    newProcessor.setAttributeStream(oldProcessor.getAttributeStream());
+    newProcessor.setControlStream(oldProcessor.getControlStream());
+
+    return newProcessor;
+  }
+
+  /**
+   * Helper method to represent a split attempt
+   *
+   * @param bVHTactiveLearningNode The corresponding boostVHT active learning 
node which will be split
+   * @param foundNode          The data structure to represents the filtering 
of the instance using the tree model.
+   */
+  @Override
+  public void attemptToSplit(ActiveLearningNode bVHTactiveLearningNode, 
FoundNode foundNode) {
+    if (!bVHTactiveLearningNode.observedClassDistributionIsPure()) {
+      // Increment the split ID
+      this.splitId++;
+
+      this.splittingNodes.put(this.splitId, new 
SplittingNodeInfo((BoostVHTActiveLearningNode)bVHTactiveLearningNode, 
foundNode));
+
+      // Inform Local Statistic PI to perform local statistic calculation
+      bVHTactiveLearningNode.requestDistributedSuggestions(this.splitId, this);
+    }
+  }
+
+  /**
+   * Helper method to continue the attempt to split once all local calculation 
results are received.
+   *
+   * @param bvhtActiveLearningNode The corresponding active learning node 
which will be split
+   * @param foundNode          The data structure to represents the filtering 
of the instance using the tree model.
+   */
+  @Override
+  public void continueAttemptToSplit(ActiveLearningNode 
bvhtActiveLearningNode, FoundNode foundNode) {
+    BoostVHTActiveLearningNode bVHTActiveLearningNode = 
(BoostVHTActiveLearningNode)bvhtActiveLearningNode;
+    AttributeSplitSuggestion bestSuggestion = 
bVHTActiveLearningNode.getDistributedBestSuggestion();
+    AttributeSplitSuggestion secondBestSuggestion = 
bVHTActiveLearningNode.getDistributedSecondBestSuggestion();
+
+    // compare with null split
+    double[] preSplitDist = 
bVHTActiveLearningNode.getObservedClassDistribution();
+    AttributeSplitSuggestion nullSplit = new AttributeSplitSuggestion(null, 
new double[0][],
+        this.splitCriterion.getMeritOfSplit(preSplitDist, new double[][] { 
preSplitDist }));
+
+    if ((bestSuggestion == null) || (nullSplit.compareTo(bestSuggestion) > 0)) 
{
+      secondBestSuggestion = bestSuggestion;
+      bestSuggestion = nullSplit;
+    } else {
+      if ((secondBestSuggestion == null) || 
(nullSplit.compareTo(secondBestSuggestion) > 0)) {
+        secondBestSuggestion = nullSplit;
+      }
+    }
+
+    boolean shouldSplit = false;
+
+    if (secondBestSuggestion == null) {
+      shouldSplit = true;
+    } else {
+      double hoeffdingBound = computeHoeffdingBound(
+          
this.splitCriterion.getRangeOfMerit(bVHTActiveLearningNode.getObservedClassDistribution()),
 this.splitConfidence,
+              bVHTActiveLearningNode.getWeightSeen());
+
+      if ((bestSuggestion.merit - secondBestSuggestion.merit > hoeffdingBound) 
|| (hoeffdingBound < tieThreshold)) {
+        shouldSplit = true;
+      }
+      // TODO: add poor attributes removal
+    }
+
+    SplitNode parent = foundNode.getParent();
+    int parentBranch = foundNode.getParentBranch();
+
+    // split if the Hoeffding bound condition is satisfied
+    if (shouldSplit) {
+
+      if (bestSuggestion.splitTest != null) { // TODO: What happens when 
bestSuggestion is null? -> Deactivate node?
+        SplitNode newSplit = new SplitNode(bestSuggestion.splitTest, 
bVHTActiveLearningNode.getObservedClassDistribution());
+
+        for (int i = 0; i < bestSuggestion.numSplits(); i++) {
+          Node newChild = 
newLearningNode(bestSuggestion.resultingClassDistributionFromSplit(i), 
this.parallelismHint);
+          newSplit.setChild(i, newChild);
+        }
+
+        this.activeLeafNodeCount--;
+        this.decisionNodeCount++;
+        this.activeLeafNodeCount += bestSuggestion.numSplits();
+
+        if (parent == null) {
+          this.treeRoot = newSplit;
+        } else {
+          parent.setChild(parentBranch, newSplit);
+        }
+        //if keep w buffer
+        if (splittingOption == SplittingOption.KEEP && this.maxBufferSize > 0) 
{
+          Queue<Instance> buffer = bVHTActiveLearningNode.getBuffer();
+//          logger.debug("node: {}. split is happening, there are {} items in 
buffer", activeLearningNode.getId(), buffer.size());
+          while (!buffer.isEmpty()) {
+            this.trainOnInstanceImpl(buffer.poll());
+          }
+        }
+      }
+      // TODO: add check on the model's memory size
+    }
+
+    // housekeeping
+    bVHTActiveLearningNode.endSplitting();
+    
bVHTActiveLearningNode.setWeightSeenAtLastSplitEvaluation(bVHTActiveLearningNode.getWeightSeen());
+  }
+
+  @Override
+  protected LearningNode newLearningNode(double[] initialClassObservations, 
int parallelismHint) {
+    BoostVHTActiveLearningNode newNode = new 
BoostVHTActiveLearningNode(initialClassObservations, parallelismHint,
+            this.splittingOption, this.maxBufferSize);
+    newNode.setEnsembleId(this.processorId);
+    return newNode;
+  }
+
+  /**
+   * Builder class to replace constructors with many parameters
+   *
+   * @author Arinto Murdopo
+   */
+  public static class BoostMABuilder extends 
ModelAggregatorProcessor.Builder<BoostMABuilder> {
+
+    // default values
+    private SplitCriterion splitCriterion = new InfoGainSplitCriterion();
+    private double splitConfidence = 0.0000001;
+    private double tieThreshold = 0.05;
+    private int gracePeriod = 200;
+    private int parallelismHint = 1;
+    private long timeOut = Integer.MAX_VALUE;
+    private SplittingOption splittingOption;
+    private int maxBufferSize = 0;
+
+    public BoostMABuilder(Instances dataset) {
+      super(dataset);
+    }
+
+    public BoostMABuilder(BoostMAProcessor oldProcessor) {
+      super(oldProcessor);
+      this.splittingOption = oldProcessor.splittingOption;
+      this.maxBufferSize = oldProcessor.maxBufferSize;
+    }
+
+    public BoostMABuilder splittingOption(SplittingOption splittingOption) {
+      this.splittingOption = splittingOption;
+      return this;
+    }
+
+    public BoostMABuilder maxBufferSize(int maxBufferSize) {
+      this.maxBufferSize = maxBufferSize;
+      return this;
+    }
+
+    public BoostMAProcessor build() {
+      return new BoostMAProcessor(this);
+    }
+
+  }
+}
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostVHT.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostVHT.java
new file mode 100644
index 0000000..7e85e0d
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostVHT.java
@@ -0,0 +1,226 @@
+package org.apache.samoa.learners.classifiers.ensemble;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.FlagOption;
+import com.github.javacliparser.FloatOption;
+import com.github.javacliparser.IntOption;
+import com.google.common.collect.ImmutableSet;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.learners.ClassificationLearner;
+import org.apache.samoa.learners.Learner;
+import 
org.apache.samoa.learners.classifiers.trees.BoostVHTActiveLearningNode.SplittingOption;
+import org.apache.samoa.learners.classifiers.trees.LocalStatisticsProcessor;
+import org.apache.samoa.learners.classifiers.trees.VerticalHoeffdingTree;
+import 
org.apache.samoa.moa.classifiers.core.attributeclassobservers.AttributeClassObserver;
+import 
org.apache.samoa.moa.classifiers.core.attributeclassobservers.DiscreteAttributeClassObserver;
+import 
org.apache.samoa.moa.classifiers.core.attributeclassobservers.NumericAttributeClassObserver;
+import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+/**
+ * The Bagging Classifier by Oza and Russell.
+ */
+public class BoostVHT implements ClassificationLearner, Configurable {
+
+  /** The Constant serialVersionUID. */
+  private static final long serialVersionUID = -7523211543185584536L;
+  
+  private static final Logger logger = LoggerFactory.getLogger(BoostVHT.class);
+  
+  public ClassOption numericEstimatorOption = new 
ClassOption("numericEstimator",
+          'n', "Numeric estimator to use.", 
NumericAttributeClassObserver.class,
+          "GaussianNumericAttributeClassObserver");
+
+  public ClassOption nominalEstimatorOption = new 
ClassOption("nominalEstimator",
+          'd', "Nominal estimator to use.", 
DiscreteAttributeClassObserver.class,
+          "NominalAttributeClassObserver");
+
+  public ClassOption splitCriterionOption = new ClassOption("splitCriterion",
+          'r', "Split criterion to use.", SplitCriterion.class,
+          "InfoGainSplitCriterion");
+
+  public FloatOption splitConfidenceOption = new 
FloatOption("splitConfidence", 'c',
+          "The allowable error in split decision, values closer to 0 will take 
longer to decide.",
+          0.0000001, 0.0, 1.0);
+
+  public FloatOption tieThresholdOption = new FloatOption("tieThreshold",
+          't', "Threshold below which a split will be forced to break ties.",
+          0.05, 0.0, 1.0);
+
+  public IntOption gracePeriodOption = new IntOption("gracePeriod", 'g',
+          "The number of instances a leaf should observe between split 
attempts.",
+          200, 0, Integer.MAX_VALUE);
+
+  public IntOption timeOutOption = new IntOption("timeOut", 'o',
+          "The duration to wait all distributed computation results from local 
statistics PI, in miliseconds",
+          Integer.MAX_VALUE, 1, Integer.MAX_VALUE);
+
+  public FlagOption binarySplitsOption = new FlagOption("binarySplits", 'b',
+          "Only allow binary splits.");
+
+  public FlagOption splittingOption = new 
FlagOption("keepInstanceWhileSplitting",'q',
+      "Keep instances in a buffer while splitting");
+
+  public IntOption maxBufferSizeOption = new 
IntOption("maxBufferSizeWhileSplitting",'z',
+      "Maximum buffer size while splitting, use in conjunction with 'q' 
option. Size 0 means we don't use buffer while splitting",
+      0, 0, Integer.MAX_VALUE);
+  
+    /** The ensemble size option. */
+  public IntOption ensembleSizeOption = new IntOption("ensembleSize", 's',
+      "The number of models in the bag.", 10, 1, Integer.MAX_VALUE);
+
+  public IntOption seedOption = new IntOption("seed", 'u',
+      "the seed for the rng.", (int) System.currentTimeMillis());
+
+  /** The Model Aggregator boosting processor. */
+  private BoostVHTProcessor boostVHTProcessor;
+
+  /** The result stream. */
+  protected Stream resultStream;
+  
+  /** The attribute stream. */
+  protected Stream attributeStream;
+  
+  /** The control stream. */
+  protected Stream controlStream;
+  
+  /** The compute stream. */
+  protected Stream computeStream;
+  
+  /** The dataset. */
+  private Instances dataset;
+
+  protected int parallelism;
+  
+  //for SAMMME
+  public IntOption numberOfClassesOption = new IntOption("numberOfClasses", 
'k',
+          "The number of classes.", 2, 2, Integer.MAX_VALUE);
+
+  /**
+   * Sets the layout.
+   */
+  protected void setLayout() {
+
+    int ensembleSize = this.ensembleSizeOption.getValue();
+
+    // Set parameters for BoostVHT processor, and the BoostMA processors 
within.
+    try {
+      boostVHTProcessor = new BoostVHTProcessor.Builder(dataset)
+          .ensembleSize(this.ensembleSizeOption.getValue())
+          .numberOfClasses(this.numberOfClassesOption.getValue())
+          .splitCriterion(
+              (SplitCriterion) 
ClassOption.createObject(this.splitCriterionOption.getValueAsCLIString(),
+              this.splitCriterionOption.getRequiredType()))
+          .splitConfidence(this.splitConfidenceOption.getValue())
+          .tieThreshold(this.tieThresholdOption.getValue())
+          .gracePeriod(this.gracePeriodOption.getValue())
+          .parallelismHint(this.ensembleSizeOption.getValue())
+          .timeOut(this.timeOutOption.getValue())
+          .splittingOption(this.splittingOption.isSet() ? 
SplittingOption.KEEP: SplittingOption.THROW_AWAY)
+          .maxBufferSize(this.maxBufferSizeOption.getValue())
+          .seed(this.seedOption.getValue())
+          .build();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    //add Boosting Model Aggregator Processor to the topology
+    this.topologyBuilder.addProcessor(boostVHTProcessor, 1);
+    
+
+    // Streams
+    attributeStream = this.topologyBuilder.createStream(boostVHTProcessor);
+    controlStream = this.topologyBuilder.createStream(boostVHTProcessor);
+    
+    //local statistics processor.
+    LocalStatisticsProcessor locStatProcessor = new 
LocalStatisticsProcessor.Builder()
+        .splitCriterion((SplitCriterion) this.splitCriterionOption.getValue())
+        .binarySplit(this.binarySplitsOption.isSet())
+        .nominalClassObserver((AttributeClassObserver) 
this.nominalEstimatorOption.getValue())
+        .numericClassObserver((AttributeClassObserver) 
this.numericEstimatorOption.getValue())
+        .build();
+    
+    this.topologyBuilder.addProcessor(locStatProcessor, ensembleSize);
+  
+    this.topologyBuilder.connectInputKeyStream(attributeStream, 
locStatProcessor);
+    this.topologyBuilder.connectInputAllStream(controlStream, 
locStatProcessor);
+    
+    
+    //local statistics result stream
+    computeStream = this.topologyBuilder.createStream(locStatProcessor);
+    locStatProcessor.setComputationResultStream(computeStream);
+    this.topologyBuilder.connectInputAllStream(computeStream, 
boostVHTProcessor);
+  
+    //prediction is computed in boostVHTProcessor
+    resultStream = this.topologyBuilder.createStream(boostVHTProcessor);
+    
+    //set the out streams of the BoostVHTProcessor
+    boostVHTProcessor.setResultStream(resultStream);
+    boostVHTProcessor.setAttributeStream(attributeStream);
+    boostVHTProcessor.setControlStream(controlStream);
+  }
+
+  /** The topologyBuilder. */
+  private TopologyBuilder topologyBuilder;
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.classifiers.Classifier#init(samoa.engines.Engine,
+   * samoa.core.Stream, weka.core.Instances)
+   */
+
+  @Override
+  public void init(TopologyBuilder builder, Instances dataset, int 
parallelism) {
+    this.topologyBuilder = builder;
+    this.dataset = dataset;
+    this.parallelism = parallelism;
+    this.setLayout();
+  }
+
+  @Override
+  public Processor getInputProcessor() {
+    return boostVHTProcessor;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.learners.Learner#getResultStreams()
+   */
+  @Override
+  public Set<Stream> getResultStreams() {
+    return ImmutableSet.of(this.resultStream);
+  }
+}
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostVHTProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostVHTProcessor.java
new file mode 100644
index 0000000..b71ec36
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/ensemble/BoostVHTProcessor.java
@@ -0,0 +1,444 @@
+package org.apache.samoa.learners.classifiers.ensemble;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.apache.samoa.learners.ResultContentEvent;
+import 
org.apache.samoa.learners.classifiers.trees.BoostVHTActiveLearningNode.SplittingOption;
+import org.apache.samoa.learners.classifiers.trees.LocalResultContentEvent;
+import 
org.apache.samoa.moa.classifiers.core.splitcriteria.InfoGainSplitCriterion;
+import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
+import org.apache.samoa.moa.core.DoubleVector;
+import org.apache.samoa.moa.core.MiscUtils;
+import org.apache.samoa.topology.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+/**
+ * The Class BoostVHTProcessor.
+ */
+public class BoostVHTProcessor implements Processor {
+
+  private static final long serialVersionUID = -1550901409625192730L;
+  private static final Logger logger = 
LoggerFactory.getLogger(BoostVHTProcessor.class);
+  
+  //The following are configured from the user in BoostVHT
+  private SplitCriterion splitCriterion;
+  
+  private Double splitConfidence;
+  
+  private Double tieThreshold;
+  
+  private int gracePeriod;
+  
+  private int parallelismHint;
+  
+  private int timeOut;
+
+  private SplittingOption splittingOption;
+
+  /** The input dataset to BoostVHT. */
+  private Instances dataset;
+  
+  /** The ensemble size. */
+  private int ensembleSize;
+  
+  /** The result stream. */
+  private Stream resultStream;
+  
+  /** The control stream. */
+  private Stream controlStream;
+  
+  /** The attribute stream. */
+  private Stream attributeStream;
+  
+  protected BoostMAProcessor[] mAPEnsemble;
+
+  /** Ramdom number generator. */
+  protected Random random;
+
+  private int seed;
+
+  // lambda_m correct
+  protected double[] scms;
+  // lambda_m wrong
+  protected double[] swms;
+  private double[] e_m;
+
+  private double trainingWeightSeenByModel;
+
+  private int numberOfClasses;
+
+  private int maxBufferSize;
+
+  private BoostVHTProcessor(Builder builder) {
+    this.dataset = builder.dataset;
+    this.ensembleSize = builder.ensembleSize;
+    this.seed = builder.seed;
+    this.numberOfClasses = builder.numberOfClasses;
+    this.splitCriterion = builder.splitCriterion;
+    this.splitConfidence = builder.splitConfidence;
+    this.tieThreshold = builder.tieThreshold;
+    this.gracePeriod = builder.gracePeriod;
+    this.parallelismHint = builder.parallelismHint;
+    this.timeOut = builder.timeOut;
+    this.splittingOption = builder.splittingOption;
+    this.maxBufferSize = builder.maxBufferSize;
+  }
+
+  /**
+   * On event.
+   * 
+   * @param event the event
+   * @return true, if successful
+   */
+  public boolean process(ContentEvent event) {
+
+    if (event instanceof InstanceContentEvent) {
+      InstanceContentEvent inEvent = (InstanceContentEvent) event;
+      //todo:: check if any precondition is needed
+
+      if (inEvent.isTesting()) {
+        double[] combinedPrediction = computeBoosting(inEvent);
+        this.resultStream.put(newResultContentEvent(combinedPrediction, 
inEvent));
+      }
+
+      // estimate model parameters using the training data
+      if (inEvent.isTraining()) {
+        train(inEvent);
+      }
+    } else if (event instanceof LocalResultContentEvent) {
+      LocalResultContentEvent lrce = (LocalResultContentEvent) event;
+      mAPEnsemble[lrce.getEnsembleId()].updateModel(lrce);
+    }
+
+
+    return true;
+  }
+
+  @Override
+  public void onCreate(int id) {
+    
+    mAPEnsemble = new BoostMAProcessor[ensembleSize];
+
+    random = new Random(seed);
+
+    this.scms = new double[ensembleSize];
+    this.swms = new double[ensembleSize];
+    this.e_m = new double[ensembleSize];
+
+    //instantiate the MAs
+    for (int i = 0; i < ensembleSize; i++) {
+      BoostMAProcessor newProc = new BoostMAProcessor.BoostMABuilder(dataset)
+          .splitCriterion(splitCriterion)
+          .splitConfidence(splitConfidence)
+          .tieThreshold(tieThreshold)
+          .gracePeriod(gracePeriod)
+          .parallelismHint(parallelismHint)
+          .timeOut(timeOut)
+          .processorID(i) // The BoostMA processors get incremental ids
+          .maxBufferSize(maxBufferSize)
+          .splittingOption(splittingOption)
+          .build();
+      newProc.setAttributeStream(this.attributeStream);
+      newProc.setControlStream(this.controlStream);
+      mAPEnsemble[i] = newProc;
+    }
+  }
+  
+  private double[] computeBoosting(InstanceContentEvent inEvent) {
+    
+    Instance testInstance = inEvent.getInstance();
+    DoubleVector combinedPredictions = new DoubleVector();
+
+    for (int i = 0; i < ensembleSize; i++) {
+      double memberWeight = getEnsembleMemberWeight(i);
+      if (memberWeight > 0.0) {
+        DoubleVector vote = new 
DoubleVector(mAPEnsemble[i].getVotesForInstance(testInstance));
+        if (vote.sumOfValues() > 0.0) {
+          vote.normalize();
+          vote.scaleValues(memberWeight);
+          combinedPredictions.addValues(vote);
+        }
+      } else {
+        break;
+      }
+    }
+    return combinedPredictions.getArrayRef();
+  }
+  
+  /**
+   * Train.
+   *
+   * @param inEvent
+   *          the in event
+   */
+  protected void train(InstanceContentEvent inEvent) {
+    Instance trainInstance = inEvent.getInstance();
+
+    this.trainingWeightSeenByModel += trainInstance.weight();
+    double lambda_d = 1.0;
+    
+    for (int i = 0; i < ensembleSize; i++) { //for each base model
+      int k = MiscUtils.poisson(lambda_d, this.random); //set k according to 
poisson
+
+      if (k > 0) {
+        Instance weightedInstance = trainInstance.copy();
+        weightedInstance.setWeight(trainInstance.weight() * k);
+        mAPEnsemble[i].trainOnInstanceImpl(weightedInstance);
+      }
+      //get prediction for the instance from the specific learner of the 
ensemble
+      double[] prediction = mAPEnsemble[i].getVotesForInstance(trainInstance);
+
+      if (mAPEnsemble[i].correctlyClassifies(trainInstance,prediction)) {
+        this.scms[i] += lambda_d;
+        lambda_d *= this.trainingWeightSeenByModel / (2 * this.scms[i]);
+      } else {
+        this.swms[i] += lambda_d;
+        lambda_d *= this.trainingWeightSeenByModel / (2 * this.swms[i]);
+      }
+    }
+  }
+  
+  private double getEnsembleMemberWeight(int i) {
+    double em = this.swms[i] / (this.scms[i] + this.swms[i]);
+//    if ((em == 0.0) || (em > 0.5)) {
+    if ((em == 0.0) || (em > (1.0 - 1.0/this.numberOfClasses))) { //for SAMME
+      return 0.0;
+    }
+    double Bm = em / (1.0 - em);
+//    return Math.log(1.0 / Bm);
+    return Math.log(1.0 / Bm ) + Math.log(this.numberOfClasses - 1); //for 
SAMME
+  }
+  
+  /**
+   * Helper method to generate new ResultContentEvent based on an instance and 
its prediction result.
+   *
+   * @param combinedPrediction
+   *          The predicted class label from the Boost-VHT decision tree model.
+   * @param inEvent
+   *          The associated instance content event
+   * @return ResultContentEvent to be sent into Evaluator PI or other 
destination PI.
+   */
+  private ResultContentEvent newResultContentEvent(double[] 
combinedPrediction, InstanceContentEvent inEvent) {
+    ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
+            inEvent.getClassId(), combinedPrediction, inEvent.isLastEvent(), 
inEvent.getArrivalTimestamp());
+    rce.setEvaluationIndex(inEvent.getEvaluationIndex());
+    return rce;
+  }
+
+  public static class Builder {
+    // BoostVHT processor parameters
+    private final Instances dataset;
+    private int ensembleSize;
+    private int numberOfClasses;
+
+    // BoostMAProcessor parameters
+    private SplitCriterion splitCriterion = new InfoGainSplitCriterion();
+    private double splitConfidence;
+    private double tieThreshold;
+    private int gracePeriod;
+    private int parallelismHint;
+    private int timeOut = Integer.MAX_VALUE;
+    private SplittingOption splittingOption;
+    private int maxBufferSize;
+    private int seed;
+
+    public Builder(Instances dataset) {
+      this.dataset = dataset;
+    }
+
+    public Builder(BoostVHTProcessor oldProcessor) {
+      this.dataset = oldProcessor.getDataset();
+      this.ensembleSize = oldProcessor.getEnsembleSize();
+      this.numberOfClasses = oldProcessor.getNumberOfClasses();
+      this.splitCriterion = oldProcessor.getSplitCriterion();
+      this.splitConfidence = oldProcessor.getSplitConfidence();
+      this.tieThreshold = oldProcessor.getTieThreshold();
+      this.gracePeriod = oldProcessor.getGracePeriod();
+      this.parallelismHint = oldProcessor.getParallelismHint();
+      this.timeOut = oldProcessor.getTimeOut();
+      this.splittingOption = oldProcessor.splittingOption;
+      this.seed = oldProcessor.getSeed();
+    }
+
+    public Builder ensembleSize(int ensembleSize) {
+      this.ensembleSize = ensembleSize;
+      return this;
+    }
+
+    public Builder numberOfClasses(int numberOfClasses) {
+      this.numberOfClasses = numberOfClasses;
+      return this;
+    }
+
+    public Builder splitCriterion(SplitCriterion splitCriterion) {
+      this.splitCriterion = splitCriterion;
+      return this;
+    }
+
+    public Builder splitConfidence(double splitConfidence) {
+      this.splitConfidence = splitConfidence;
+      return this;
+    }
+
+    public Builder tieThreshold(double tieThreshold) {
+      this.tieThreshold = tieThreshold;
+      return this;
+    }
+
+    public Builder gracePeriod(int gracePeriod) {
+      this.gracePeriod = gracePeriod;
+      return this;
+    }
+
+    public Builder parallelismHint(int parallelismHint) {
+      this.parallelismHint = parallelismHint;
+      return this;
+    }
+
+    public Builder timeOut(int timeOut) {
+      this.timeOut = timeOut;
+      return this;
+    }
+
+    public Builder splittingOption(SplittingOption splittingOption) {
+      this.splittingOption = splittingOption;
+      return this;
+    }
+
+    public Builder maxBufferSize(int maxBufferSize) {
+      this.maxBufferSize= maxBufferSize;
+      return this;
+    }
+
+    public Builder seed(int seed) {
+      this.seed = seed;
+      return this;
+    }
+
+    public BoostVHTProcessor build() {
+      return new BoostVHTProcessor(this);
+    }
+  }
+  
+  public Instances getInputInstances() {
+    return dataset;
+  }
+  
+  public void setInputInstances(Instances dataset) {
+    this.dataset = dataset;
+  }
+  
+  public Stream getResultStream() {
+    return this.resultStream;
+  }
+  
+  public void setResultStream(Stream resultStream) {
+    this.resultStream = resultStream;
+  }
+
+  public int getEnsembleSize() {
+    return ensembleSize;
+  }
+
+  public Stream getControlStream() {
+    return controlStream;
+  }
+  
+  public void setControlStream(Stream controlStream) {
+    this.controlStream = controlStream;
+  }
+
+  public Stream getAttributeStream() {
+    return attributeStream;
+  }
+
+  public void setAttributeStream(Stream attributeStream) {
+    this.attributeStream = attributeStream;
+  }
+
+  public SplitCriterion getSplitCriterion() {
+    return splitCriterion;
+  }
+  
+
+  public Double getSplitConfidence() {
+    return splitConfidence;
+  }
+  
+
+  public Double getTieThreshold() {
+    return tieThreshold;
+  }
+
+  public int getSeed() {
+    return seed;
+  }
+
+  public int getGracePeriod() {
+    return gracePeriod;
+  }
+  
+
+  public int getParallelismHint() {
+    return parallelismHint;
+  }
+
+  public int getTimeOut() {
+    return timeOut;
+  }
+  
+  public void setTimeOut(int timeOut) {
+    this.timeOut = timeOut;
+  }
+  
+  public int getNumberOfClasses() {
+    return numberOfClasses;
+  }
+  
+  public void setNumberOfClasses(int numberOfClasses) {
+    this.numberOfClasses = numberOfClasses;
+  }
+  
+  public Instances getDataset() {
+    return dataset;
+  }
+  
+  @Override
+  public Processor newProcessor(Processor sourceProcessor) {
+    BoostVHTProcessor originProcessor = (BoostVHTProcessor) sourceProcessor;
+    BoostVHTProcessor newProcessor = new 
BoostVHTProcessor.Builder(originProcessor).build();
+
+    if (originProcessor.getResultStream() != null) {
+      newProcessor.setResultStream(originProcessor.getResultStream());
+      newProcessor.setControlStream(originProcessor.getControlStream());
+      newProcessor.setAttributeStream(originProcessor.getAttributeStream());
+    }
+    return newProcessor;
+  }
+}
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ActiveLearningNode.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ActiveLearningNode.java
index a437719..1e27acc 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ActiveLearningNode.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ActiveLearningNode.java
@@ -28,26 +28,26 @@ import 
org.apache.samoa.moa.classifiers.core.AttributeSplitSuggestion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-final class ActiveLearningNode extends LearningNode {
+public class ActiveLearningNode extends LearningNode {
   /**
-        * 
+        *
         */
   private static final long serialVersionUID = -2892102872646338908L;
   private static final Logger logger = 
LoggerFactory.getLogger(ActiveLearningNode.class);
 
-  private double weightSeenAtLastSplitEvaluation;
+  protected double weightSeenAtLastSplitEvaluation;
 
-  private final Map<Integer, String> attributeContentEventKeys;
+  protected Map<Integer, String> attributeContentEventKeys;
 
-  private AttributeSplitSuggestion bestSuggestion;
-  private AttributeSplitSuggestion secondBestSuggestion;
+  protected AttributeSplitSuggestion bestSuggestion;
+  protected AttributeSplitSuggestion secondBestSuggestion;
 
-  private final long id;
-  private final int parallelismHint;
-  private int suggestionCtr;
-  private int thrownAwayInstance;
+  protected long id;
+  protected int parallelismHint;
+  protected int suggestionCtr;
+  protected int thrownAwayInstance;
 
-  private boolean isSplitting;
+  protected boolean isSplitting;
 
   ActiveLearningNode(double[] classObservation, int parallelismHint) {
     super(classObservation);
@@ -58,7 +58,7 @@ final class ActiveLearningNode extends LearningNode {
     this.parallelismHint = parallelismHint;
   }
 
-  long getId() {
+  protected long getId() {
     return id;
   }
 
@@ -73,7 +73,7 @@ final class ActiveLearningNode extends LearningNode {
   }
 
   @Override
-  void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) {
+  public void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) {
     // TODO: what statistics should we keep for unused instance?
     if (isSplitting) { // currently throw all instance will splitting
       this.thrownAwayInstance++;
@@ -120,23 +120,23 @@ final class ActiveLearningNode extends LearningNode {
   }
 
   @Override
-  double[] getClassVotes(Instance inst, ModelAggregatorProcessor map) {
+  public double[] getClassVotes(Instance inst, ModelAggregatorProcessor map) {
     return this.observedClassDistribution.getArrayCopy();
   }
 
-  double getWeightSeen() {
+  public double getWeightSeen() {
     return this.observedClassDistribution.sumOfValues();
   }
 
-  void setWeightSeenAtLastSplitEvaluation(double weight) {
+  public void setWeightSeenAtLastSplitEvaluation(double weight) {
     this.weightSeenAtLastSplitEvaluation = weight;
   }
 
-  double getWeightSeenAtLastSplitEvaluation() {
+  public double getWeightSeenAtLastSplitEvaluation() {
     return this.weightSeenAtLastSplitEvaluation;
   }
 
-  void requestDistributedSuggestions(long splitId, ModelAggregatorProcessor 
modelAggrProc) {
+  public void requestDistributedSuggestions(long splitId, 
ModelAggregatorProcessor modelAggrProc) {
     this.isSplitting = true;
     this.suggestionCtr = 0;
     this.thrownAwayInstance = 0;
@@ -146,7 +146,7 @@ final class ActiveLearningNode extends LearningNode {
     modelAggrProc.sendToControlStream(cce);
   }
 
-  void addDistributedSuggestions(AttributeSplitSuggestion bestSuggestion, 
AttributeSplitSuggestion secondBestSuggestion) {
+  public void addDistributedSuggestions(AttributeSplitSuggestion 
bestSuggestion, AttributeSplitSuggestion secondBestSuggestion) {
     // starts comparing from the best suggestion
     if (bestSuggestion != null) {
       if ((this.bestSuggestion == null) || 
(bestSuggestion.compareTo(this.bestSuggestion) > 0)) {
@@ -170,7 +170,7 @@ final class ActiveLearningNode extends LearningNode {
     this.suggestionCtr++;
   }
 
-  boolean isSplitting() {
+  public boolean isSplitting() {
     return this.isSplitting;
   }
 
@@ -182,15 +182,15 @@ final class ActiveLearningNode extends LearningNode {
     this.secondBestSuggestion = null;
   }
 
-  AttributeSplitSuggestion getDistributedBestSuggestion() {
+  public AttributeSplitSuggestion getDistributedBestSuggestion() {
     return this.bestSuggestion;
   }
 
-  AttributeSplitSuggestion getDistributedSecondBestSuggestion() {
+  public AttributeSplitSuggestion getDistributedSecondBestSuggestion() {
     return this.secondBestSuggestion;
   }
 
-  boolean isAllSuggestionsCollected() {
+  public boolean isAllSuggestionsCollected() {
     return (this.suggestionCtr == this.parallelismHint);
   }
 
@@ -198,7 +198,7 @@ final class ActiveLearningNode extends LearningNode {
     return inst.classIndex() > index ? index : index + 1;
   }
 
-  private String generateKey(int obsIndex) {
+  protected String generateKey(int obsIndex) {
     final int prime = 31;
     int result = 1;
     result = prime * result + (int) (this.id ^ (this.id >>> 32));
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeSliceEvent.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeSliceEvent.java
new file mode 100644
index 0000000..9e3f245
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeSliceEvent.java
@@ -0,0 +1,90 @@
+package org.apache.samoa.learners.classifiers.trees;
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ * Attribute Slice Event represents the instances that split into 
parallelismHint (no. of local stats processors - LSP)
+ * and send only one message per LSP for BoostVHT algorithm which contains 
that slice of the attributes along with required information
+ * to update the class observers.
+ *
+ */
+
+public class AttributeSliceEvent implements ContentEvent{
+  private static final long serialVersionUID = 6752449086753238767L;
+  private final long learningNodeId;
+  private final int attributeStartingIndex;
+  private final transient String key;
+  private final boolean[] isNominalSlice;
+  private final double[] attributeSlice;
+  private final int classValue;
+  private final double weight;
+
+  public AttributeSliceEvent(
+      long learningNodeId, int attributeStartingIndex, String key, boolean[] 
isNominalSlice, double[] attributeSlice,
+      int classValue, double weight) {
+    this.learningNodeId = learningNodeId;
+    this.attributeStartingIndex = attributeStartingIndex;
+    this.key = key;
+    this.isNominalSlice = isNominalSlice;
+    this.attributeSlice = attributeSlice;
+    this.classValue = classValue;
+    this.weight = weight;
+  }
+
+  public int getClassValue() {
+    return classValue;
+  }
+
+  public double getWeight() {
+    return weight;
+  }
+
+  public long getLearningNodeId() {
+    return learningNodeId;
+  }
+
+  public int getAttributeStartingIndex() {
+    return attributeStartingIndex;
+  }
+
+  public boolean[] getIsNominalSlice() {
+    return isNominalSlice;
+  }
+
+  public double[] getAttributeSlice() {
+    return attributeSlice;
+  }
+
+  @Override
+  public String getKey() {
+    return key;
+  }
+
+  @Override
+  public void setKey(String key) {
+  }
+
+  @Override
+  public boolean isLastEvent() { // TODO
+    return false;
+  }
+}
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/BoostVHTActiveLearningNode.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/BoostVHTActiveLearningNode.java
new file mode 100644
index 0000000..65d89d3
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/BoostVHTActiveLearningNode.java
@@ -0,0 +1,145 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.google.common.collect.EvictingQueue;
+import org.apache.samoa.instances.Attribute;
+import org.apache.samoa.instances.Instance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Queue;
+
+public final class BoostVHTActiveLearningNode extends ActiveLearningNode {
+
+  public enum SplittingOption {
+    THROW_AWAY, KEEP
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(BoostVHTActiveLearningNode.class);
+
+  private final SplittingOption splittingOption;
+  private final int maxBufferSize;
+  private final Queue<Instance> buffer;
+  private int ensembleId;
+
+  public BoostVHTActiveLearningNode(double[] classObservation, int 
parallelism_hint, SplittingOption splitOption, int maxBufferSize) {
+    super(classObservation, parallelism_hint);
+    weightSeenAtLastSplitEvaluation = this.getWeightSeen();
+    id = VerticalHoeffdingTree.LearningNodeIdGenerator.generate();
+    attributeContentEventKeys = new HashMap<>();
+    isSplitting = false;
+    parallelismHint = parallelism_hint;
+    this.splittingOption = splitOption;
+    this.maxBufferSize = maxBufferSize;
+    this.buffer = EvictingQueue.create(maxBufferSize);
+  }
+
+  @Override
+  public void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) {
+    if (isSplitting) {
+      switch (this.splittingOption) {
+        case THROW_AWAY:
+          //logger.trace("node {}: splitting is happening, throw away the 
instance", this.id); // throw all instance will splitting
+          thrownAwayInstance++;
+          return;
+        case KEEP:
+          //logger.trace("node {}: keep instance with max buffer size: {}, 
continue sending to local stats", this.id, this.maxBufferSize);
+          //logger.trace("node {}: add to buffer", this.id);
+          buffer.add(inst);
+          break;
+        default:
+          logger.error("node {}: invalid splittingOption option: {}", id, 
this.splittingOption);
+          break;
+      }
+    }
+
+    // What we do is slice up the attributes array into parallelismHint (no. 
of local stats processors - LSP)
+    // and send only one message per LSP which contains that slice of the 
attributes along with required information
+    // to update the class observers.
+    // Given that we are sending slices, there's probably some optimizations 
that can be made at the LSP level,
+    // like being smarter about how we update the observers.
+    this.observedClassDistribution.addToValue((int) inst.classValue(),
+            inst.weight());
+    double[] attributeArray = inst.toDoubleArray();
+    int sliceSize = (attributeArray.length - 1) / parallelismHint;
+    boolean[] isNominalAll = new boolean[inst.numAttributes() - 1];
+    for (int i = 0; i < inst.numAttributes() - 1; i++) {
+      Attribute att = inst.attribute(i);
+      if (att.isNominal()) {
+        isNominalAll[i] = true;
+      }
+    }
+    int startingIndex = 0;
+    for (int localStatsIndex = 0; localStatsIndex < parallelismHint; 
localStatsIndex++) {
+      // The endpoint for the slice is either the end of the previous slice, 
or the end of the array
+      // TODO: Note that we assume class is at the end of the instance 
attribute array, hence the length-1 here
+      // We can do proper handling later
+      int endpoint = localStatsIndex == (parallelismHint - 1) ? 
(attributeArray.length - 1) : (localStatsIndex + 1) * sliceSize;
+      double[] attributeSlice = Arrays.copyOfRange(
+              attributeArray, localStatsIndex * sliceSize, endpoint);
+      boolean[] isNominalSlice = Arrays.copyOfRange(
+              isNominalAll, localStatsIndex * sliceSize, endpoint);
+      AttributeSliceEvent attributeSliceEvent = new AttributeSliceEvent(
+              this.id, startingIndex, Integer.toString(localStatsIndex), 
isNominalSlice, attributeSlice,
+              (int) inst.classValue(), inst.weight());
+      proc.sendToAttributeStream(attributeSliceEvent);
+      startingIndex = endpoint;
+    }
+  }
+
+  @Override
+  public void requestDistributedSuggestions(long splitId, 
ModelAggregatorProcessor modelAggrProc) {
+    this.isSplitting = true;
+    this.suggestionCtr = 0;
+    this.thrownAwayInstance = 0;
+
+    ComputeContentEvent cce = new ComputeContentEvent(splitId, this.id,
+            this.getObservedClassDistribution());
+    cce.setEnsembleId(this.ensembleId);
+    modelAggrProc.sendToControlStream(cce);
+  }
+
+  @Override
+  public void endSplitting() {
+    super.endSplitting();
+    this.buffer.clear();
+  }
+
+  @Override
+  protected String generateKey(int obsIndex) {
+    return Integer.toString(obsIndex % parallelismHint);
+  }
+
+  public Queue<Instance> getBuffer() {
+    return buffer;
+  }
+
+  public int getEnsembleId() {
+    return ensembleId;
+  }
+
+  public void setEnsembleId(int ensembleId) {
+    this.ensembleId = ensembleId;
+  }
+}
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java
index fe56cc1..fe5ece8 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java
@@ -38,6 +38,8 @@ public final class ComputeContentEvent extends 
ControlContentEvent {
 
   private final double[] preSplitDist;
   private final long splitId;
+  
+  private int ensembleId; //the id of the ensemble that send the event
 
   public ComputeContentEvent() {
     super(-1);
@@ -142,4 +144,11 @@ public final class ComputeContentEvent extends 
ControlContentEvent {
 
   }
 
+  public int getEnsembleId() {
+    return ensembleId;
+  }
+  
+  public void setEnsembleId(int ensembleId) {
+    this.ensembleId = ensembleId;
+  }
 }
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java
index 61d9b19..b3607cc 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java
@@ -50,7 +50,7 @@ public final class FoundNode implements java.io.Serializable {
    * 
    * @return The node where the instance is routed/filtered
    */
-  Node getNode() {
+  public Node getNode() {
     return this.node;
   }
 
@@ -71,7 +71,7 @@ public final class FoundNode implements java.io.Serializable {
    * 
    * @return The parent of the node
    */
-  SplitNode getParent() {
+  public SplitNode getParent() {
     return this.parent;
   }
 
@@ -81,7 +81,7 @@ public final class FoundNode implements java.io.Serializable {
    * 
    * @return The index of the node in its parent node.
    */
-  int getParentBranch() {
+  public int getParentBranch() {
     return this.parentBranch;
   }
 
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java
index 1e38377..beb8aa6 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java
@@ -36,12 +36,12 @@ public final class InactiveLearningNode extends 
LearningNode {
         */
   private static final long serialVersionUID = -814552382883472302L;
 
-  InactiveLearningNode(double[] initialClassObservation) {
+  public InactiveLearningNode(double[] initialClassObservation) {
     super(initialClassObservation);
   }
 
   @Override
-  void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) {
+  public void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) {
     this.observedClassDistribution.addToValue(
         (int) inst.classValue(), inst.weight());
   }
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java
index f7f7826..ad41ee7 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java
@@ -38,13 +38,13 @@ public abstract class LearningNode extends Node {
 
   /**
    * Method to process the instance for learning
-   * 
+   *
    * @param inst
    *          The processed instance
    * @param proc
    *          The model aggregator processor where this learning node exists
    */
-  abstract void learnFromInstance(Instance inst, ModelAggregatorProcessor 
proc);
+  public abstract void learnFromInstance(Instance inst, 
ModelAggregatorProcessor proc);
 
   @Override
   protected boolean isLeaf() {
@@ -52,7 +52,7 @@ public abstract class LearningNode extends Node {
   }
 
   @Override
-  protected FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent,
+  public FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent,
       int parentBranch) {
     return new FoundNode(this, parent, parentBranch);
   }
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java
index 485ac75..ddc3121 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java
@@ -37,6 +37,7 @@ public final class LocalResultContentEvent implements 
ContentEvent {
   private final AttributeSplitSuggestion bestSuggestion;
   private final AttributeSplitSuggestion secondBestSuggestion;
   private final long splitId;
+  private int ensembleId; //the id of the ensemble that asked for the local 
statistics
 
   public LocalResultContentEvent() {
     bestSuggestion = null;
@@ -60,7 +61,7 @@ public final class LocalResultContentEvent implements 
ContentEvent {
    * 
    * @return The best attribute split suggestion.
    */
-  AttributeSplitSuggestion getBestSuggestion() {
+  public AttributeSplitSuggestion getBestSuggestion() {
     return this.bestSuggestion;
   }
 
@@ -69,7 +70,7 @@ public final class LocalResultContentEvent implements 
ContentEvent {
    * 
    * @return The second best attribute split suggestion.
    */
-  AttributeSplitSuggestion getSecondBestSuggestion() {
+  public AttributeSplitSuggestion getSecondBestSuggestion() {
     return this.secondBestSuggestion;
   }
 
@@ -78,7 +79,7 @@ public final class LocalResultContentEvent implements 
ContentEvent {
    * 
    * @return The split id of this local calculation result
    */
-  long getSplitId() {
+  public long getSplitId() {
     return this.splitId;
   }
 
@@ -92,4 +93,12 @@ public final class LocalResultContentEvent implements 
ContentEvent {
   public boolean isLastEvent() {
     return false;
   }
+
+  public int getEnsembleId() {
+    return ensembleId;
+  }
+  
+  public void setEnsembleId(int ensembleId) {
+    this.ensembleId = ensembleId;
+  }
 }
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
index 6e7c174..fde2300 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Vector;
 
 import org.apache.samoa.core.ContentEvent;
 import org.apache.samoa.core.Processor;
@@ -65,6 +64,7 @@ public final class LocalStatisticsProcessor implements 
Processor {
   private final boolean binarySplit;
   private final AttributeClassObserver nominalClassObserver;
   private final AttributeClassObserver numericClassObserver;
+  private int id;
 
   // the two observer classes below are also needed to be setup from the Tree
   private LocalStatisticsProcessor(Builder builder) {
@@ -102,66 +102,94 @@ public final class LocalStatisticsProcessor implements 
Processor {
        * = (AttributeContentEvent) event; Long learningNodeId =
        * Long.valueOf(ace.getLearningNodeId()); Integer obsIndex =
        * Integer.valueOf(ace.getObsIndex());
-       * 
+       *
        * AttributeClassObserver obs = localStats.get( learningNodeId, 
obsIndex);
-       * 
+       *
        * if (obs == null) { obs = ace.isNominal() ? newNominalClassObserver() :
        * newNumericClassObserver(); localStats.put(ace.getLearningNodeId(),
        * obsIndex, obs); } obs.observeAttributeClass(ace.getAttrVal(),
        * ace.getClassVal(), ace.getWeight());
        */
-    } else if (event instanceof ComputeContentEvent) {
-      // process ComputeContentEvent by calculating the local statistic
-      // and send back the calculation results via computation result stream.
+    } else if (event instanceof AttributeSliceEvent) {
+      AttributeSliceEvent ase = (AttributeSliceEvent) event;
+      processAttributeSlice(ase);
+
+    } else if (event instanceof ComputeContentEvent){
       ComputeContentEvent cce = (ComputeContentEvent) event;
-      Long learningNodeId = cce.getLearningNodeId();
-      double[] preSplitDist = cce.getPreSplitDist();
-
-      Map<Integer, AttributeClassObserver> learningNodeRowMap = localStats
-          .row(learningNodeId);
-      List<AttributeSplitSuggestion> suggestions = new Vector<>();
-
-      for (Entry<Integer, AttributeClassObserver> entry : 
learningNodeRowMap.entrySet()) {
-        AttributeClassObserver obs = entry.getValue();
-        AttributeSplitSuggestion suggestion = obs
-            .getBestEvaluatedSplitSuggestion(splitCriterion,
-                preSplitDist, entry.getKey(), binarySplit);
-        if (suggestion != null) {
-          suggestions.add(suggestion);
-        }
-      }
+      processComputeEvent(cce);
+
+    } else if (event instanceof DeleteContentEvent) {
+      DeleteContentEvent dce = (DeleteContentEvent) event;
+      Long learningNodeId = dce.getLearningNodeId();
+      localStats.rowMap().remove(learningNodeId);
+    }
+    return true;
+  }
 
-      AttributeSplitSuggestion[] bestSuggestions = suggestions
-          .toArray(new AttributeSplitSuggestion[suggestions.size()]);
+  private void processComputeEvent(ComputeContentEvent cce) {
+    Long learningNodeId = cce.getLearningNodeId();
+    double[] preSplitDist = cce.getPreSplitDist();
+
+    Map<Integer, AttributeClassObserver> learningNodeRowMap = 
localStats.row(learningNodeId);
+    AttributeSplitSuggestion[] suggestions = new 
AttributeSplitSuggestion[learningNodeRowMap.size()];
+
+    int curIndex = 0;
+    for (Entry<Integer, AttributeClassObserver> entry : 
learningNodeRowMap.entrySet()) {
+      AttributeClassObserver obs = entry.getValue();
+      AttributeSplitSuggestion suggestion = obs
+          .getBestEvaluatedSplitSuggestion(splitCriterion,
+              preSplitDist, entry.getKey(), binarySplit);
+      if (suggestion == null) {
+        suggestion = new AttributeSplitSuggestion();
+      }
+      suggestions[curIndex] = suggestion;
+      curIndex++;
+    }
 
-      Arrays.sort(bestSuggestions);
+    // Doing this sort instead of keeping the max and second max seems faster 
for some reason
+    Arrays.sort(suggestions);
 
-      AttributeSplitSuggestion bestSuggestion = null;
-      AttributeSplitSuggestion secondBestSuggestion = null;
+    AttributeSplitSuggestion bestSuggestion = null;
+    AttributeSplitSuggestion secondBestSuggestion = null;
 
-      if (bestSuggestions.length >= 1) {
-        bestSuggestion = bestSuggestions[bestSuggestions.length - 1];
+    if (suggestions.length >= 1) {
+    bestSuggestion = suggestions[suggestions.length - 1];
 
-        if (bestSuggestions.length >= 2) {
-          secondBestSuggestion = bestSuggestions[bestSuggestions.length - 2];
-        }
+      if (suggestions.length >= 2) {
+        secondBestSuggestion = suggestions[suggestions.length - 2];
       }
+    }
 
-      // create the local result content event
-      LocalResultContentEvent lcre =
-          new LocalResultContentEvent(cce.getSplitId(), bestSuggestion, 
secondBestSuggestion);
-      computationResultStream.put(lcre);
-      logger.debug("Finish compute event");
-    } else if (event instanceof DeleteContentEvent) {
-      DeleteContentEvent dce = (DeleteContentEvent) event;
-      Long learningNodeId = dce.getLearningNodeId();
-      localStats.rowMap().remove(learningNodeId);
+    // create the local result content event
+    LocalResultContentEvent lcre =
+        new LocalResultContentEvent(cce.getSplitId(), bestSuggestion, 
secondBestSuggestion);
+    lcre.setEnsembleId(cce.getEnsembleId());
+    computationResultStream.put(lcre);
+  }
+
+  private void processAttributeSlice(AttributeSliceEvent ase) {
+    //      System.out.printf("Event with key: %s processed by LSP: %d%n", 
ase.getKey(), id);
+    double[] attributeSlice = ase.getAttributeSlice();
+    boolean[] isNominal = ase.getIsNominalSlice();
+    int startingIndex = ase.getAttributeStartingIndex();
+    Long learningNodeId = ase.getLearningNodeId();
+    int classValue = ase.getClassValue();
+    double weight = ase.getWeight();
+
+    for (int i = 0; i < attributeSlice.length; i++) {
+      Integer obsIndex = i + startingIndex;
+      AttributeClassObserver obs = localStats.get(learningNodeId, obsIndex);
+      if (obs == null) {
+        obs = isNominal[i] ? newNominalClassObserver() : 
newNumericClassObserver();
+        localStats.put(learningNodeId, obsIndex, obs);
+      }
+      obs.observeAttributeClass(attributeSlice[i], classValue, weight);
     }
-    return false;
   }
 
   @Override
   public void onCreate(int id) {
+    this.id = id;
     this.localStats = HashBasedTable.create();
   }
 
@@ -170,7 +198,7 @@ public final class LocalStatisticsProcessor implements 
Processor {
     LocalStatisticsProcessor oldProcessor = (LocalStatisticsProcessor) p;
     LocalStatisticsProcessor newProcessor = new 
LocalStatisticsProcessor.Builder(oldProcessor).build();
 
-    
newProcessor.setComputationResultStream(oldProcessor.computationResultStream);
+    
newProcessor.setComputationResultStream(oldProcessor.getComputationResultStream());
 
     return newProcessor;
   }
@@ -180,16 +208,16 @@ public final class LocalStatisticsProcessor implements 
Processor {
    * 
    * @param computeStream
    */
-  void setComputationResultStream(Stream computeStream) {
+  public void setComputationResultStream(Stream computeStream) {
     this.computationResultStream = computeStream;
   }
 
   private AttributeClassObserver newNominalClassObserver() {
-    return (AttributeClassObserver) this.nominalClassObserver.copy();
+    return new NominalAttributeClassObserver(); //further investigate this 
change
   }
 
   private AttributeClassObserver newNumericClassObserver() {
-    return (AttributeClassObserver) this.numericClassObserver.copy();
+    return new GaussianNumericAttributeClassObserver();//further investigate 
this change
   }
 
   /**
@@ -198,45 +226,66 @@ public final class LocalStatisticsProcessor implements 
Processor {
    * @author Arinto Murdopo
    * 
    */
-  static class Builder {
+  public static class Builder {
 
     private SplitCriterion splitCriterion = new InfoGainSplitCriterion();
     private boolean binarySplit = false;
     private AttributeClassObserver nominalClassObserver = new 
NominalAttributeClassObserver();
     private AttributeClassObserver numericClassObserver = new 
GaussianNumericAttributeClassObserver();
 
-    Builder() {
+    public Builder() {
 
     }
 
-    Builder(LocalStatisticsProcessor oldProcessor) {
-      this.splitCriterion = oldProcessor.splitCriterion;
-      this.binarySplit = oldProcessor.binarySplit;
+    public Builder(LocalStatisticsProcessor oldProcessor) {
+      this.splitCriterion = oldProcessor.getSplitCriterion();
+      this.binarySplit = oldProcessor.isBinarySplit();
+      this.nominalClassObserver = oldProcessor.getNominalClassObserver();
+      this.numericClassObserver = oldProcessor.getNumericClassObserver();
     }
 
-    Builder splitCriterion(SplitCriterion splitCriterion) {
+    public Builder splitCriterion(SplitCriterion splitCriterion) {
       this.splitCriterion = splitCriterion;
       return this;
     }
 
-    Builder binarySplit(boolean binarySplit) {
+    public Builder binarySplit(boolean binarySplit) {
       this.binarySplit = binarySplit;
       return this;
     }
 
-    Builder nominalClassObserver(AttributeClassObserver nominalClassObserver) {
+    public Builder nominalClassObserver(AttributeClassObserver 
nominalClassObserver) {
       this.nominalClassObserver = nominalClassObserver;
       return this;
     }
 
-    Builder numericClassObserver(AttributeClassObserver numericClassObserver) {
+    public Builder numericClassObserver(AttributeClassObserver 
numericClassObserver) {
       this.numericClassObserver = numericClassObserver;
       return this;
     }
 
-    LocalStatisticsProcessor build() {
+    public LocalStatisticsProcessor build() {
       return new LocalStatisticsProcessor(this);
     }
   }
-
+  
+  public SplitCriterion getSplitCriterion() {
+    return splitCriterion;
+  }
+  
+  public boolean isBinarySplit() {
+    return binarySplit;
+  }
+  
+  public AttributeClassObserver getNominalClassObserver() {
+    return nominalClassObserver;
+  }
+  
+  public AttributeClassObserver getNumericClassObserver() {
+    return numericClassObserver;
+  }
+  
+  public Stream getComputationResultStream() {
+    return computationResultStream;
+  }
 }
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
index a4f6fe1..d389102 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
@@ -64,44 +64,45 @@ import org.slf4j.LoggerFactory;
  * @author Arinto Murdopo
  * 
  */
-public final class ModelAggregatorProcessor implements Processor {
+public class ModelAggregatorProcessor implements Processor {
 
   private static final long serialVersionUID = -1685875718300564886L;
   private static final Logger logger = 
LoggerFactory.getLogger(ModelAggregatorProcessor.class);
 
-  private int processorId;
+  protected int processorId;
 
-  private Node treeRoot;
+  protected Node treeRoot;
 
-  private int activeLeafNodeCount;
-  private int inactiveLeafNodeCount;
-  private int decisionNodeCount;
-  private boolean growthAllowed;
+  protected int activeLeafNodeCount;
+  protected int inactiveLeafNodeCount;
+  protected int decisionNodeCount;
+  protected boolean growthAllowed;
 
-  private final Instances dataset;
+  protected final Instances dataset;
 
   // to support concurrent split
-  private long splitId;
-  private ConcurrentMap<Long, SplittingNodeInfo> splittingNodes;
-  private BlockingQueue<Long> timedOutSplittingNodes;
+  protected long splitId;
+  protected ConcurrentMap<Long, SplittingNodeInfo> splittingNodes;
+  public BlockingQueue<Long> timedOutSplittingNodes;
 
   // available streams
-  private Stream resultStream;
-  private Stream attributeStream;
-  private Stream controlStream;
+  public Stream resultStream;
+  public Stream attributeStream;
+  public Stream controlStream;
 
-  private transient ScheduledExecutorService executor;
+  protected transient ScheduledExecutorService executor;
 
-  private final SplitCriterion splitCriterion;
-  private final double splitConfidence;
-  private final double tieThreshold;
-  private final int gracePeriod;
-  private final int parallelismHint;
-  private final long timeOut;
+  public final SplitCriterion splitCriterion;
+  public final double splitConfidence;
+  public final double tieThreshold;
+  public final int gracePeriod;
+  public final int parallelismHint;
+  public final long timeOut;
 
   // private constructor based on Builder pattern
-  private ModelAggregatorProcessor(Builder builder) {
+  protected ModelAggregatorProcessor(Builder<?> builder) {
     this.dataset = builder.dataset;
+    this.processorId = builder.processorID;
     this.splitCriterion = builder.splitCriterion;
     this.splitConfidence = builder.splitConfidence;
     this.tieThreshold = builder.tieThreshold;
@@ -226,19 +227,19 @@ public final class ModelAggregatorProcessor implements 
Processor {
     this.resultStream = resultStream;
   }
 
-  void setAttributeStream(Stream attributeStream) {
+  public void setAttributeStream(Stream attributeStream) {
     this.attributeStream = attributeStream;
   }
 
-  void setControlStream(Stream controlStream) {
+  public void setControlStream(Stream controlStream) {
     this.controlStream = controlStream;
   }
 
-  void sendToAttributeStream(ContentEvent event) {
+  public void sendToAttributeStream(ContentEvent event) {
     this.attributeStream.put(event);
   }
 
-  void sendToControlStream(ContentEvent event) {
+  public void sendToControlStream(ContentEvent event) {
     this.controlStream.put(event);
   }
 
@@ -318,11 +319,11 @@ public final class ModelAggregatorProcessor implements 
Processor {
     }
   }
 
-  private boolean correctlyClassifies(Instance inst, double[] prediction) {
+  public boolean correctlyClassifies(Instance inst, double[] prediction) {
     return maxIndex(prediction) == (int) inst.classValue();
   }
 
-  private void resetLearning() {
+  public void resetLearning() {
     this.treeRoot = null;
     // Remove nodes
     FoundNode[] learningNodes = findNodes();
@@ -338,13 +339,13 @@ public final class ModelAggregatorProcessor implements 
Processor {
     }
   }
 
-  protected FoundNode[] findNodes() {
+  public FoundNode[] findNodes() {
     List<FoundNode> foundList = new LinkedList<>();
     findNodes(this.treeRoot, null, -1, foundList);
     return foundList.toArray(new FoundNode[foundList.size()]);
   }
 
-  protected void findNodes(Node node, SplitNode parent, int parentBranch, 
List<FoundNode> found) {
+  public void findNodes(Node node, SplitNode parent, int parentBranch, 
List<FoundNode> found) {
     if (node != null) {
       found.add(new FoundNode(node, parent, parentBranch));
       if (node instanceof SplitNode) {
@@ -362,7 +363,7 @@ public final class ModelAggregatorProcessor implements 
Processor {
    * @param inst
    * @return
    */
-  private double[] getVotesForInstance(Instance inst) {
+  public double[] getVotesForInstance(Instance inst) {
     return getVotesForInstance(inst, false);
   }
 
@@ -383,7 +384,7 @@ public final class ModelAggregatorProcessor implements 
Processor {
 
     }
 
-    // Training after testing to speed up the process
+    // Training after testing to speed up the process. Never called for 
BoostMAProcessor
     if (isTraining) {
       if (this.treeRoot == null) {
         this.treeRoot = newLearningNode(this.parallelismHint);
@@ -401,7 +402,7 @@ public final class ModelAggregatorProcessor implements 
Processor {
    * 
    * @param inst
    */
-  private void trainOnInstanceImpl(Instance inst) {
+  public void trainOnInstanceImpl(Instance inst) {
     if (this.treeRoot == null) {
       this.treeRoot = newLearningNode(this.parallelismHint);
       this.activeLeafNodeCount = 1;
@@ -411,7 +412,7 @@ public final class ModelAggregatorProcessor implements 
Processor {
     trainOnInstanceImpl(foundNode, inst);
   }
 
-  private void trainOnInstanceImpl(FoundNode foundNode, Instance inst) {
+  public void trainOnInstanceImpl(FoundNode foundNode, Instance inst) {
 
     Node leafNode = foundNode.getNode();
 
@@ -440,7 +441,7 @@ public final class ModelAggregatorProcessor implements 
Processor {
    * @param foundNode
    *          The data structure to represents the filtering of the instance 
using the tree model.
    */
-  private void attemptToSplit(ActiveLearningNode activeLearningNode, FoundNode 
foundNode) {
+  public void attemptToSplit(ActiveLearningNode activeLearningNode, FoundNode 
foundNode) {
     if (!activeLearningNode.observedClassDistributionIsPure()) {
       // Increment the split ID
       this.splitId++;
@@ -469,7 +470,7 @@ public final class ModelAggregatorProcessor implements 
Processor {
    * @param foundNode
    *          The data structure to represents the filtering of the instance 
using the tree model.
    */
-  private void continueAttemptToSplit(ActiveLearningNode activeLearningNode, 
FoundNode foundNode) {
+  protected void continueAttemptToSplit(ActiveLearningNode activeLearningNode, 
FoundNode foundNode) {
     AttributeSplitSuggestion bestSuggestion = 
activeLearningNode.getDistributedBestSuggestion();
     AttributeSplitSuggestion secondBestSuggestion = 
activeLearningNode.getDistributedSecondBestSuggestion();
 
@@ -490,7 +491,7 @@ public final class ModelAggregatorProcessor implements 
Processor {
     boolean shouldSplit = false;
 
     if (secondBestSuggestion == null) {
-      shouldSplit = (bestSuggestion != null);
+      shouldSplit = true;
     } else {
       double hoeffdingBound = computeHoeffdingBound(
           
this.splitCriterion.getRangeOfMerit(activeLearningNode.getObservedClassDistribution()),
 this.splitConfidence,
@@ -544,7 +545,7 @@ public final class ModelAggregatorProcessor implements 
Processor {
    * @param parentBranch
    *          the branch index of the node in the parent node
    */
-  private void deactivateLearningNode(ActiveLearningNode toDeactivate, 
SplitNode parent, int parentBranch) {
+  public void deactivateLearningNode(ActiveLearningNode toDeactivate, 
SplitNode parent, int parentBranch) {
     Node newLeaf = new 
InactiveLearningNode(toDeactivate.getObservedClassDistribution());
     if (parent == null) {
       this.treeRoot = newLeaf;
@@ -556,11 +557,11 @@ public final class ModelAggregatorProcessor implements 
Processor {
     this.inactiveLeafNodeCount++;
   }
 
-  private LearningNode newLearningNode(int parallelismHint) {
+  protected LearningNode newLearningNode(int parallelismHint) {
     return newLearningNode(new double[0], parallelismHint);
   }
 
-  private LearningNode newLearningNode(double[] initialClassObservations, int 
parallelismHint) {
+  protected LearningNode newLearningNode(double[] initialClassObservations, 
int parallelismHint) {
     // for VHT optimization, we need to dynamically instantiate the appropriate
     // ActiveLearningNode
     return new ActiveLearningNode(initialClassObservations, parallelismHint);
@@ -571,7 +572,7 @@ public final class ModelAggregatorProcessor implements 
Processor {
    * 
    * @param ih
    */
-  private void setModelContext(InstancesHeader ih) {
+  public void setModelContext(InstancesHeader ih) {
     // TODO possibly refactored
     if ((ih != null) && (ih.classIndex() < 0)) {
       throw new IllegalArgumentException("Context for a classifier must 
include a class to learn");
@@ -582,7 +583,7 @@ public final class ModelAggregatorProcessor implements 
Processor {
     logger.trace("Model context: {}", ih.toString());
   }
 
-  private static double computeHoeffdingBound(double range, double confidence, 
double n) {
+  public static double computeHoeffdingBound(double range, double confidence, 
double n) {
     return Math.sqrt((Math.pow(range, 2.0) * Math.log(1.0 / confidence)) / 
(2.0 * n));
   }
 
@@ -593,7 +594,7 @@ public final class ModelAggregatorProcessor implements 
Processor {
    * @author Arinto Murdopo
    * 
    */
-  static class AggregationTimeOutHandler implements Runnable {
+  protected static class AggregationTimeOutHandler implements Runnable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(AggregationTimeOutHandler.class);
     private final Long splitId;
@@ -622,17 +623,30 @@ public final class ModelAggregatorProcessor implements 
Processor {
    * @author Arinto Murdopo
    * 
    */
-  static class SplittingNodeInfo implements Serializable {
+  protected static class SplittingNodeInfo implements Serializable {
 
     private final ActiveLearningNode activeLearningNode;
     private final FoundNode foundNode;
-    private final transient ScheduledFuture<?> scheduledFuture;
+    private transient ScheduledFuture<?> scheduledFuture;
 
     SplittingNodeInfo(ActiveLearningNode activeLearningNode, FoundNode 
foundNode, ScheduledFuture<?> scheduledFuture) {
       this.activeLearningNode = activeLearningNode;
       this.foundNode = foundNode;
       this.scheduledFuture = scheduledFuture;
     }
+
+    public SplittingNodeInfo(BoostVHTActiveLearningNode activeLearningNode, 
FoundNode foundNode) {
+      this.activeLearningNode = activeLearningNode;
+      this.foundNode = foundNode;
+    }
+
+    public ActiveLearningNode getActiveLearningNode() {
+      return activeLearningNode;
+    }
+
+    public FoundNode getFoundNode() {
+      return foundNode;
+    }
   }
 
   protected ChangeDetector changeDetector;
@@ -651,10 +665,11 @@ public final class ModelAggregatorProcessor implements 
Processor {
    * @author Arinto Murdopo
    * 
    */
-  static class Builder {
+  public static class Builder<T extends Builder<T>>{
 
     // required parameters
     private final Instances dataset;
+    private int processorID;
 
     // default values
     private SplitCriterion splitCriterion = new InfoGainSplitCriterion();
@@ -665,11 +680,11 @@ public final class ModelAggregatorProcessor implements 
Processor {
     private long timeOut = 30;
     private ChangeDetector changeDetector = null;
 
-    Builder(Instances dataset) {
+    public Builder(Instances dataset) {
       this.dataset = dataset;
     }
 
-    Builder(ModelAggregatorProcessor oldProcessor) {
+    public Builder(ModelAggregatorProcessor oldProcessor) {
       this.dataset = oldProcessor.dataset;
       this.splitCriterion = oldProcessor.splitCriterion;
       this.splitConfidence = oldProcessor.splitConfidence;
@@ -677,46 +692,96 @@ public final class ModelAggregatorProcessor implements 
Processor {
       this.gracePeriod = oldProcessor.gracePeriod;
       this.parallelismHint = oldProcessor.parallelismHint;
       this.timeOut = oldProcessor.timeOut;
+      this.processorID = oldProcessor.getProcessorId();
     }
 
-    Builder splitCriterion(SplitCriterion splitCriterion) {
+    public T splitCriterion(SplitCriterion splitCriterion) {
       this.splitCriterion = splitCriterion;
-      return this;
+      return getThis();
     }
 
-    Builder splitConfidence(double splitConfidence) {
+    public T splitConfidence(double splitConfidence) {
       this.splitConfidence = splitConfidence;
-      return this;
+      return getThis();
     }
 
-    Builder tieThreshold(double tieThreshold) {
+    public T tieThreshold(double tieThreshold) {
       this.tieThreshold = tieThreshold;
-      return this;
+      return getThis();
     }
 
-    Builder gracePeriod(int gracePeriod) {
+    public T gracePeriod(int gracePeriod) {
       this.gracePeriod = gracePeriod;
-      return this;
+      return getThis();
     }
 
-    Builder parallelismHint(int parallelismHint) {
+    public T parallelismHint(int parallelismHint) {
       this.parallelismHint = parallelismHint;
-      return this;
+      return getThis();
     }
 
-    Builder timeOut(long timeOut) {
+    public T timeOut(long timeOut) {
       this.timeOut = timeOut;
-      return this;
+      return getThis();
     }
 
-    Builder changeDetector(ChangeDetector changeDetector) {
+    public T processorID(int processorID) {
+      this.processorID = processorID;
+      return getThis();
+    }
+
+    T changeDetector(ChangeDetector changeDetector) {
       this.changeDetector = changeDetector;
-      return this;
+      return getThis();
+    }
+
+    public T getThis(){
+      return (T) this;
     }
 
-    ModelAggregatorProcessor build() {
+    public ModelAggregatorProcessor build() {
       return new ModelAggregatorProcessor(this);
     }
+
+  }
+
+  public Instances getDataset() {
+    return dataset;
+  }
+
+  public Stream getAttributeStream() {
+    return attributeStream;
+  }
+
+  public Stream getControlStream() {
+    return controlStream;
+  }
+
+  public SplitCriterion getSplitCriterion() {
+    return splitCriterion;
+  }
+
+  public double getSplitConfidence() {
+    return splitConfidence;
+  }
+
+  public double getTieThreshold() {
+    return tieThreshold;
+  }
+
+  public int getGracePeriod() {
+    return gracePeriod;
+  }
+
+  public int getParallelismHint() {
+    return parallelismHint;
+  }
+
+  public long getTimeOut() {
+    return timeOut;
   }
 
+  public int getProcessorId() {
+    return processorId;
+  }
 }
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java
index 898a433..8ac6150 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java
@@ -29,7 +29,7 @@ import org.apache.samoa.instances.Instance;
  * @author Arinto Murdopo
  * 
  */
-abstract class Node implements java.io.Serializable {
+public abstract class Node implements java.io.Serializable {
 
   private static final long serialVersionUID = 4008521239214180548L;
 
@@ -46,7 +46,7 @@ abstract class Node implements java.io.Serializable {
    *          The index of the current node in the parent
    * @return FoundNode which is the data structure to represent the resulting 
leaf.
    */
-  abstract FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int 
parentBranch);
+  public abstract FoundNode filterInstanceToLeaf(Instance inst, SplitNode 
parent, int parentBranch);
 
   /**
    * Method to return the predicted class of the instance based on the 
statistic inside the node.
@@ -81,7 +81,7 @@ abstract class Node implements java.io.Serializable {
    * 
    * @return Observed class distribution
    */
-  protected double[] getObservedClassDistribution() {
+  public double[] getObservedClassDistribution() {
     return this.observedClassDistribution.getArrayCopy();
   }
 
@@ -90,7 +90,7 @@ abstract class Node implements java.io.Serializable {
    * 
    * @return Flag whether class distribution is pure or not.
    */
-  protected boolean observedClassDistributionIsPure() {
+  public boolean observedClassDistributionIsPure() {
     return (observedClassDistribution.numNonZeroEntries() < 2);
   }
 
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java
index c2b1a47..d05caa0 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java
@@ -46,7 +46,7 @@ public class SplitNode extends Node {
   }
 
   @Override
-  FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int 
parentBranch) {
+  public FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int 
parentBranch) {
     int childIndex = instanceChildIndex(inst);
     if (childIndex >= 0) {
       Node child = getChild(childIndex);
@@ -85,7 +85,7 @@ public class SplitNode extends Node {
    * @param child
    *          The child node
    */
-  void setChild(int index, Node child) {
+  public void setChild(int index, Node child) {
     if ((this.splitTest.maxBranches() >= 0)
         && (index >= this.splitTest.maxBranches())) {
       throw new IndexOutOfBoundsException();
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/core/AttributeSplitSuggestion.java
 
b/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/core/AttributeSplitSuggestion.java
index 2eeade5..e75622e 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/core/AttributeSplitSuggestion.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/core/AttributeSplitSuggestion.java
@@ -40,6 +40,7 @@ public class AttributeSplitSuggestion extends 
AbstractMOAObject implements Compa
   public double merit;
 
   public AttributeSplitSuggestion() {
+    merit = Double.NEGATIVE_INFINITY;
   }
 
   public AttributeSplitSuggestion(InstanceConditionalTest splitTest,
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/functions/DecisionStump.java
 
b/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/functions/DecisionStump.java
new file mode 100644
index 0000000..730b69a
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/moa/classifiers/functions/DecisionStump.java
@@ -0,0 +1,157 @@
+package org.apache.samoa.moa.classifiers.functions;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.github.javacliparser.FlagOption;
+import com.github.javacliparser.IntOption;
+import org.apache.samoa.moa.classifiers.AbstractClassifier;
+import org.apache.samoa.moa.classifiers.core.AttributeSplitSuggestion;
+import 
org.apache.samoa.moa.classifiers.core.attributeclassobservers.AttributeClassObserver;
+import 
org.apache.samoa.moa.classifiers.core.attributeclassobservers.GaussianNumericAttributeClassObserver;
+import 
org.apache.samoa.moa.classifiers.core.attributeclassobservers.NominalAttributeClassObserver;
+import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
+import org.apache.samoa.moa.core.AutoExpandVector;
+import org.apache.samoa.moa.core.DoubleVector;
+import org.apache.samoa.moa.core.Measurement;
+import org.apache.samoa.moa.options.ClassOption;
+import org.apache.samoa.instances.Instance;
+
+/**
+ * Decision trees of one level.<br />
+ *
+ * Parameters:</p>
+ * <ul>
+ * <li>-g : The number of instances to observe between model changes</li>
+ * <li>-b : Only allow binary splits</li>
+ * <li>-c : Split criterion to use. Example : InfoGainSplitCriterion</li>
+ * <li>-r : Seed for random behaviour of the classifier</li>
+ * </ul>
+ *
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public class DecisionStump extends AbstractClassifier {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public String getPurposeString() {
+    return "Decision trees of one level.";
+  }
+
+  public IntOption gracePeriodOption = new IntOption("gracePeriod", 'g',
+      "The number of instances to observe between model changes.", 1000,
+      0, Integer.MAX_VALUE);
+
+  public FlagOption binarySplitsOption = new FlagOption("binarySplits", 'b',
+      "Only allow binary splits.");
+
+  public ClassOption splitCriterionOption = new ClassOption("splitCriterion",
+      'c', "Split criterion to use.", SplitCriterion.class,
+      "InfoGainSplitCriterion");
+
+  protected AttributeSplitSuggestion bestSplit;
+
+  protected DoubleVector observedClassDistribution;
+
+  protected AutoExpandVector<AttributeClassObserver> attributeObservers;
+
+  protected double weightSeenAtLastSplit;
+
+  @Override
+  public void resetLearningImpl() {
+    this.bestSplit = null;
+    this.observedClassDistribution = new DoubleVector();
+    this.attributeObservers = new AutoExpandVector<AttributeClassObserver>();
+    this.weightSeenAtLastSplit = 0.0;
+  }
+
+  @Override
+  protected Measurement[] getModelMeasurementsImpl() {
+    return null;
+  }
+
+  @Override
+  public void getModelDescription(StringBuilder out, int indent) {
+    // TODO Auto-generated method stub
+  }
+
+  @Override
+  public void trainOnInstanceImpl(Instance inst) {
+    this.observedClassDistribution.addToValue((int) inst.classValue(), 
inst.weight());
+    for (int i = 0; i < inst.numAttributes() - 1; i++) {
+      int instAttIndex = modelAttIndexToInstanceAttIndex(i);
+      AttributeClassObserver obs = this.attributeObservers.get(i);
+      if (obs == null) {
+        obs = inst.attribute(instAttIndex).isNominal() ? 
newNominalClassObserver()
+            : newNumericClassObserver();
+        this.attributeObservers.set(i, obs);
+      }
+      obs.observeAttributeClass(inst.value(instAttIndex), (int) 
inst.classValue(), inst.weight());
+    }
+    if (this.trainingWeightSeenByModel - this.weightSeenAtLastSplit >= 
this.gracePeriodOption.getValue()) {
+      this.bestSplit = findBestSplit((SplitCriterion) 
getPreparedClassOption(this.splitCriterionOption));
+      this.weightSeenAtLastSplit = this.trainingWeightSeenByModel;
+    }
+  }
+
+  @Override
+  public double[] getVotesForInstance(Instance inst) {
+    if (this.bestSplit != null) {
+      int branch = this.bestSplit.splitTest.branchForInstance(inst);
+      if (branch >= 0) {
+        return this.bestSplit.resultingClassDistributionFromSplit(branch);
+      }
+    }
+    return this.observedClassDistribution.getArrayCopy();
+  }
+
+  @Override
+  public boolean isRandomizable() {
+    return false;
+  }
+
+  protected AttributeClassObserver newNominalClassObserver() {
+    return new NominalAttributeClassObserver();
+  }
+
+  protected AttributeClassObserver newNumericClassObserver() {
+    return new GaussianNumericAttributeClassObserver();
+  }
+
+  protected AttributeSplitSuggestion findBestSplit(SplitCriterion criterion) {
+    AttributeSplitSuggestion bestFound = null;
+    double bestMerit = Double.NEGATIVE_INFINITY;
+    double[] preSplitDist = this.observedClassDistribution.getArrayCopy();
+    for (int i = 0; i < this.attributeObservers.size(); i++) {
+      AttributeClassObserver obs = this.attributeObservers.get(i);
+      if (obs != null) {
+        AttributeSplitSuggestion suggestion = 
obs.getBestEvaluatedSplitSuggestion(criterion,
+            preSplitDist, i, this.binarySplitsOption.isSet());
+        if (suggestion.merit > bestMerit) {
+          bestMerit = suggestion.merit;
+          bestFound = suggestion;
+        }
+      }
+    }
+    return bestFound;
+  }
+}
diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/core/Vote.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/core/Vote.java
index 24ea3f3..2fda158 100644
--- a/samoa-api/src/main/java/org/apache/samoa/moa/core/Vote.java
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/core/Vote.java
@@ -1,5 +1,25 @@
 package org.apache.samoa.moa.core;
 
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
 import java.io.Serializable;
 
 /*

Reply via email to