http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalResultContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalResultContentEvent.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalResultContentEvent.java
deleted file mode 100644
index c0cc78f..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalResultContentEvent.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package com.yahoo.labs.samoa.learners.classifiers.trees;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.moa.classifiers.core.AttributeSplitSuggestion;
-import com.yahoo.labs.samoa.core.ContentEvent;
-
-/**
- * Local Result Content Event is the content event that represents local 
calculation of statistic in Local Statistic
- * Processor.
- * 
- * @author Arinto Murdopo
- * 
- */
-final class LocalResultContentEvent implements ContentEvent {
-
-  private static final long serialVersionUID = -4206620993777418571L;
-
-  private final AttributeSplitSuggestion bestSuggestion;
-  private final AttributeSplitSuggestion secondBestSuggestion;
-  private final long splitId;
-
-  public LocalResultContentEvent() {
-    bestSuggestion = null;
-    secondBestSuggestion = null;
-    splitId = -1;
-  }
-
-  LocalResultContentEvent(long splitId, AttributeSplitSuggestion best, 
AttributeSplitSuggestion secondBest) {
-    this.splitId = splitId;
-    this.bestSuggestion = best;
-    this.secondBestSuggestion = secondBest;
-  }
-
-  @Override
-  public String getKey() {
-    return null;
-  }
-
-  /**
-   * Method to return the best attribute split suggestion from this local 
statistic calculation.
-   * 
-   * @return The best attribute split suggestion.
-   */
-  AttributeSplitSuggestion getBestSuggestion() {
-    return this.bestSuggestion;
-  }
-
-  /**
-   * Method to return the second best attribute split suggestion from this 
local statistic calculation.
-   * 
-   * @return The second best attribute split suggestion.
-   */
-  AttributeSplitSuggestion getSecondBestSuggestion() {
-    return this.secondBestSuggestion;
-  }
-
-  /**
-   * Method to get the split ID of this local statistic calculation result
-   * 
-   * @return The split id of this local calculation result
-   */
-  long getSplitId() {
-    return this.splitId;
-  }
-
-  @Override
-  public void setKey(String str) {
-    // do nothing
-
-  }
-
-  @Override
-  public boolean isLastEvent() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
deleted file mode 100644
index 9d3ccbf..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
+++ /dev/null
@@ -1,244 +0,0 @@
-package com.yahoo.labs.samoa.learners.classifiers.trees;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Vector;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yahoo.labs.samoa.moa.classifiers.core.AttributeSplitSuggestion;
-import 
com.yahoo.labs.samoa.moa.classifiers.core.attributeclassobservers.AttributeClassObserver;
-import 
com.yahoo.labs.samoa.moa.classifiers.core.attributeclassobservers.GaussianNumericAttributeClassObserver;
-import 
com.yahoo.labs.samoa.moa.classifiers.core.attributeclassobservers.NominalAttributeClassObserver;
-import 
com.yahoo.labs.samoa.moa.classifiers.core.splitcriteria.InfoGainSplitCriterion;
-import com.yahoo.labs.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
-
-import com.google.common.collect.HashBasedTable;
-import com.google.common.collect.Table;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.topology.Stream;
-
-/**
- * Local Statistic Processor contains the local statistic of a subset of the 
attributes.
- * 
- * @author Arinto Murdopo
- * 
- */
-final class LocalStatisticsProcessor implements Processor {
-
-  /**
-        * 
-        */
-  private static final long serialVersionUID = -3967695130634517631L;
-  private static Logger logger = 
LoggerFactory.getLogger(LocalStatisticsProcessor.class);
-
-  // Collection of AttributeObservers, for each ActiveLearningNode and
-  // AttributeId
-  private Table<Long, Integer, AttributeClassObserver> localStats;
-
-  private Stream computationResultStream;
-
-  private final SplitCriterion splitCriterion;
-  private final boolean binarySplit;
-  private final AttributeClassObserver nominalClassObserver;
-  private final AttributeClassObserver numericClassObserver;
-
-  // the two observer classes below are also needed to be setup from the Tree
-  private LocalStatisticsProcessor(Builder builder) {
-    this.splitCriterion = builder.splitCriterion;
-    this.binarySplit = builder.binarySplit;
-    this.nominalClassObserver = builder.nominalClassObserver;
-    this.numericClassObserver = builder.numericClassObserver;
-  }
-
-  @Override
-  public boolean process(ContentEvent event) {
-    // process AttributeContentEvent by updating the subset of local statistics
-    if (event instanceof AttributeBatchContentEvent) {
-      AttributeBatchContentEvent abce = (AttributeBatchContentEvent) event;
-      List<ContentEvent> contentEventList = abce.getContentEventList();
-      for (ContentEvent contentEvent : contentEventList) {
-        AttributeContentEvent ace = (AttributeContentEvent) contentEvent;
-        Long learningNodeId = ace.getLearningNodeId();
-        Integer obsIndex = ace.getObsIndex();
-
-        AttributeClassObserver obs = localStats.get(
-            learningNodeId, obsIndex);
-
-        if (obs == null) {
-          obs = ace.isNominal() ? newNominalClassObserver()
-              : newNumericClassObserver();
-          localStats.put(ace.getLearningNodeId(), obsIndex, obs);
-        }
-        obs.observeAttributeClass(ace.getAttrVal(), ace.getClassVal(),
-            ace.getWeight());
-      }
-
-      /*
-       * if (event instanceof AttributeContentEvent) { AttributeContentEvent 
ace
-       * = (AttributeContentEvent) event; Long learningNodeId =
-       * Long.valueOf(ace.getLearningNodeId()); Integer obsIndex =
-       * Integer.valueOf(ace.getObsIndex());
-       * 
-       * AttributeClassObserver obs = localStats.get( learningNodeId, 
obsIndex);
-       * 
-       * if (obs == null) { obs = ace.isNominal() ? newNominalClassObserver() :
-       * newNumericClassObserver(); localStats.put(ace.getLearningNodeId(),
-       * obsIndex, obs); } obs.observeAttributeClass(ace.getAttrVal(),
-       * ace.getClassVal(), ace.getWeight());
-       */
-    } else if (event instanceof ComputeContentEvent) {
-      // process ComputeContentEvent by calculating the local statistic
-      // and send back the calculation results via computation result stream.
-      ComputeContentEvent cce = (ComputeContentEvent) event;
-      Long learningNodeId = cce.getLearningNodeId();
-      double[] preSplitDist = cce.getPreSplitDist();
-
-      Map<Integer, AttributeClassObserver> learningNodeRowMap = localStats
-          .row(learningNodeId);
-      List<AttributeSplitSuggestion> suggestions = new Vector<>();
-
-      for (Entry<Integer, AttributeClassObserver> entry : 
learningNodeRowMap.entrySet()) {
-        AttributeClassObserver obs = entry.getValue();
-        AttributeSplitSuggestion suggestion = obs
-            .getBestEvaluatedSplitSuggestion(splitCriterion,
-                preSplitDist, entry.getKey(), binarySplit);
-        if (suggestion != null) {
-          suggestions.add(suggestion);
-        }
-      }
-
-      AttributeSplitSuggestion[] bestSuggestions = suggestions
-          .toArray(new AttributeSplitSuggestion[suggestions.size()]);
-
-      Arrays.sort(bestSuggestions);
-
-      AttributeSplitSuggestion bestSuggestion = null;
-      AttributeSplitSuggestion secondBestSuggestion = null;
-
-      if (bestSuggestions.length >= 1) {
-        bestSuggestion = bestSuggestions[bestSuggestions.length - 1];
-
-        if (bestSuggestions.length >= 2) {
-          secondBestSuggestion = bestSuggestions[bestSuggestions.length - 2];
-        }
-      }
-
-      // create the local result content event
-      LocalResultContentEvent lcre =
-          new LocalResultContentEvent(cce.getSplitId(), bestSuggestion, 
secondBestSuggestion);
-      computationResultStream.put(lcre);
-      logger.debug("Finish compute event");
-    } else if (event instanceof DeleteContentEvent) {
-      DeleteContentEvent dce = (DeleteContentEvent) event;
-      Long learningNodeId = dce.getLearningNodeId();
-      localStats.rowMap().remove(learningNodeId);
-    }
-    return false;
-  }
-
-  @Override
-  public void onCreate(int id) {
-    this.localStats = HashBasedTable.create();
-  }
-
-  @Override
-  public Processor newProcessor(Processor p) {
-    LocalStatisticsProcessor oldProcessor = (LocalStatisticsProcessor) p;
-    LocalStatisticsProcessor newProcessor = new 
LocalStatisticsProcessor.Builder(oldProcessor).build();
-
-    
newProcessor.setComputationResultStream(oldProcessor.computationResultStream);
-
-    return newProcessor;
-  }
-
-  /**
-   * Method to set the computation result when using this processor to build a 
topology.
-   * 
-   * @param computeStream
-   */
-  void setComputationResultStream(Stream computeStream) {
-    this.computationResultStream = computeStream;
-  }
-
-  private AttributeClassObserver newNominalClassObserver() {
-    return (AttributeClassObserver) this.nominalClassObserver.copy();
-  }
-
-  private AttributeClassObserver newNumericClassObserver() {
-    return (AttributeClassObserver) this.numericClassObserver.copy();
-  }
-
-  /**
-   * Builder class to replace constructors with many parameters
-   * 
-   * @author Arinto Murdopo
-   * 
-   */
-  static class Builder {
-
-    private SplitCriterion splitCriterion = new InfoGainSplitCriterion();
-    private boolean binarySplit = false;
-    private AttributeClassObserver nominalClassObserver = new 
NominalAttributeClassObserver();
-    private AttributeClassObserver numericClassObserver = new 
GaussianNumericAttributeClassObserver();
-
-    Builder() {
-
-    }
-
-    Builder(LocalStatisticsProcessor oldProcessor) {
-      this.splitCriterion = oldProcessor.splitCriterion;
-      this.binarySplit = oldProcessor.binarySplit;
-    }
-
-    Builder splitCriterion(SplitCriterion splitCriterion) {
-      this.splitCriterion = splitCriterion;
-      return this;
-    }
-
-    Builder binarySplit(boolean binarySplit) {
-      this.binarySplit = binarySplit;
-      return this;
-    }
-
-    Builder nominalClassObserver(AttributeClassObserver nominalClassObserver) {
-      this.nominalClassObserver = nominalClassObserver;
-      return this;
-    }
-
-    Builder numericClassObserver(AttributeClassObserver numericClassObserver) {
-      this.numericClassObserver = numericClassObserver;
-      return this;
-    }
-
-    LocalStatisticsProcessor build() {
-      return new LocalStatisticsProcessor(this);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
deleted file mode 100644
index e5bdc0d..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
+++ /dev/null
@@ -1,747 +0,0 @@
-package com.yahoo.labs.samoa.learners.classifiers.trees;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.instances.Instance;
-import com.yahoo.labs.samoa.instances.Instances;
-import com.yahoo.labs.samoa.instances.InstancesHeader;
-import com.yahoo.labs.samoa.learners.InstanceContentEvent;
-import com.yahoo.labs.samoa.learners.InstancesContentEvent;
-import com.yahoo.labs.samoa.learners.ResultContentEvent;
-import com.yahoo.labs.samoa.moa.classifiers.core.AttributeSplitSuggestion;
-import com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ChangeDetector;
-import 
com.yahoo.labs.samoa.moa.classifiers.core.splitcriteria.InfoGainSplitCriterion;
-import com.yahoo.labs.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
-import com.yahoo.labs.samoa.topology.Stream;
-
-import static com.yahoo.labs.samoa.moa.core.Utils.maxIndex;
-
-/**
- * Model Aggegator Processor consists of the decision tree model. It connects 
to local-statistic PI via attribute stream
- * and control stream. Model-aggregator PI sends the split instances via 
attribute stream and it sends control messages
- * to ask local-statistic PI to perform computation via control stream.
- * 
- * Model-aggregator PI sends the classification result via result stream to an 
evaluator PI for classifier or other
- * destination PI. The calculation results from local statistic arrive to the 
model-aggregator PI via computation-result
- * stream.
- * 
- * @author Arinto Murdopo
- * 
- */
-final class ModelAggregatorProcessor implements Processor {
-
-  private static final long serialVersionUID = -1685875718300564886L;
-  private static final Logger logger = 
LoggerFactory.getLogger(ModelAggregatorProcessor.class);
-
-  private int processorId;
-
-  private Node treeRoot;
-
-  private int activeLeafNodeCount;
-  private int inactiveLeafNodeCount;
-  private int decisionNodeCount;
-  private boolean growthAllowed;
-
-  private final Instances dataset;
-
-  // to support concurrent split
-  private long splitId;
-  private ConcurrentMap<Long, SplittingNodeInfo> splittingNodes;
-  private BlockingQueue<Long> timedOutSplittingNodes;
-
-  // available streams
-  private Stream resultStream;
-  private Stream attributeStream;
-  private Stream controlStream;
-
-  private transient ScheduledExecutorService executor;
-
-  private final SplitCriterion splitCriterion;
-  private final double splitConfidence;
-  private final double tieThreshold;
-  private final int gracePeriod;
-  private final int parallelismHint;
-  private final long timeOut;
-
-  // private constructor based on Builder pattern
-  private ModelAggregatorProcessor(Builder builder) {
-    this.dataset = builder.dataset;
-    this.splitCriterion = builder.splitCriterion;
-    this.splitConfidence = builder.splitConfidence;
-    this.tieThreshold = builder.tieThreshold;
-    this.gracePeriod = builder.gracePeriod;
-    this.parallelismHint = builder.parallelismHint;
-    this.timeOut = builder.timeOut;
-    this.changeDetector = builder.changeDetector;
-
-    InstancesHeader ih = new InstancesHeader(dataset);
-    this.setModelContext(ih);
-  }
-
-  @Override
-  public boolean process(ContentEvent event) {
-
-    // Poll the blocking queue shared between ModelAggregator and the time-out
-    // threads
-    Long timedOutSplitId = timedOutSplittingNodes.poll();
-    if (timedOutSplitId != null) { // time out has been reached!
-      SplittingNodeInfo splittingNode = splittingNodes.get(timedOutSplitId);
-      if (splittingNode != null) {
-        this.splittingNodes.remove(timedOutSplitId);
-        this.continueAttemptToSplit(splittingNode.activeLearningNode,
-            splittingNode.foundNode);
-
-      }
-
-    }
-
-    // Receive a new instance from source
-    if (event instanceof InstancesContentEvent) {
-      InstancesContentEvent instancesEvent = (InstancesContentEvent) event;
-      this.processInstanceContentEvent(instancesEvent);
-      // Send information to local-statistic PI
-      // for each of the nodes
-      if (this.foundNodeSet != null) {
-        for (FoundNode foundNode : this.foundNodeSet) {
-          ActiveLearningNode leafNode = (ActiveLearningNode) 
foundNode.getNode();
-          AttributeBatchContentEvent[] abce = 
leafNode.getAttributeBatchContentEvent();
-          if (abce != null) {
-            for (int i = 0; i < this.dataset.numAttributes() - 1; i++) {
-              this.sendToAttributeStream(abce[i]);
-            }
-          }
-          leafNode.setAttributeBatchContentEvent(null);
-          // this.sendToControlStream(event); //split information
-          // See if we can ask for splits
-          if (!leafNode.isSplitting()) {
-            double weightSeen = leafNode.getWeightSeen();
-            // check whether it is the time for splitting
-            if (weightSeen - leafNode.getWeightSeenAtLastSplitEvaluation() >= 
this.gracePeriod) {
-              attemptToSplit(leafNode, foundNode);
-            }
-          }
-        }
-      }
-      this.foundNodeSet = null;
-    } else if (event instanceof LocalResultContentEvent) {
-      LocalResultContentEvent lrce = (LocalResultContentEvent) event;
-      Long lrceSplitId = lrce.getSplitId();
-      SplittingNodeInfo splittingNodeInfo = splittingNodes.get(lrceSplitId);
-
-      if (splittingNodeInfo != null) { // if null, that means
-        // activeLearningNode has been
-        // removed by timeout thread
-        ActiveLearningNode activeLearningNode = 
splittingNodeInfo.activeLearningNode;
-
-        activeLearningNode.addDistributedSuggestions(
-            lrce.getBestSuggestion(),
-            lrce.getSecondBestSuggestion());
-
-        if (activeLearningNode.isAllSuggestionsCollected()) {
-          splittingNodeInfo.scheduledFuture.cancel(false);
-          this.splittingNodes.remove(lrceSplitId);
-          this.continueAttemptToSplit(activeLearningNode,
-              splittingNodeInfo.foundNode);
-        }
-      }
-    }
-    return false;
-  }
-
-  protected Set<FoundNode> foundNodeSet;
-
-  @Override
-  public void onCreate(int id) {
-    this.processorId = id;
-
-    this.activeLeafNodeCount = 0;
-    this.inactiveLeafNodeCount = 0;
-    this.decisionNodeCount = 0;
-    this.growthAllowed = true;
-
-    this.splittingNodes = new ConcurrentHashMap<>();
-    this.timedOutSplittingNodes = new LinkedBlockingQueue<>();
-    this.splitId = 0;
-
-    // Executor for scheduling time-out threads
-    this.executor = Executors.newScheduledThreadPool(8);
-  }
-
-  @Override
-  public Processor newProcessor(Processor p) {
-    ModelAggregatorProcessor oldProcessor = (ModelAggregatorProcessor) p;
-    ModelAggregatorProcessor newProcessor =
-        new ModelAggregatorProcessor.Builder(oldProcessor).build();
-
-    newProcessor.setResultStream(oldProcessor.resultStream);
-    newProcessor.setAttributeStream(oldProcessor.attributeStream);
-    newProcessor.setControlStream(oldProcessor.controlStream);
-    return newProcessor;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append(super.toString());
-
-    sb.append("ActiveLeafNodeCount: ").append(activeLeafNodeCount);
-    sb.append("InactiveLeafNodeCount: ").append(inactiveLeafNodeCount);
-    sb.append("DecisionNodeCount: ").append(decisionNodeCount);
-    sb.append("Growth allowed: ").append(growthAllowed);
-    return sb.toString();
-  }
-
-  void setResultStream(Stream resultStream) {
-    this.resultStream = resultStream;
-  }
-
-  void setAttributeStream(Stream attributeStream) {
-    this.attributeStream = attributeStream;
-  }
-
-  void setControlStream(Stream controlStream) {
-    this.controlStream = controlStream;
-  }
-
-  void sendToAttributeStream(ContentEvent event) {
-    this.attributeStream.put(event);
-  }
-
-  void sendToControlStream(ContentEvent event) {
-    this.controlStream.put(event);
-  }
-
-  /**
-   * Helper method to generate new ResultContentEvent based on an instance and 
its prediction result.
-   * 
-   * @param prediction
-   *          The predicted class label from the decision tree model.
-   * @param inEvent
-   *          The associated instance content event
-   * @return ResultContentEvent to be sent into Evaluator PI or other 
destination PI.
-   */
-  private ResultContentEvent newResultContentEvent(double[] prediction, 
InstanceContentEvent inEvent) {
-    ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
-        inEvent.getClassId(), prediction, inEvent.isLastEvent());
-    rce.setClassifierIndex(this.processorId);
-    rce.setEvaluationIndex(inEvent.getEvaluationIndex());
-    return rce;
-  }
-
-  private ResultContentEvent newResultContentEvent(double[] prediction, 
Instance inst, InstancesContentEvent inEvent) {
-    ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inst, (int) inst.classValue(),
-        prediction, inEvent.isLastEvent());
-    rce.setClassifierIndex(this.processorId);
-    rce.setEvaluationIndex(inEvent.getEvaluationIndex());
-    return rce;
-  }
-
-  private List<InstancesContentEvent> contentEventList = new LinkedList<>();
-
-  /**
-   * Helper method to process the InstanceContentEvent
-   * 
-   * @param instContentEvent
-   */
-  private void processInstanceContentEvent(InstancesContentEvent 
instContentEvent) {
-    this.numBatches++;
-    this.contentEventList.add(instContentEvent);
-    if (this.numBatches == 1 || this.numBatches > 4) {
-      this.processInstances(this.contentEventList.remove(0));
-    }
-
-    if (instContentEvent.isLastEvent()) {
-      // drain remaining instances
-      while (!contentEventList.isEmpty()) {
-        processInstances(contentEventList.remove(0));
-      }
-    }
-
-  }
-
-  private int numBatches = 0;
-
-  private void processInstances(InstancesContentEvent instContentEvent) {
-
-    Instance[] instances = instContentEvent.getInstances();
-    boolean isTesting = instContentEvent.isTesting();
-    boolean isTraining = instContentEvent.isTraining();
-    for (Instance inst : instances) {
-      this.processInstance(inst, instContentEvent, isTesting, isTraining);
-    }
-  }
-
-  private void processInstance(Instance inst, InstancesContentEvent 
instContentEvent, boolean isTesting,
-      boolean isTraining) {
-    inst.setDataset(this.dataset);
-    // Check the instance whether it is used for testing or training
-    // boolean testAndTrain = isTraining; //Train after testing
-    double[] prediction = null;
-    if (isTesting) {
-      prediction = getVotesForInstance(inst, false);
-      this.resultStream.put(newResultContentEvent(prediction, inst,
-          instContentEvent));
-    }
-
-    if (isTraining) {
-      trainOnInstanceImpl(inst);
-      if (this.changeDetector != null) {
-        if (prediction == null) {
-          prediction = getVotesForInstance(inst);
-        }
-        boolean correctlyClassifies = this.correctlyClassifies(inst, 
prediction);
-        double oldEstimation = this.changeDetector.getEstimation();
-        this.changeDetector.input(correctlyClassifies ? 0 : 1);
-        if (this.changeDetector.getEstimation() > oldEstimation) {
-          // Start a new classifier
-          logger.info("Change detected, resetting the classifier");
-          this.resetLearning();
-          this.changeDetector.resetLearning();
-        }
-      }
-    }
-  }
-
-  private boolean correctlyClassifies(Instance inst, double[] prediction) {
-    return maxIndex(prediction) == (int) inst.classValue();
-  }
-
-  private void resetLearning() {
-    this.treeRoot = null;
-    // Remove nodes
-    FoundNode[] learningNodes = findNodes();
-    for (FoundNode learningNode : learningNodes) {
-      Node node = learningNode.getNode();
-      if (node instanceof SplitNode) {
-        SplitNode splitNode;
-        splitNode = (SplitNode) node;
-        for (int i = 0; i < splitNode.numChildren(); i++) {
-          splitNode.setChild(i, null);
-        }
-      }
-    }
-  }
-
-  protected FoundNode[] findNodes() {
-    List<FoundNode> foundList = new LinkedList<>();
-    findNodes(this.treeRoot, null, -1, foundList);
-    return foundList.toArray(new FoundNode[foundList.size()]);
-  }
-
-  protected void findNodes(Node node, SplitNode parent,
-      int parentBranch, List<FoundNode> found) {
-    if (node != null) {
-      found.add(new FoundNode(node, parent, parentBranch));
-      if (node instanceof SplitNode) {
-        SplitNode splitNode = (SplitNode) node;
-        for (int i = 0; i < splitNode.numChildren(); i++) {
-          findNodes(splitNode.getChild(i), splitNode, i,
-              found);
-        }
-      }
-    }
-  }
-
-  /**
-   * Helper method to get the prediction result. The actual prediction result 
is delegated to the leaf node.
-   * 
-   * @param inst
-   * @return
-   */
-  private double[] getVotesForInstance(Instance inst) {
-    return getVotesForInstance(inst, false);
-  }
-
-  private double[] getVotesForInstance(Instance inst, boolean isTraining) {
-    double[] ret;
-    FoundNode foundNode = null;
-    if (this.treeRoot != null) {
-      foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1);
-      Node leafNode = foundNode.getNode();
-      if (leafNode == null) {
-        leafNode = foundNode.getParent();
-      }
-
-      ret = leafNode.getClassVotes(inst, this);
-    } else {
-      int numClasses = this.dataset.numClasses();
-      ret = new double[numClasses];
-
-    }
-
-    // Training after testing to speed up the process
-    if (isTraining) {
-      if (this.treeRoot == null) {
-        this.treeRoot = newLearningNode(this.parallelismHint);
-        this.activeLeafNodeCount = 1;
-        foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1);
-      }
-      trainOnInstanceImpl(foundNode, inst);
-    }
-    return ret;
-  }
-
-  /**
-   * Helper method that represent training of an instance. Since it is 
decision tree, this method routes the incoming
-   * instance into the correct leaf and then update the statistic on the found 
leaf.
-   * 
-   * @param inst
-   */
-  private void trainOnInstanceImpl(Instance inst) {
-    if (this.treeRoot == null) {
-      this.treeRoot = newLearningNode(this.parallelismHint);
-      this.activeLeafNodeCount = 1;
-
-    }
-    FoundNode foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1);
-    trainOnInstanceImpl(foundNode, inst);
-  }
-
-  private void trainOnInstanceImpl(FoundNode foundNode, Instance inst) {
-
-    Node leafNode = foundNode.getNode();
-
-    if (leafNode == null) {
-      leafNode = newLearningNode(this.parallelismHint);
-      foundNode.getParent().setChild(foundNode.getParentBranch(), leafNode);
-      activeLeafNodeCount++;
-    }
-
-    if (leafNode instanceof LearningNode) {
-      LearningNode learningNode = (LearningNode) leafNode;
-      learningNode.learnFromInstance(inst, this);
-    }
-    if (this.foundNodeSet == null) {
-      this.foundNodeSet = new HashSet<>();
-    }
-    this.foundNodeSet.add(foundNode);
-  }
-
-  /**
-   * Helper method to represent a split attempt
-   * 
-   * @param activeLearningNode
-   *          The corresponding active learning node which will be split
-   * @param foundNode
-   *          The data structure to represents the filtering of the instance 
using the tree model.
-   */
-  private void attemptToSplit(ActiveLearningNode activeLearningNode, FoundNode 
foundNode) {
-    // Increment the split ID
-    this.splitId++;
-
-    // Schedule time-out thread
-    ScheduledFuture<?> timeOutHandler = this.executor.schedule(new 
AggregationTimeOutHandler(this.splitId,
-        this.timedOutSplittingNodes),
-        this.timeOut, TimeUnit.SECONDS);
-
-    // Keep track of the splitting node information, so that we can continue 
the
-    // split
-    // once we receive all local statistic calculation from Local Statistic PI
-    // this.splittingNodes.put(Long.valueOf(this.splitId), new
-    // SplittingNodeInfo(activeLearningNode, foundNode, null));
-    this.splittingNodes.put(this.splitId, new 
SplittingNodeInfo(activeLearningNode, foundNode, timeOutHandler));
-
-    // Inform Local Statistic PI to perform local statistic calculation
-    activeLearningNode.requestDistributedSuggestions(this.splitId, this);
-  }
-
-  /**
-   * Helper method to continue the attempt to split once all local calculation 
results are received.
-   * 
-   * @param activeLearningNode
-   *          The corresponding active learning node which will be split
-   * @param foundNode
-   *          The data structure to represents the filtering of the instance 
using the tree model.
-   */
-  private void continueAttemptToSplit(ActiveLearningNode activeLearningNode, 
FoundNode foundNode) {
-    AttributeSplitSuggestion bestSuggestion = 
activeLearningNode.getDistributedBestSuggestion();
-    AttributeSplitSuggestion secondBestSuggestion = 
activeLearningNode.getDistributedSecondBestSuggestion();
-
-    // compare with null split
-    double[] preSplitDist = activeLearningNode.getObservedClassDistribution();
-    AttributeSplitSuggestion nullSplit = new AttributeSplitSuggestion(null,
-        new double[0][], this.splitCriterion.getMeritOfSplit(
-            preSplitDist,
-            new double[][] { preSplitDist }));
-
-    if ((bestSuggestion == null) || (nullSplit.compareTo(bestSuggestion) > 0)) 
{
-      secondBestSuggestion = bestSuggestion;
-      bestSuggestion = nullSplit;
-    } else {
-      if ((secondBestSuggestion == null) || 
(nullSplit.compareTo(secondBestSuggestion) > 0)) {
-        secondBestSuggestion = nullSplit;
-      }
-    }
-
-    boolean shouldSplit = false;
-
-    if (secondBestSuggestion == null) {
-      shouldSplit = (bestSuggestion != null);
-    } else {
-      double hoeffdingBound = computeHoeffdingBound(
-          
this.splitCriterion.getRangeOfMerit(activeLearningNode.getObservedClassDistribution()),
-          this.splitConfidence,
-          activeLearningNode.getWeightSeen());
-
-      if ((bestSuggestion.merit - secondBestSuggestion.merit > hoeffdingBound)
-          || (hoeffdingBound < tieThreshold)) {
-        shouldSplit = true;
-      }
-      // TODO: add poor attributes removal
-    }
-
-    SplitNode parent = foundNode.getParent();
-    int parentBranch = foundNode.getParentBranch();
-
-    // split if the Hoeffding bound condition is satisfied
-    if (shouldSplit) {
-
-      if (bestSuggestion.splitTest != null) {
-        SplitNode newSplit = new SplitNode(bestSuggestion.splitTest, 
activeLearningNode.getObservedClassDistribution());
-
-        for (int i = 0; i < bestSuggestion.numSplits(); i++) {
-          Node newChild = 
newLearningNode(bestSuggestion.resultingClassDistributionFromSplit(i), 
this.parallelismHint);
-          newSplit.setChild(i, newChild);
-        }
-
-        this.activeLeafNodeCount--;
-        this.decisionNodeCount++;
-        this.activeLeafNodeCount += bestSuggestion.numSplits();
-
-        if (parent == null) {
-          this.treeRoot = newSplit;
-        } else {
-          parent.setChild(parentBranch, newSplit);
-        }
-      }
-      // TODO: add check on the model's memory size
-    }
-
-    // housekeeping
-    activeLearningNode.endSplitting();
-    
activeLearningNode.setWeightSeenAtLastSplitEvaluation(activeLearningNode.getWeightSeen());
-  }
-
-  /**
-   * Helper method to deactivate learning node
-   * 
-   * @param toDeactivate
-   *          Active Learning Node that will be deactivated
-   * @param parent
-   *          Parent of the soon-to-be-deactivated Active LearningNode
-   * @param parentBranch
-   *          the branch index of the node in the parent node
-   */
-  private void deactivateLearningNode(ActiveLearningNode toDeactivate, 
SplitNode parent, int parentBranch) {
-    Node newLeaf = new 
InactiveLearningNode(toDeactivate.getObservedClassDistribution());
-    if (parent == null) {
-      this.treeRoot = newLeaf;
-    } else {
-      parent.setChild(parentBranch, newLeaf);
-    }
-
-    this.activeLeafNodeCount--;
-    this.inactiveLeafNodeCount++;
-  }
-
-  private LearningNode newLearningNode(int parallelismHint) {
-    return newLearningNode(new double[0], parallelismHint);
-  }
-
-  private LearningNode newLearningNode(double[] initialClassObservations, int 
parallelismHint) {
-    // for VHT optimization, we need to dynamically instantiate the appropriate
-    // ActiveLearningNode
-    return new ActiveLearningNode(initialClassObservations, parallelismHint);
-  }
-
-  /**
-   * Helper method to set the model context, i.e. how many attributes they are 
and what is the class index
-   * 
-   * @param ih
-   */
-  private void setModelContext(InstancesHeader ih) {
-    // TODO possibly refactored
-    if ((ih != null) && (ih.classIndex() < 0)) {
-      throw new IllegalArgumentException(
-          "Context for a classifier must include a class to learn");
-    }
-    // TODO: check flag for checking whether training has started or not
-
-    // model context is used to describe the model
-    logger.trace("Model context: {}", ih.toString());
-  }
-
-  private static double computeHoeffdingBound(double range, double confidence, 
double n) {
-    return Math.sqrt((Math.pow(range, 2.0) * Math.log(1.0 / confidence)) / 
(2.0 * n));
-  }
-
-  /**
-   * AggregationTimeOutHandler is a class to support time-out feature while 
waiting for local computation results from
-   * the local statistic PIs.
-   * 
-   * @author Arinto Murdopo
-   * 
-   */
-  static class AggregationTimeOutHandler implements Runnable {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(AggregationTimeOutHandler.class);
-    private final Long splitId;
-    private final BlockingQueue<Long> toBeSplittedNodes;
-
-    AggregationTimeOutHandler(Long splitId, BlockingQueue<Long> 
toBeSplittedNodes) {
-      this.splitId = splitId;
-      this.toBeSplittedNodes = toBeSplittedNodes;
-    }
-
-    @Override
-    public void run() {
-      logger.debug("Time out is reached. AggregationTimeOutHandler is 
started.");
-      try {
-        toBeSplittedNodes.put(splitId);
-      } catch (InterruptedException e) {
-        logger.warn("Interrupted while trying to put the ID into the queue");
-      }
-      logger.debug("AggregationTimeOutHandler is finished.");
-    }
-  }
-
-  /**
-   * SplittingNodeInfo is a class to represents the ActiveLearningNode that is 
splitting
-   * 
-   * @author Arinto Murdopo
-   * 
-   */
-  static class SplittingNodeInfo {
-
-    private final ActiveLearningNode activeLearningNode;
-    private final FoundNode foundNode;
-    private final ScheduledFuture<?> scheduledFuture;
-
-    SplittingNodeInfo(ActiveLearningNode activeLearningNode, FoundNode 
foundNode, ScheduledFuture<?> scheduledFuture) {
-      this.activeLearningNode = activeLearningNode;
-      this.foundNode = foundNode;
-      this.scheduledFuture = scheduledFuture;
-    }
-  }
-
-  protected ChangeDetector changeDetector;
-
-  public ChangeDetector getChangeDetector() {
-    return this.changeDetector;
-  }
-
-  public void setChangeDetector(ChangeDetector cd) {
-    this.changeDetector = cd;
-  }
-
-  /**
-   * Builder class to replace constructors with many parameters
-   * 
-   * @author Arinto Murdopo
-   * 
-   */
-  static class Builder {
-
-    // required parameters
-    private final Instances dataset;
-
-    // default values
-    private SplitCriterion splitCriterion = new InfoGainSplitCriterion();
-    private double splitConfidence = 0.0000001;
-    private double tieThreshold = 0.05;
-    private int gracePeriod = 200;
-    private int parallelismHint = 1;
-    private long timeOut = 30;
-    private ChangeDetector changeDetector = null;
-
-    Builder(Instances dataset) {
-      this.dataset = dataset;
-    }
-
-    Builder(ModelAggregatorProcessor oldProcessor) {
-      this.dataset = oldProcessor.dataset;
-      this.splitCriterion = oldProcessor.splitCriterion;
-      this.splitConfidence = oldProcessor.splitConfidence;
-      this.tieThreshold = oldProcessor.tieThreshold;
-      this.gracePeriod = oldProcessor.gracePeriod;
-      this.parallelismHint = oldProcessor.parallelismHint;
-      this.timeOut = oldProcessor.timeOut;
-    }
-
-    Builder splitCriterion(SplitCriterion splitCriterion) {
-      this.splitCriterion = splitCriterion;
-      return this;
-    }
-
-    Builder splitConfidence(double splitConfidence) {
-      this.splitConfidence = splitConfidence;
-      return this;
-    }
-
-    Builder tieThreshold(double tieThreshold) {
-      this.tieThreshold = tieThreshold;
-      return this;
-    }
-
-    Builder gracePeriod(int gracePeriod) {
-      this.gracePeriod = gracePeriod;
-      return this;
-    }
-
-    Builder parallelismHint(int parallelismHint) {
-      this.parallelismHint = parallelismHint;
-      return this;
-    }
-
-    Builder timeOut(long timeOut) {
-      this.timeOut = timeOut;
-      return this;
-    }
-
-    Builder changeDetector(ChangeDetector changeDetector) {
-      this.changeDetector = changeDetector;
-      return this;
-    }
-
-    ModelAggregatorProcessor build() {
-      return new ModelAggregatorProcessor(this);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java
deleted file mode 100644
index f41c21d..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/Node.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package com.yahoo.labs.samoa.learners.classifiers.trees;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.DoubleVector;
-import com.yahoo.labs.samoa.instances.Instance;
-
-/**
- * Abstract class that represents a node in the tree model.
- * 
- * @author Arinto Murdopo
- * 
- */
-abstract class Node implements java.io.Serializable {
-
-  private static final long serialVersionUID = 4008521239214180548L;
-
-  protected final DoubleVector observedClassDistribution;
-
-  /**
-   * Method to route/filter an instance into its corresponding leaf. This 
method will be invoked recursively.
-   * 
-   * @param inst
-   *          Instance to be routed
-   * @param parent
-   *          Parent of the current node
-   * @param parentBranch
-   *          The index of the current node in the parent
-   * @return FoundNode which is the data structure to represent the resulting 
leaf.
-   */
-  abstract FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int 
parentBranch);
-
-  /**
-   * Method to return the predicted class of the instance based on the 
statistic inside the node.
-   * 
-   * @param inst
-   *          To-be-predicted instance
-   * @param map
-   *          ModelAggregatorProcessor
-   * @return The prediction result in the form of class distribution
-   */
-  abstract double[] getClassVotes(Instance inst, ModelAggregatorProcessor map);
-
-  /**
-   * Method to check whether the node is a leaf node or not.
-   * 
-   * @return Boolean flag to indicate whether the node is a leaf or not
-   */
-  abstract boolean isLeaf();
-
-  /**
-   * Constructor of the tree node
-   * 
-   * @param classObservation
-   *          distribution of the observed classes.
-   */
-  protected Node(double[] classObservation) {
-    this.observedClassDistribution = new DoubleVector(classObservation);
-  }
-
-  /**
-   * Getter method for the class distribution
-   * 
-   * @return Observed class distribution
-   */
-  protected double[] getObservedClassDistribution() {
-    return this.observedClassDistribution.getArrayCopy();
-  }
-
-  /**
-   * A method to check whether the class distribution only consists of one 
class or not.
-   * 
-   * @return Flag whether class distribution is pure or not.
-   */
-  protected boolean observedClassDistributionIsPure() {
-    return (observedClassDistribution.numNonZeroEntries() < 2);
-  }
-
-  protected void describeSubtree(ModelAggregatorProcessor modelAggrProc, 
StringBuilder out, int indent) {
-    // TODO: implement method to gracefully define the tree
-  }
-
-  // TODO: calculate promise for limiting the model based on the memory size
-  // double calculatePromise();
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java
deleted file mode 100644
index c513dad..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/SplitNode.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package com.yahoo.labs.samoa.learners.classifiers.trees;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import 
com.yahoo.labs.samoa.moa.classifiers.core.conditionaltests.InstanceConditionalTest;
-import com.yahoo.labs.samoa.moa.core.AutoExpandVector;
-import com.yahoo.labs.samoa.instances.Instance;
-
-/**
- * SplitNode represents the node that contains one or more questions in the 
decision tree model, in order to route the
- * instances into the correct leaf.
- * 
- * @author Arinto Murdopo
- * 
- */
-public class SplitNode extends Node {
-
-  private static final long serialVersionUID = -7380795529928485792L;
-
-  private final AutoExpandVector<Node> children;
-  protected final InstanceConditionalTest splitTest;
-
-  public SplitNode(InstanceConditionalTest splitTest,
-      double[] classObservation) {
-    super(classObservation);
-    this.children = new AutoExpandVector<>();
-    this.splitTest = splitTest;
-  }
-
-  @Override
-  FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int 
parentBranch) {
-    int childIndex = instanceChildIndex(inst);
-    if (childIndex >= 0) {
-      Node child = getChild(childIndex);
-      if (child != null) {
-        return child.filterInstanceToLeaf(inst, this, childIndex);
-      }
-      return new FoundNode(null, this, childIndex);
-    }
-    return new FoundNode(this, parent, parentBranch);
-  }
-
-  @Override
-  boolean isLeaf() {
-    return false;
-  }
-
-  @Override
-  double[] getClassVotes(Instance inst, ModelAggregatorProcessor vht) {
-    return this.observedClassDistribution.getArrayCopy();
-  }
-
-  /**
-   * Method to return the number of children of this split node
-   * 
-   * @return number of children
-   */
-  int numChildren() {
-    return this.children.size();
-  }
-
-  /**
-   * Method to set the children in a specific index of the SplitNode with the 
appropriate child
-   * 
-   * @param index
-   *          Index of the child in the SplitNode
-   * @param child
-   *          The child node
-   */
-  void setChild(int index, Node child) {
-    if ((this.splitTest.maxBranches() >= 0)
-        && (index >= this.splitTest.maxBranches())) {
-      throw new IndexOutOfBoundsException();
-    }
-    this.children.set(index, child);
-  }
-
-  /**
-   * Method to get the child node given the index
-   * 
-   * @param index
-   *          The child node index
-   * @return The child node in the given index
-   */
-  Node getChild(int index) {
-    return this.children.get(index);
-  }
-
-  /**
-   * Method to route the instance using this split node
-   * 
-   * @param inst
-   *          The routed instance
-   * @return The index of the branch where the instance is routed
-   */
-  int instanceChildIndex(Instance inst) {
-    return this.splitTest.branchForInstance(inst);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java
deleted file mode 100644
index 6131ec7..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java
+++ /dev/null
@@ -1,183 +0,0 @@
-package com.yahoo.labs.samoa.learners.classifiers.trees;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.google.common.collect.ImmutableSet;
-import java.util.Set;
-
-import com.github.javacliparser.ClassOption;
-import com.github.javacliparser.Configurable;
-import com.github.javacliparser.FlagOption;
-import com.github.javacliparser.FloatOption;
-import com.github.javacliparser.IntOption;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.instances.Instances;
-import com.yahoo.labs.samoa.learners.AdaptiveLearner;
-import com.yahoo.labs.samoa.learners.ClassificationLearner;
-import 
com.yahoo.labs.samoa.moa.classifiers.core.attributeclassobservers.AttributeClassObserver;
-import 
com.yahoo.labs.samoa.moa.classifiers.core.attributeclassobservers.DiscreteAttributeClassObserver;
-import 
com.yahoo.labs.samoa.moa.classifiers.core.attributeclassobservers.NumericAttributeClassObserver;
-import com.yahoo.labs.samoa.moa.classifiers.core.driftdetection.ChangeDetector;
-import com.yahoo.labs.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.TopologyBuilder;
-
-/**
- * Vertical Hoeffding Tree.
- * <p/>
- * Vertical Hoeffding Tree (VHT) classifier is a distributed classifier that 
utilizes vertical parallelism on top of
- * Very Fast Decision Tree (VFDT) classifier.
- * 
- * @author Arinto Murdopo
- */
-public final class VerticalHoeffdingTree implements ClassificationLearner, 
AdaptiveLearner, Configurable {
-
-  private static final long serialVersionUID = -4937416312929984057L;
-
-  public ClassOption numericEstimatorOption = new 
ClassOption("numericEstimator",
-      'n', "Numeric estimator to use.", NumericAttributeClassObserver.class,
-      "GaussianNumericAttributeClassObserver");
-
-  public ClassOption nominalEstimatorOption = new 
ClassOption("nominalEstimator",
-      'd', "Nominal estimator to use.", DiscreteAttributeClassObserver.class,
-      "NominalAttributeClassObserver");
-
-  public ClassOption splitCriterionOption = new ClassOption("splitCriterion",
-      's', "Split criterion to use.", SplitCriterion.class,
-      "InfoGainSplitCriterion");
-
-  public FloatOption splitConfidenceOption = new FloatOption(
-      "splitConfidence",
-      'c',
-      "The allowable error in split decision, values closer to 0 will take 
longer to decide.",
-      0.0000001, 0.0, 1.0);
-
-  public FloatOption tieThresholdOption = new FloatOption("tieThreshold",
-      't', "Threshold below which a split will be forced to break ties.",
-      0.05, 0.0, 1.0);
-
-  public IntOption gracePeriodOption = new IntOption(
-      "gracePeriod",
-      'g',
-      "The number of instances a leaf should observe between split attempts.",
-      200, 0, Integer.MAX_VALUE);
-
-  public IntOption parallelismHintOption = new IntOption(
-      "parallelismHint",
-      'p',
-      "The number of local statistics PI to do distributed computation",
-      1, 1, Integer.MAX_VALUE);
-
-  public IntOption timeOutOption = new IntOption(
-      "timeOut",
-      'o',
-      "The duration to wait all distributed computation results from local 
statistics PI",
-      30, 1, Integer.MAX_VALUE);
-
-  public FlagOption binarySplitsOption = new FlagOption("binarySplits", 'b',
-      "Only allow binary splits.");
-
-  private Stream resultStream;
-
-  private FilterProcessor filterProc;
-
-  @Override
-  public void init(TopologyBuilder topologyBuilder, Instances dataset, int 
parallelism) {
-
-    this.filterProc = new FilterProcessor.Builder(dataset)
-        .build();
-    topologyBuilder.addProcessor(filterProc, parallelism);
-
-    Stream filterStream = topologyBuilder.createStream(filterProc);
-    this.filterProc.setOutputStream(filterStream);
-
-    ModelAggregatorProcessor modelAggrProc = new 
ModelAggregatorProcessor.Builder(dataset)
-        .splitCriterion((SplitCriterion) this.splitCriterionOption.getValue())
-        .splitConfidence(splitConfidenceOption.getValue())
-        .tieThreshold(tieThresholdOption.getValue())
-        .gracePeriod(gracePeriodOption.getValue())
-        .parallelismHint(parallelismHintOption.getValue())
-        .timeOut(timeOutOption.getValue())
-        .changeDetector(this.getChangeDetector())
-        .build();
-
-    topologyBuilder.addProcessor(modelAggrProc, parallelism);
-
-    topologyBuilder.connectInputShuffleStream(filterStream, modelAggrProc);
-
-    this.resultStream = topologyBuilder.createStream(modelAggrProc);
-    modelAggrProc.setResultStream(resultStream);
-
-    Stream attributeStream = topologyBuilder.createStream(modelAggrProc);
-    modelAggrProc.setAttributeStream(attributeStream);
-
-    Stream controlStream = topologyBuilder.createStream(modelAggrProc);
-    modelAggrProc.setControlStream(controlStream);
-
-    LocalStatisticsProcessor locStatProc = new 
LocalStatisticsProcessor.Builder()
-        .splitCriterion((SplitCriterion) this.splitCriterionOption.getValue())
-        .binarySplit(binarySplitsOption.isSet())
-        .nominalClassObserver((AttributeClassObserver) 
this.nominalEstimatorOption.getValue())
-        .numericClassObserver((AttributeClassObserver) 
this.numericEstimatorOption.getValue())
-        .build();
-
-    topologyBuilder.addProcessor(locStatProc, 
parallelismHintOption.getValue());
-    topologyBuilder.connectInputKeyStream(attributeStream, locStatProc);
-    topologyBuilder.connectInputAllStream(controlStream, locStatProc);
-
-    Stream computeStream = topologyBuilder.createStream(locStatProc);
-
-    locStatProc.setComputationResultStream(computeStream);
-    topologyBuilder.connectInputAllStream(computeStream, modelAggrProc);
-  }
-
-  @Override
-  public Processor getInputProcessor() {
-    return this.filterProc;
-  }
-
-  @Override
-  public Set<Stream> getResultStreams() {
-    return ImmutableSet.of(this.resultStream);
-  }
-
-  protected ChangeDetector changeDetector;
-
-  @Override
-  public ChangeDetector getChangeDetector() {
-    return this.changeDetector;
-  }
-
-  @Override
-  public void setChangeDetector(ChangeDetector cd) {
-    this.changeDetector = cd;
-  }
-
-  static class LearningNodeIdGenerator {
-
-    // TODO: add code to warn user of when value reaches Long.MAX_VALUES
-    private static long id = 0;
-
-    static synchronized long generate() {
-      return id++;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java
deleted file mode 100644
index a7792e6..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClusteringContentEvent.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package com.yahoo.labs.samoa.learners.clusterers;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-import net.jcip.annotations.Immutable;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.instances.Instance;
-
-/**
- * The Class ClusteringContentEvent.
- */
-@Immutable
-final public class ClusteringContentEvent implements ContentEvent {
-
-  private static final long serialVersionUID = -7746983521296618922L;
-  private Instance instance;
-  private boolean isLast = false;
-  private String key;
-  private boolean isSample;
-
-  public ClusteringContentEvent() {
-    // Necessary for kryo serializer
-  }
-
-  /**
-   * Instantiates a new clustering event.
-   * 
-   * @param index
-   *          the index
-   * @param instance
-   *          the instance
-   */
-  public ClusteringContentEvent(long index, Instance instance) {
-    /*
-     * if (instance != null) { this.instance = new
-     * SerializableInstance(instance); }
-     */
-    this.instance = instance;
-    this.setKey(Long.toString(index));
-  }
-
-  @Override
-  public String getKey() {
-    return this.key;
-  }
-
-  @Override
-  public void setKey(String str) {
-    this.key = str;
-  }
-
-  @Override
-  public boolean isLastEvent() {
-    return this.isLast;
-  }
-
-  public void setLast(boolean isLast) {
-    this.isLast = isLast;
-  }
-
-  public Instance getInstance() {
-    return this.instance;
-  }
-
-  public boolean isSample() {
-    return isSample;
-  }
-
-  public void setSample(boolean b) {
-    this.isSample = b;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java
deleted file mode 100644
index 7c0f32b..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/ClustreamClustererAdapter.java
+++ /dev/null
@@ -1,170 +0,0 @@
-package com.yahoo.labs.samoa.learners.clusterers;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-/**
- * License
- */
-import com.github.javacliparser.ClassOption;
-import com.github.javacliparser.Configurable;
-import com.yahoo.labs.samoa.instances.Instance;
-import com.yahoo.labs.samoa.instances.Instances;
-import com.yahoo.labs.samoa.instances.InstancesHeader;
-import com.yahoo.labs.samoa.moa.cluster.Clustering;
-import com.yahoo.labs.samoa.moa.clusterers.clustream.Clustream;
-
-/**
- * 
- * Base class for adapting Clustream clusterer.
- * 
- */
-public class ClustreamClustererAdapter implements LocalClustererAdapter, 
Configurable {
-
-  /**
-     *
-     */
-  private static final long serialVersionUID = 4372366401338704353L;
-
-  public ClassOption learnerOption = new ClassOption("learner", 'l',
-      "Clusterer to train.", 
com.yahoo.labs.samoa.moa.clusterers.Clusterer.class, Clustream.class.getName());
-  /**
-   * The learner.
-   */
-  protected com.yahoo.labs.samoa.moa.clusterers.Clusterer learner;
-
-  /**
-   * The is init.
-   */
-  protected Boolean isInit;
-
-  /**
-   * The dataset.
-   */
-  protected Instances dataset;
-
-  @Override
-  public void setDataset(Instances dataset) {
-    this.dataset = dataset;
-  }
-
-  /**
-   * Instantiates a new learner.
-   * 
-   * @param learner
-   *          the learner
-   * @param dataset
-   *          the dataset
-   */
-  public 
ClustreamClustererAdapter(com.yahoo.labs.samoa.moa.clusterers.Clusterer 
learner, Instances dataset) {
-    this.learner = learner.copy();
-    this.isInit = false;
-    this.dataset = dataset;
-  }
-
-  /**
-   * Instantiates a new learner.
-   * 
-   * @param learner
-   *          the learner
-   * @param dataset
-   *          the dataset
-   */
-  public ClustreamClustererAdapter() {
-    this.learner = ((com.yahoo.labs.samoa.moa.clusterers.Clusterer) 
this.learnerOption.getValue()).copy();
-    this.isInit = false;
-    // this.dataset = dataset;
-  }
-
-  /**
-   * Creates a new learner object.
-   * 
-   * @return the learner
-   */
-  @Override
-  public ClustreamClustererAdapter create() {
-    ClustreamClustererAdapter l = new ClustreamClustererAdapter(learner, 
dataset);
-    if (dataset == null) {
-      System.out.println("dataset null while creating");
-    }
-    return l;
-  }
-
-  /**
-   * Trains this classifier incrementally using the given instance.
-   * 
-   * @param inst
-   *          the instance to be used for training
-   */
-  @Override
-  public void trainOnInstance(Instance inst) {
-    if (this.isInit == false) {
-      this.isInit = true;
-      InstancesHeader instances = new InstancesHeader(dataset);
-      this.learner.setModelContext(instances);
-      this.learner.prepareForUse();
-    }
-    if (inst.weight() > 0) {
-      inst.setDataset(dataset);
-      learner.trainOnInstance(inst);
-    }
-  }
-
-  /**
-   * Predicts the class memberships for a given instance. If an instance is 
unclassified, the returned array elements
-   * must be all zero.
-   * 
-   * @param inst
-   *          the instance to be classified
-   * @return an array containing the estimated membership probabilities of the 
test instance in each class
-   */
-  @Override
-  public double[] getVotesForInstance(Instance inst) {
-    double[] ret;
-    inst.setDataset(dataset);
-    if (this.isInit == false) {
-      ret = new double[dataset.numClasses()];
-    } else {
-      ret = learner.getVotesForInstance(inst);
-    }
-    return ret;
-  }
-
-  /**
-   * Resets this classifier. It must be similar to starting a new classifier 
from scratch.
-   * 
-   */
-  @Override
-  public void resetLearning() {
-    learner.resetLearning();
-  }
-
-  public boolean implementsMicroClusterer() {
-    return this.learner.implementsMicroClusterer();
-  }
-
-  public Clustering getMicroClusteringResult() {
-    return this.learner.getMicroClusteringResult();
-  }
-
-  public Instances getDataset() {
-    return this.dataset;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java
deleted file mode 100644
index 2cb017e..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererAdapter.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package com.yahoo.labs.samoa.learners.clusterers;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.instances.Instance;
-import com.yahoo.labs.samoa.instances.Instances;
-import com.yahoo.labs.samoa.moa.cluster.Clustering;
-import java.io.Serializable;
-
-/**
- * Learner interface for non-distributed learners.
- * 
- * @author abifet
- */
-public interface LocalClustererAdapter extends Serializable {
-
-  /**
-   * Creates a new learner object.
-   * 
-   * @return the learner
-   */
-  LocalClustererAdapter create();
-
-  /**
-   * Predicts the class memberships for a given instance. If an instance is 
unclassified, the returned array elements
-   * must be all zero.
-   * 
-   * @param inst
-   *          the instance to be classified
-   * @return an array containing the estimated membership probabilities of the 
test instance in each class
-   */
-  double[] getVotesForInstance(Instance inst);
-
-  /**
-   * Resets this classifier. It must be similar to starting a new classifier 
from scratch.
-   * 
-   */
-  void resetLearning();
-
-  /**
-   * Trains this classifier incrementally using the given instance.
-   * 
-   * @param inst
-   *          the instance to be used for training
-   */
-  void trainOnInstance(Instance inst);
-
-  /**
-   * Sets where to obtain the information of attributes of Instances
-   * 
-   * @param dataset
-   *          the dataset that contains the information
-   */
-  public void setDataset(Instances dataset);
-
-  public Instances getDataset();
-
-  public boolean implementsMicroClusterer();
-
-  public Clustering getMicroClusteringResult();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java
deleted file mode 100644
index cf2b979..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/LocalClustererProcessor.java
+++ /dev/null
@@ -1,200 +0,0 @@
-package com.yahoo.labs.samoa.learners.clusterers;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-/**
- * License
- */
-import com.yahoo.labs.samoa.evaluation.ClusteringEvaluationContentEvent;
-import com.yahoo.labs.samoa.evaluation.ClusteringResultContentEvent;
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.instances.DenseInstance;
-import com.yahoo.labs.samoa.instances.Instance;
-import com.yahoo.labs.samoa.moa.cluster.Clustering;
-import com.yahoo.labs.samoa.moa.core.DataPoint;
-import com.yahoo.labs.samoa.topology.Stream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-//import weka.core.Instance;
-
-/**
- * The Class LearnerProcessor.
- */
-final public class LocalClustererProcessor implements Processor {
-
-  /**
-     *
-     */
-  private static final long serialVersionUID = -1577910988699148691L;
-  private static final Logger logger = LoggerFactory
-      .getLogger(LocalClustererProcessor.class);
-  private LocalClustererAdapter model;
-  private Stream outputStream;
-  private int modelId;
-  private long instancesCount = 0;
-  private long sampleFrequency = 1000;
-
-  public long getSampleFrequency() {
-    return sampleFrequency;
-  }
-
-  public void setSampleFrequency(long sampleFrequency) {
-    this.sampleFrequency = sampleFrequency;
-  }
-
-  /**
-   * Sets the learner.
-   * 
-   * @param model
-   *          the model to set
-   */
-  public void setLearner(LocalClustererAdapter model) {
-    this.model = model;
-  }
-
-  /**
-   * Gets the learner.
-   * 
-   * @return the model
-   */
-  public LocalClustererAdapter getLearner() {
-    return model;
-  }
-
-  /**
-   * Set the output streams.
-   * 
-   * @param outputStream
-   *          the new output stream {@link PredictionCombinerPE}.
-   */
-  public void setOutputStream(Stream outputStream) {
-
-    this.outputStream = outputStream;
-  }
-
-  /**
-   * Gets the output stream.
-   * 
-   * @return the output stream
-   */
-  public Stream getOutputStream() {
-    return outputStream;
-  }
-
-  /**
-   * Gets the instances count.
-   * 
-   * @return number of observation vectors used in training iteration.
-   */
-  public long getInstancesCount() {
-    return instancesCount;
-  }
-
-  /**
-   * Update stats.
-   * 
-   * @param event
-   *          the event
-   */
-  private void updateStats(ContentEvent event) {
-    Instance instance;
-    if (event instanceof ClusteringContentEvent) {
-      // Local Clustering
-      ClusteringContentEvent ev = (ClusteringContentEvent) event;
-      instance = ev.getInstance();
-      DataPoint point = new DataPoint(instance, 
Integer.parseInt(event.getKey()));
-      model.trainOnInstance(point);
-      instancesCount++;
-    }
-
-    if (event instanceof ClusteringResultContentEvent) {
-      // Global Clustering
-      ClusteringResultContentEvent ev = (ClusteringResultContentEvent) event;
-      Clustering clustering = ev.getClustering();
-
-      for (int i = 0; i < clustering.size(); i++) {
-        instance = new DenseInstance(1.0, clustering.get(i).getCenter());
-        instance.setDataset(model.getDataset());
-        DataPoint point = new DataPoint(instance, 
Integer.parseInt(event.getKey()));
-        model.trainOnInstance(point);
-        instancesCount++;
-      }
-    }
-
-    if (instancesCount % this.sampleFrequency == 0) {
-      logger.info("Trained model using {} events with classifier id {}", 
instancesCount, this.modelId); // getId());
-    }
-  }
-
-  /**
-   * On event.
-   * 
-   * @param event
-   *          the event
-   * @return true, if successful
-   */
-  @Override
-  public boolean process(ContentEvent event) {
-
-    if (event.isLastEvent() ||
-        (instancesCount > 0 && instancesCount % this.sampleFrequency == 0)) {
-      if (model.implementsMicroClusterer()) {
-
-        Clustering clustering = model.getMicroClusteringResult();
-
-        ClusteringResultContentEvent resultEvent = new 
ClusteringResultContentEvent(clustering, event.isLastEvent());
-
-        this.outputStream.put(resultEvent);
-      }
-    }
-
-    updateStats(event);
-    return false;
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see samoa.core.Processor#onCreate(int)
-   */
-  @Override
-  public void onCreate(int id) {
-    this.modelId = id;
-    model = model.create();
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
-   */
-  @Override
-  public Processor newProcessor(Processor sourceProcessor) {
-    LocalClustererProcessor newProcessor = new LocalClustererProcessor();
-    LocalClustererProcessor originProcessor = (LocalClustererProcessor) 
sourceProcessor;
-    if (originProcessor.getLearner() != null) {
-      newProcessor.setLearner(originProcessor.getLearner().create());
-    }
-    newProcessor.setOutputStream(originProcessor.getOutputStream());
-    return newProcessor;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/SingleLearner.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/SingleLearner.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/SingleLearner.java
deleted file mode 100644
index cbb706c..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/SingleLearner.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package com.yahoo.labs.samoa.learners.clusterers;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * License
- */
-
-import com.google.common.collect.ImmutableSet;
-import java.util.Set;
-
-import com.github.javacliparser.ClassOption;
-import com.github.javacliparser.Configurable;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.instances.Instances;
-import com.yahoo.labs.samoa.learners.Learner;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.TopologyBuilder;
-
-/**
- * 
- * Learner that contain a single learner.
- * 
- */
-public final class SingleLearner implements Learner, Configurable {
-
-  private static final long serialVersionUID = 684111382631697031L;
-
-  private LocalClustererProcessor learnerP;
-
-  private Stream resultStream;
-
-  private Instances dataset;
-
-  public ClassOption learnerOption = new ClassOption("learner", 'l',
-      "Learner to train.", LocalClustererAdapter.class, 
ClustreamClustererAdapter.class.getName());
-
-  private TopologyBuilder builder;
-
-  private int parallelism;
-
-  @Override
-  public void init(TopologyBuilder builder, Instances dataset, int 
parallelism) {
-    this.builder = builder;
-    this.dataset = dataset;
-    this.parallelism = parallelism;
-    this.setLayout();
-  }
-
-  protected void setLayout() {
-    learnerP = new LocalClustererProcessor();
-    LocalClustererAdapter learner = (LocalClustererAdapter) 
this.learnerOption.getValue();
-    learner.setDataset(this.dataset);
-    learnerP.setLearner(learner);
-
-    this.builder.addProcessor(learnerP, this.parallelism);
-    resultStream = this.builder.createStream(learnerP);
-
-    learnerP.setOutputStream(resultStream);
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see samoa.classifiers.Classifier#getInputProcessingItem()
-   */
-  @Override
-  public Processor getInputProcessor() {
-    return learnerP;
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see samoa.learners.Learner#getResultStreams()
-   */
-  @Override
-  public Set<Stream> getResultStreams() {
-    Set<Stream> streams = ImmutableSet.of(this.resultStream);
-    return streams;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/simple/ClusteringDistributorProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/simple/ClusteringDistributorProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/simple/ClusteringDistributorProcessor.java
deleted file mode 100644
index 44109e2..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/simple/ClusteringDistributorProcessor.java
+++ /dev/null
@@ -1,100 +0,0 @@
-package com.yahoo.labs.samoa.learners.clusterers.simple;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-/**
- * License
- */
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.evaluation.ClusteringEvaluationContentEvent;
-import com.yahoo.labs.samoa.learners.clusterers.ClusteringContentEvent;
-import com.yahoo.labs.samoa.moa.core.DataPoint;
-import com.yahoo.labs.samoa.topology.Stream;
-
-/**
- * The Class ClusteringDistributorPE.
- */
-public class ClusteringDistributorProcessor implements Processor {
-
-  private static final long serialVersionUID = -1550901409625192730L;
-
-  private Stream outputStream;
-  private Stream evaluationStream;
-  private int numInstances;
-
-  public Stream getOutputStream() {
-    return outputStream;
-  }
-
-  public void setOutputStream(Stream outputStream) {
-    this.outputStream = outputStream;
-  }
-
-  public Stream getEvaluationStream() {
-    return evaluationStream;
-  }
-
-  public void setEvaluationStream(Stream evaluationStream) {
-    this.evaluationStream = evaluationStream;
-  }
-
-  /**
-   * Process event.
-   * 
-   * @param event
-   *          the event
-   * @return true, if successful
-   */
-  public boolean process(ContentEvent event) {
-    // distinguish between ClusteringContentEvent and
-    // ClusteringEvaluationContentEvent
-    if (event instanceof ClusteringContentEvent) {
-      ClusteringContentEvent cce = (ClusteringContentEvent) event;
-      outputStream.put(event);
-      if (cce.isSample()) {
-        evaluationStream.put(new ClusteringEvaluationContentEvent(null,
-            new DataPoint(cce.getInstance(), numInstances++), 
cce.isLastEvent()));
-      }
-    } else if (event instanceof ClusteringEvaluationContentEvent) {
-      evaluationStream.put(event);
-    }
-    return true;
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
-   */
-  @Override
-  public Processor newProcessor(Processor sourceProcessor) {
-    ClusteringDistributorProcessor newProcessor = new 
ClusteringDistributorProcessor();
-    ClusteringDistributorProcessor originProcessor = 
(ClusteringDistributorProcessor) sourceProcessor;
-    if (originProcessor.getOutputStream() != null)
-      newProcessor.setOutputStream(originProcessor.getOutputStream());
-    if (originProcessor.getEvaluationStream() != null)
-      newProcessor.setEvaluationStream(originProcessor.getEvaluationStream());
-    return newProcessor;
-  }
-
-  public void onCreate(int id) {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/simple/DistributedClusterer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/simple/DistributedClusterer.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/simple/DistributedClusterer.java
deleted file mode 100644
index dab039c..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/learners/clusterers/simple/DistributedClusterer.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package com.yahoo.labs.samoa.learners.clusterers.simple;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * License
- */
-
-import com.google.common.collect.ImmutableSet;
-import java.util.Set;
-
-import com.github.javacliparser.ClassOption;
-import com.github.javacliparser.Configurable;
-import com.github.javacliparser.IntOption;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.instances.Instances;
-import com.yahoo.labs.samoa.learners.Learner;
-import com.yahoo.labs.samoa.learners.clusterers.*;
-import com.yahoo.labs.samoa.topology.ProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.TopologyBuilder;
-
-/**
- * 
- * Learner that contain a single learner.
- * 
- */
-public final class DistributedClusterer implements Learner, Configurable {
-
-  private static final long serialVersionUID = 684111382631697031L;
-
-  private Stream resultStream;
-
-  private Instances dataset;
-
-  public ClassOption learnerOption = new ClassOption("learner", 'l', 
"Clusterer to use.", LocalClustererAdapter.class,
-      ClustreamClustererAdapter.class.getName());
-
-  public IntOption paralellismOption = new IntOption("paralellismOption", 'P',
-      "The paralellism level for concurrent processes", 2, 1, 
Integer.MAX_VALUE);
-
-  private TopologyBuilder builder;
-
-  // private ClusteringDistributorProcessor distributorP;
-  private LocalClustererProcessor learnerP;
-
-  // private Stream distributorToLocalStream;
-  private Stream localToGlobalStream;
-
-  // private int parallelism;
-
-  @Override
-  public void init(TopologyBuilder builder, Instances dataset, int 
parallelism) {
-    this.builder = builder;
-    this.dataset = dataset;
-    // this.parallelism = parallelism;
-    this.setLayout();
-  }
-
-  protected void setLayout() {
-    // Distributor
-    // distributorP = new ClusteringDistributorProcessor();
-    // this.builder.addProcessor(distributorP, parallelism);
-    // distributorToLocalStream = this.builder.createStream(distributorP);
-    // distributorP.setOutputStream(distributorToLocalStream);
-    // distributorToGlobalStream = this.builder.createStream(distributorP);
-
-    // Local Clustering
-    learnerP = new LocalClustererProcessor();
-    LocalClustererAdapter learner = (LocalClustererAdapter) 
this.learnerOption.getValue();
-    learner.setDataset(this.dataset);
-    learnerP.setLearner(learner);
-    builder.addProcessor(learnerP, this.paralellismOption.getValue());
-    localToGlobalStream = this.builder.createStream(learnerP);
-    learnerP.setOutputStream(localToGlobalStream);
-
-    // Global Clustering
-    LocalClustererProcessor globalClusteringCombinerP = new 
LocalClustererProcessor();
-    LocalClustererAdapter globalLearner = (LocalClustererAdapter) 
this.learnerOption.getValue();
-    globalLearner.setDataset(this.dataset);
-    globalClusteringCombinerP.setLearner(learner);
-    builder.addProcessor(globalClusteringCombinerP, 1);
-    builder.connectInputAllStream(localToGlobalStream, 
globalClusteringCombinerP);
-
-    // Output Stream
-    resultStream = this.builder.createStream(globalClusteringCombinerP);
-    globalClusteringCombinerP.setOutputStream(resultStream);
-  }
-
-  @Override
-  public Processor getInputProcessor() {
-    // return distributorP;
-    return learnerP;
-  }
-
-  @Override
-  public Set<Stream> getResultStreams() {
-    Set<Stream> streams = ImmutableSet.of(this.resultStream);
-    return streams;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/AbstractMOAObject.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/AbstractMOAObject.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/AbstractMOAObject.java
deleted file mode 100644
index f951bf3..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/AbstractMOAObject.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package com.yahoo.labs.samoa.moa;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.moa.core.SerializeUtils;
-
-//import moa.core.SizeOf;
-
-/**
- * Abstract MOA Object. All classes that are serializable, copiable, can 
measure its size, and can give a description,
- * extend this class.
- * 
- * @author Richard Kirkby ([email protected])
- * @version $Revision: 7 $
- */
-public abstract class AbstractMOAObject implements MOAObject {
-
-  @Override
-  public MOAObject copy() {
-    return copy(this);
-  }
-
-  @Override
-  public int measureByteSize() {
-    return measureByteSize(this);
-  }
-
-  /**
-   * Returns a description of the object.
-   * 
-   * @return a description of the object
-   */
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    getDescription(sb, 0);
-    return sb.toString();
-  }
-
-  /**
-   * This method produces a copy of an object.
-   * 
-   * @param obj
-   *          object to copy
-   * @return a copy of the object
-   */
-  public static MOAObject copy(MOAObject obj) {
-    try {
-      return (MOAObject) SerializeUtils.copyObject(obj);
-    } catch (Exception e) {
-      throw new RuntimeException("Object copy failed.", e);
-    }
-  }
-
-  /**
-   * Gets the memory size of an object.
-   * 
-   * @param obj
-   *          object to measure the memory size
-   * @return the memory size of this object
-   */
-  public static int measureByteSize(MOAObject obj) {
-    return 0; // (int) SizeOf.fullSizeOf(obj);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/MOAObject.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/MOAObject.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/MOAObject.java
deleted file mode 100644
index 3a714c4..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/MOAObject.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.yahoo.labs.samoa.moa;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.Serializable;
-
-/**
- * Interface implemented by classes in MOA, so that all are serializable, can 
produce copies of their objects, and can
- * measure its memory size. They also give a string description.
- * 
- * @author Richard Kirkby ([email protected])
- * @version $Revision: 7 $
- */
-public interface MOAObject extends Serializable {
-
-  /**
-   * Gets the memory size of this object.
-   * 
-   * @return the memory size of this object
-   */
-  public int measureByteSize();
-
-  /**
-   * This method produces a copy of this object.
-   * 
-   * @return a copy of this object
-   */
-  public MOAObject copy();
-
-  /**
-   * Returns a string representation of this object. Used in 
<code>AbstractMOAObject.toString</code> to give a string
-   * representation of the object.
-   * 
-   * @param sb
-   *          the stringbuilder to add the description
-   * @param indent
-   *          the number of characters to indent
-   */
-  public void getDescription(StringBuilder sb, int indent);
-}


Reply via email to