http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
index cf7a1b3..b526a40 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
@@ -53,679 +53,706 @@ import com.yahoo.labs.samoa.topology.Stream;
 import static com.yahoo.labs.samoa.moa.core.Utils.maxIndex;
 
 /**
- * Model Aggegator Processor consists of the decision tree model. It connects 
- * to local-statistic PI via attribute stream and control stream. 
- * Model-aggregator PI sends the split instances via attribute stream and 
- * it sends control messages to ask local-statistic PI to perform computation 
- * via control stream. 
+ * Model Aggegator Processor consists of the decision tree model. It connects 
to
+ * local-statistic PI via attribute stream and control stream. Model-aggregator
+ * PI sends the split instances via attribute stream and it sends control
+ * messages to ask local-statistic PI to perform computation via control 
stream.
+ * 
+ * Model-aggregator PI sends the classification result via result stream to an
+ * evaluator PI for classifier or other destination PI. The calculation results
+ * from local statistic arrive to the model-aggregator PI via 
computation-result
+ * stream.
  * 
- * Model-aggregator PI sends the classification result via result stream to 
- * an evaluator PI for classifier or other destination PI. The calculation 
- * results from local statistic arrive to the model-aggregator PI via 
- * computation-result stream.
-
  * @author Arinto Murdopo
- *
+ * 
  */
 final class ModelAggregatorProcessor implements Processor {
 
-       private static final long serialVersionUID = -1685875718300564886L;
-       private static final Logger logger = 
LoggerFactory.getLogger(ModelAggregatorProcessor.class);
-
-       private int processorId;
-       
-       private Node treeRoot;
-       
-       private int activeLeafNodeCount;
-       private int inactiveLeafNodeCount;
-       private int decisionNodeCount;
-       private boolean growthAllowed;
-       
-       private final Instances dataset;
-
-       //to support concurrent split
-       private long splitId;
-       private ConcurrentMap<Long, SplittingNodeInfo> splittingNodes;
-       private BlockingQueue<Long> timedOutSplittingNodes;
-       
-       //available streams
-       private Stream resultStream;
-       private Stream attributeStream;
-       private Stream controlStream;
-       
-       private transient ScheduledExecutorService executor;
-       
-       private final SplitCriterion splitCriterion;
-       private final double splitConfidence; 
-       private final double tieThreshold;
-       private final int gracePeriod;
-       private final int parallelismHint;
-       private final long timeOut;
-       
-       //private constructor based on Builder pattern
-       private ModelAggregatorProcessor(Builder builder){
-               this.dataset = builder.dataset;
-               this.splitCriterion = builder.splitCriterion;
-               this.splitConfidence = builder.splitConfidence;
-               this.tieThreshold = builder.tieThreshold;
-               this.gracePeriod = builder.gracePeriod;
-               this.parallelismHint = builder.parallelismHint;
-               this.timeOut = builder.timeOut;
-               this.changeDetector = builder.changeDetector;
-
-               InstancesHeader ih = new InstancesHeader(dataset);
-               this.setModelContext(ih);
-       }
-
-       @Override
-       public boolean process(ContentEvent event) {
-               
-               //Poll the blocking queue shared between ModelAggregator and 
the time-out threads
-               Long timedOutSplitId = timedOutSplittingNodes.poll();
-               if(timedOutSplitId != null){ //time out has been reached!
-                       SplittingNodeInfo splittingNode = 
splittingNodes.get(timedOutSplitId);
-                       if (splittingNode != null) {
-                               this.splittingNodes.remove(timedOutSplitId);
-                               
this.continueAttemptToSplit(splittingNode.activeLearningNode,
-                                               splittingNode.foundNode);
-                       
-                       }
-
-               }
-               
-               //Receive a new instance from source
-               if(event instanceof InstancesContentEvent){
-                       InstancesContentEvent instancesEvent = 
(InstancesContentEvent) event;
-                       this.processInstanceContentEvent(instancesEvent);
-                        //Send information to local-statistic PI
-                        //for each of the nodes
-                        if (this.foundNodeSet != null){
-                            for (FoundNode foundNode: this.foundNodeSet ){
-                                ActiveLearningNode leafNode = 
(ActiveLearningNode) foundNode.getNode();
-                                AttributeBatchContentEvent[] abce = 
leafNode.getAttributeBatchContentEvent();
-                                if (abce != null) {
-                                    for (int i = 0; i< 
this.dataset.numAttributes() - 1; i++) {
-                                        this.sendToAttributeStream(abce[i]);
-                                    }
-                                }
-                                leafNode.setAttributeBatchContentEvent(null);
-                            //this.sendToControlStream(event); //split 
information
-                            //See if we can ask for splits
-                                if(!leafNode.isSplitting()){
-                                    double weightSeen = 
leafNode.getWeightSeen();
-                                    //check whether it is the time for 
splitting
-                                    if(weightSeen - 
leafNode.getWeightSeenAtLastSplitEvaluation() >= this.gracePeriod){
-                                            attemptToSplit(leafNode, 
foundNode);
-                                    } 
-                                }
-                            }
-                        }
-                        this.foundNodeSet = null;
-               } else if(event instanceof LocalResultContentEvent){
-                       LocalResultContentEvent lrce = 
(LocalResultContentEvent) event;
-                       Long lrceSplitId = lrce.getSplitId();
-                       SplittingNodeInfo splittingNodeInfo = 
splittingNodes.get(lrceSplitId);
-                       
-                       if (splittingNodeInfo != null) { // if null, that means
-                                                                               
                // activeLearningNode has been
-                                                                               
                // removed by timeout thread
-                               ActiveLearningNode activeLearningNode = 
splittingNodeInfo.activeLearningNode;
-
-                               activeLearningNode.addDistributedSuggestions(
-                                               lrce.getBestSuggestion(),
-                                               lrce.getSecondBestSuggestion());
-
-                               if 
(activeLearningNode.isAllSuggestionsCollected()) {
-                                       
splittingNodeInfo.scheduledFuture.cancel(false);
-                                       this.splittingNodes.remove(lrceSplitId);
-                                       
this.continueAttemptToSplit(activeLearningNode,
-                                                       
splittingNodeInfo.foundNode);
-                               }
-                       }
-               }
-               return false;
-       }
-
-        protected Set<FoundNode> foundNodeSet;
-        
-       @Override
-       public void onCreate(int id) {
-               this.processorId = id;
-               
-               this.activeLeafNodeCount = 0;
-               this.inactiveLeafNodeCount = 0;
-               this.decisionNodeCount = 0; 
-               this.growthAllowed = true;
-               
-               this.splittingNodes = new ConcurrentHashMap<>();
-               this.timedOutSplittingNodes = new LinkedBlockingQueue<>();
-               this.splitId = 0;
-               
-               //Executor for scheduling time-out threads
-               this.executor = Executors.newScheduledThreadPool(8);
-       }
-
-       @Override
-       public Processor newProcessor(Processor p) {
-               ModelAggregatorProcessor oldProcessor = 
(ModelAggregatorProcessor)p;
-               ModelAggregatorProcessor newProcessor = 
-                               new 
ModelAggregatorProcessor.Builder(oldProcessor).build();
-               
-               newProcessor.setResultStream(oldProcessor.resultStream);
-               newProcessor.setAttributeStream(oldProcessor.attributeStream);
-               newProcessor.setControlStream(oldProcessor.controlStream);
-               return newProcessor;
-       }
-       
-       @Override
-       public String toString() {
-               StringBuilder sb = new StringBuilder();
-               sb.append(super.toString());
-
-               sb.append("ActiveLeafNodeCount: ").append(activeLeafNodeCount);
-               sb.append("InactiveLeafNodeCount: 
").append(inactiveLeafNodeCount);
-               sb.append("DecisionNodeCount: ").append(decisionNodeCount);
-               sb.append("Growth allowed: ").append(growthAllowed);
-               return sb.toString();
-       }
-       
-       void setResultStream(Stream resultStream){
-               this.resultStream = resultStream;
-       }
-       
-       void setAttributeStream(Stream attributeStream){
-               this.attributeStream = attributeStream;
-       }
-       
-       void setControlStream(Stream controlStream){
-               this.controlStream = controlStream;
-       }
-       
-       void sendToAttributeStream(ContentEvent event){
-               this.attributeStream.put(event);
-       }
-       
-       void sendToControlStream(ContentEvent event){
-               this.controlStream.put(event);
-       }
-       
-       /**
-        * Helper method to generate new ResultContentEvent based on an 
instance and
-        * its prediction result.
-        * @param prediction The predicted class label from the decision tree 
model.
-        * @param inEvent The associated instance content event
-        * @return ResultContentEvent to be sent into Evaluator PI or other 
destination PI.
-        */
-       private ResultContentEvent newResultContentEvent(double[] prediction, 
InstanceContentEvent inEvent){
-               ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
-                               inEvent.getClassId(), prediction, 
inEvent.isLastEvent());
-               rce.setClassifierIndex(this.processorId);
-               rce.setEvaluationIndex(inEvent.getEvaluationIndex());
-               return rce;
-       }
-                       
-       private ResultContentEvent newResultContentEvent(double[] prediction, 
Instance inst, InstancesContentEvent inEvent){
-               ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inst, (int) inst.classValue(), 
prediction, inEvent.isLastEvent());
-               rce.setClassifierIndex(this.processorId);
-               rce.setEvaluationIndex(inEvent.getEvaluationIndex());
-               return rce;
-       }
-                       
-       private List<InstancesContentEvent> contentEventList = new 
LinkedList<>();
-
-        
-       /**
-        * Helper method to process the InstanceContentEvent
-        * @param instContentEvent
-        */
-       private void processInstanceContentEvent(InstancesContentEvent 
instContentEvent){
-            this.numBatches++;
-            this.contentEventList.add(instContentEvent);
-            if (this.numBatches == 1 || this.numBatches > 4){
-                this.processInstances(this.contentEventList.remove(0));
-            }
+  private static final long serialVersionUID = -1685875718300564886L;
+  private static final Logger logger = 
LoggerFactory.getLogger(ModelAggregatorProcessor.class);
+
+  private int processorId;
+
+  private Node treeRoot;
+
+  private int activeLeafNodeCount;
+  private int inactiveLeafNodeCount;
+  private int decisionNodeCount;
+  private boolean growthAllowed;
+
+  private final Instances dataset;
+
+  // to support concurrent split
+  private long splitId;
+  private ConcurrentMap<Long, SplittingNodeInfo> splittingNodes;
+  private BlockingQueue<Long> timedOutSplittingNodes;
+
+  // available streams
+  private Stream resultStream;
+  private Stream attributeStream;
+  private Stream controlStream;
+
+  private transient ScheduledExecutorService executor;
+
+  private final SplitCriterion splitCriterion;
+  private final double splitConfidence;
+  private final double tieThreshold;
+  private final int gracePeriod;
+  private final int parallelismHint;
+  private final long timeOut;
+
+  // private constructor based on Builder pattern
+  private ModelAggregatorProcessor(Builder builder) {
+    this.dataset = builder.dataset;
+    this.splitCriterion = builder.splitCriterion;
+    this.splitConfidence = builder.splitConfidence;
+    this.tieThreshold = builder.tieThreshold;
+    this.gracePeriod = builder.gracePeriod;
+    this.parallelismHint = builder.parallelismHint;
+    this.timeOut = builder.timeOut;
+    this.changeDetector = builder.changeDetector;
+
+    InstancesHeader ih = new InstancesHeader(dataset);
+    this.setModelContext(ih);
+  }
+
+  @Override
+  public boolean process(ContentEvent event) {
+
+    // Poll the blocking queue shared between ModelAggregator and the time-out
+    // threads
+    Long timedOutSplitId = timedOutSplittingNodes.poll();
+    if (timedOutSplitId != null) { // time out has been reached!
+      SplittingNodeInfo splittingNode = splittingNodes.get(timedOutSplitId);
+      if (splittingNode != null) {
+        this.splittingNodes.remove(timedOutSplitId);
+        this.continueAttemptToSplit(splittingNode.activeLearningNode,
+            splittingNode.foundNode);
+
+      }
+
+    }
 
-            if (instContentEvent.isLastEvent()) {
-                // drain remaining instances
-                while (!contentEventList.isEmpty()) {
-                    processInstances(contentEventList.remove(0));
-                }
+    // Receive a new instance from source
+    if (event instanceof InstancesContentEvent) {
+      InstancesContentEvent instancesEvent = (InstancesContentEvent) event;
+      this.processInstanceContentEvent(instancesEvent);
+      // Send information to local-statistic PI
+      // for each of the nodes
+      if (this.foundNodeSet != null) {
+        for (FoundNode foundNode : this.foundNodeSet) {
+          ActiveLearningNode leafNode = (ActiveLearningNode) 
foundNode.getNode();
+          AttributeBatchContentEvent[] abce = 
leafNode.getAttributeBatchContentEvent();
+          if (abce != null) {
+            for (int i = 0; i < this.dataset.numAttributes() - 1; i++) {
+              this.sendToAttributeStream(abce[i]);
             }
-                
-        }
-        
-        private int numBatches = 0;
-                    
-        private void processInstances(InstancesContentEvent instContentEvent){
-         
-            Instance[] instances = instContentEvent.getInstances();
-            boolean isTesting = instContentEvent.isTesting();
-            boolean isTraining= instContentEvent.isTraining();
-            for (Instance inst: instances){
-                this.processInstance(inst,instContentEvent, isTesting, 
isTraining);
+          }
+          leafNode.setAttributeBatchContentEvent(null);
+          // this.sendToControlStream(event); //split information
+          // See if we can ask for splits
+          if (!leafNode.isSplitting()) {
+            double weightSeen = leafNode.getWeightSeen();
+            // check whether it is the time for splitting
+            if (weightSeen - leafNode.getWeightSeenAtLastSplitEvaluation() >= 
this.gracePeriod) {
+              attemptToSplit(leafNode, foundNode);
             }
+          }
         }
-        
-         private void processInstance(Instance inst, InstancesContentEvent 
instContentEvent, boolean isTesting, boolean isTraining){
-                inst.setDataset(this.dataset);
-               //Check the instance whether it is used for testing or training
-                //boolean testAndTrain = isTraining; //Train after testing
-               double[] prediction = null;
-               if (isTesting) {
-                        prediction = getVotesForInstance(inst, false);
-                       this.resultStream.put(newResultContentEvent(prediction, 
inst,
-                                       instContentEvent));
-               }
-
-               if (isTraining) {
-                       trainOnInstanceImpl(inst);
-                        if (this.changeDetector != null) {
-                            if (prediction == null) {
-                                prediction = getVotesForInstance(inst);
-                            }
-                            boolean correctlyClassifies = 
this.correctlyClassifies(inst,prediction);
-                            double oldEstimation = 
this.changeDetector.getEstimation();
-                            this.changeDetector.input(correctlyClassifies ? 0 
: 1);
-                            if (this.changeDetector.getEstimation() > 
oldEstimation) {
-                                //Start a new classifier
-                                logger.info("Change detected, resetting the 
classifier");
-                                this.resetLearning();
-                                this.changeDetector.resetLearning();
-                            }
-                    }
-               }
-       }
-       
-        private boolean correctlyClassifies(Instance inst, double[] 
prediction) {
-            return maxIndex(prediction) == (int) inst.classValue();
+      }
+      this.foundNodeSet = null;
+    } else if (event instanceof LocalResultContentEvent) {
+      LocalResultContentEvent lrce = (LocalResultContentEvent) event;
+      Long lrceSplitId = lrce.getSplitId();
+      SplittingNodeInfo splittingNodeInfo = splittingNodes.get(lrceSplitId);
+
+      if (splittingNodeInfo != null) { // if null, that means
+        // activeLearningNode has been
+        // removed by timeout thread
+        ActiveLearningNode activeLearningNode = 
splittingNodeInfo.activeLearningNode;
+
+        activeLearningNode.addDistributedSuggestions(
+            lrce.getBestSuggestion(),
+            lrce.getSecondBestSuggestion());
+
+        if (activeLearningNode.isAllSuggestionsCollected()) {
+          splittingNodeInfo.scheduledFuture.cancel(false);
+          this.splittingNodes.remove(lrceSplitId);
+          this.continueAttemptToSplit(activeLearningNode,
+              splittingNodeInfo.foundNode);
         }
-        
-        private void resetLearning() {
-            this.treeRoot = null;
-            //Remove nodes
-            FoundNode[] learningNodes = findNodes();
-            for (FoundNode learningNode : learningNodes) {
-                Node node = learningNode.getNode();
-                if (node instanceof SplitNode) {
-                    SplitNode splitNode;
-                    splitNode = (SplitNode) node;
-                    for (int i = 0; i < splitNode.numChildren(); i++) {
-                        splitNode.setChild(i, null);
-                    }
-                }
-            }
+      }
+    }
+    return false;
+  }
+
+  protected Set<FoundNode> foundNodeSet;
+
+  @Override
+  public void onCreate(int id) {
+    this.processorId = id;
+
+    this.activeLeafNodeCount = 0;
+    this.inactiveLeafNodeCount = 0;
+    this.decisionNodeCount = 0;
+    this.growthAllowed = true;
+
+    this.splittingNodes = new ConcurrentHashMap<>();
+    this.timedOutSplittingNodes = new LinkedBlockingQueue<>();
+    this.splitId = 0;
+
+    // Executor for scheduling time-out threads
+    this.executor = Executors.newScheduledThreadPool(8);
+  }
+
+  @Override
+  public Processor newProcessor(Processor p) {
+    ModelAggregatorProcessor oldProcessor = (ModelAggregatorProcessor) p;
+    ModelAggregatorProcessor newProcessor =
+        new ModelAggregatorProcessor.Builder(oldProcessor).build();
+
+    newProcessor.setResultStream(oldProcessor.resultStream);
+    newProcessor.setAttributeStream(oldProcessor.attributeStream);
+    newProcessor.setControlStream(oldProcessor.controlStream);
+    return newProcessor;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(super.toString());
+
+    sb.append("ActiveLeafNodeCount: ").append(activeLeafNodeCount);
+    sb.append("InactiveLeafNodeCount: ").append(inactiveLeafNodeCount);
+    sb.append("DecisionNodeCount: ").append(decisionNodeCount);
+    sb.append("Growth allowed: ").append(growthAllowed);
+    return sb.toString();
+  }
+
+  void setResultStream(Stream resultStream) {
+    this.resultStream = resultStream;
+  }
+
+  void setAttributeStream(Stream attributeStream) {
+    this.attributeStream = attributeStream;
+  }
+
+  void setControlStream(Stream controlStream) {
+    this.controlStream = controlStream;
+  }
+
+  void sendToAttributeStream(ContentEvent event) {
+    this.attributeStream.put(event);
+  }
+
+  void sendToControlStream(ContentEvent event) {
+    this.controlStream.put(event);
+  }
+
+  /**
+   * Helper method to generate new ResultContentEvent based on an instance and
+   * its prediction result.
+   * 
+   * @param prediction
+   *          The predicted class label from the decision tree model.
+   * @param inEvent
+   *          The associated instance content event
+   * @return ResultContentEvent to be sent into Evaluator PI or other
+   *         destination PI.
+   */
+  private ResultContentEvent newResultContentEvent(double[] prediction, 
InstanceContentEvent inEvent) {
+    ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
+        inEvent.getClassId(), prediction, inEvent.isLastEvent());
+    rce.setClassifierIndex(this.processorId);
+    rce.setEvaluationIndex(inEvent.getEvaluationIndex());
+    return rce;
+  }
+
+  private ResultContentEvent newResultContentEvent(double[] prediction, 
Instance inst, InstancesContentEvent inEvent) {
+    ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inst, (int) inst.classValue(),
+        prediction, inEvent.isLastEvent());
+    rce.setClassifierIndex(this.processorId);
+    rce.setEvaluationIndex(inEvent.getEvaluationIndex());
+    return rce;
+  }
+
+  private List<InstancesContentEvent> contentEventList = new LinkedList<>();
+
+  /**
+   * Helper method to process the InstanceContentEvent
+   * 
+   * @param instContentEvent
+   */
+  private void processInstanceContentEvent(InstancesContentEvent 
instContentEvent) {
+    this.numBatches++;
+    this.contentEventList.add(instContentEvent);
+    if (this.numBatches == 1 || this.numBatches > 4) {
+      this.processInstances(this.contentEventList.remove(0));
+    }
+
+    if (instContentEvent.isLastEvent()) {
+      // drain remaining instances
+      while (!contentEventList.isEmpty()) {
+        processInstances(contentEventList.remove(0));
+      }
+    }
+
+  }
+
+  private int numBatches = 0;
+
+  private void processInstances(InstancesContentEvent instContentEvent) {
+
+    Instance[] instances = instContentEvent.getInstances();
+    boolean isTesting = instContentEvent.isTesting();
+    boolean isTraining = instContentEvent.isTraining();
+    for (Instance inst : instances) {
+      this.processInstance(inst, instContentEvent, isTesting, isTraining);
+    }
+  }
+
+  private void processInstance(Instance inst, InstancesContentEvent 
instContentEvent, boolean isTesting,
+      boolean isTraining) {
+    inst.setDataset(this.dataset);
+    // Check the instance whether it is used for testing or training
+    // boolean testAndTrain = isTraining; //Train after testing
+    double[] prediction = null;
+    if (isTesting) {
+      prediction = getVotesForInstance(inst, false);
+      this.resultStream.put(newResultContentEvent(prediction, inst,
+          instContentEvent));
+    }
+
+    if (isTraining) {
+      trainOnInstanceImpl(inst);
+      if (this.changeDetector != null) {
+        if (prediction == null) {
+          prediction = getVotesForInstance(inst);
         }
-       
-        protected FoundNode[] findNodes() {
-        List<FoundNode> foundList = new LinkedList<>();
-        findNodes(this.treeRoot, null, -1, foundList);
-        return foundList.toArray(new FoundNode[foundList.size()]);
-    }
-
-    protected void findNodes(Node node, SplitNode parent,
-            int parentBranch, List<FoundNode> found) {
-        if (node != null) {
-            found.add(new FoundNode(node, parent, parentBranch));
-            if (node instanceof SplitNode) {
-                SplitNode splitNode = (SplitNode) node;
-                for (int i = 0; i < splitNode.numChildren(); i++) {
-                    findNodes(splitNode.getChild(i), splitNode, i,
-                            found);
-                }
-            }
+        boolean correctlyClassifies = this.correctlyClassifies(inst, 
prediction);
+        double oldEstimation = this.changeDetector.getEstimation();
+        this.changeDetector.input(correctlyClassifies ? 0 : 1);
+        if (this.changeDetector.getEstimation() > oldEstimation) {
+          // Start a new classifier
+          logger.info("Change detected, resetting the classifier");
+          this.resetLearning();
+          this.changeDetector.resetLearning();
         }
+      }
     }
-
-        
-       /**
-        * Helper method to get the prediction result. 
-        * The actual prediction result is delegated to the leaf node.
-        * @param inst
-        * @return
-        */
-       private double[] getVotesForInstance(Instance inst){
-            return getVotesForInstance(inst, false);
+  }
+
+  private boolean correctlyClassifies(Instance inst, double[] prediction) {
+    return maxIndex(prediction) == (int) inst.classValue();
+  }
+
+  private void resetLearning() {
+    this.treeRoot = null;
+    // Remove nodes
+    FoundNode[] learningNodes = findNodes();
+    for (FoundNode learningNode : learningNodes) {
+      Node node = learningNode.getNode();
+      if (node instanceof SplitNode) {
+        SplitNode splitNode;
+        splitNode = (SplitNode) node;
+        for (int i = 0; i < splitNode.numChildren(); i++) {
+          splitNode.setChild(i, null);
         }
-            
-       private double[] getVotesForInstance(Instance inst, boolean isTraining){
-               double[] ret;
-               FoundNode foundNode = null;
-               if(this.treeRoot != null){
-                       foundNode = this.treeRoot.filterInstanceToLeaf(inst, 
null, -1);
-                       Node leafNode = foundNode.getNode();
-                       if(leafNode == null){
-                               leafNode = foundNode.getParent();
-                       }
-                       
-                       ret = leafNode.getClassVotes(inst, this);
-               } else {
-                       int numClasses = this.dataset.numClasses();
-                       ret = new double[numClasses];
-                        
-               }
-                
-    //Training after testing to speed up the process
-               if (isTraining){
-                       if(this.treeRoot == null){
-                               this.treeRoot = 
newLearningNode(this.parallelismHint);
+      }
+    }
+  }
+
+  protected FoundNode[] findNodes() {
+    List<FoundNode> foundList = new LinkedList<>();
+    findNodes(this.treeRoot, null, -1, foundList);
+    return foundList.toArray(new FoundNode[foundList.size()]);
+  }
+
+  protected void findNodes(Node node, SplitNode parent,
+      int parentBranch, List<FoundNode> found) {
+    if (node != null) {
+      found.add(new FoundNode(node, parent, parentBranch));
+      if (node instanceof SplitNode) {
+        SplitNode splitNode = (SplitNode) node;
+        for (int i = 0; i < splitNode.numChildren(); i++) {
+          findNodes(splitNode.getChild(i), splitNode, i,
+              found);
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper method to get the prediction result. The actual prediction result 
is
+   * delegated to the leaf node.
+   * 
+   * @param inst
+   * @return
+   */
+  private double[] getVotesForInstance(Instance inst) {
+    return getVotesForInstance(inst, false);
+  }
+
+  private double[] getVotesForInstance(Instance inst, boolean isTraining) {
+    double[] ret;
+    FoundNode foundNode = null;
+    if (this.treeRoot != null) {
+      foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1);
+      Node leafNode = foundNode.getNode();
+      if (leafNode == null) {
+        leafNode = foundNode.getParent();
+      }
+
+      ret = leafNode.getClassVotes(inst, this);
+    } else {
+      int numClasses = this.dataset.numClasses();
+      ret = new double[numClasses];
+
+    }
+
+    // Training after testing to speed up the process
+    if (isTraining) {
+      if (this.treeRoot == null) {
+        this.treeRoot = newLearningNode(this.parallelismHint);
         this.activeLeafNodeCount = 1;
         foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1);
-                       }
-                       trainOnInstanceImpl(foundNode, inst);
-               }
-               return ret;
-       }
-       
-       /**
-        * Helper method that represent training of an instance. Since it is 
decision tree, 
-        * this method routes the incoming instance into the correct leaf and 
then update the 
-        * statistic on the found leaf. 
-        * @param inst
-        */
-       private void trainOnInstanceImpl(Instance inst) {
-               if(this.treeRoot == null){
-                       this.treeRoot = newLearningNode(this.parallelismHint);
-                       this.activeLeafNodeCount = 1;
-                        
-               }
-               FoundNode foundNode = this.treeRoot.filterInstanceToLeaf(inst, 
null, -1);
-                trainOnInstanceImpl(foundNode, inst);
+      }
+      trainOnInstanceImpl(foundNode, inst);
+    }
+    return ret;
+  }
+
+  /**
+   * Helper method that represent training of an instance. Since it is decision
+   * tree, this method routes the incoming instance into the correct leaf and
+   * then update the statistic on the found leaf.
+   * 
+   * @param inst
+   */
+  private void trainOnInstanceImpl(Instance inst) {
+    if (this.treeRoot == null) {
+      this.treeRoot = newLearningNode(this.parallelismHint);
+      this.activeLeafNodeCount = 1;
+
+    }
+    FoundNode foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1);
+    trainOnInstanceImpl(foundNode, inst);
+  }
+
+  private void trainOnInstanceImpl(FoundNode foundNode, Instance inst) {
+
+    Node leafNode = foundNode.getNode();
+
+    if (leafNode == null) {
+      leafNode = newLearningNode(this.parallelismHint);
+      foundNode.getParent().setChild(foundNode.getParentBranch(), leafNode);
+      activeLeafNodeCount++;
+    }
+
+    if (leafNode instanceof LearningNode) {
+      LearningNode learningNode = (LearningNode) leafNode;
+      learningNode.learnFromInstance(inst, this);
+    }
+    if (this.foundNodeSet == null) {
+      this.foundNodeSet = new HashSet<>();
+    }
+    this.foundNodeSet.add(foundNode);
+  }
+
+  /**
+   * Helper method to represent a split attempt
+   * 
+   * @param activeLearningNode
+   *          The corresponding active learning node which will be split
+   * @param foundNode
+   *          The data structure to represents the filtering of the instance
+   *          using the tree model.
+   */
+  private void attemptToSplit(ActiveLearningNode activeLearningNode, FoundNode 
foundNode) {
+    // Increment the split ID
+    this.splitId++;
+
+    // Schedule time-out thread
+    ScheduledFuture<?> timeOutHandler = this.executor.schedule(new 
AggregationTimeOutHandler(this.splitId,
+        this.timedOutSplittingNodes),
+        this.timeOut, TimeUnit.SECONDS);
+
+    // Keep track of the splitting node information, so that we can continue 
the
+    // split
+    // once we receive all local statistic calculation from Local Statistic PI
+    // this.splittingNodes.put(Long.valueOf(this.splitId), new
+    // SplittingNodeInfo(activeLearningNode, foundNode, null));
+    this.splittingNodes.put(this.splitId, new 
SplittingNodeInfo(activeLearningNode, foundNode, timeOutHandler));
+
+    // Inform Local Statistic PI to perform local statistic calculation
+    activeLearningNode.requestDistributedSuggestions(this.splitId, this);
+  }
+
+  /**
+   * Helper method to continue the attempt to split once all local calculation
+   * results are received.
+   * 
+   * @param activeLearningNode
+   *          The corresponding active learning node which will be split
+   * @param foundNode
+   *          The data structure to represents the filtering of the instance
+   *          using the tree model.
+   */
+  private void continueAttemptToSplit(ActiveLearningNode activeLearningNode, 
FoundNode foundNode) {
+    AttributeSplitSuggestion bestSuggestion = 
activeLearningNode.getDistributedBestSuggestion();
+    AttributeSplitSuggestion secondBestSuggestion = 
activeLearningNode.getDistributedSecondBestSuggestion();
+
+    // compare with null split
+    double[] preSplitDist = activeLearningNode.getObservedClassDistribution();
+    AttributeSplitSuggestion nullSplit = new AttributeSplitSuggestion(null,
+        new double[0][], this.splitCriterion.getMeritOfSplit(
+            preSplitDist,
+            new double[][] { preSplitDist }));
+
+    if ((bestSuggestion == null) || (nullSplit.compareTo(bestSuggestion) > 0)) 
{
+      secondBestSuggestion = bestSuggestion;
+      bestSuggestion = nullSplit;
+    } else {
+      if ((secondBestSuggestion == null) || 
(nullSplit.compareTo(secondBestSuggestion) > 0)) {
+        secondBestSuggestion = nullSplit;
+      }
+    }
+
+    boolean shouldSplit = false;
+
+    if (secondBestSuggestion == null) {
+      shouldSplit = (bestSuggestion != null);
+    } else {
+      double hoeffdingBound = computeHoeffdingBound(
+          
this.splitCriterion.getRangeOfMerit(activeLearningNode.getObservedClassDistribution()),
+          this.splitConfidence,
+          activeLearningNode.getWeightSeen());
+
+      if ((bestSuggestion.merit - secondBestSuggestion.merit > hoeffdingBound)
+          || (hoeffdingBound < tieThreshold)) {
+        shouldSplit = true;
+      }
+      // TODO: add poor attributes removal
+    }
+
+    SplitNode parent = foundNode.getParent();
+    int parentBranch = foundNode.getParentBranch();
+
+    // split if the Hoeffding bound condition is satisfied
+    if (shouldSplit) {
+
+      if (bestSuggestion.splitTest != null) {
+        SplitNode newSplit = new SplitNode(bestSuggestion.splitTest, 
activeLearningNode.getObservedClassDistribution());
+
+        for (int i = 0; i < bestSuggestion.numSplits(); i++) {
+          Node newChild = 
newLearningNode(bestSuggestion.resultingClassDistributionFromSplit(i), 
this.parallelismHint);
+          newSplit.setChild(i, newChild);
         }
-               
-         private void trainOnInstanceImpl(FoundNode foundNode, Instance inst) {
-                
-               Node leafNode = foundNode.getNode();
-               
-               if(leafNode == null){
-                       leafNode = newLearningNode(this.parallelismHint);
-                       
foundNode.getParent().setChild(foundNode.getParentBranch(), leafNode);
-                       activeLeafNodeCount++;
-               }
-               
-               if(leafNode instanceof LearningNode){
-                       LearningNode learningNode = (LearningNode) leafNode;
-                       learningNode.learnFromInstance(inst, this);
-               }
-                                        if (this.foundNodeSet == null){
-                    this.foundNodeSet = new HashSet<>();
-               }
-                this.foundNodeSet.add(foundNode);
-       }
-       
-       /**
-        * Helper method to represent a split attempt
-        * @param activeLearningNode The corresponding active learning node 
which will be split
-        * @param foundNode The data structure to represents the filtering of 
the instance using the
-        * tree model.
-        */
-       private void attemptToSplit(ActiveLearningNode activeLearningNode, 
FoundNode foundNode){
-               //Increment the split ID
-               this.splitId++;
-               
-               //Schedule time-out thread
-               ScheduledFuture<?> timeOutHandler = this.executor.schedule(new 
AggregationTimeOutHandler(this.splitId, this.timedOutSplittingNodes), 
-                               this.timeOut, TimeUnit.SECONDS);
-               
-               //Keep track of the splitting node information, so that we can 
continue the split
-               //once we receive all local statistic calculation from Local 
Statistic PI
-               //this.splittingNodes.put(Long.valueOf(this.splitId), new 
SplittingNodeInfo(activeLearningNode, foundNode, null)); 
-               this.splittingNodes.put(this.splitId, new 
SplittingNodeInfo(activeLearningNode, foundNode, timeOutHandler));
-               
-               //Inform Local Statistic PI to perform local statistic 
calculation
-               activeLearningNode.requestDistributedSuggestions(this.splitId, 
this);
-       }
-       
-       
-       /**
-        * Helper method to continue the attempt to split once all local 
calculation results are received.
-        * @param activeLearningNode The corresponding active learning node 
which will be split
-        * @param foundNode The data structure to represents the filtering of 
the instance using the
-        * tree model.
-        */
-       private void continueAttemptToSplit(ActiveLearningNode 
activeLearningNode, FoundNode foundNode){
-               AttributeSplitSuggestion bestSuggestion = 
activeLearningNode.getDistributedBestSuggestion();
-               AttributeSplitSuggestion secondBestSuggestion = 
activeLearningNode.getDistributedSecondBestSuggestion();
-               
-               //compare with null split
-               double[] preSplitDist = 
activeLearningNode.getObservedClassDistribution();
-               AttributeSplitSuggestion nullSplit = new 
AttributeSplitSuggestion(null,
-                new double[0][], this.splitCriterion.getMeritOfSplit(
-                preSplitDist,
-                new double[][]{preSplitDist}));
-               
-               if((bestSuggestion == null) || 
(nullSplit.compareTo(bestSuggestion) > 0)){
-                       secondBestSuggestion = bestSuggestion;
-                       bestSuggestion = nullSplit;
-               }else{
-                       if((secondBestSuggestion == null) || 
(nullSplit.compareTo(secondBestSuggestion) > 0)){
-                               secondBestSuggestion = nullSplit;
-                       }
-               }
-               
-               boolean shouldSplit = false;
-               
-               if(secondBestSuggestion == null){
-                       shouldSplit = (bestSuggestion != null);
-               }else{
-                       double hoeffdingBound = computeHoeffdingBound(
-                                       
this.splitCriterion.getRangeOfMerit(activeLearningNode.getObservedClassDistribution()),
-                                       this.splitConfidence, 
-                                       activeLearningNode.getWeightSeen());
-                       
-                       if((bestSuggestion.merit - secondBestSuggestion.merit > 
hoeffdingBound) 
-                                       || (hoeffdingBound < tieThreshold)) {
-                               shouldSplit = true;
-                       }
-                       //TODO: add poor attributes removal 
-               }
-
-               SplitNode parent = foundNode.getParent();
-               int parentBranch = foundNode.getParentBranch();
-               
-               //split if the Hoeffding bound condition is satisfied
-               if(shouldSplit){
-
-                       if (bestSuggestion.splitTest != null) {
-                               SplitNode newSplit = new 
SplitNode(bestSuggestion.splitTest, 
activeLearningNode.getObservedClassDistribution());
-
-                               for(int i = 0; i < bestSuggestion.numSplits(); 
i++){
-                                       Node newChild = 
newLearningNode(bestSuggestion.resultingClassDistributionFromSplit(i), 
this.parallelismHint);
-                                       newSplit.setChild(i, newChild);
-                               }
-
-                               this.activeLeafNodeCount--;
-                               this.decisionNodeCount++;
-                               this.activeLeafNodeCount += 
bestSuggestion.numSplits();
-
-                               if(parent == null){
-                                       this.treeRoot = newSplit;
-                               }else{
-                                       parent.setChild(parentBranch, newSplit);
-                               }
-                       }
-                       //TODO: add check on the model's memory size 
-               }
-               
-               //housekeeping
-               activeLearningNode.endSplitting();
-               
activeLearningNode.setWeightSeenAtLastSplitEvaluation(activeLearningNode.getWeightSeen());
-       }
-
-       /**
-        * Helper method to deactivate learning node
-        * @param toDeactivate Active Learning Node that will be deactivated
-        * @param parent Parent of the soon-to-be-deactivated Active 
LearningNode
-        * @param parentBranch the branch index of the node in the parent node
-        */
-       private void deactivateLearningNode(ActiveLearningNode toDeactivate, 
SplitNode parent, int parentBranch){
-               Node newLeaf = new 
InactiveLearningNode(toDeactivate.getObservedClassDistribution());
-               if(parent == null){
-                       this.treeRoot = newLeaf;
-               }else{
-                       parent.setChild(parentBranch, newLeaf);
-               }
-               
-               this.activeLeafNodeCount--;
-               this.inactiveLeafNodeCount++;
-       }
-       
-       
-       private LearningNode newLearningNode(int parallelismHint){
-               return newLearningNode(new double[0], parallelismHint);
-       }
-       
-       private LearningNode newLearningNode(double[] initialClassObservations, 
int parallelismHint){
-               //for VHT optimization, we need to dynamically instantiate the 
appropriate ActiveLearningNode
-               return new ActiveLearningNode(initialClassObservations, 
parallelismHint);
-       }
-               
-       /**
-        * Helper method to set the model context, i.e. how many attributes 
they are and what is the class index
-        * @param ih
-        */
-       private void setModelContext(InstancesHeader ih){
-               //TODO possibly refactored
-        if ((ih != null) && (ih.classIndex() < 0)) {
-            throw new IllegalArgumentException(
-                    "Context for a classifier must include a class to learn");
+
+        this.activeLeafNodeCount--;
+        this.decisionNodeCount++;
+        this.activeLeafNodeCount += bestSuggestion.numSplits();
+
+        if (parent == null) {
+          this.treeRoot = newSplit;
+        } else {
+          parent.setChild(parentBranch, newSplit);
         }
-        //TODO: check flag for checking whether training has started or not
-        
-        //model context is used to describe the model
-               logger.trace("Model context: {}", ih.toString());
-       }
-
-       private static double computeHoeffdingBound(double range, double 
confidence, double n){
-               return Math.sqrt((Math.pow(range, 2.0) * 
Math.log(1.0/confidence)) / (2.0*n));
-       }
-
-       /**
-        * AggregationTimeOutHandler is a class to support time-out feature 
while waiting for local computation results
-        * from the local statistic PIs.
-        * @author Arinto Murdopo
-        *
-        */
-       static class AggregationTimeOutHandler implements Runnable{
-               
-               private static final Logger logger = 
LoggerFactory.getLogger(AggregationTimeOutHandler.class);
-               private final Long splitId;
-               private final BlockingQueue<Long> toBeSplittedNodes;
-               
-               AggregationTimeOutHandler(Long splitId, BlockingQueue<Long> 
toBeSplittedNodes){
-                       this.splitId = splitId;
-                       this.toBeSplittedNodes = toBeSplittedNodes;
-               }
-
-               @Override
-               public void run() {
-                       logger.debug("Time out is reached. 
AggregationTimeOutHandler is started.");
-                       try {
-                               toBeSplittedNodes.put(splitId);
-                       } catch (InterruptedException e) {
-                               logger.warn("Interrupted while trying to put 
the ID into the queue");
-                       }
-                       logger.debug("AggregationTimeOutHandler is finished.");
-               }
-       }
-       
-       /**
-        * SplittingNodeInfo is a class to represents the ActiveLearningNode 
that is splitting
-        * @author Arinto Murdopo
-        *
-        */
-       static class SplittingNodeInfo{
-               
-               private final ActiveLearningNode activeLearningNode;
-               private final FoundNode foundNode;
-               private final ScheduledFuture<?> scheduledFuture;
-               
-               SplittingNodeInfo(ActiveLearningNode activeLearningNode, 
FoundNode foundNode, ScheduledFuture<?> scheduledFuture){
-                       this.activeLearningNode = activeLearningNode;
-                       this.foundNode = foundNode;
-                       this.scheduledFuture = scheduledFuture;
-               }
-       }
-        
-               protected ChangeDetector changeDetector;
-
-               public ChangeDetector getChangeDetector() {
-                               return this.changeDetector;
-               }
-
-               public void setChangeDetector(ChangeDetector cd) {
-                               this.changeDetector = cd;
-               }
-       
-       /**
-        * Builder class to replace constructors with many parameters
-        * @author Arinto Murdopo
-        *
-        */
-       static class Builder{
-               
-               //required parameters
-               private final Instances dataset;
-               
-               //default values
-               private SplitCriterion splitCriterion = new 
InfoGainSplitCriterion();
-               private double splitConfidence = 0.0000001;
-               private double tieThreshold = 0.05;
-               private int gracePeriod = 200;
-               private int parallelismHint = 1;
-               private long timeOut = 30;
-               private ChangeDetector changeDetector = null;
-
-               Builder(Instances dataset){
-                       this.dataset = dataset;
-               }
-               
-               Builder(ModelAggregatorProcessor oldProcessor){
-                       this.dataset = oldProcessor.dataset;
-                       this.splitCriterion = oldProcessor.splitCriterion;
-                       this.splitConfidence = oldProcessor.splitConfidence;
-                       this.tieThreshold = oldProcessor.tieThreshold;
-                       this.gracePeriod = oldProcessor.gracePeriod;
-                       this.parallelismHint = oldProcessor.parallelismHint;
-                       this.timeOut = oldProcessor.timeOut;
-               }
-               
-               Builder splitCriterion(SplitCriterion splitCriterion){
-                       this.splitCriterion = splitCriterion;
-                       return this;
-               }
-               
-               Builder splitConfidence(double splitConfidence){
-                       this.splitConfidence = splitConfidence;
-                       return this;
-               }
-               
-               Builder tieThreshold(double tieThreshold){
-                       this.tieThreshold = tieThreshold;
-                       return this;
-               }
-               
-               Builder gracePeriod(int gracePeriod){
-                       this.gracePeriod = gracePeriod;
-                       return this;
-               }
-               
-               Builder parallelismHint(int parallelismHint){
-                       this.parallelismHint = parallelismHint;
-                       return this;
-               }
-               
-               Builder timeOut(long timeOut){
-                       this.timeOut = timeOut;
-                       return this;
-               }
-               
-               Builder changeDetector(ChangeDetector changeDetector){
-                       this.changeDetector = changeDetector;
-                       return this;
-               }
-               ModelAggregatorProcessor build(){
-                       return new ModelAggregatorProcessor(this);
-               }
-       }
-       
+      }
+      // TODO: add check on the model's memory size
+    }
+
+    // housekeeping
+    activeLearningNode.endSplitting();
+    
activeLearningNode.setWeightSeenAtLastSplitEvaluation(activeLearningNode.getWeightSeen());
+  }
+
+  /**
+   * Helper method to deactivate learning node
+   * 
+   * @param toDeactivate
+   *          Active Learning Node that will be deactivated
+   * @param parent
+   *          Parent of the soon-to-be-deactivated Active LearningNode
+   * @param parentBranch
+   *          the branch index of the node in the parent node
+   */
+  private void deactivateLearningNode(ActiveLearningNode toDeactivate, 
SplitNode parent, int parentBranch) {
+    Node newLeaf = new 
InactiveLearningNode(toDeactivate.getObservedClassDistribution());
+    if (parent == null) {
+      this.treeRoot = newLeaf;
+    } else {
+      parent.setChild(parentBranch, newLeaf);
+    }
+
+    this.activeLeafNodeCount--;
+    this.inactiveLeafNodeCount++;
+  }
+
+  private LearningNode newLearningNode(int parallelismHint) {
+    return newLearningNode(new double[0], parallelismHint);
+  }
+
+  private LearningNode newLearningNode(double[] initialClassObservations, int 
parallelismHint) {
+    // for VHT optimization, we need to dynamically instantiate the appropriate
+    // ActiveLearningNode
+    return new ActiveLearningNode(initialClassObservations, parallelismHint);
+  }
+
+  /**
+   * Helper method to set the model context, i.e. how many attributes they are
+   * and what is the class index
+   * 
+   * @param ih
+   */
+  private void setModelContext(InstancesHeader ih) {
+    // TODO possibly refactored
+    if ((ih != null) && (ih.classIndex() < 0)) {
+      throw new IllegalArgumentException(
+          "Context for a classifier must include a class to learn");
+    }
+    // TODO: check flag for checking whether training has started or not
+
+    // model context is used to describe the model
+    logger.trace("Model context: {}", ih.toString());
+  }
+
+  private static double computeHoeffdingBound(double range, double confidence, 
double n) {
+    return Math.sqrt((Math.pow(range, 2.0) * Math.log(1.0 / confidence)) / 
(2.0 * n));
+  }
+
+  /**
+   * AggregationTimeOutHandler is a class to support time-out feature while
+   * waiting for local computation results from the local statistic PIs.
+   * 
+   * @author Arinto Murdopo
+   * 
+   */
+  static class AggregationTimeOutHandler implements Runnable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(AggregationTimeOutHandler.class);
+    private final Long splitId;
+    private final BlockingQueue<Long> toBeSplittedNodes;
+
+    AggregationTimeOutHandler(Long splitId, BlockingQueue<Long> 
toBeSplittedNodes) {
+      this.splitId = splitId;
+      this.toBeSplittedNodes = toBeSplittedNodes;
+    }
+
+    @Override
+    public void run() {
+      logger.debug("Time out is reached. AggregationTimeOutHandler is 
started.");
+      try {
+        toBeSplittedNodes.put(splitId);
+      } catch (InterruptedException e) {
+        logger.warn("Interrupted while trying to put the ID into the queue");
+      }
+      logger.debug("AggregationTimeOutHandler is finished.");
+    }
+  }
+
+  /**
+   * SplittingNodeInfo is a class to represents the ActiveLearningNode that is
+   * splitting
+   * 
+   * @author Arinto Murdopo
+   * 
+   */
+  static class SplittingNodeInfo {
+
+    private final ActiveLearningNode activeLearningNode;
+    private final FoundNode foundNode;
+    private final ScheduledFuture<?> scheduledFuture;
+
+    SplittingNodeInfo(ActiveLearningNode activeLearningNode, FoundNode 
foundNode, ScheduledFuture<?> scheduledFuture) {
+      this.activeLearningNode = activeLearningNode;
+      this.foundNode = foundNode;
+      this.scheduledFuture = scheduledFuture;
+    }
+  }
+
+  protected ChangeDetector changeDetector;
+
+  public ChangeDetector getChangeDetector() {
+    return this.changeDetector;
+  }
+
+  public void setChangeDetector(ChangeDetector cd) {
+    this.changeDetector = cd;
+  }
+
+  /**
+   * Builder class to replace constructors with many parameters
+   * 
+   * @author Arinto Murdopo
+   * 
+   */
+  static class Builder {
+
+    // required parameters
+    private final Instances dataset;
+
+    // default values
+    private SplitCriterion splitCriterion = new InfoGainSplitCriterion();
+    private double splitConfidence = 0.0000001;
+    private double tieThreshold = 0.05;
+    private int gracePeriod = 200;
+    private int parallelismHint = 1;
+    private long timeOut = 30;
+    private ChangeDetector changeDetector = null;
+
+    Builder(Instances dataset) {
+      this.dataset = dataset;
+    }
+
+    Builder(ModelAggregatorProcessor oldProcessor) {
+      this.dataset = oldProcessor.dataset;
+      this.splitCriterion = oldProcessor.splitCriterion;
+      this.splitConfidence = oldProcessor.splitConfidence;
+      this.tieThreshold = oldProcessor.tieThreshold;
+      this.gracePeriod = oldProcessor.gracePeriod;
+      this.parallelismHint = oldProcessor.parallelismHint;
+      this.timeOut = oldProcessor.timeOut;
+    }
+
+    Builder splitCriterion(SplitCriterion splitCriterion) {
+      this.splitCriterion = splitCriterion;
+      return this;
+    }
+
+    Builder splitConfidence(double splitConfidence) {
+      this.splitConfidence = splitConfidence;
+      return this;
+    }
+
+    Builder tieThreshold(double tieThreshold) {
+      this.tieThreshold = tieThreshold;
+      return this;
+    }
+
+    Builder gracePeriod(int gracePeriod) {
+      this.gracePeriod = gracePeriod;
+      return this;
+    }
+
+    Builder parallelismHint(int parallelismHint) {
+      this.parallelismHint = parallelismHint;
+      return this;
+    }
+
+    Builder timeOut(long timeOut) {
+      this.timeOut = timeOut;
+      return this;
+    }
+
+    Builder changeDetector(ChangeDetector changeDetector) {
+      this.changeDetector = changeDetector;
+      return this;
+    }
+
+    ModelAggregatorProcessor build() {
+      return new ModelAggregatorProcessor(this);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java
index ff9bf5f..22e551f 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java
@@ -25,70 +25,83 @@ import com.yahoo.labs.samoa.instances.Instance;
 
 /**
  * Abstract class that represents a node in the tree model.
+ * 
  * @author Arinto Murdopo
- *
+ * 
  */
-abstract class Node implements java.io.Serializable{
-       
-       private static final long serialVersionUID = 4008521239214180548L;
-       
-       protected final DoubleVector observedClassDistribution; 
-       
-       /**
-        * Method to route/filter an instance into its corresponding leaf. This 
method will be
-        * invoked recursively. 
-        * @param inst Instance to be routed
-        * @param parent Parent of the current node
-        * @param parentBranch The index of the current node in the parent
-        * @return FoundNode which is the data structure to represent the 
resulting leaf.
-        */
-       abstract FoundNode filterInstanceToLeaf(Instance inst, SplitNode 
parent, int parentBranch);
-       
-       /**
-        * Method to return the predicted class of the instance based on the 
statistic 
-        * inside the node.
-        *  
-        * @param inst To-be-predicted instance
-        * @param map ModelAggregatorProcessor
-        * @return The prediction result in the form of class distribution
-        */
-       abstract double[] getClassVotes(Instance inst, ModelAggregatorProcessor 
map);
-       
-       /**
-        * Method to check whether the node is a leaf node or not.
-        * @return Boolean flag to indicate whether the node is a leaf or not
-        */
-       abstract boolean isLeaf();
-       
-       
-       /**
-        * Constructor of the tree node
-        * @param classObservation distribution of the observed classes.
-        */
-       protected Node(double[] classObservation){
-               this.observedClassDistribution = new 
DoubleVector(classObservation);
-       }
-       
-       /**
-        * Getter method for the class distribution
-        * @return Observed class distribution
-        */
-       protected double[] getObservedClassDistribution() {
-               return this.observedClassDistribution.getArrayCopy();
-       }
-               
-       /**
-        * A method to check whether the class distribution only consists of 
one class or not.
-        * @return Flag whether class distribution is pure or not.
-        */
-       protected boolean observedClassDistributionIsPure(){
-               return (observedClassDistribution.numNonZeroEntries() < 2);
-       }
-       
-       protected void describeSubtree(ModelAggregatorProcessor modelAggrProc, 
StringBuilder out, int indent){
-               //TODO: implement method to gracefully define the tree
-       }
-       
-       //TODO: calculate promise for limiting the model based on the memory 
size
-       //double calculatePromise();
+abstract class Node implements java.io.Serializable {
+
+  private static final long serialVersionUID = 4008521239214180548L;
+
+  protected final DoubleVector observedClassDistribution;
+
+  /**
+   * Method to route/filter an instance into its corresponding leaf. This 
method
+   * will be invoked recursively.
+   * 
+   * @param inst
+   *          Instance to be routed
+   * @param parent
+   *          Parent of the current node
+   * @param parentBranch
+   *          The index of the current node in the parent
+   * @return FoundNode which is the data structure to represent the resulting
+   *         leaf.
+   */
+  abstract FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int 
parentBranch);
+
+  /**
+   * Method to return the predicted class of the instance based on the 
statistic
+   * inside the node.
+   * 
+   * @param inst
+   *          To-be-predicted instance
+   * @param map
+   *          ModelAggregatorProcessor
+   * @return The prediction result in the form of class distribution
+   */
+  abstract double[] getClassVotes(Instance inst, ModelAggregatorProcessor map);
+
+  /**
+   * Method to check whether the node is a leaf node or not.
+   * 
+   * @return Boolean flag to indicate whether the node is a leaf or not
+   */
+  abstract boolean isLeaf();
+
+  /**
+   * Constructor of the tree node
+   * 
+   * @param classObservation
+   *          distribution of the observed classes.
+   */
+  protected Node(double[] classObservation) {
+    this.observedClassDistribution = new DoubleVector(classObservation);
+  }
+
+  /**
+   * Getter method for the class distribution
+   * 
+   * @return Observed class distribution
+   */
+  protected double[] getObservedClassDistribution() {
+    return this.observedClassDistribution.getArrayCopy();
+  }
+
+  /**
+   * A method to check whether the class distribution only consists of one 
class
+   * or not.
+   * 
+   * @return Flag whether class distribution is pure or not.
+   */
+  protected boolean observedClassDistributionIsPure() {
+    return (observedClassDistribution.numNonZeroEntries() < 2);
+  }
+
+  protected void describeSubtree(ModelAggregatorProcessor modelAggrProc, 
StringBuilder out, int indent) {
+    // TODO: implement method to gracefully define the tree
+  }
+
+  // TODO: calculate promise for limiting the model based on the memory size
+  // double calculatePromise();
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java
index fd93db1..7c6b434 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java
@@ -25,84 +25,94 @@ import com.yahoo.labs.samoa.moa.core.AutoExpandVector;
 import com.yahoo.labs.samoa.instances.Instance;
 
 /**
- * SplitNode represents the node that contains one or more questions in the 
decision tree model,
- * in order to route the instances into the correct leaf.
+ * SplitNode represents the node that contains one or more questions in the
+ * decision tree model, in order to route the instances into the correct leaf.
+ * 
  * @author Arinto Murdopo
- *
+ * 
  */
 public class SplitNode extends Node {
-       
-       private static final long serialVersionUID = -7380795529928485792L;
-       
-       private final AutoExpandVector<Node> children;
-       protected final InstanceConditionalTest splitTest;
-       
-       public SplitNode(InstanceConditionalTest splitTest, 
-                       double[] classObservation) {
-               super(classObservation);
-               this.children = new AutoExpandVector<>();
-               this.splitTest = splitTest;
-       }
 
-       @Override
-       FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int 
parentBranch) {
-               int childIndex = instanceChildIndex(inst);
-               if(childIndex >= 0){
-                       Node child = getChild(childIndex);
-                       if(child != null){
-                               return child.filterInstanceToLeaf(inst, this, 
childIndex);
-                       }
-                       return new FoundNode(null, this, childIndex);
-               }
-               return new FoundNode(this, parent, parentBranch);
-       }
+  private static final long serialVersionUID = -7380795529928485792L;
+
+  private final AutoExpandVector<Node> children;
+  protected final InstanceConditionalTest splitTest;
+
+  public SplitNode(InstanceConditionalTest splitTest,
+      double[] classObservation) {
+    super(classObservation);
+    this.children = new AutoExpandVector<>();
+    this.splitTest = splitTest;
+  }
+
+  @Override
+  FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int 
parentBranch) {
+    int childIndex = instanceChildIndex(inst);
+    if (childIndex >= 0) {
+      Node child = getChild(childIndex);
+      if (child != null) {
+        return child.filterInstanceToLeaf(inst, this, childIndex);
+      }
+      return new FoundNode(null, this, childIndex);
+    }
+    return new FoundNode(this, parent, parentBranch);
+  }
+
+  @Override
+  boolean isLeaf() {
+    return false;
+  }
+
+  @Override
+  double[] getClassVotes(Instance inst, ModelAggregatorProcessor vht) {
+    return this.observedClassDistribution.getArrayCopy();
+  }
+
+  /**
+   * Method to return the number of children of this split node
+   * 
+   * @return number of children
+   */
+  int numChildren() {
+    return this.children.size();
+  }
+
+  /**
+   * Method to set the children in a specific index of the SplitNode with the
+   * appropriate child
+   * 
+   * @param index
+   *          Index of the child in the SplitNode
+   * @param child
+   *          The child node
+   */
+  void setChild(int index, Node child) {
+    if ((this.splitTest.maxBranches() >= 0)
+        && (index >= this.splitTest.maxBranches())) {
+      throw new IndexOutOfBoundsException();
+    }
+    this.children.set(index, child);
+  }
+
+  /**
+   * Method to get the child node given the index
+   * 
+   * @param index
+   *          The child node index
+   * @return The child node in the given index
+   */
+  Node getChild(int index) {
+    return this.children.get(index);
+  }
 
-       @Override
-       boolean isLeaf() {
-               return false;
-       }
-       
-       @Override
-       double[] getClassVotes(Instance inst, ModelAggregatorProcessor vht) {
-               return this.observedClassDistribution.getArrayCopy();
-       }               
-       
-       /**
-        * Method to return the number of children of this split node
-        * @return number of children
-        */
-       int numChildren(){
-               return this.children.size();
-       }
-       
-       /**
-        * Method to set the children in a specific index of the SplitNode with 
the appropriate child
-        * @param index Index of the child in the SplitNode
-        * @param child The child node
-        */
-       void setChild(int index, Node child){
-        if ((this.splitTest.maxBranches() >= 0)
-                && (index >= this.splitTest.maxBranches())) {
-            throw new IndexOutOfBoundsException();
-        }
-        this.children.set(index, child);
-       }
-       
-       /**
-        * Method to get the child node given the index
-        * @param index The child node index
-        * @return The child node in the given index
-        */
-       Node getChild(int index){
-               return this.children.get(index);
-       }
-       
-       /**
-        * Method to route the instance using this split node
-        * @param inst The routed instance
-        * @return The index of the branch where the instance is routed
-        */
-       int instanceChildIndex(Instance inst){
-               return this.splitTest.branchForInstance(inst);
-       }
+  /**
+   * Method to route the instance using this split node
+   * 
+   * @param inst
+   *          The routed instance
+   * @return The index of the branch where the instance is routed
+   */
+  int instanceChildIndex(Instance inst) {
+    return this.splitTest.branchForInstance(inst);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java
index e8ccce7..990a15b 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java
@@ -46,7 +46,7 @@ import com.yahoo.labs.samoa.topology.TopologyBuilder;
  * Vertical Hoeffding Tree (VHT) classifier is a distributed classifier that
  * utilizes vertical parallelism on top of Very Fast Decision Tree (VFDT)
  * classifier.
- *
+ * 
  * @author Arinto Murdopo
  */
 public final class VerticalHoeffdingTree implements ClassificationLearner, 
AdaptiveLearner, Configurable {
@@ -110,7 +110,6 @@ public final class VerticalHoeffdingTree implements 
ClassificationLearner, Adapt
     Stream filterStream = topologyBuilder.createStream(filterProc);
     this.filterProc.setOutputStream(filterStream);
 
-
     ModelAggregatorProcessor modelAggrProc = new 
ModelAggregatorProcessor.Builder(dataset)
         .splitCriterion((SplitCriterion) this.splitCriterionOption.getValue())
         .splitConfidence(splitConfidenceOption.getValue())
@@ -175,7 +174,7 @@ public final class VerticalHoeffdingTree implements 
ClassificationLearner, Adapt
 
   static class LearningNodeIdGenerator {
 
-    //TODO: add code to warn user of when value reaches Long.MAX_VALUES
+    // TODO: add code to warn user of when value reaches Long.MAX_VALUES
     private static long id = 0;
 
     static synchronized long generate() {

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java
index a0d950b..4b7b1f6 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java
@@ -30,60 +30,61 @@ import com.yahoo.labs.samoa.instances.Instance;
 @Immutable
 final public class ClusteringContentEvent implements ContentEvent {
 
-    private static final long serialVersionUID = -7746983521296618922L;
-    private Instance instance;
-    private boolean isLast = false;
-    private String key;
-    private boolean isSample;
+  private static final long serialVersionUID = -7746983521296618922L;
+  private Instance instance;
+  private boolean isLast = false;
+  private String key;
+  private boolean isSample;
 
-    public ClusteringContentEvent() {
-        // Necessary for kryo serializer
-    }
+  public ClusteringContentEvent() {
+    // Necessary for kryo serializer
+  }
 
-    /**
-     * Instantiates a new clustering event.
-     * 
-     * @param index
-     *            the index
-     * @param instance
-     *            the instance
+  /**
+   * Instantiates a new clustering event.
+   * 
+   * @param index
+   *          the index
+   * @param instance
+   *          the instance
+   */
+  public ClusteringContentEvent(long index, Instance instance) {
+    /*
+     * if (instance != null) { this.instance = new
+     * SerializableInstance(instance); }
      */
-    public ClusteringContentEvent(long index, Instance instance) {
-        /*
-         * if (instance != null) { this.instance = new 
SerializableInstance(instance); }
-         */
-        this.instance = instance;
-        this.setKey(Long.toString(index));
-    }
+    this.instance = instance;
+    this.setKey(Long.toString(index));
+  }
 
-    @Override
-    public String getKey() {
-        return this.key;
-    }
+  @Override
+  public String getKey() {
+    return this.key;
+  }
 
-    @Override
-    public void setKey(String str) {
-        this.key = str;
-    }
+  @Override
+  public void setKey(String str) {
+    this.key = str;
+  }
 
-    @Override
-    public boolean isLastEvent() {
-        return this.isLast;
-    }
+  @Override
+  public boolean isLastEvent() {
+    return this.isLast;
+  }
 
-    public void setLast(boolean isLast) {
-        this.isLast = isLast;
-    }
+  public void setLast(boolean isLast) {
+    this.isLast = isLast;
+  }
 
-    public Instance getInstance() {
-        return this.instance;
-    }
+  public Instance getInstance() {
+    return this.instance;
+  }
 
-    public boolean isSample() {
-        return isSample;
-    }
+  public boolean isSample() {
+    return isSample;
+  }
 
-    public void setSample(boolean b) {
-        this.isSample = b;
-    }
+  public void setSample(boolean b) {
+    this.isSample = b;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java
index 057e37b..829d448 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java
@@ -31,136 +31,142 @@ import com.yahoo.labs.samoa.moa.cluster.Clustering;
 import com.yahoo.labs.samoa.moa.clusterers.clustream.Clustream;
 
 /**
- *
- *  Base class for adapting Clustream clusterer.
- *
+ * 
+ * Base class for adapting Clustream clusterer.
+ * 
  */
 public class ClustreamClustererAdapter implements LocalClustererAdapter, 
Configurable {
 
-    /**
+  /**
      *
      */
-    private static final long serialVersionUID = 4372366401338704353L;
-    
-    public ClassOption learnerOption = new ClassOption("learner", 'l',
-            "Clusterer to train.", 
com.yahoo.labs.samoa.moa.clusterers.Clusterer.class, Clustream.class.getName());
-    /**
-     * The learner.
-     */
-    protected com.yahoo.labs.samoa.moa.clusterers.Clusterer learner;
-    
-    /**
-     * The is init.
-     */
-    protected Boolean isInit;
-    
-    /**
-     * The dataset.
-     */
-    protected Instances dataset;
+  private static final long serialVersionUID = 4372366401338704353L;
 
-    @Override
-    public void setDataset(Instances dataset) {
-        this.dataset = dataset;
-    }
+  public ClassOption learnerOption = new ClassOption("learner", 'l',
+      "Clusterer to train.", 
com.yahoo.labs.samoa.moa.clusterers.Clusterer.class, Clustream.class.getName());
+  /**
+   * The learner.
+   */
+  protected com.yahoo.labs.samoa.moa.clusterers.Clusterer learner;
 
-    /**
-     * Instantiates a new learner.
-     *
-     * @param learner the learner
-     * @param dataset the dataset
-     */
-    public 
ClustreamClustererAdapter(com.yahoo.labs.samoa.moa.clusterers.Clusterer 
learner, Instances dataset) {
-        this.learner = learner.copy();
-        this.isInit = false;
-        this.dataset = dataset;
-    }
+  /**
+   * The is init.
+   */
+  protected Boolean isInit;
 
-    /**
-     * Instantiates a new learner.
-     *
-     * @param learner the learner
-     * @param dataset the dataset
-     */
-    public ClustreamClustererAdapter() {
-        this.learner = ((com.yahoo.labs.samoa.moa.clusterers.Clusterer) 
this.learnerOption.getValue()).copy();
-        this.isInit = false;
-        //this.dataset = dataset;
-    }
+  /**
+   * The dataset.
+   */
+  protected Instances dataset;
 
-    /**
-     * Creates a new learner object.
-     *
-     * @return the learner
-     */
-    @Override
-    public ClustreamClustererAdapter create() {
-        ClustreamClustererAdapter l = new ClustreamClustererAdapter(learner, 
dataset);
-        if (dataset == null) {
-            System.out.println("dataset null while creating");
-        }
-        return l;
-    }
+  @Override
+  public void setDataset(Instances dataset) {
+    this.dataset = dataset;
+  }
 
-    /**
-     * Trains this classifier incrementally using the given instance.
-     *
-     * @param inst the instance to be used for training
-     */
-    @Override
-    public void trainOnInstance(Instance inst) {
-        if (this.isInit == false) {
-            this.isInit = true;
-            InstancesHeader instances = new InstancesHeader(dataset);
-            this.learner.setModelContext(instances);
-            this.learner.prepareForUse();
-        }
-        if (inst.weight() > 0) {
-            inst.setDataset(dataset);
-            learner.trainOnInstance(inst);
-        }
-    }
+  /**
+   * Instantiates a new learner.
+   * 
+   * @param learner
+   *          the learner
+   * @param dataset
+   *          the dataset
+   */
+  public 
ClustreamClustererAdapter(com.yahoo.labs.samoa.moa.clusterers.Clusterer 
learner, Instances dataset) {
+    this.learner = learner.copy();
+    this.isInit = false;
+    this.dataset = dataset;
+  }
 
-    /**
-     * Predicts the class memberships for a given instance. If an instance is
-     * unclassified, the returned array elements must be all zero.
-     *
-     * @param inst the instance to be classified
-     * @return an array containing the estimated membership probabilities of 
the
-     * test instance in each class
-     */
-    @Override
-    public double[] getVotesForInstance(Instance inst) {
-        double[] ret;
-        inst.setDataset(dataset);
-        if (this.isInit == false) {
-           ret = new double[dataset.numClasses()];
-        } else {
-            ret = learner.getVotesForInstance(inst);
-        }
-        return ret;
-    }
+  /**
+   * Instantiates a new learner.
+   * 
+   * @param learner
+   *          the learner
+   * @param dataset
+   *          the dataset
+   */
+  public ClustreamClustererAdapter() {
+    this.learner = ((com.yahoo.labs.samoa.moa.clusterers.Clusterer) 
this.learnerOption.getValue()).copy();
+    this.isInit = false;
+    // this.dataset = dataset;
+  }
 
-    /**
-     * Resets this classifier. It must be similar to starting a new classifier
-     * from scratch.
-     *
-     */
-    @Override
-    public void resetLearning() {
-        learner.resetLearning();
+  /**
+   * Creates a new learner object.
+   * 
+   * @return the learner
+   */
+  @Override
+  public ClustreamClustererAdapter create() {
+    ClustreamClustererAdapter l = new ClustreamClustererAdapter(learner, 
dataset);
+    if (dataset == null) {
+      System.out.println("dataset null while creating");
     }
+    return l;
+  }
 
-    public boolean implementsMicroClusterer() {
-        return this.learner.implementsMicroClusterer();
+  /**
+   * Trains this classifier incrementally using the given instance.
+   * 
+   * @param inst
+   *          the instance to be used for training
+   */
+  @Override
+  public void trainOnInstance(Instance inst) {
+    if (this.isInit == false) {
+      this.isInit = true;
+      InstancesHeader instances = new InstancesHeader(dataset);
+      this.learner.setModelContext(instances);
+      this.learner.prepareForUse();
     }
-
-    public Clustering getMicroClusteringResult() {
-        return this.learner.getMicroClusteringResult();
+    if (inst.weight() > 0) {
+      inst.setDataset(dataset);
+      learner.trainOnInstance(inst);
     }
+  }
 
-    public Instances getDataset() {
-        return this.dataset;
+  /**
+   * Predicts the class memberships for a given instance. If an instance is
+   * unclassified, the returned array elements must be all zero.
+   * 
+   * @param inst
+   *          the instance to be classified
+   * @return an array containing the estimated membership probabilities of the
+   *         test instance in each class
+   */
+  @Override
+  public double[] getVotesForInstance(Instance inst) {
+    double[] ret;
+    inst.setDataset(dataset);
+    if (this.isInit == false) {
+      ret = new double[dataset.numClasses()];
+    } else {
+      ret = learner.getVotesForInstance(inst);
     }
+    return ret;
+  }
+
+  /**
+   * Resets this classifier. It must be similar to starting a new classifier
+   * from scratch.
+   * 
+   */
+  @Override
+  public void resetLearning() {
+    learner.resetLearning();
+  }
+
+  public boolean implementsMicroClusterer() {
+    return this.learner.implementsMicroClusterer();
+  }
+
+  public Clustering getMicroClusteringResult() {
+    return this.learner.getMicroClusteringResult();
+  }
+
+  public Instances getDataset() {
+    return this.dataset;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java
index fedbcfe..4e4cd6e 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java
@@ -31,52 +31,52 @@ import java.io.Serializable;
  * @author abifet
  */
 public interface LocalClustererAdapter extends Serializable {
-    
-    /**
-     * Creates a new learner object.
-     *
-     * @return the learner
-     */
-    LocalClustererAdapter create();
 
-    /**
-     * Predicts the class memberships for a given instance. If an instance is
-     * unclassified, the returned array elements must be all zero.
-     *
-     * @param inst
-     *            the instance to be classified
-     * @return an array containing the estimated membership probabilities of 
the
-     *         test instance in each class
-     */
-    double[] getVotesForInstance(Instance inst);
+  /**
+   * Creates a new learner object.
+   * 
+   * @return the learner
+   */
+  LocalClustererAdapter create();
 
-    /**
-     * Resets this classifier. It must be similar to starting a new classifier
-     * from scratch.
-     *
-     */
-    void resetLearning();
+  /**
+   * Predicts the class memberships for a given instance. If an instance is
+   * unclassified, the returned array elements must be all zero.
+   * 
+   * @param inst
+   *          the instance to be classified
+   * @return an array containing the estimated membership probabilities of the
+   *         test instance in each class
+   */
+  double[] getVotesForInstance(Instance inst);
 
-    /**
-     * Trains this classifier incrementally using the given instance.
-     *
-     * @param inst
-     *            the instance to be used for training
-     */
-    void trainOnInstance(Instance inst);
-    
-     /**
-     * Sets where to obtain the information of attributes of Instances
-     *
-     * @param dataset
-     *            the dataset that contains the information
-     */
-    public void setDataset(Instances dataset);
-    
-    public Instances getDataset();
+  /**
+   * Resets this classifier. It must be similar to starting a new classifier
+   * from scratch.
+   * 
+   */
+  void resetLearning();
 
-    public boolean implementsMicroClusterer();    
+  /**
+   * Trains this classifier incrementally using the given instance.
+   * 
+   * @param inst
+   *          the instance to be used for training
+   */
+  void trainOnInstance(Instance inst);
+
+  /**
+   * Sets where to obtain the information of attributes of Instances
+   * 
+   * @param dataset
+   *          the dataset that contains the information
+   */
+  public void setDataset(Instances dataset);
+
+  public Instances getDataset();
+
+  public boolean implementsMicroClusterer();
+
+  public Clustering getMicroClusteringResult();
 
-    public Clustering getMicroClusteringResult();
-    
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java
index a397539..4688ba2 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java
@@ -33,6 +33,7 @@ import com.yahoo.labs.samoa.moa.core.DataPoint;
 import com.yahoo.labs.samoa.topology.Stream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 //import weka.core.Instance;
 
 /**
@@ -40,152 +41,160 @@ import org.slf4j.LoggerFactory;
  */
 final public class LocalClustererProcessor implements Processor {
 
-    /**
-     *
-     */
-    private static final long serialVersionUID = -1577910988699148691L;
-    private static final Logger logger = LoggerFactory
-            .getLogger(LocalClustererProcessor.class);
-    private LocalClustererAdapter model;
-    private Stream outputStream;
-    private int modelId;
-    private long instancesCount = 0;
-    private long sampleFrequency = 1000;
-
-    public long getSampleFrequency() {
-        return sampleFrequency;
-    }
-
-    public void setSampleFrequency(long sampleFrequency) {
-        this.sampleFrequency = sampleFrequency;
-    }
-
-    /**
-     * Sets the learner.
-     *
-     * @param model the model to set
-     */
-    public void setLearner(LocalClustererAdapter model) {
-        this.model = model;
-    }
-
-    /**
-     * Gets the learner.
-     *
-     * @return the model
-     */
-    public LocalClustererAdapter getLearner() {
-        return model;
-    }
-
-    /**
-     * Set the output streams.
+  /**
      *
-     * @param outputStream the new output stream {@link PredictionCombinerPE}.
      */
-    public void setOutputStream(Stream outputStream) {
-
-        this.outputStream = outputStream;
+  private static final long serialVersionUID = -1577910988699148691L;
+  private static final Logger logger = LoggerFactory
+      .getLogger(LocalClustererProcessor.class);
+  private LocalClustererAdapter model;
+  private Stream outputStream;
+  private int modelId;
+  private long instancesCount = 0;
+  private long sampleFrequency = 1000;
+
+  public long getSampleFrequency() {
+    return sampleFrequency;
+  }
+
+  public void setSampleFrequency(long sampleFrequency) {
+    this.sampleFrequency = sampleFrequency;
+  }
+
+  /**
+   * Sets the learner.
+   * 
+   * @param model
+   *          the model to set
+   */
+  public void setLearner(LocalClustererAdapter model) {
+    this.model = model;
+  }
+
+  /**
+   * Gets the learner.
+   * 
+   * @return the model
+   */
+  public LocalClustererAdapter getLearner() {
+    return model;
+  }
+
+  /**
+   * Set the output streams.
+   * 
+   * @param outputStream
+   *          the new output stream {@link PredictionCombinerPE}.
+   */
+  public void setOutputStream(Stream outputStream) {
+
+    this.outputStream = outputStream;
+  }
+
+  /**
+   * Gets the output stream.
+   * 
+   * @return the output stream
+   */
+  public Stream getOutputStream() {
+    return outputStream;
+  }
+
+  /**
+   * Gets the instances count.
+   * 
+   * @return number of observation vectors used in training iteration.
+   */
+  public long getInstancesCount() {
+    return instancesCount;
+  }
+
+  /**
+   * Update stats.
+   * 
+   * @param event
+   *          the event
+   */
+  private void updateStats(ContentEvent event) {
+    Instance instance;
+    if (event instanceof ClusteringContentEvent) {
+      // Local Clustering
+      ClusteringContentEvent ev = (ClusteringContentEvent) event;
+      instance = ev.getInstance();
+      DataPoint point = new DataPoint(instance, 
Integer.parseInt(event.getKey()));
+      model.trainOnInstance(point);
+      instancesCount++;
     }
 
-    /**
-     * Gets the output stream.
-     *
-     * @return the output stream
-     */
-    public Stream getOutputStream() {
-        return outputStream;
+    if (event instanceof ClusteringResultContentEvent) {
+      // Global Clustering
+      ClusteringResultContentEvent ev = (ClusteringResultContentEvent) event;
+      Clustering clustering = ev.getClustering();
+
+      for (int i = 0; i < clustering.size(); i++) {
+        instance = new DenseInstance(1.0, clustering.get(i).getCenter());
+        instance.setDataset(model.getDataset());
+        DataPoint point = new DataPoint(instance, 
Integer.parseInt(event.getKey()));
+        model.trainOnInstance(point);
+        instancesCount++;
+      }
     }
 
-    /**
-     * Gets the instances count.
-     *
-     * @return number of observation vectors used in training iteration.
-     */
-    public long getInstancesCount() {
-        return instancesCount;
+    if (instancesCount % this.sampleFrequency == 0) {
+      logger.info("Trained model using {} events with classifier id {}", 
instancesCount, this.modelId); // getId());
     }
+  }
 
-    /**
-     * Update stats.
-     *
-     * @param event the event
-     */
-    private void updateStats(ContentEvent event) {
-        Instance instance;
-        if (event instanceof ClusteringContentEvent){
-            //Local Clustering
-            ClusteringContentEvent ev = (ClusteringContentEvent) event;
-            instance = ev.getInstance();
-            DataPoint point = new DataPoint(instance, 
Integer.parseInt(event.getKey()));
-            model.trainOnInstance(point);
-            instancesCount++;
-        }
-        
-        if (event instanceof ClusteringResultContentEvent){
-            //Global Clustering
-            ClusteringResultContentEvent ev = (ClusteringResultContentEvent) 
event;
-            Clustering clustering = ev.getClustering(); 
-            
-            for (int i=0; i<clustering.size(); i++) { 
-                instance = new 
DenseInstance(1.0,clustering.get(i).getCenter());
-                instance.setDataset(model.getDataset());
-                DataPoint point = new DataPoint(instance, 
Integer.parseInt(event.getKey()));
-                model.trainOnInstance(point);
-                instancesCount++;
-            }
-        }    
-
-        if (instancesCount % this.sampleFrequency == 0) {
-            logger.info("Trained model using {} events with classifier id {}", 
instancesCount, this.modelId); // getId());
-        }
-    }
+  /**
+   * On event.
+   * 
+   * @param event
+   *          the event
+   * @return true, if successful
+   */
+  @Override
+  public boolean process(ContentEvent event) {
 
-    /**
-     * On event.
-     *
-     * @param event the event
-     * @return true, if successful
-     */
-    @Override
-    public boolean process(ContentEvent event) {
-       
-        if (event.isLastEvent() || 
-                (instancesCount > 0 && instancesCount% this.sampleFrequency == 
0)) {
-            if (model.implementsMicroClusterer()) {
+    if (event.isLastEvent() ||
+        (instancesCount > 0 && instancesCount % this.sampleFrequency == 0)) {
+      if (model.implementsMicroClusterer()) {
 
-                Clustering clustering = model.getMicroClusteringResult();
+        Clustering clustering = model.getMicroClusteringResult();
 
-                ClusteringResultContentEvent resultEvent = new 
ClusteringResultContentEvent(clustering, event.isLastEvent());
+        ClusteringResultContentEvent resultEvent = new 
ClusteringResultContentEvent(clustering, event.isLastEvent());
 
-                this.outputStream.put(resultEvent);
-            }
-        }
-
-        updateStats(event);
-        return false;
+        this.outputStream.put(resultEvent);
+      }
     }
 
-    /* (non-Javadoc)
-     * @see samoa.core.Processor#onCreate(int)
-     */
-    @Override
-    public void onCreate(int id) {
-        this.modelId = id;
-        model = model.create();
-    }
-
-    /* (non-Javadoc)
-     * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
-     */
-    @Override
-    public Processor newProcessor(Processor sourceProcessor) {
-        LocalClustererProcessor newProcessor = new LocalClustererProcessor();
-        LocalClustererProcessor originProcessor = (LocalClustererProcessor) 
sourceProcessor;
-        if (originProcessor.getLearner() != null) {
-            newProcessor.setLearner(originProcessor.getLearner().create());
-        }
-        newProcessor.setOutputStream(originProcessor.getOutputStream());
-        return newProcessor;
+    updateStats(event);
+    return false;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.core.Processor#onCreate(int)
+   */
+  @Override
+  public void onCreate(int id) {
+    this.modelId = id;
+    model = model.create();
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
+   */
+  @Override
+  public Processor newProcessor(Processor sourceProcessor) {
+    LocalClustererProcessor newProcessor = new LocalClustererProcessor();
+    LocalClustererProcessor originProcessor = (LocalClustererProcessor) 
sourceProcessor;
+    if (originProcessor.getLearner() != null) {
+      newProcessor.setLearner(originProcessor.getLearner().create());
     }
+    newProcessor.setOutputStream(originProcessor.getOutputStream());
+    return newProcessor;
+  }
 }

Reply via email to