http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeContentEvent.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeContentEvent.java
new file mode 100644
index 0000000..9976599
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/AttributeContentEvent.java
@@ -0,0 +1,224 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Attribute Content Event represents the instances that split vertically 
based on their attribute
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+public final class AttributeContentEvent implements ContentEvent {
+
+  private static final long serialVersionUID = 6652815649846676832L;
+
+  private final long learningNodeId;
+  private final int obsIndex;
+  private final double attrVal;
+  private final int classVal;
+  private final double weight;
+  private final transient String key;
+  private final boolean isNominal;
+
+  public AttributeContentEvent() {
+    learningNodeId = -1;
+    obsIndex = -1;
+    attrVal = 0.0;
+    classVal = -1;
+    weight = 0.0;
+    key = "";
+    isNominal = true;
+  }
+
+  private AttributeContentEvent(Builder builder) {
+    this.learningNodeId = builder.learningNodeId;
+    this.obsIndex = builder.obsIndex;
+    this.attrVal = builder.attrVal;
+    this.classVal = builder.classVal;
+    this.weight = builder.weight;
+    this.isNominal = builder.isNominal;
+    this.key = builder.key;
+  }
+
+  @Override
+  public String getKey() {
+    return this.key;
+  }
+
+  @Override
+  public void setKey(String str) {
+    // do nothing, maybe useful when we want to reuse the object for
+    // serialization/deserialization purpose
+  }
+
+  @Override
+  public boolean isLastEvent() {
+    return false;
+  }
+
+  long getLearningNodeId() {
+    return this.learningNodeId;
+  }
+
+  int getObsIndex() {
+    return this.obsIndex;
+  }
+
+  int getClassVal() {
+    return this.classVal;
+  }
+
+  double getAttrVal() {
+    return this.attrVal;
+  }
+
+  double getWeight() {
+    return this.weight;
+  }
+
+  boolean isNominal() {
+    return this.isNominal;
+  }
+
+  static final class Builder {
+
+    // required parameters
+    private final long learningNodeId;
+    private final int obsIndex;
+    private final String key;
+
+    // optional parameters
+    private double attrVal = 0.0;
+    private int classVal = 0;
+    private double weight = 0.0;
+    private boolean isNominal = false;
+
+    Builder(long id, int obsIndex, String key) {
+      this.learningNodeId = id;
+      this.obsIndex = obsIndex;
+      this.key = key;
+    }
+
+    private Builder(long id, int obsIndex) {
+      this.learningNodeId = id;
+      this.obsIndex = obsIndex;
+      this.key = "";
+    }
+
+    Builder attrValue(double val) {
+      this.attrVal = val;
+      return this;
+    }
+
+    Builder classValue(int val) {
+      this.classVal = val;
+      return this;
+    }
+
+    Builder weight(double val) {
+      this.weight = val;
+      return this;
+    }
+
+    Builder isNominal(boolean val) {
+      this.isNominal = val;
+      return this;
+    }
+
+    AttributeContentEvent build() {
+      return new AttributeContentEvent(this);
+    }
+  }
+
+  /**
+   * The Kryo serializer class for AttributeContentEvent when executing on top 
of Storm. This class allow us to change
+   * the precision of the statistics.
+   * 
+   * @author Arinto Murdopo
+   * 
+   */
+  public static final class AttributeCESerializer extends 
Serializer<AttributeContentEvent> {
+
+    private static double PRECISION = 1000000.0;
+
+    @Override
+    public void write(Kryo kryo, Output output, AttributeContentEvent event) {
+      output.writeLong(event.learningNodeId, true);
+      output.writeInt(event.obsIndex, true);
+      output.writeDouble(event.attrVal, PRECISION, true);
+      output.writeInt(event.classVal, true);
+      output.writeDouble(event.weight, PRECISION, true);
+      output.writeBoolean(event.isNominal);
+    }
+
+    @Override
+    public AttributeContentEvent read(Kryo kryo, Input input,
+        Class<AttributeContentEvent> type) {
+      AttributeContentEvent ace = new 
AttributeContentEvent.Builder(input.readLong(true), input.readInt(true))
+          .attrValue(input.readDouble(PRECISION, true))
+          .classValue(input.readInt(true))
+          .weight(input.readDouble(PRECISION, true))
+          .isNominal(input.readBoolean())
+          .build();
+      return ace;
+    }
+  }
+
+  /**
+   * The Kryo serializer class for AttributeContentEvent when executing on top 
of Storm with full precision of the
+   * statistics.
+   * 
+   * @author Arinto Murdopo
+   * 
+   */
+  public static final class AttributeCEFullPrecSerializer extends 
Serializer<AttributeContentEvent> {
+
+    @Override
+    public void write(Kryo kryo, Output output, AttributeContentEvent event) {
+      output.writeLong(event.learningNodeId, true);
+      output.writeInt(event.obsIndex, true);
+      output.writeDouble(event.attrVal);
+      output.writeInt(event.classVal, true);
+      output.writeDouble(event.weight);
+      output.writeBoolean(event.isNominal);
+    }
+
+    @Override
+    public AttributeContentEvent read(Kryo kryo, Input input,
+        Class<AttributeContentEvent> type) {
+      AttributeContentEvent ace = new 
AttributeContentEvent.Builder(input.readLong(true), input.readInt(true))
+          .attrValue(input.readDouble())
+          .classValue(input.readInt(true))
+          .weight(input.readDouble())
+          .isNominal(input.readBoolean())
+          .build();
+      return ace;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java
new file mode 100644
index 0000000..fe56cc1
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ComputeContentEvent.java
@@ -0,0 +1,145 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Compute content event is the message that is sent by Model Aggregator 
Processor to request Local Statistic PI to
+ * start the local statistic calculation for splitting
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+public final class ComputeContentEvent extends ControlContentEvent {
+
+  private static final long serialVersionUID = 5590798490073395190L;
+
+  private final double[] preSplitDist;
+  private final long splitId;
+
+  public ComputeContentEvent() {
+    super(-1);
+    preSplitDist = null;
+    splitId = -1;
+  }
+
+  ComputeContentEvent(long splitId, long id, double[] preSplitDist) {
+    super(id);
+    // this.preSplitDist = Arrays.copyOf(preSplitDist, preSplitDist.length);
+    this.preSplitDist = preSplitDist;
+    this.splitId = splitId;
+  }
+
+  @Override
+  LocStatControl getType() {
+    return LocStatControl.COMPUTE;
+  }
+
+  double[] getPreSplitDist() {
+    return this.preSplitDist;
+  }
+
+  long getSplitId() {
+    return this.splitId;
+  }
+
+  /**
+   * The Kryo serializer class for ComputeContentEevent when executing on top 
of Storm. This class allow us to change
+   * the precision of the statistics.
+   * 
+   * @author Arinto Murdopo
+   * 
+   */
+  public static final class ComputeCESerializer extends 
Serializer<ComputeContentEvent> {
+
+    private static double PRECISION = 1000000.0;
+
+    @Override
+    public void write(Kryo kryo, Output output, ComputeContentEvent object) {
+      output.writeLong(object.splitId, true);
+      output.writeLong(object.learningNodeId, true);
+
+      output.writeInt(object.preSplitDist.length, true);
+      for (int i = 0; i < object.preSplitDist.length; i++) {
+        output.writeDouble(object.preSplitDist[i], PRECISION, true);
+      }
+    }
+
+    @Override
+    public ComputeContentEvent read(Kryo kryo, Input input,
+        Class<ComputeContentEvent> type) {
+      long splitId = input.readLong(true);
+      long learningNodeId = input.readLong(true);
+
+      int dataLength = input.readInt(true);
+      double[] preSplitDist = new double[dataLength];
+
+      for (int i = 0; i < dataLength; i++) {
+        preSplitDist[i] = input.readDouble(PRECISION, true);
+      }
+
+      return new ComputeContentEvent(splitId, learningNodeId, preSplitDist);
+    }
+  }
+
+  /**
+   * The Kryo serializer class for ComputeContentEevent when executing on top 
of Storm with full precision of the
+   * statistics.
+   * 
+   * @author Arinto Murdopo
+   * 
+   */
+  public static final class ComputeCEFullPrecSerializer extends 
Serializer<ComputeContentEvent> {
+
+    @Override
+    public void write(Kryo kryo, Output output, ComputeContentEvent object) {
+      output.writeLong(object.splitId, true);
+      output.writeLong(object.learningNodeId, true);
+
+      output.writeInt(object.preSplitDist.length, true);
+      for (int i = 0; i < object.preSplitDist.length; i++) {
+        output.writeDouble(object.preSplitDist[i]);
+      }
+    }
+
+    @Override
+    public ComputeContentEvent read(Kryo kryo, Input input,
+        Class<ComputeContentEvent> type) {
+      long splitId = input.readLong(true);
+      long learningNodeId = input.readLong(true);
+
+      int dataLength = input.readInt(true);
+      double[] preSplitDist = new double[dataLength];
+
+      for (int i = 0; i < dataLength; i++) {
+        preSplitDist[i] = input.readDouble();
+      }
+
+      return new ComputeContentEvent(splitId, learningNodeId, preSplitDist);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ControlContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ControlContentEvent.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ControlContentEvent.java
new file mode 100644
index 0000000..18d5f06
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ControlContentEvent.java
@@ -0,0 +1,72 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ * Abstract class to represent ContentEvent to control Local Statistic 
Processor.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+abstract class ControlContentEvent implements ContentEvent {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = 5837375639629708363L;
+
+  protected final long learningNodeId;
+
+  public ControlContentEvent() {
+    this.learningNodeId = -1;
+  }
+
+  ControlContentEvent(long id) {
+    this.learningNodeId = id;
+  }
+
+  @Override
+  public final String getKey() {
+    return null;
+  }
+
+  @Override
+  public void setKey(String str) {
+    // Do nothing
+  }
+
+  @Override
+  public boolean isLastEvent() {
+    return false;
+  }
+
+  final long getLearningNodeId() {
+    return this.learningNodeId;
+  }
+
+  abstract LocStatControl getType();
+
+  static enum LocStatControl {
+    COMPUTE, DELETE
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/DeleteContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/DeleteContentEvent.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/DeleteContentEvent.java
new file mode 100644
index 0000000..a834f2f
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/DeleteContentEvent.java
@@ -0,0 +1,47 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Delete Content Event is the content event that is sent by Model Aggregator 
Processor to delete unnecessary statistic
+ * in Local Statistic Processor.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+final class DeleteContentEvent extends ControlContentEvent {
+
+  private static final long serialVersionUID = -2105250722560863633L;
+
+  public DeleteContentEvent() {
+    super(-1);
+  }
+
+  DeleteContentEvent(long id) {
+    super(id);
+  }
+
+  @Override
+  LocStatControl getType() {
+    return LocStatControl.DELETE;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java
new file mode 100644
index 0000000..0af8b93
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FilterProcessor.java
@@ -0,0 +1,191 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.apache.samoa.learners.InstancesContentEvent;
+import org.apache.samoa.learners.ResultContentEvent;
+import org.apache.samoa.topology.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Filter Processor that stores and filters the instances before sending them 
to the Model Aggregator Processor.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+final class FilterProcessor implements Processor {
+
+  private static final long serialVersionUID = -1685875718300564885L;
+  private static final Logger logger = 
LoggerFactory.getLogger(FilterProcessor.class);
+
+  private int processorId;
+
+  private final Instances dataset;
+  private InstancesHeader modelContext;
+
+  // available streams
+  private Stream outputStream;
+
+  // private constructor based on Builder pattern
+  private FilterProcessor(Builder builder) {
+    this.dataset = builder.dataset;
+    this.batchSize = builder.batchSize;
+    this.delay = builder.delay;
+  }
+
+  private int waitingInstances = 0;
+
+  private int delay = 0;
+
+  private int batchSize = 200;
+
+  private List<InstanceContentEvent> contentEventList = new 
LinkedList<InstanceContentEvent>();
+
+  @Override
+  public boolean process(ContentEvent event) {
+    // Receive a new instance from source
+    if (event instanceof InstanceContentEvent) {
+      InstanceContentEvent instanceContentEvent = (InstanceContentEvent) event;
+      this.contentEventList.add(instanceContentEvent);
+      this.waitingInstances++;
+      if (this.waitingInstances == this.batchSize || 
instanceContentEvent.isLastEvent()) {
+        // Send Instances
+        InstancesContentEvent outputEvent = new 
InstancesContentEvent(instanceContentEvent);
+        boolean isLastEvent = false;
+        while (!this.contentEventList.isEmpty()) {
+          InstanceContentEvent ice = this.contentEventList.remove(0);
+          Instance inst = ice.getInstance();
+          outputEvent.add(inst);
+          if (!isLastEvent) {
+            isLastEvent = ice.isLastEvent();
+          }
+        }
+        outputEvent.setLast(isLastEvent);
+        this.waitingInstances = 0;
+        this.outputStream.put(outputEvent);
+        if (this.delay > 0) {
+          try {
+            Thread.sleep(this.delay);
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public void onCreate(int id) {
+    this.processorId = id;
+    this.waitingInstances = 0;
+
+  }
+
+  @Override
+  public Processor newProcessor(Processor p) {
+    FilterProcessor oldProcessor = (FilterProcessor) p;
+    FilterProcessor newProcessor =
+        new FilterProcessor.Builder(oldProcessor).build();
+
+    newProcessor.setOutputStream(oldProcessor.outputStream);
+    return newProcessor;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(super.toString());
+    return sb.toString();
+  }
+
+  void setOutputStream(Stream outputStream) {
+    this.outputStream = outputStream;
+  }
+
+  /**
+   * Helper method to generate new ResultContentEvent based on an instance and 
its prediction result.
+   * 
+   * @param prediction
+   *          The predicted class label from the decision tree model.
+   * @param inEvent
+   *          The associated instance content event
+   * @return ResultContentEvent to be sent into Evaluator PI or other 
destination PI.
+   */
+  private ResultContentEvent newResultContentEvent(double[] prediction, 
InstanceContentEvent inEvent) {
+    ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
+        inEvent.getClassId(), prediction, inEvent.isLastEvent());
+    rce.setClassifierIndex(this.processorId);
+    rce.setEvaluationIndex(inEvent.getEvaluationIndex());
+    return rce;
+  }
+
+  /**
+   * Builder class to replace constructors with many parameters
+   * 
+   * @author Arinto Murdopo
+   * 
+   */
+  static class Builder {
+
+    // required parameters
+    private final Instances dataset;
+
+    private int delay = 0;
+
+    private int batchSize = 200;
+
+    Builder(Instances dataset) {
+      this.dataset = dataset;
+    }
+
+    Builder(FilterProcessor oldProcessor) {
+      this.dataset = oldProcessor.dataset;
+      this.delay = oldProcessor.delay;
+      this.batchSize = oldProcessor.batchSize;
+    }
+
+    public Builder delay(int delay) {
+      this.delay = delay;
+      return this;
+    }
+
+    public Builder batchSize(int val) {
+      this.batchSize = val;
+      return this;
+    }
+
+    FilterProcessor build() {
+      return new FilterProcessor(this);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java
new file mode 100644
index 0000000..c8522c8
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/FoundNode.java
@@ -0,0 +1,77 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Class that represents the necessary data structure of the node where an 
instance is routed/filtered through the
+ * decision tree model.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+final class FoundNode implements java.io.Serializable {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = -637695387934143293L;
+
+  private final Node node;
+  private final SplitNode parent;
+  private final int parentBranch;
+
+  FoundNode(Node node, SplitNode splitNode, int parentBranch) {
+    this.node = node;
+    this.parent = splitNode;
+    this.parentBranch = parentBranch;
+  }
+
+  /**
+   * Method to get the node where an instance is routed/filtered through the 
decision tree model for testing and
+   * training.
+   * 
+   * @return The node where the instance is routed/filtered
+   */
+  Node getNode() {
+    return this.node;
+  }
+
+  /**
+   * Method to get the parent of the node where an instance is routed/filtered 
through the decision tree model for
+   * testing and training
+   * 
+   * @return The parent of the node
+   */
+  SplitNode getParent() {
+    return this.parent;
+  }
+
+  /**
+   * Method to get the index of the node (where an instance is routed/filtered 
through the decision tree model for
+   * testing and training) in its parent.
+   * 
+   * @return The index of the node in its parent node.
+   */
+  int getParentBranch() {
+    return this.parentBranch;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java
new file mode 100644
index 0000000..e4df577
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/InactiveLearningNode.java
@@ -0,0 +1,54 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.instances.Instance;
+
+/**
+ * Class that represents inactive learning node. Inactive learning node is a 
node which only keeps track of the observed
+ * class distribution. It does not store the statistic for splitting the node.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+final class InactiveLearningNode extends LearningNode {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = -814552382883472302L;
+
+  InactiveLearningNode(double[] initialClassObservation) {
+    super(initialClassObservation);
+  }
+
+  @Override
+  void learnFromInstance(Instance inst, ModelAggregatorProcessor proc) {
+    this.observedClassDistribution.addToValue(
+        (int) inst.classValue(), inst.weight());
+  }
+
+  @Override
+  double[] getClassVotes(Instance inst, ModelAggregatorProcessor map) {
+    return this.observedClassDistribution.getArrayCopy();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java
new file mode 100644
index 0000000..9b0480c
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LearningNode.java
@@ -0,0 +1,59 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.instances.Instance;
+
+/**
+ * Abstract class that represents a learning node
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+abstract class LearningNode extends Node {
+
+  private static final long serialVersionUID = 7157319356146764960L;
+
+  protected LearningNode(double[] classObservation) {
+    super(classObservation);
+  }
+
+  /**
+   * Method to process the instance for learning
+   * 
+   * @param inst
+   *          The processed instance
+   * @param proc
+   *          The model aggregator processor where this learning node exists
+   */
+  abstract void learnFromInstance(Instance inst, ModelAggregatorProcessor 
proc);
+
+  @Override
+  protected boolean isLeaf() {
+    return true;
+  }
+
+  @Override
+  protected FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent,
+      int parentBranch) {
+    return new FoundNode(this, parent, parentBranch);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java
new file mode 100644
index 0000000..fc9f39c
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalResultContentEvent.java
@@ -0,0 +1,95 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.moa.classifiers.core.AttributeSplitSuggestion;
+
+/**
+ * Local Result Content Event is the content event that represents local 
calculation of statistic in Local Statistic
+ * Processor.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+final class LocalResultContentEvent implements ContentEvent {
+
+  private static final long serialVersionUID = -4206620993777418571L;
+
+  private final AttributeSplitSuggestion bestSuggestion;
+  private final AttributeSplitSuggestion secondBestSuggestion;
+  private final long splitId;
+
+  public LocalResultContentEvent() {
+    bestSuggestion = null;
+    secondBestSuggestion = null;
+    splitId = -1;
+  }
+
+  LocalResultContentEvent(long splitId, AttributeSplitSuggestion best, 
AttributeSplitSuggestion secondBest) {
+    this.splitId = splitId;
+    this.bestSuggestion = best;
+    this.secondBestSuggestion = secondBest;
+  }
+
+  @Override
+  public String getKey() {
+    return null;
+  }
+
+  /**
+   * Method to return the best attribute split suggestion from this local 
statistic calculation.
+   * 
+   * @return The best attribute split suggestion.
+   */
+  AttributeSplitSuggestion getBestSuggestion() {
+    return this.bestSuggestion;
+  }
+
+  /**
+   * Method to return the second best attribute split suggestion from this 
local statistic calculation.
+   * 
+   * @return The second best attribute split suggestion.
+   */
+  AttributeSplitSuggestion getSecondBestSuggestion() {
+    return this.secondBestSuggestion;
+  }
+
+  /**
+   * Method to get the split ID of this local statistic calculation result
+   * 
+   * @return The split id of this local calculation result
+   */
+  long getSplitId() {
+    return this.splitId;
+  }
+
+  @Override
+  public void setKey(String str) {
+    // do nothing
+
+  }
+
+  @Override
+  public boolean isLastEvent() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
new file mode 100644
index 0000000..7ce46ec
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/LocalStatisticsProcessor.java
@@ -0,0 +1,242 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Vector;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.moa.classifiers.core.AttributeSplitSuggestion;
+import 
org.apache.samoa.moa.classifiers.core.attributeclassobservers.AttributeClassObserver;
+import 
org.apache.samoa.moa.classifiers.core.attributeclassobservers.GaussianNumericAttributeClassObserver;
+import 
org.apache.samoa.moa.classifiers.core.attributeclassobservers.NominalAttributeClassObserver;
+import 
org.apache.samoa.moa.classifiers.core.splitcriteria.InfoGainSplitCriterion;
+import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
+import org.apache.samoa.topology.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+
+/**
+ * Local Statistic Processor contains the local statistic of a subset of the 
attributes.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+final class LocalStatisticsProcessor implements Processor {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = -3967695130634517631L;
+  private static Logger logger = 
LoggerFactory.getLogger(LocalStatisticsProcessor.class);
+
+  // Collection of AttributeObservers, for each ActiveLearningNode and
+  // AttributeId
+  private Table<Long, Integer, AttributeClassObserver> localStats;
+
+  private Stream computationResultStream;
+
+  private final SplitCriterion splitCriterion;
+  private final boolean binarySplit;
+  private final AttributeClassObserver nominalClassObserver;
+  private final AttributeClassObserver numericClassObserver;
+
+  // the two observer classes below are also needed to be setup from the Tree
+  private LocalStatisticsProcessor(Builder builder) {
+    this.splitCriterion = builder.splitCriterion;
+    this.binarySplit = builder.binarySplit;
+    this.nominalClassObserver = builder.nominalClassObserver;
+    this.numericClassObserver = builder.numericClassObserver;
+  }
+
+  @Override
+  public boolean process(ContentEvent event) {
+    // process AttributeContentEvent by updating the subset of local statistics
+    if (event instanceof AttributeBatchContentEvent) {
+      AttributeBatchContentEvent abce = (AttributeBatchContentEvent) event;
+      List<ContentEvent> contentEventList = abce.getContentEventList();
+      for (ContentEvent contentEvent : contentEventList) {
+        AttributeContentEvent ace = (AttributeContentEvent) contentEvent;
+        Long learningNodeId = ace.getLearningNodeId();
+        Integer obsIndex = ace.getObsIndex();
+
+        AttributeClassObserver obs = localStats.get(
+            learningNodeId, obsIndex);
+
+        if (obs == null) {
+          obs = ace.isNominal() ? newNominalClassObserver()
+              : newNumericClassObserver();
+          localStats.put(ace.getLearningNodeId(), obsIndex, obs);
+        }
+        obs.observeAttributeClass(ace.getAttrVal(), ace.getClassVal(),
+            ace.getWeight());
+      }
+
+      /*
+       * if (event instanceof AttributeContentEvent) { AttributeContentEvent 
ace
+       * = (AttributeContentEvent) event; Long learningNodeId =
+       * Long.valueOf(ace.getLearningNodeId()); Integer obsIndex =
+       * Integer.valueOf(ace.getObsIndex());
+       * 
+       * AttributeClassObserver obs = localStats.get( learningNodeId, 
obsIndex);
+       * 
+       * if (obs == null) { obs = ace.isNominal() ? newNominalClassObserver() :
+       * newNumericClassObserver(); localStats.put(ace.getLearningNodeId(),
+       * obsIndex, obs); } obs.observeAttributeClass(ace.getAttrVal(),
+       * ace.getClassVal(), ace.getWeight());
+       */
+    } else if (event instanceof ComputeContentEvent) {
+      // process ComputeContentEvent by calculating the local statistic
+      // and send back the calculation results via computation result stream.
+      ComputeContentEvent cce = (ComputeContentEvent) event;
+      Long learningNodeId = cce.getLearningNodeId();
+      double[] preSplitDist = cce.getPreSplitDist();
+
+      Map<Integer, AttributeClassObserver> learningNodeRowMap = localStats
+          .row(learningNodeId);
+      List<AttributeSplitSuggestion> suggestions = new Vector<>();
+
+      for (Entry<Integer, AttributeClassObserver> entry : 
learningNodeRowMap.entrySet()) {
+        AttributeClassObserver obs = entry.getValue();
+        AttributeSplitSuggestion suggestion = obs
+            .getBestEvaluatedSplitSuggestion(splitCriterion,
+                preSplitDist, entry.getKey(), binarySplit);
+        if (suggestion != null) {
+          suggestions.add(suggestion);
+        }
+      }
+
+      AttributeSplitSuggestion[] bestSuggestions = suggestions
+          .toArray(new AttributeSplitSuggestion[suggestions.size()]);
+
+      Arrays.sort(bestSuggestions);
+
+      AttributeSplitSuggestion bestSuggestion = null;
+      AttributeSplitSuggestion secondBestSuggestion = null;
+
+      if (bestSuggestions.length >= 1) {
+        bestSuggestion = bestSuggestions[bestSuggestions.length - 1];
+
+        if (bestSuggestions.length >= 2) {
+          secondBestSuggestion = bestSuggestions[bestSuggestions.length - 2];
+        }
+      }
+
+      // create the local result content event
+      LocalResultContentEvent lcre =
+          new LocalResultContentEvent(cce.getSplitId(), bestSuggestion, 
secondBestSuggestion);
+      computationResultStream.put(lcre);
+      logger.debug("Finish compute event");
+    } else if (event instanceof DeleteContentEvent) {
+      DeleteContentEvent dce = (DeleteContentEvent) event;
+      Long learningNodeId = dce.getLearningNodeId();
+      localStats.rowMap().remove(learningNodeId);
+    }
+    return false;
+  }
+
+  @Override
+  public void onCreate(int id) {
+    this.localStats = HashBasedTable.create();
+  }
+
+  @Override
+  public Processor newProcessor(Processor p) {
+    LocalStatisticsProcessor oldProcessor = (LocalStatisticsProcessor) p;
+    LocalStatisticsProcessor newProcessor = new 
LocalStatisticsProcessor.Builder(oldProcessor).build();
+
+    
newProcessor.setComputationResultStream(oldProcessor.computationResultStream);
+
+    return newProcessor;
+  }
+
+  /**
+   * Method to set the computation result when using this processor to build a 
topology.
+   * 
+   * @param computeStream
+   */
+  void setComputationResultStream(Stream computeStream) {
+    this.computationResultStream = computeStream;
+  }
+
+  private AttributeClassObserver newNominalClassObserver() {
+    return (AttributeClassObserver) this.nominalClassObserver.copy();
+  }
+
+  private AttributeClassObserver newNumericClassObserver() {
+    return (AttributeClassObserver) this.numericClassObserver.copy();
+  }
+
+  /**
+   * Builder class to replace constructors with many parameters
+   * 
+   * @author Arinto Murdopo
+   * 
+   */
+  static class Builder {
+
+    private SplitCriterion splitCriterion = new InfoGainSplitCriterion();
+    private boolean binarySplit = false;
+    private AttributeClassObserver nominalClassObserver = new 
NominalAttributeClassObserver();
+    private AttributeClassObserver numericClassObserver = new 
GaussianNumericAttributeClassObserver();
+
+    Builder() {
+
+    }
+
+    Builder(LocalStatisticsProcessor oldProcessor) {
+      this.splitCriterion = oldProcessor.splitCriterion;
+      this.binarySplit = oldProcessor.binarySplit;
+    }
+
+    Builder splitCriterion(SplitCriterion splitCriterion) {
+      this.splitCriterion = splitCriterion;
+      return this;
+    }
+
+    Builder binarySplit(boolean binarySplit) {
+      this.binarySplit = binarySplit;
+      return this;
+    }
+
+    Builder nominalClassObserver(AttributeClassObserver nominalClassObserver) {
+      this.nominalClassObserver = nominalClassObserver;
+      return this;
+    }
+
+    Builder numericClassObserver(AttributeClassObserver numericClassObserver) {
+      this.numericClassObserver = numericClassObserver;
+      return this;
+    }
+
+    LocalStatisticsProcessor build() {
+      return new LocalStatisticsProcessor(this);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
new file mode 100644
index 0000000..1e79f48
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/ModelAggregatorProcessor.java
@@ -0,0 +1,746 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.apache.samoa.learners.InstancesContentEvent;
+import org.apache.samoa.learners.ResultContentEvent;
+import org.apache.samoa.moa.classifiers.core.AttributeSplitSuggestion;
+import org.apache.samoa.moa.classifiers.core.driftdetection.ChangeDetector;
+import 
org.apache.samoa.moa.classifiers.core.splitcriteria.InfoGainSplitCriterion;
+import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
+import org.apache.samoa.topology.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samoa.moa.core.Utils.maxIndex;
+
+/**
+ * Model Aggegator Processor consists of the decision tree model. It connects 
to local-statistic PI via attribute stream
+ * and control stream. Model-aggregator PI sends the split instances via 
attribute stream and it sends control messages
+ * to ask local-statistic PI to perform computation via control stream.
+ * 
+ * Model-aggregator PI sends the classification result via result stream to an 
evaluator PI for classifier or other
+ * destination PI. The calculation results from local statistic arrive to the 
model-aggregator PI via computation-result
+ * stream.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+final class ModelAggregatorProcessor implements Processor {
+
+  private static final long serialVersionUID = -1685875718300564886L;
+  private static final Logger logger = 
LoggerFactory.getLogger(ModelAggregatorProcessor.class);
+
+  private int processorId;
+
+  private Node treeRoot;
+
+  private int activeLeafNodeCount;
+  private int inactiveLeafNodeCount;
+  private int decisionNodeCount;
+  private boolean growthAllowed;
+
+  private final Instances dataset;
+
+  // to support concurrent split
+  private long splitId;
+  private ConcurrentMap<Long, SplittingNodeInfo> splittingNodes;
+  private BlockingQueue<Long> timedOutSplittingNodes;
+
+  // available streams
+  private Stream resultStream;
+  private Stream attributeStream;
+  private Stream controlStream;
+
+  private transient ScheduledExecutorService executor;
+
+  private final SplitCriterion splitCriterion;
+  private final double splitConfidence;
+  private final double tieThreshold;
+  private final int gracePeriod;
+  private final int parallelismHint;
+  private final long timeOut;
+
+  // private constructor based on Builder pattern
+  private ModelAggregatorProcessor(Builder builder) {
+    this.dataset = builder.dataset;
+    this.splitCriterion = builder.splitCriterion;
+    this.splitConfidence = builder.splitConfidence;
+    this.tieThreshold = builder.tieThreshold;
+    this.gracePeriod = builder.gracePeriod;
+    this.parallelismHint = builder.parallelismHint;
+    this.timeOut = builder.timeOut;
+    this.changeDetector = builder.changeDetector;
+
+    InstancesHeader ih = new InstancesHeader(dataset);
+    this.setModelContext(ih);
+  }
+
+  @Override
+  public boolean process(ContentEvent event) {
+
+    // Poll the blocking queue shared between ModelAggregator and the time-out
+    // threads
+    Long timedOutSplitId = timedOutSplittingNodes.poll();
+    if (timedOutSplitId != null) { // time out has been reached!
+      SplittingNodeInfo splittingNode = splittingNodes.get(timedOutSplitId);
+      if (splittingNode != null) {
+        this.splittingNodes.remove(timedOutSplitId);
+        this.continueAttemptToSplit(splittingNode.activeLearningNode,
+            splittingNode.foundNode);
+
+      }
+
+    }
+
+    // Receive a new instance from source
+    if (event instanceof InstancesContentEvent) {
+      InstancesContentEvent instancesEvent = (InstancesContentEvent) event;
+      this.processInstanceContentEvent(instancesEvent);
+      // Send information to local-statistic PI
+      // for each of the nodes
+      if (this.foundNodeSet != null) {
+        for (FoundNode foundNode : this.foundNodeSet) {
+          ActiveLearningNode leafNode = (ActiveLearningNode) 
foundNode.getNode();
+          AttributeBatchContentEvent[] abce = 
leafNode.getAttributeBatchContentEvent();
+          if (abce != null) {
+            for (int i = 0; i < this.dataset.numAttributes() - 1; i++) {
+              this.sendToAttributeStream(abce[i]);
+            }
+          }
+          leafNode.setAttributeBatchContentEvent(null);
+          // this.sendToControlStream(event); //split information
+          // See if we can ask for splits
+          if (!leafNode.isSplitting()) {
+            double weightSeen = leafNode.getWeightSeen();
+            // check whether it is the time for splitting
+            if (weightSeen - leafNode.getWeightSeenAtLastSplitEvaluation() >= 
this.gracePeriod) {
+              attemptToSplit(leafNode, foundNode);
+            }
+          }
+        }
+      }
+      this.foundNodeSet = null;
+    } else if (event instanceof LocalResultContentEvent) {
+      LocalResultContentEvent lrce = (LocalResultContentEvent) event;
+      Long lrceSplitId = lrce.getSplitId();
+      SplittingNodeInfo splittingNodeInfo = splittingNodes.get(lrceSplitId);
+
+      if (splittingNodeInfo != null) { // if null, that means
+        // activeLearningNode has been
+        // removed by timeout thread
+        ActiveLearningNode activeLearningNode = 
splittingNodeInfo.activeLearningNode;
+
+        activeLearningNode.addDistributedSuggestions(
+            lrce.getBestSuggestion(),
+            lrce.getSecondBestSuggestion());
+
+        if (activeLearningNode.isAllSuggestionsCollected()) {
+          splittingNodeInfo.scheduledFuture.cancel(false);
+          this.splittingNodes.remove(lrceSplitId);
+          this.continueAttemptToSplit(activeLearningNode,
+              splittingNodeInfo.foundNode);
+        }
+      }
+    }
+    return false;
+  }
+
+  protected Set<FoundNode> foundNodeSet;
+
+  @Override
+  public void onCreate(int id) {
+    this.processorId = id;
+
+    this.activeLeafNodeCount = 0;
+    this.inactiveLeafNodeCount = 0;
+    this.decisionNodeCount = 0;
+    this.growthAllowed = true;
+
+    this.splittingNodes = new ConcurrentHashMap<>();
+    this.timedOutSplittingNodes = new LinkedBlockingQueue<>();
+    this.splitId = 0;
+
+    // Executor for scheduling time-out threads
+    this.executor = Executors.newScheduledThreadPool(8);
+  }
+
+  @Override
+  public Processor newProcessor(Processor p) {
+    ModelAggregatorProcessor oldProcessor = (ModelAggregatorProcessor) p;
+    ModelAggregatorProcessor newProcessor =
+        new ModelAggregatorProcessor.Builder(oldProcessor).build();
+
+    newProcessor.setResultStream(oldProcessor.resultStream);
+    newProcessor.setAttributeStream(oldProcessor.attributeStream);
+    newProcessor.setControlStream(oldProcessor.controlStream);
+    return newProcessor;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(super.toString());
+
+    sb.append("ActiveLeafNodeCount: ").append(activeLeafNodeCount);
+    sb.append("InactiveLeafNodeCount: ").append(inactiveLeafNodeCount);
+    sb.append("DecisionNodeCount: ").append(decisionNodeCount);
+    sb.append("Growth allowed: ").append(growthAllowed);
+    return sb.toString();
+  }
+
+  void setResultStream(Stream resultStream) {
+    this.resultStream = resultStream;
+  }
+
+  void setAttributeStream(Stream attributeStream) {
+    this.attributeStream = attributeStream;
+  }
+
+  void setControlStream(Stream controlStream) {
+    this.controlStream = controlStream;
+  }
+
+  void sendToAttributeStream(ContentEvent event) {
+    this.attributeStream.put(event);
+  }
+
+  void sendToControlStream(ContentEvent event) {
+    this.controlStream.put(event);
+  }
+
+  /**
+   * Helper method to generate new ResultContentEvent based on an instance and 
its prediction result.
+   * 
+   * @param prediction
+   *          The predicted class label from the decision tree model.
+   * @param inEvent
+   *          The associated instance content event
+   * @return ResultContentEvent to be sent into Evaluator PI or other 
destination PI.
+   */
+  private ResultContentEvent newResultContentEvent(double[] prediction, 
InstanceContentEvent inEvent) {
+    ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inEvent.getInstance(),
+        inEvent.getClassId(), prediction, inEvent.isLastEvent());
+    rce.setClassifierIndex(this.processorId);
+    rce.setEvaluationIndex(inEvent.getEvaluationIndex());
+    return rce;
+  }
+
+  private ResultContentEvent newResultContentEvent(double[] prediction, 
Instance inst, InstancesContentEvent inEvent) {
+    ResultContentEvent rce = new 
ResultContentEvent(inEvent.getInstanceIndex(), inst, (int) inst.classValue(),
+        prediction, inEvent.isLastEvent());
+    rce.setClassifierIndex(this.processorId);
+    rce.setEvaluationIndex(inEvent.getEvaluationIndex());
+    return rce;
+  }
+
+  private List<InstancesContentEvent> contentEventList = new LinkedList<>();
+
+  /**
+   * Helper method to process the InstanceContentEvent
+   * 
+   * @param instContentEvent
+   */
+  private void processInstanceContentEvent(InstancesContentEvent 
instContentEvent) {
+    this.numBatches++;
+    this.contentEventList.add(instContentEvent);
+    if (this.numBatches == 1 || this.numBatches > 4) {
+      this.processInstances(this.contentEventList.remove(0));
+    }
+
+    if (instContentEvent.isLastEvent()) {
+      // drain remaining instances
+      while (!contentEventList.isEmpty()) {
+        processInstances(contentEventList.remove(0));
+      }
+    }
+
+  }
+
+  private int numBatches = 0;
+
+  private void processInstances(InstancesContentEvent instContentEvent) {
+
+    Instance[] instances = instContentEvent.getInstances();
+    boolean isTesting = instContentEvent.isTesting();
+    boolean isTraining = instContentEvent.isTraining();
+    for (Instance inst : instances) {
+      this.processInstance(inst, instContentEvent, isTesting, isTraining);
+    }
+  }
+
+  private void processInstance(Instance inst, InstancesContentEvent 
instContentEvent, boolean isTesting,
+      boolean isTraining) {
+    inst.setDataset(this.dataset);
+    // Check the instance whether it is used for testing or training
+    // boolean testAndTrain = isTraining; //Train after testing
+    double[] prediction = null;
+    if (isTesting) {
+      prediction = getVotesForInstance(inst, false);
+      this.resultStream.put(newResultContentEvent(prediction, inst,
+          instContentEvent));
+    }
+
+    if (isTraining) {
+      trainOnInstanceImpl(inst);
+      if (this.changeDetector != null) {
+        if (prediction == null) {
+          prediction = getVotesForInstance(inst);
+        }
+        boolean correctlyClassifies = this.correctlyClassifies(inst, 
prediction);
+        double oldEstimation = this.changeDetector.getEstimation();
+        this.changeDetector.input(correctlyClassifies ? 0 : 1);
+        if (this.changeDetector.getEstimation() > oldEstimation) {
+          // Start a new classifier
+          logger.info("Change detected, resetting the classifier");
+          this.resetLearning();
+          this.changeDetector.resetLearning();
+        }
+      }
+    }
+  }
+
+  private boolean correctlyClassifies(Instance inst, double[] prediction) {
+    return maxIndex(prediction) == (int) inst.classValue();
+  }
+
+  private void resetLearning() {
+    this.treeRoot = null;
+    // Remove nodes
+    FoundNode[] learningNodes = findNodes();
+    for (FoundNode learningNode : learningNodes) {
+      Node node = learningNode.getNode();
+      if (node instanceof SplitNode) {
+        SplitNode splitNode;
+        splitNode = (SplitNode) node;
+        for (int i = 0; i < splitNode.numChildren(); i++) {
+          splitNode.setChild(i, null);
+        }
+      }
+    }
+  }
+
+  protected FoundNode[] findNodes() {
+    List<FoundNode> foundList = new LinkedList<>();
+    findNodes(this.treeRoot, null, -1, foundList);
+    return foundList.toArray(new FoundNode[foundList.size()]);
+  }
+
+  protected void findNodes(Node node, SplitNode parent,
+      int parentBranch, List<FoundNode> found) {
+    if (node != null) {
+      found.add(new FoundNode(node, parent, parentBranch));
+      if (node instanceof SplitNode) {
+        SplitNode splitNode = (SplitNode) node;
+        for (int i = 0; i < splitNode.numChildren(); i++) {
+          findNodes(splitNode.getChild(i), splitNode, i,
+              found);
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper method to get the prediction result. The actual prediction result 
is delegated to the leaf node.
+   * 
+   * @param inst
+   * @return
+   */
+  private double[] getVotesForInstance(Instance inst) {
+    return getVotesForInstance(inst, false);
+  }
+
+  private double[] getVotesForInstance(Instance inst, boolean isTraining) {
+    double[] ret;
+    FoundNode foundNode = null;
+    if (this.treeRoot != null) {
+      foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1);
+      Node leafNode = foundNode.getNode();
+      if (leafNode == null) {
+        leafNode = foundNode.getParent();
+      }
+
+      ret = leafNode.getClassVotes(inst, this);
+    } else {
+      int numClasses = this.dataset.numClasses();
+      ret = new double[numClasses];
+
+    }
+
+    // Training after testing to speed up the process
+    if (isTraining) {
+      if (this.treeRoot == null) {
+        this.treeRoot = newLearningNode(this.parallelismHint);
+        this.activeLeafNodeCount = 1;
+        foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1);
+      }
+      trainOnInstanceImpl(foundNode, inst);
+    }
+    return ret;
+  }
+
+  /**
+   * Helper method that represent training of an instance. Since it is 
decision tree, this method routes the incoming
+   * instance into the correct leaf and then update the statistic on the found 
leaf.
+   * 
+   * @param inst
+   */
+  private void trainOnInstanceImpl(Instance inst) {
+    if (this.treeRoot == null) {
+      this.treeRoot = newLearningNode(this.parallelismHint);
+      this.activeLeafNodeCount = 1;
+
+    }
+    FoundNode foundNode = this.treeRoot.filterInstanceToLeaf(inst, null, -1);
+    trainOnInstanceImpl(foundNode, inst);
+  }
+
+  private void trainOnInstanceImpl(FoundNode foundNode, Instance inst) {
+
+    Node leafNode = foundNode.getNode();
+
+    if (leafNode == null) {
+      leafNode = newLearningNode(this.parallelismHint);
+      foundNode.getParent().setChild(foundNode.getParentBranch(), leafNode);
+      activeLeafNodeCount++;
+    }
+
+    if (leafNode instanceof LearningNode) {
+      LearningNode learningNode = (LearningNode) leafNode;
+      learningNode.learnFromInstance(inst, this);
+    }
+    if (this.foundNodeSet == null) {
+      this.foundNodeSet = new HashSet<>();
+    }
+    this.foundNodeSet.add(foundNode);
+  }
+
+  /**
+   * Helper method to represent a split attempt
+   * 
+   * @param activeLearningNode
+   *          The corresponding active learning node which will be split
+   * @param foundNode
+   *          The data structure to represents the filtering of the instance 
using the tree model.
+   */
+  private void attemptToSplit(ActiveLearningNode activeLearningNode, FoundNode 
foundNode) {
+    // Increment the split ID
+    this.splitId++;
+
+    // Schedule time-out thread
+    ScheduledFuture<?> timeOutHandler = this.executor.schedule(new 
AggregationTimeOutHandler(this.splitId,
+        this.timedOutSplittingNodes),
+        this.timeOut, TimeUnit.SECONDS);
+
+    // Keep track of the splitting node information, so that we can continue 
the
+    // split
+    // once we receive all local statistic calculation from Local Statistic PI
+    // this.splittingNodes.put(Long.valueOf(this.splitId), new
+    // SplittingNodeInfo(activeLearningNode, foundNode, null));
+    this.splittingNodes.put(this.splitId, new 
SplittingNodeInfo(activeLearningNode, foundNode, timeOutHandler));
+
+    // Inform Local Statistic PI to perform local statistic calculation
+    activeLearningNode.requestDistributedSuggestions(this.splitId, this);
+  }
+
+  /**
+   * Helper method to continue the attempt to split once all local calculation 
results are received.
+   * 
+   * @param activeLearningNode
+   *          The corresponding active learning node which will be split
+   * @param foundNode
+   *          The data structure to represents the filtering of the instance 
using the tree model.
+   */
+  private void continueAttemptToSplit(ActiveLearningNode activeLearningNode, 
FoundNode foundNode) {
+    AttributeSplitSuggestion bestSuggestion = 
activeLearningNode.getDistributedBestSuggestion();
+    AttributeSplitSuggestion secondBestSuggestion = 
activeLearningNode.getDistributedSecondBestSuggestion();
+
+    // compare with null split
+    double[] preSplitDist = activeLearningNode.getObservedClassDistribution();
+    AttributeSplitSuggestion nullSplit = new AttributeSplitSuggestion(null,
+        new double[0][], this.splitCriterion.getMeritOfSplit(
+            preSplitDist,
+            new double[][] { preSplitDist }));
+
+    if ((bestSuggestion == null) || (nullSplit.compareTo(bestSuggestion) > 0)) 
{
+      secondBestSuggestion = bestSuggestion;
+      bestSuggestion = nullSplit;
+    } else {
+      if ((secondBestSuggestion == null) || 
(nullSplit.compareTo(secondBestSuggestion) > 0)) {
+        secondBestSuggestion = nullSplit;
+      }
+    }
+
+    boolean shouldSplit = false;
+
+    if (secondBestSuggestion == null) {
+      shouldSplit = (bestSuggestion != null);
+    } else {
+      double hoeffdingBound = computeHoeffdingBound(
+          
this.splitCriterion.getRangeOfMerit(activeLearningNode.getObservedClassDistribution()),
+          this.splitConfidence,
+          activeLearningNode.getWeightSeen());
+
+      if ((bestSuggestion.merit - secondBestSuggestion.merit > hoeffdingBound)
+          || (hoeffdingBound < tieThreshold)) {
+        shouldSplit = true;
+      }
+      // TODO: add poor attributes removal
+    }
+
+    SplitNode parent = foundNode.getParent();
+    int parentBranch = foundNode.getParentBranch();
+
+    // split if the Hoeffding bound condition is satisfied
+    if (shouldSplit) {
+
+      if (bestSuggestion.splitTest != null) {
+        SplitNode newSplit = new SplitNode(bestSuggestion.splitTest, 
activeLearningNode.getObservedClassDistribution());
+
+        for (int i = 0; i < bestSuggestion.numSplits(); i++) {
+          Node newChild = 
newLearningNode(bestSuggestion.resultingClassDistributionFromSplit(i), 
this.parallelismHint);
+          newSplit.setChild(i, newChild);
+        }
+
+        this.activeLeafNodeCount--;
+        this.decisionNodeCount++;
+        this.activeLeafNodeCount += bestSuggestion.numSplits();
+
+        if (parent == null) {
+          this.treeRoot = newSplit;
+        } else {
+          parent.setChild(parentBranch, newSplit);
+        }
+      }
+      // TODO: add check on the model's memory size
+    }
+
+    // housekeeping
+    activeLearningNode.endSplitting();
+    
activeLearningNode.setWeightSeenAtLastSplitEvaluation(activeLearningNode.getWeightSeen());
+  }
+
+  /**
+   * Helper method to deactivate learning node
+   * 
+   * @param toDeactivate
+   *          Active Learning Node that will be deactivated
+   * @param parent
+   *          Parent of the soon-to-be-deactivated Active LearningNode
+   * @param parentBranch
+   *          the branch index of the node in the parent node
+   */
+  private void deactivateLearningNode(ActiveLearningNode toDeactivate, 
SplitNode parent, int parentBranch) {
+    Node newLeaf = new 
InactiveLearningNode(toDeactivate.getObservedClassDistribution());
+    if (parent == null) {
+      this.treeRoot = newLeaf;
+    } else {
+      parent.setChild(parentBranch, newLeaf);
+    }
+
+    this.activeLeafNodeCount--;
+    this.inactiveLeafNodeCount++;
+  }
+
+  private LearningNode newLearningNode(int parallelismHint) {
+    return newLearningNode(new double[0], parallelismHint);
+  }
+
+  private LearningNode newLearningNode(double[] initialClassObservations, int 
parallelismHint) {
+    // for VHT optimization, we need to dynamically instantiate the appropriate
+    // ActiveLearningNode
+    return new ActiveLearningNode(initialClassObservations, parallelismHint);
+  }
+
+  /**
+   * Helper method to set the model context, i.e. how many attributes they are 
and what is the class index
+   * 
+   * @param ih
+   */
+  private void setModelContext(InstancesHeader ih) {
+    // TODO possibly refactored
+    if ((ih != null) && (ih.classIndex() < 0)) {
+      throw new IllegalArgumentException(
+          "Context for a classifier must include a class to learn");
+    }
+    // TODO: check flag for checking whether training has started or not
+
+    // model context is used to describe the model
+    logger.trace("Model context: {}", ih.toString());
+  }
+
+  private static double computeHoeffdingBound(double range, double confidence, 
double n) {
+    return Math.sqrt((Math.pow(range, 2.0) * Math.log(1.0 / confidence)) / 
(2.0 * n));
+  }
+
+  /**
+   * AggregationTimeOutHandler is a class to support time-out feature while 
waiting for local computation results from
+   * the local statistic PIs.
+   * 
+   * @author Arinto Murdopo
+   * 
+   */
+  static class AggregationTimeOutHandler implements Runnable {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(AggregationTimeOutHandler.class);
+    private final Long splitId;
+    private final BlockingQueue<Long> toBeSplittedNodes;
+
+    AggregationTimeOutHandler(Long splitId, BlockingQueue<Long> 
toBeSplittedNodes) {
+      this.splitId = splitId;
+      this.toBeSplittedNodes = toBeSplittedNodes;
+    }
+
+    @Override
+    public void run() {
+      logger.debug("Time out is reached. AggregationTimeOutHandler is 
started.");
+      try {
+        toBeSplittedNodes.put(splitId);
+      } catch (InterruptedException e) {
+        logger.warn("Interrupted while trying to put the ID into the queue");
+      }
+      logger.debug("AggregationTimeOutHandler is finished.");
+    }
+  }
+
+  /**
+   * SplittingNodeInfo is a class to represents the ActiveLearningNode that is 
splitting
+   * 
+   * @author Arinto Murdopo
+   * 
+   */
+  static class SplittingNodeInfo {
+
+    private final ActiveLearningNode activeLearningNode;
+    private final FoundNode foundNode;
+    private final ScheduledFuture<?> scheduledFuture;
+
+    SplittingNodeInfo(ActiveLearningNode activeLearningNode, FoundNode 
foundNode, ScheduledFuture<?> scheduledFuture) {
+      this.activeLearningNode = activeLearningNode;
+      this.foundNode = foundNode;
+      this.scheduledFuture = scheduledFuture;
+    }
+  }
+
+  protected ChangeDetector changeDetector;
+
+  public ChangeDetector getChangeDetector() {
+    return this.changeDetector;
+  }
+
+  public void setChangeDetector(ChangeDetector cd) {
+    this.changeDetector = cd;
+  }
+
+  /**
+   * Builder class to replace constructors with many parameters
+   * 
+   * @author Arinto Murdopo
+   * 
+   */
+  static class Builder {
+
+    // required parameters
+    private final Instances dataset;
+
+    // default values
+    private SplitCriterion splitCriterion = new InfoGainSplitCriterion();
+    private double splitConfidence = 0.0000001;
+    private double tieThreshold = 0.05;
+    private int gracePeriod = 200;
+    private int parallelismHint = 1;
+    private long timeOut = 30;
+    private ChangeDetector changeDetector = null;
+
+    Builder(Instances dataset) {
+      this.dataset = dataset;
+    }
+
+    Builder(ModelAggregatorProcessor oldProcessor) {
+      this.dataset = oldProcessor.dataset;
+      this.splitCriterion = oldProcessor.splitCriterion;
+      this.splitConfidence = oldProcessor.splitConfidence;
+      this.tieThreshold = oldProcessor.tieThreshold;
+      this.gracePeriod = oldProcessor.gracePeriod;
+      this.parallelismHint = oldProcessor.parallelismHint;
+      this.timeOut = oldProcessor.timeOut;
+    }
+
+    Builder splitCriterion(SplitCriterion splitCriterion) {
+      this.splitCriterion = splitCriterion;
+      return this;
+    }
+
+    Builder splitConfidence(double splitConfidence) {
+      this.splitConfidence = splitConfidence;
+      return this;
+    }
+
+    Builder tieThreshold(double tieThreshold) {
+      this.tieThreshold = tieThreshold;
+      return this;
+    }
+
+    Builder gracePeriod(int gracePeriod) {
+      this.gracePeriod = gracePeriod;
+      return this;
+    }
+
+    Builder parallelismHint(int parallelismHint) {
+      this.parallelismHint = parallelismHint;
+      return this;
+    }
+
+    Builder timeOut(long timeOut) {
+      this.timeOut = timeOut;
+      return this;
+    }
+
+    Builder changeDetector(ChangeDetector changeDetector) {
+      this.changeDetector = changeDetector;
+      return this;
+    }
+
+    ModelAggregatorProcessor build() {
+      return new ModelAggregatorProcessor(this);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java
new file mode 100644
index 0000000..898a433
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/Node.java
@@ -0,0 +1,103 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.DoubleVector;
+import org.apache.samoa.instances.Instance;
+
+/**
+ * Abstract class that represents a node in the tree model.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+abstract class Node implements java.io.Serializable {
+
+  private static final long serialVersionUID = 4008521239214180548L;
+
+  protected final DoubleVector observedClassDistribution;
+
+  /**
+   * Method to route/filter an instance into its corresponding leaf. This 
method will be invoked recursively.
+   * 
+   * @param inst
+   *          Instance to be routed
+   * @param parent
+   *          Parent of the current node
+   * @param parentBranch
+   *          The index of the current node in the parent
+   * @return FoundNode which is the data structure to represent the resulting 
leaf.
+   */
+  abstract FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int 
parentBranch);
+
+  /**
+   * Method to return the predicted class of the instance based on the 
statistic inside the node.
+   * 
+   * @param inst
+   *          To-be-predicted instance
+   * @param map
+   *          ModelAggregatorProcessor
+   * @return The prediction result in the form of class distribution
+   */
+  abstract double[] getClassVotes(Instance inst, ModelAggregatorProcessor map);
+
+  /**
+   * Method to check whether the node is a leaf node or not.
+   * 
+   * @return Boolean flag to indicate whether the node is a leaf or not
+   */
+  abstract boolean isLeaf();
+
+  /**
+   * Constructor of the tree node
+   * 
+   * @param classObservation
+   *          distribution of the observed classes.
+   */
+  protected Node(double[] classObservation) {
+    this.observedClassDistribution = new DoubleVector(classObservation);
+  }
+
+  /**
+   * Getter method for the class distribution
+   * 
+   * @return Observed class distribution
+   */
+  protected double[] getObservedClassDistribution() {
+    return this.observedClassDistribution.getArrayCopy();
+  }
+
+  /**
+   * A method to check whether the class distribution only consists of one 
class or not.
+   * 
+   * @return Flag whether class distribution is pure or not.
+   */
+  protected boolean observedClassDistributionIsPure() {
+    return (observedClassDistribution.numNonZeroEntries() < 2);
+  }
+
+  protected void describeSubtree(ModelAggregatorProcessor modelAggrProc, 
StringBuilder out, int indent) {
+    // TODO: implement method to gracefully define the tree
+  }
+
+  // TODO: calculate promise for limiting the model based on the memory size
+  // double calculatePromise();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java
new file mode 100644
index 0000000..c2b1a47
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/SplitNode.java
@@ -0,0 +1,117 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.instances.Instance;
+import 
org.apache.samoa.moa.classifiers.core.conditionaltests.InstanceConditionalTest;
+import org.apache.samoa.moa.core.AutoExpandVector;
+
+/**
+ * SplitNode represents the node that contains one or more questions in the 
decision tree model, in order to route the
+ * instances into the correct leaf.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+public class SplitNode extends Node {
+
+  private static final long serialVersionUID = -7380795529928485792L;
+
+  private final AutoExpandVector<Node> children;
+  protected final InstanceConditionalTest splitTest;
+
+  public SplitNode(InstanceConditionalTest splitTest,
+      double[] classObservation) {
+    super(classObservation);
+    this.children = new AutoExpandVector<>();
+    this.splitTest = splitTest;
+  }
+
+  @Override
+  FoundNode filterInstanceToLeaf(Instance inst, SplitNode parent, int 
parentBranch) {
+    int childIndex = instanceChildIndex(inst);
+    if (childIndex >= 0) {
+      Node child = getChild(childIndex);
+      if (child != null) {
+        return child.filterInstanceToLeaf(inst, this, childIndex);
+      }
+      return new FoundNode(null, this, childIndex);
+    }
+    return new FoundNode(this, parent, parentBranch);
+  }
+
+  @Override
+  boolean isLeaf() {
+    return false;
+  }
+
+  @Override
+  double[] getClassVotes(Instance inst, ModelAggregatorProcessor vht) {
+    return this.observedClassDistribution.getArrayCopy();
+  }
+
+  /**
+   * Method to return the number of children of this split node
+   * 
+   * @return number of children
+   */
+  int numChildren() {
+    return this.children.size();
+  }
+
+  /**
+   * Method to set the children in a specific index of the SplitNode with the 
appropriate child
+   * 
+   * @param index
+   *          Index of the child in the SplitNode
+   * @param child
+   *          The child node
+   */
+  void setChild(int index, Node child) {
+    if ((this.splitTest.maxBranches() >= 0)
+        && (index >= this.splitTest.maxBranches())) {
+      throw new IndexOutOfBoundsException();
+    }
+    this.children.set(index, child);
+  }
+
+  /**
+   * Method to get the child node given the index
+   * 
+   * @param index
+   *          The child node index
+   * @return The child node in the given index
+   */
+  Node getChild(int index) {
+    return this.children.get(index);
+  }
+
+  /**
+   * Method to route the instance using this split node
+   * 
+   * @param inst
+   *          The routed instance
+   * @return The index of the branch where the instance is routed
+   */
+  int instanceChildIndex(Instance inst) {
+    return this.splitTest.branchForInstance(inst);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java
new file mode 100644
index 0000000..ea7e53d
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/trees/VerticalHoeffdingTree.java
@@ -0,0 +1,185 @@
+package org.apache.samoa.learners.classifiers.trees;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.learners.AdaptiveLearner;
+import org.apache.samoa.learners.ClassificationLearner;
+import 
org.apache.samoa.moa.classifiers.core.attributeclassobservers.AttributeClassObserver;
+import 
org.apache.samoa.moa.classifiers.core.attributeclassobservers.DiscreteAttributeClassObserver;
+import 
org.apache.samoa.moa.classifiers.core.attributeclassobservers.NumericAttributeClassObserver;
+import org.apache.samoa.moa.classifiers.core.driftdetection.ChangeDetector;
+import org.apache.samoa.moa.classifiers.core.splitcriteria.SplitCriterion;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.TopologyBuilder;
+
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.FlagOption;
+import com.github.javacliparser.FloatOption;
+import com.github.javacliparser.IntOption;
+
+/**
+ * Vertical Hoeffding Tree.
+ * <p/>
+ * Vertical Hoeffding Tree (VHT) classifier is a distributed classifier that 
utilizes vertical parallelism on top of
+ * Very Fast Decision Tree (VFDT) classifier.
+ * 
+ * @author Arinto Murdopo
+ */
+public final class VerticalHoeffdingTree implements ClassificationLearner, 
AdaptiveLearner, Configurable {
+
+  private static final long serialVersionUID = -4937416312929984057L;
+
+  public ClassOption numericEstimatorOption = new 
ClassOption("numericEstimator",
+      'n', "Numeric estimator to use.", NumericAttributeClassObserver.class,
+      "GaussianNumericAttributeClassObserver");
+
+  public ClassOption nominalEstimatorOption = new 
ClassOption("nominalEstimator",
+      'd', "Nominal estimator to use.", DiscreteAttributeClassObserver.class,
+      "NominalAttributeClassObserver");
+
+  public ClassOption splitCriterionOption = new ClassOption("splitCriterion",
+      's', "Split criterion to use.", SplitCriterion.class,
+      "InfoGainSplitCriterion");
+
+  public FloatOption splitConfidenceOption = new FloatOption(
+      "splitConfidence",
+      'c',
+      "The allowable error in split decision, values closer to 0 will take 
longer to decide.",
+      0.0000001, 0.0, 1.0);
+
+  public FloatOption tieThresholdOption = new FloatOption("tieThreshold",
+      't', "Threshold below which a split will be forced to break ties.",
+      0.05, 0.0, 1.0);
+
+  public IntOption gracePeriodOption = new IntOption(
+      "gracePeriod",
+      'g',
+      "The number of instances a leaf should observe between split attempts.",
+      200, 0, Integer.MAX_VALUE);
+
+  public IntOption parallelismHintOption = new IntOption(
+      "parallelismHint",
+      'p',
+      "The number of local statistics PI to do distributed computation",
+      1, 1, Integer.MAX_VALUE);
+
+  public IntOption timeOutOption = new IntOption(
+      "timeOut",
+      'o',
+      "The duration to wait all distributed computation results from local 
statistics PI",
+      30, 1, Integer.MAX_VALUE);
+
+  public FlagOption binarySplitsOption = new FlagOption("binarySplits", 'b',
+      "Only allow binary splits.");
+
+  private Stream resultStream;
+
+  private FilterProcessor filterProc;
+
+  @Override
+  public void init(TopologyBuilder topologyBuilder, Instances dataset, int 
parallelism) {
+
+    this.filterProc = new FilterProcessor.Builder(dataset)
+        .build();
+    topologyBuilder.addProcessor(filterProc, parallelism);
+
+    Stream filterStream = topologyBuilder.createStream(filterProc);
+    this.filterProc.setOutputStream(filterStream);
+
+    ModelAggregatorProcessor modelAggrProc = new 
ModelAggregatorProcessor.Builder(dataset)
+        .splitCriterion((SplitCriterion) this.splitCriterionOption.getValue())
+        .splitConfidence(splitConfidenceOption.getValue())
+        .tieThreshold(tieThresholdOption.getValue())
+        .gracePeriod(gracePeriodOption.getValue())
+        .parallelismHint(parallelismHintOption.getValue())
+        .timeOut(timeOutOption.getValue())
+        .changeDetector(this.getChangeDetector())
+        .build();
+
+    topologyBuilder.addProcessor(modelAggrProc, parallelism);
+
+    topologyBuilder.connectInputShuffleStream(filterStream, modelAggrProc);
+
+    this.resultStream = topologyBuilder.createStream(modelAggrProc);
+    modelAggrProc.setResultStream(resultStream);
+
+    Stream attributeStream = topologyBuilder.createStream(modelAggrProc);
+    modelAggrProc.setAttributeStream(attributeStream);
+
+    Stream controlStream = topologyBuilder.createStream(modelAggrProc);
+    modelAggrProc.setControlStream(controlStream);
+
+    LocalStatisticsProcessor locStatProc = new 
LocalStatisticsProcessor.Builder()
+        .splitCriterion((SplitCriterion) this.splitCriterionOption.getValue())
+        .binarySplit(binarySplitsOption.isSet())
+        .nominalClassObserver((AttributeClassObserver) 
this.nominalEstimatorOption.getValue())
+        .numericClassObserver((AttributeClassObserver) 
this.numericEstimatorOption.getValue())
+        .build();
+
+    topologyBuilder.addProcessor(locStatProc, 
parallelismHintOption.getValue());
+    topologyBuilder.connectInputKeyStream(attributeStream, locStatProc);
+    topologyBuilder.connectInputAllStream(controlStream, locStatProc);
+
+    Stream computeStream = topologyBuilder.createStream(locStatProc);
+
+    locStatProc.setComputationResultStream(computeStream);
+    topologyBuilder.connectInputAllStream(computeStream, modelAggrProc);
+  }
+
+  @Override
+  public Processor getInputProcessor() {
+    return this.filterProc;
+  }
+
+  @Override
+  public Set<Stream> getResultStreams() {
+    return ImmutableSet.of(this.resultStream);
+  }
+
+  protected ChangeDetector changeDetector;
+
+  @Override
+  public ChangeDetector getChangeDetector() {
+    return this.changeDetector;
+  }
+
+  @Override
+  public void setChangeDetector(ChangeDetector cd) {
+    this.changeDetector = cd;
+  }
+
+  static class LearningNodeIdGenerator {
+
+    // TODO: add code to warn user of when value reaches Long.MAX_VALUES
+    private static long id = 0;
+
+    static synchronized long generate() {
+      return id++;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/clusterers/ClusteringContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/clusterers/ClusteringContentEvent.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/clusterers/ClusteringContentEvent.java
new file mode 100644
index 0000000..e7eb5b5
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/clusterers/ClusteringContentEvent.java
@@ -0,0 +1,90 @@
+package org.apache.samoa.learners.clusterers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.instances.Instance;
+
+import net.jcip.annotations.Immutable;
+
+/**
+ * The Class ClusteringContentEvent.
+ */
+@Immutable
+final public class ClusteringContentEvent implements ContentEvent {
+
+  private static final long serialVersionUID = -7746983521296618922L;
+  private Instance instance;
+  private boolean isLast = false;
+  private String key;
+  private boolean isSample;
+
+  public ClusteringContentEvent() {
+    // Necessary for kryo serializer
+  }
+
+  /**
+   * Instantiates a new clustering event.
+   * 
+   * @param index
+   *          the index
+   * @param instance
+   *          the instance
+   */
+  public ClusteringContentEvent(long index, Instance instance) {
+    /*
+     * if (instance != null) { this.instance = new
+     * SerializableInstance(instance); }
+     */
+    this.instance = instance;
+    this.setKey(Long.toString(index));
+  }
+
+  @Override
+  public String getKey() {
+    return this.key;
+  }
+
+  @Override
+  public void setKey(String str) {
+    this.key = str;
+  }
+
+  @Override
+  public boolean isLastEvent() {
+    return this.isLast;
+  }
+
+  public void setLast(boolean isLast) {
+    this.isLast = isLast;
+  }
+
+  public Instance getInstance() {
+    return this.instance;
+  }
+
+  public boolean isSample() {
+    return isSample;
+  }
+
+  public void setSample(boolean b) {
+    this.isSample = b;
+  }
+}

Reply via email to