http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/streams/generators/RandomTreeGenerator.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/streams/generators/RandomTreeGenerator.java
 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/generators/RandomTreeGenerator.java
new file mode 100644
index 0000000..69100aa
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/generators/RandomTreeGenerator.java
@@ -0,0 +1,267 @@
+package org.apache.samoa.moa.streams.generators;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Random;
+
+import org.apache.samoa.instances.Attribute;
+import org.apache.samoa.instances.DenseInstance;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.moa.core.FastVector;
+import org.apache.samoa.moa.core.InstanceExample;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.options.AbstractOptionHandler;
+import org.apache.samoa.moa.streams.InstanceStream;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+
+import com.github.javacliparser.FloatOption;
+import com.github.javacliparser.IntOption;
+
+/**
+ * Stream generator for a stream based on a randomly generated tree..
+ * 
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public class RandomTreeGenerator extends AbstractOptionHandler implements 
InstanceStream {
+
+  @Override
+  public String getPurposeString() {
+    return "Generates a stream based on a randomly generated tree.";
+  }
+
+  private static final long serialVersionUID = 1L;
+
+  public IntOption treeRandomSeedOption = new IntOption("treeRandomSeed",
+      'r', "Seed for random generation of tree.", 1);
+
+  public IntOption instanceRandomSeedOption = new IntOption(
+      "instanceRandomSeed", 'i',
+      "Seed for random generation of instances.", 1);
+
+  public IntOption numClassesOption = new IntOption("numClasses", 'c',
+      "The number of classes to generate.", 2, 2, Integer.MAX_VALUE);
+
+  public IntOption numNominalsOption = new IntOption("numNominals", 'o',
+      "The number of nominal attributes to generate.", 5, 0,
+      Integer.MAX_VALUE);
+
+  public IntOption numNumericsOption = new IntOption("numNumerics", 'u',
+      "The number of numeric attributes to generate.", 5, 0,
+      Integer.MAX_VALUE);
+
+  public IntOption numValsPerNominalOption = new IntOption(
+      "numValsPerNominal", 'v',
+      "The number of values to generate per nominal attribute.", 5, 2,
+      Integer.MAX_VALUE);
+
+  public IntOption maxTreeDepthOption = new IntOption("maxTreeDepth", 'd',
+      "The maximum depth of the tree concept.", 5, 0, Integer.MAX_VALUE);
+
+  public IntOption firstLeafLevelOption = new IntOption(
+      "firstLeafLevel",
+      'l',
+      "The first level of the tree above maxTreeDepth that can have leaves.",
+      3, 0, Integer.MAX_VALUE);
+
+  public FloatOption leafFractionOption = new FloatOption("leafFraction",
+      'f',
+      "The fraction of leaves per level from firstLeafLevel onwards.",
+      0.15, 0.0, 1.0);
+
+  protected static class Node implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public int classLabel;
+
+    public int splitAttIndex;
+
+    public double splitAttValue;
+
+    public Node[] children;
+  }
+
+  protected Node treeRoot;
+
+  protected InstancesHeader streamHeader;
+
+  protected Random instanceRandom;
+
+  @Override
+  public void prepareForUseImpl(TaskMonitor monitor,
+      ObjectRepository repository) {
+    monitor.setCurrentActivity("Preparing random tree...", -1.0);
+    generateHeader();
+    generateRandomTree();
+    restart();
+  }
+
+  @Override
+  public long estimatedRemainingInstances() {
+    return -1;
+  }
+
+  @Override
+  public boolean isRestartable() {
+    return true;
+  }
+
+  @Override
+  public void restart() {
+    this.instanceRandom = new Random(this.instanceRandomSeedOption.getValue());
+  }
+
+  @Override
+  public InstancesHeader getHeader() {
+    return this.streamHeader;
+  }
+
+  @Override
+  public boolean hasMoreInstances() {
+    return true;
+  }
+
+  @Override
+  public InstanceExample nextInstance() {
+    double[] attVals = new double[this.numNominalsOption.getValue()
+        + this.numNumericsOption.getValue()];
+    InstancesHeader header = getHeader();
+    Instance inst = new DenseInstance(header.numAttributes());
+    for (int i = 0; i < attVals.length; i++) {
+      attVals[i] = i < this.numNominalsOption.getValue() ? 
this.instanceRandom.nextInt(this.numValsPerNominalOption
+          .getValue())
+          : this.instanceRandom.nextDouble();
+      inst.setValue(i, attVals[i]);
+    }
+    inst.setDataset(header);
+    inst.setClassValue(classifyInstance(this.treeRoot, attVals));
+    return new InstanceExample(inst);
+  }
+
+  protected int classifyInstance(Node node, double[] attVals) {
+    if (node.children == null) {
+      return node.classLabel;
+    }
+    if (node.splitAttIndex < this.numNominalsOption.getValue()) {
+      return classifyInstance(
+          node.children[(int) attVals[node.splitAttIndex]], attVals);
+    }
+    return classifyInstance(
+        node.children[attVals[node.splitAttIndex] < node.splitAttValue ? 0
+            : 1], attVals);
+  }
+
+  protected void generateHeader() {
+    FastVector<Attribute> attributes = new FastVector<>();
+    FastVector<String> nominalAttVals = new FastVector<>();
+    for (int i = 0; i < this.numValsPerNominalOption.getValue(); i++) {
+      nominalAttVals.addElement("value" + (i + 1));
+    }
+    for (int i = 0; i < this.numNominalsOption.getValue(); i++) {
+      attributes.addElement(new Attribute("nominal" + (i + 1),
+          nominalAttVals));
+    }
+    for (int i = 0; i < this.numNumericsOption.getValue(); i++) {
+      attributes.addElement(new Attribute("numeric" + (i + 1)));
+    }
+    FastVector<String> classLabels = new FastVector<>();
+    for (int i = 0; i < this.numClassesOption.getValue(); i++) {
+      classLabels.addElement("class" + (i + 1));
+    }
+    attributes.addElement(new Attribute("class", classLabels));
+    this.streamHeader = new InstancesHeader(new Instances(
+        getCLICreationString(InstanceStream.class), attributes, 0));
+    this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1);
+  }
+
+  protected void generateRandomTree() {
+    Random treeRand = new Random(this.treeRandomSeedOption.getValue());
+    ArrayList<Integer> nominalAttCandidates = new ArrayList<>(
+        this.numNominalsOption.getValue());
+    for (int i = 0; i < this.numNominalsOption.getValue(); i++) {
+      nominalAttCandidates.add(i);
+    }
+    double[] minNumericVals = new double[this.numNumericsOption.getValue()];
+    double[] maxNumericVals = new double[this.numNumericsOption.getValue()];
+    for (int i = 0; i < this.numNumericsOption.getValue(); i++) {
+      minNumericVals[i] = 0.0;
+      maxNumericVals[i] = 1.0;
+    }
+    this.treeRoot = generateRandomTreeNode(0, nominalAttCandidates,
+        minNumericVals, maxNumericVals, treeRand);
+  }
+
+  protected Node generateRandomTreeNode(int currentDepth,
+      ArrayList<Integer> nominalAttCandidates, double[] minNumericVals,
+      double[] maxNumericVals, Random treeRand) {
+    if ((currentDepth >= this.maxTreeDepthOption.getValue())
+        || ((currentDepth >= this.firstLeafLevelOption.getValue()) && 
(this.leafFractionOption.getValue() >= (1.0 - treeRand
+            .nextDouble())))) {
+      Node leaf = new Node();
+      leaf.classLabel = treeRand.nextInt(this.numClassesOption.getValue());
+      return leaf;
+    }
+    Node node = new Node();
+    int chosenAtt = treeRand.nextInt(nominalAttCandidates.size()
+        + this.numNumericsOption.getValue());
+    if (chosenAtt < nominalAttCandidates.size()) {
+      node.splitAttIndex = nominalAttCandidates.get(chosenAtt);
+      node.children = new Node[this.numValsPerNominalOption.getValue()];
+      ArrayList<Integer> newNominalCandidates = new ArrayList<>(
+          nominalAttCandidates);
+      newNominalCandidates.remove(new Integer(node.splitAttIndex));
+      newNominalCandidates.trimToSize();
+      for (int i = 0; i < node.children.length; i++) {
+        node.children[i] = generateRandomTreeNode(currentDepth + 1,
+            newNominalCandidates, minNumericVals, maxNumericVals,
+            treeRand);
+      }
+    } else {
+      int numericIndex = chosenAtt - nominalAttCandidates.size();
+      node.splitAttIndex = this.numNominalsOption.getValue()
+          + numericIndex;
+      double minVal = minNumericVals[numericIndex];
+      double maxVal = maxNumericVals[numericIndex];
+      node.splitAttValue = ((maxVal - minVal) * treeRand.nextDouble())
+          + minVal;
+      node.children = new Node[2];
+      double[] newMaxVals = maxNumericVals.clone();
+      newMaxVals[numericIndex] = node.splitAttValue;
+      node.children[0] = generateRandomTreeNode(currentDepth + 1,
+          nominalAttCandidates, minNumericVals, newMaxVals, treeRand);
+      double[] newMinVals = minNumericVals.clone();
+      newMinVals[numericIndex] = node.splitAttValue;
+      node.children[1] = generateRandomTreeNode(currentDepth + 1,
+          nominalAttCandidates, newMinVals, maxNumericVals, treeRand);
+    }
+    return node;
+  }
+
+  @Override
+  public void getDescription(StringBuilder sb, int indent) {
+    // TODO Auto-generated method stub
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/tasks/NullMonitor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/tasks/NullMonitor.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/NullMonitor.java
new file mode 100644
index 0000000..a4fcd33
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/NullMonitor.java
@@ -0,0 +1,102 @@
+package org.apache.samoa.moa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Class that represents a null monitor.
+ * 
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public class NullMonitor implements TaskMonitor {
+
+  @Override
+  public void setCurrentActivity(String activityDescription,
+      double fracComplete) {
+  }
+
+  @Override
+  public void setCurrentActivityDescription(String activity) {
+  }
+
+  @Override
+  public void setCurrentActivityFractionComplete(double fracComplete) {
+  }
+
+  @Override
+  public boolean taskShouldAbort() {
+    return false;
+  }
+
+  @Override
+  public String getCurrentActivityDescription() {
+    return null;
+  }
+
+  @Override
+  public double getCurrentActivityFractionComplete() {
+    return -1.0;
+  }
+
+  @Override
+  public boolean isPaused() {
+    return false;
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return false;
+  }
+
+  @Override
+  public void requestCancel() {
+  }
+
+  @Override
+  public void requestPause() {
+  }
+
+  @Override
+  public void requestResume() {
+  }
+
+  @Override
+  public Object getLatestResultPreview() {
+    return null;
+  }
+
+  @Override
+  public void requestResultPreview() {
+  }
+
+  @Override
+  public boolean resultPreviewRequested() {
+    return false;
+  }
+
+  @Override
+  public void setLatestResultPreview(Object latestPreview) {
+  }
+
+  @Override
+  public void requestResultPreview(ResultPreviewListener toInform) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/tasks/ResultPreviewListener.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/tasks/ResultPreviewListener.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/ResultPreviewListener.java
new file mode 100644
index 0000000..f8dfadf
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/ResultPreviewListener.java
@@ -0,0 +1,37 @@
+package org.apache.samoa.moa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Interface implemented by classes that preview results on the Graphical User 
Interface
+ * 
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public interface ResultPreviewListener {
+
+  /**
+   * This method is used to receive a signal from <code>TaskMonitor</code> 
that the lastest preview has changed. This
+   * method is implemented in <code>PreviewPanel</code> to change the results 
that are shown in its panel.
+   * 
+   */
+  public void latestPreviewChanged();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/tasks/Task.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/tasks/Task.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/Task.java
new file mode 100644
index 0000000..6776a05
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/Task.java
@@ -0,0 +1,60 @@
+package org.apache.samoa.moa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.moa.MOAObject;
+import org.apache.samoa.moa.core.ObjectRepository;
+
+/**
+ * Interface representing a task.
+ * 
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public interface Task extends MOAObject {
+
+  /**
+   * Gets the result type of this task. Tasks can return LearningCurve, 
LearningEvaluation, Classifier, String,
+   * Instances..
+   * 
+   * @return a class object of the result of this task
+   */
+  public Class<?> getTaskResultType();
+
+  /**
+   * This method performs this task, when TaskMonitor and ObjectRepository are 
no needed.
+   * 
+   * @return an object with the result of this task
+   */
+  public Object doTask();
+
+  /**
+   * This method performs this task. <code>AbstractTask</code> implements this 
method so all its extensions only need to
+   * implement <code>doTaskImpl</code>
+   * 
+   * @param monitor
+   *          the TaskMonitor to use
+   * @param repository
+   *          the ObjectRepository to use
+   * @return an object with the result of this task
+   */
+  public Object doTask(TaskMonitor monitor, ObjectRepository repository);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/tasks/TaskMonitor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/tasks/TaskMonitor.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/TaskMonitor.java
new file mode 100644
index 0000000..4eaffaf
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/TaskMonitor.java
@@ -0,0 +1,146 @@
+package org.apache.samoa.moa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Interface representing a task monitor.
+ * 
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public interface TaskMonitor {
+
+  /**
+   * Sets the description and the percentage done of the current activity.
+   * 
+   * @param activity
+   *          the description of the current activity
+   * @param fracComplete
+   *          the percentage done of the current activity
+   */
+  public void setCurrentActivity(String activityDescription,
+      double fracComplete);
+
+  /**
+   * Sets the description of the current activity.
+   * 
+   * @param activity
+   *          the description of the current activity
+   */
+  public void setCurrentActivityDescription(String activity);
+
+  /**
+   * Sets the percentage done of the current activity
+   * 
+   * @param fracComplete
+   *          the percentage done of the current activity
+   */
+  public void setCurrentActivityFractionComplete(double fracComplete);
+
+  /**
+   * Gets whether the task should abort.
+   * 
+   * @return true if the task should abort
+   */
+  public boolean taskShouldAbort();
+
+  /**
+   * Gets whether there is a request for preview the task result.
+   * 
+   * @return true if there is a request for preview the task result
+   */
+  public boolean resultPreviewRequested();
+
+  /**
+   * Sets the current result to preview
+   * 
+   * @param latestPreview
+   *          the result to preview
+   */
+  public void setLatestResultPreview(Object latestPreview);
+
+  /**
+   * Gets the description of the current activity.
+   * 
+   * @return the description of the current activity
+   */
+  public String getCurrentActivityDescription();
+
+  /**
+   * Gets the percentage done of the current activity
+   * 
+   * @return the percentage done of the current activity
+   */
+  public double getCurrentActivityFractionComplete();
+
+  /**
+   * Requests the task monitored to pause.
+   * 
+   */
+  public void requestPause();
+
+  /**
+   * Requests the task monitored to resume.
+   * 
+   */
+  public void requestResume();
+
+  /**
+   * Requests the task monitored to cancel.
+   * 
+   */
+  public void requestCancel();
+
+  /**
+   * Gets whether the task monitored is paused.
+   * 
+   * @return true if the task is paused
+   */
+  public boolean isPaused();
+
+  /**
+   * Gets whether the task monitored is cancelled.
+   * 
+   * @return true if the task is cancelled
+   */
+  public boolean isCancelled();
+
+  /**
+   * Requests to preview the task result.
+   * 
+   */
+  public void requestResultPreview();
+
+  /**
+   * Requests to preview the task result.
+   * 
+   * @param toInform
+   *          the listener of the changes in the preview of the result
+   */
+  public void requestResultPreview(ResultPreviewListener toInform);
+
+  /**
+   * Gets the current result to preview
+   * 
+   * @return the result to preview
+   */
+  public Object getLatestResultPreview();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
new file mode 100644
index 0000000..099f639
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
@@ -0,0 +1,120 @@
+package org.apache.samoa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.moa.core.InstanceExample;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+
+import com.github.javacliparser.FileOption;
+import com.github.javacliparser.IntOption;
+
+/**
+ * InstanceStream for ARFF file
+ * 
+ * @author Casey
+ */
+public class ArffFileStream extends FileStream {
+
+  public FileOption arffFileOption = new FileOption("arffFile", 'f',
+      "ARFF File(s) to load.", null, null, false);
+
+  public IntOption classIndexOption = new IntOption("classIndex", 'c',
+      "Class index of data. 0 for none or -1 for last attribute in file.",
+      -1, -1, Integer.MAX_VALUE);
+
+  protected InstanceExample lastInstanceRead;
+
+  @Override
+  public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
+    super.prepareForUseImpl(monitor, repository);
+    String filePath = this.arffFileOption.getFile().getAbsolutePath();
+    this.fileSource.init(filePath, "arff");
+    this.lastInstanceRead = null;
+  }
+
+  @Override
+  protected void reset() {
+    try {
+      if (this.fileReader != null)
+        this.fileReader.close();
+
+      fileSource.reset();
+    } catch (IOException ioe) {
+      throw new RuntimeException("FileStream restart failed.", ioe);
+    }
+
+    if (!getNextFileReader()) {
+      hitEndOfStream = true;
+      throw new RuntimeException("FileStream is empty.");
+    }
+  }
+
+  @Override
+  protected boolean getNextFileReader() {
+    boolean ret = super.getNextFileReader();
+    if (ret) {
+      this.instances = new Instances(this.fileReader, 1, -1);
+      if (this.classIndexOption.getValue() < 0) {
+        this.instances.setClassIndex(this.instances.numAttributes() - 1);
+      } else if (this.classIndexOption.getValue() > 0) {
+        this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  protected boolean readNextInstanceFromFile() {
+    try {
+      if (this.instances.readInstance(this.fileReader)) {
+        this.lastInstanceRead = new 
InstanceExample(this.instances.instance(0));
+        this.instances.delete(); // keep instances clean
+        return true;
+      }
+      if (this.fileReader != null) {
+        this.fileReader.close();
+        this.fileReader = null;
+      }
+      return false;
+    } catch (IOException ioe) {
+      throw new RuntimeException(
+          "ArffFileStream failed to read instance from stream.", ioe);
+    }
+
+  }
+
+  @Override
+  protected InstanceExample getLastInstanceRead() {
+    return this.lastInstanceRead;
+  }
+
+  /*
+   * extend org.apache.samoa.moa.MOAObject
+   */
+  @Override
+  public void getDescription(StringBuilder sb, int indent) {
+    // TODO Auto-generated method stub
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/ClusteringEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/ClusteringEntranceProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/ClusteringEntranceProcessor.java
new file mode 100644
index 0000000..db6f698
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/ClusteringEntranceProcessor.java
@@ -0,0 +1,252 @@
+package org.apache.samoa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.Random;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.evaluation.ClusteringEvaluationContentEvent;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.learners.clusterers.ClusteringContentEvent;
+import org.apache.samoa.moa.cluster.Clustering;
+import org.apache.samoa.moa.core.DataPoint;
+import org.apache.samoa.moa.options.AbstractOptionHandler;
+import org.apache.samoa.moa.streams.InstanceStream;
+import org.apache.samoa.moa.streams.clustering.ClusteringStream;
+import org.apache.samoa.moa.streams.clustering.RandomRBFGeneratorEvents;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * EntranceProcessor for Clustering Evaluation Task.
+ * 
+ */
+public final class ClusteringEntranceProcessor implements EntranceProcessor {
+
+  private static final long serialVersionUID = 4169053337917578558L;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ClusteringEntranceProcessor.class);
+
+  private StreamSource streamSource;
+  private Instance firstInstance;
+  private boolean isInited = false;
+  private Random random = new Random();
+  private double samplingThreshold;
+  private int numberInstances;
+  private int numInstanceSent = 0;
+
+  private int groundTruthSamplingFrequency;
+
+  @Override
+  public boolean process(ContentEvent event) {
+    // TODO: possible refactor of the super-interface implementation
+    // of source processor does not need this method
+    return false;
+  }
+
+  @Override
+  public void onCreate(int id) {
+    logger.debug("Creating ClusteringSourceProcessor with id {}", id);
+  }
+
+  @Override
+  public Processor newProcessor(Processor p) {
+    ClusteringEntranceProcessor newProcessor = new 
ClusteringEntranceProcessor();
+    ClusteringEntranceProcessor originProcessor = 
(ClusteringEntranceProcessor) p;
+    if (originProcessor.getStreamSource() != null) {
+      
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
+    }
+    return newProcessor;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return (!isFinished());
+  }
+
+  @Override
+  public boolean isFinished() {
+    return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && 
numInstanceSent >= numberInstances));
+  }
+
+  // /**
+  // * Method to send instances via input stream
+  // *
+  // * @param inputStream
+  // * @param numberInstances
+  // */
+  // public void sendInstances(Stream inputStream, Stream evaluationStream, int
+  // numberInstances, double samplingThreshold) {
+  // int numInstanceSent = 0;
+  // this.samplingThreshold = samplingThreshold;
+  // while (streamSource.hasMoreInstances() && numInstanceSent <
+  // numberInstances) {
+  // numInstanceSent++;
+  // DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent);
+  // ClusteringContentEvent contentEvent = new
+  // ClusteringContentEvent(numInstanceSent, nextDataPoint);
+  // inputStream.put(contentEvent);
+  // sendPointsAndGroundTruth(streamSource, evaluationStream, numInstanceSent,
+  // nextDataPoint);
+  // }
+  //
+  // sendEndEvaluationInstance(inputStream);
+  // }
+
+  public double getSamplingThreshold() {
+    return samplingThreshold;
+  }
+
+  public void setSamplingThreshold(double samplingThreshold) {
+    this.samplingThreshold = samplingThreshold;
+  }
+
+  public int getGroundTruthSamplingFrequency() {
+    return groundTruthSamplingFrequency;
+  }
+
+  public void setGroundTruthSamplingFrequency(int 
groundTruthSamplingFrequency) {
+    this.groundTruthSamplingFrequency = groundTruthSamplingFrequency;
+  }
+
+  public StreamSource getStreamSource() {
+    return streamSource;
+  }
+
+  public void setStreamSource(InstanceStream stream) {
+    if (stream instanceof AbstractOptionHandler) {
+      ((AbstractOptionHandler) (stream)).prepareForUse();
+    }
+
+    this.streamSource = new StreamSource(stream);
+    firstInstance = streamSource.nextInstance().getData();
+  }
+
+  public Instances getDataset() {
+    return firstInstance.dataset();
+  }
+
+  private Instance nextInstance() {
+    if (this.isInited) {
+      return streamSource.nextInstance().getData();
+    } else {
+      this.isInited = true;
+      return firstInstance;
+    }
+  }
+
+  // private void sendEndEvaluationInstance(Stream inputStream) {
+  // ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1,
+  // firstInstance);
+  // contentEvent.setLast(true);
+  // inputStream.put(contentEvent);
+  // }
+
+  // private void sendPointsAndGroundTruth(StreamSource sourceStream, Stream
+  // evaluationStream, int numInstanceSent, DataPoint nextDataPoint) {
+  // boolean sendEvent = false;
+  // DataPoint instance = null;
+  // Clustering gtClustering = null;
+  // int samplingFrequency = ((ClusteringStream)
+  // sourceStream.getStream()).getDecayHorizon();
+  // if (random.nextDouble() < samplingThreshold) {
+  // // Add instance
+  // sendEvent = true;
+  // instance = nextDataPoint;
+  // }
+  // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) {
+  // // Add GroundTruth
+  // sendEvent = true;
+  // gtClustering = ((RandomRBFGeneratorEvents)
+  // sourceStream.getStream()).getGeneratingClusters();
+  // }
+  // if (sendEvent == true) {
+  // ClusteringEvaluationContentEvent evalEvent;
+  // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance,
+  // false);
+  // evaluationStream.put(evalEvent);
+  // }
+  // }
+
+  public void setMaxNumInstances(int value) {
+    numberInstances = value;
+  }
+
+  public int getMaxNumInstances() {
+    return this.numberInstances;
+  }
+
+  @Override
+  public ContentEvent nextEvent() {
+
+    // boolean sendEvent = false;
+    // DataPoint instance = null;
+    // Clustering gtClustering = null;
+    // int samplingFrequency = ((ClusteringStream)
+    // sourceStream.getStream()).getDecayHorizon();
+    // if (random.nextDouble() < samplingThreshold) {
+    // // Add instance
+    // sendEvent = true;
+    // instance = nextDataPoint;
+    // }
+    // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) {
+    // // Add GroundTruth
+    // sendEvent = true;
+    // gtClustering = ((RandomRBFGeneratorEvents)
+    // sourceStream.getStream()).getGeneratingClusters();
+    // }
+    // if (sendEvent == true) {
+    // ClusteringEvaluationContentEvent evalEvent;
+    // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance,
+    // false);
+    // evaluationStream.put(evalEvent);
+    // }
+
+    groundTruthSamplingFrequency = ((ClusteringStream) 
streamSource.getStream()).getDecayHorizon(); // FIXME should it be takend from 
the ClusteringEvaluation -f option instead?
+    if (isFinished()) {
+      // send ending event
+      ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, 
firstInstance);
+      contentEvent.setLast(true);
+      return contentEvent;
+    } else {
+      DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent);
+      numInstanceSent++;
+      if (numInstanceSent % groundTruthSamplingFrequency == 0) {
+        // TODO implement an interface ClusteringGroundTruth with a
+        // getGeneratingClusters() method, check if the source implements the 
interface
+        // send a clustering evaluation event for external measures (distance 
from the gt clusters)
+        Clustering gtClustering = ((RandomRBFGeneratorEvents) 
streamSource.getStream()).getGeneratingClusters();
+        return new ClusteringEvaluationContentEvent(gtClustering, 
nextDataPoint, false);
+      } else {
+        ClusteringContentEvent contentEvent = new 
ClusteringContentEvent(numInstanceSent, nextDataPoint);
+        if (random.nextDouble() < samplingThreshold) {
+          // send a clustering content event for internal measures (cohesion,
+          // separation)
+          contentEvent.setSample(true);
+        }
+        return contentEvent;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java
new file mode 100644
index 0000000..4f07ed2
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java
@@ -0,0 +1,174 @@
+package org.apache.samoa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.moa.core.InstanceExample;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.options.AbstractOptionHandler;
+import org.apache.samoa.moa.streams.InstanceStream;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+import org.apache.samoa.streams.fs.FileStreamSource;
+
+import com.github.javacliparser.ClassOption;
+
+/**
+ * InstanceStream for files (Abstract class: subclass this class for different 
file formats)
+ * 
+ * @author Casey
+ */
+public abstract class FileStream extends AbstractOptionHandler implements 
InstanceStream {
+  /**
+    *
+    */
+  private static final long serialVersionUID = 3028905554604259130L;
+
+  public ClassOption sourceTypeOption = new ClassOption("sourceType",
+      's', "Source Type (HDFS, local FS)", FileStreamSource.class,
+      "LocalFileStreamSource");
+
+  protected transient FileStreamSource fileSource;
+  protected transient Reader fileReader;
+  protected Instances instances;
+
+  protected boolean hitEndOfStream;
+  private boolean hasStarted;
+
+  /*
+   * Constructors
+   */
+  public FileStream() {
+    this.hitEndOfStream = false;
+  }
+
+  /*
+   * implement InstanceStream
+   */
+  @Override
+  public InstancesHeader getHeader() {
+    return new InstancesHeader(this.instances);
+  }
+
+  @Override
+  public long estimatedRemainingInstances() {
+    return -1;
+  }
+
+  @Override
+  public boolean hasMoreInstances() {
+    return !this.hitEndOfStream;
+  }
+
+  @Override
+  public InstanceExample nextInstance() {
+    if (this.getLastInstanceRead() == null) {
+      readNextInstanceFromStream();
+    }
+    InstanceExample prevInstance = this.getLastInstanceRead();
+    readNextInstanceFromStream();
+    return prevInstance;
+  }
+
+  @Override
+  public boolean isRestartable() {
+    return true;
+  }
+
+  @Override
+  public void restart() {
+    reset();
+    hasStarted = false;
+  }
+
+  protected void reset() {
+    try {
+      if (this.fileReader != null)
+        this.fileReader.close();
+
+      fileSource.reset();
+    } catch (IOException ioe) {
+      throw new RuntimeException("FileStream restart failed.", ioe);
+    }
+
+    if (!getNextFileReader()) {
+      hitEndOfStream = true;
+      throw new RuntimeException("FileStream is empty.");
+    }
+
+    this.instances = new Instances(this.fileReader, 1, -1);
+    this.instances.setClassIndex(this.instances.numAttributes() - 1);
+  }
+
+  protected boolean getNextFileReader() {
+    if (this.fileReader != null)
+      try {
+        this.fileReader.close();
+      } catch (IOException ioe) {
+        ioe.printStackTrace();
+      }
+
+    InputStream inputStream = this.fileSource.getNextInputStream();
+    if (inputStream == null)
+      return false;
+
+    this.fileReader = new BufferedReader(new InputStreamReader(inputStream));
+    return true;
+  }
+
+  protected boolean readNextInstanceFromStream() {
+    if (!hasStarted) {
+      this.reset();
+      hasStarted = true;
+    }
+
+    while (true) {
+      if (readNextInstanceFromFile())
+        return true;
+
+      if (!getNextFileReader()) {
+        this.hitEndOfStream = true;
+        return false;
+      }
+    }
+  }
+
+  /**
+   * Read next instance from the current file and assign it to 
lastInstanceRead.
+   * 
+   * @return true if it was able to read next instance and false if it was at 
the end of the file
+   */
+  protected abstract boolean readNextInstanceFromFile();
+
+  protected abstract InstanceExample getLastInstanceRead();
+
+  @Override
+  public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
+    this.fileSource = sourceTypeOption.getValue();
+    this.hasStarted = false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/PrequentialSourceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/PrequentialSourceProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/PrequentialSourceProcessor.java
new file mode 100644
index 0000000..b947b2f
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/PrequentialSourceProcessor.java
@@ -0,0 +1,231 @@
+package org.apache.samoa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.apache.samoa.moa.options.AbstractOptionHandler;
+import org.apache.samoa.moa.streams.InstanceStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Prequential Source Processor is the processor for Prequential Evaluation 
Task.
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+public final class PrequentialSourceProcessor implements EntranceProcessor {
+
+  private static final long serialVersionUID = 4169053337917578558L;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(PrequentialSourceProcessor.class);
+  private boolean isInited = false;
+  private StreamSource streamSource;
+  private Instance firstInstance;
+  private int numberInstances;
+  private int numInstanceSent = 0;
+
+  protected InstanceStream sourceStream;
+
+  /*
+   * ScheduledExecutorService to schedule sending events after each delay
+   * interval. It is expected to have only one event in the queue at a time, so
+   * we need only one thread in the pool.
+   */
+  private transient ScheduledExecutorService timer;
+  private transient ScheduledFuture<?> schedule = null;
+  private int readyEventIndex = 1; // No waiting for the first event
+  private int delay = 0;
+  private int batchSize = 1;
+  private boolean finished = false;
+
+  @Override
+  public boolean process(ContentEvent event) {
+    // TODO: possible refactor of the super-interface implementation
+    // of source processor does not need this method
+    return false;
+  }
+
+  @Override
+  public boolean isFinished() {
+    return finished;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return !isFinished() && (delay <= 0 || numInstanceSent < readyEventIndex);
+  }
+
+  private boolean hasReachedEndOfStream() {
+    return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && 
numInstanceSent >= numberInstances));
+  }
+
+  @Override
+  public ContentEvent nextEvent() {
+    InstanceContentEvent contentEvent = null;
+    if (hasReachedEndOfStream()) {
+      contentEvent = new InstanceContentEvent(-1, firstInstance, false, true);
+      contentEvent.setLast(true);
+      // set finished status _after_ tagging last event
+      finished = true;
+    }
+    else if (hasNext()) {
+      numInstanceSent++;
+      contentEvent = new InstanceContentEvent(numInstanceSent, nextInstance(), 
true, true);
+
+      // first call to this method will trigger the timer
+      if (schedule == null && delay > 0) {
+        schedule = timer.scheduleWithFixedDelay(new DelayTimeoutHandler(this), 
delay, delay,
+            TimeUnit.MICROSECONDS);
+      }
+    }
+    return contentEvent;
+  }
+
+  private void increaseReadyEventIndex() {
+    readyEventIndex += batchSize;
+    // if we exceed the max, cancel the timer
+    if (schedule != null && isFinished()) {
+      schedule.cancel(false);
+    }
+  }
+
+  @Override
+  public void onCreate(int id) {
+    initStreamSource(sourceStream);
+    timer = Executors.newScheduledThreadPool(1);
+    logger.debug("Creating PrequentialSourceProcessor with id {}", id);
+  }
+
+  @Override
+  public Processor newProcessor(Processor p) {
+    PrequentialSourceProcessor newProcessor = new PrequentialSourceProcessor();
+    PrequentialSourceProcessor originProcessor = (PrequentialSourceProcessor) 
p;
+    if (originProcessor.getStreamSource() != null) {
+      
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
+    }
+    return newProcessor;
+  }
+
+  // /**
+  // * Method to send instances via input stream
+  // *
+  // * @param inputStream
+  // * @param numberInstances
+  // */
+  // public void sendInstances(Stream inputStream, int numberInstances) {
+  // int numInstanceSent = 0;
+  // initStreamSource(sourceStream);
+  //
+  // while (streamSource.hasMoreInstances() && numInstanceSent <
+  // numberInstances) {
+  // numInstanceSent++;
+  // InstanceContentEvent contentEvent = new
+  // InstanceContentEvent(numInstanceSent, nextInstance(), true, true);
+  // inputStream.put(contentEvent);
+  // }
+  //
+  // sendEndEvaluationInstance(inputStream);
+  // }
+
+  public StreamSource getStreamSource() {
+    return streamSource;
+  }
+
+  public void setStreamSource(InstanceStream stream) {
+    this.sourceStream = stream;
+  }
+
+  public Instances getDataset() {
+    if (firstInstance == null) {
+      initStreamSource(sourceStream);
+    }
+    return firstInstance.dataset();
+  }
+
+  private Instance nextInstance() {
+    if (this.isInited) {
+      return streamSource.nextInstance().getData();
+    } else {
+      this.isInited = true;
+      return firstInstance;
+    }
+  }
+
+  // private void sendEndEvaluationInstance(Stream inputStream) {
+  // InstanceContentEvent contentEvent = new InstanceContentEvent(-1,
+  // firstInstance, false, true);
+  // contentEvent.setLast(true);
+  // inputStream.put(contentEvent);
+  // }
+
+  private void initStreamSource(InstanceStream stream) {
+    if (stream instanceof AbstractOptionHandler) {
+      ((AbstractOptionHandler) (stream)).prepareForUse();
+    }
+
+    this.streamSource = new StreamSource(stream);
+    firstInstance = streamSource.nextInstance().getData();
+  }
+
+  public void setMaxNumInstances(int value) {
+    numberInstances = value;
+  }
+
+  public int getMaxNumInstances() {
+    return this.numberInstances;
+  }
+
+  public void setSourceDelay(int delay) {
+    this.delay = delay;
+  }
+
+  public int getSourceDelay() {
+    return this.delay;
+  }
+
+  public void setDelayBatchSize(int batch) {
+    this.batchSize = batch;
+  }
+
+  private class DelayTimeoutHandler implements Runnable {
+
+    private PrequentialSourceProcessor processor;
+
+    public DelayTimeoutHandler(PrequentialSourceProcessor processor) {
+      this.processor = processor;
+    }
+
+    public void run() {
+      processor.increaseReadyEventIndex();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/StreamSource.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/StreamSource.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/StreamSource.java
new file mode 100644
index 0000000..f4dba63
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/StreamSource.java
@@ -0,0 +1,92 @@
+package org.apache.samoa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.moa.core.Example;
+import org.apache.samoa.moa.streams.InstanceStream;
+
+/**
+ * The Class StreamSource.
+ */
+public class StreamSource implements java.io.Serializable {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = 3974668694861231236L;
+
+  /**
+   * Instantiates a new stream source.
+   * 
+   * @param stream
+   *          the stream
+   */
+  public StreamSource(InstanceStream stream) {
+    super();
+    this.stream = stream;
+  }
+
+  /** The stream. */
+  protected InstanceStream stream;
+
+  /**
+   * Gets the stream.
+   * 
+   * @return the stream
+   */
+  public InstanceStream getStream() {
+    return stream;
+  }
+
+  /**
+   * Next instance.
+   * 
+   * @return the instance
+   */
+  public Example<Instance> nextInstance() {
+    return stream.nextInstance();
+  }
+
+  /**
+   * Sets the stream.
+   * 
+   * @param stream
+   *          the new stream
+   */
+  public void setStream(InstanceStream stream) {
+    this.stream = stream;
+  }
+
+  /**
+   * Checks for more instances.
+   * 
+   * @return true, if successful
+   */
+  public boolean hasMoreInstances() {
+    return this.stream.hasMoreInstances();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/StreamSourceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/StreamSourceProcessor.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/StreamSourceProcessor.java
new file mode 100644
index 0000000..d34f701
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/StreamSourceProcessor.java
@@ -0,0 +1,194 @@
+package org.apache.samoa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.apache.samoa.moa.streams.InstanceStream;
+import org.apache.samoa.topology.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Class StreamSourceProcessor.
+ */
+public class StreamSourceProcessor implements Processor {
+
+  /** The Constant logger. */
+  private static final Logger logger = LoggerFactory
+      .getLogger(StreamSourceProcessor.class);
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = -204182279475432739L;
+
+  /** The stream source. */
+  private StreamSource streamSource;
+
+  /**
+   * Gets the stream source.
+   * 
+   * @return the stream source
+   */
+  public StreamSource getStreamSource() {
+    return streamSource;
+  }
+
+  /**
+   * Sets the stream source.
+   * 
+   * @param stream
+   *          the new stream source
+   */
+  public void setStreamSource(InstanceStream stream) {
+    this.streamSource = new StreamSource(stream);
+    firstInstance = streamSource.nextInstance().getData();
+  }
+
+  /** The number instances sent. */
+  private long numberInstancesSent = 0;
+
+  /**
+   * Send instances.
+   * 
+   * @param inputStream
+   *          the input stream
+   * @param numberInstances
+   *          the number instances
+   * @param isTraining
+   *          the is training
+   */
+  public void sendInstances(Stream inputStream,
+      int numberInstances, boolean isTraining, boolean isTesting) {
+    int numberSamples = 0;
+
+    while (streamSource.hasMoreInstances()
+        && numberSamples < numberInstances) {
+
+      numberSamples++;
+      numberInstancesSent++;
+      InstanceContentEvent instanceContentEvent = new InstanceContentEvent(
+          numberInstancesSent, nextInstance(), isTraining, isTesting);
+
+      inputStream.put(instanceContentEvent);
+    }
+
+    InstanceContentEvent instanceContentEvent = new InstanceContentEvent(
+        numberInstancesSent, null, isTraining, isTesting);
+    instanceContentEvent.setLast(true);
+    inputStream.put(instanceContentEvent);
+  }
+
+  /**
+   * Send end evaluation instance.
+   * 
+   * @param inputStream
+   *          the input stream
+   */
+  public void sendEndEvaluationInstance(Stream inputStream) {
+    InstanceContentEvent instanceContentEvent = new InstanceContentEvent(-1, 
firstInstance, false, true);
+    inputStream.put(instanceContentEvent);
+  }
+
+  /**
+   * Next instance.
+   * 
+   * @return the instance
+   */
+  protected Instance nextInstance() {
+    if (this.isInited) {
+      return streamSource.nextInstance().getData();
+    } else {
+      this.isInited = true;
+      return firstInstance;
+    }
+  }
+
+  /** The is inited. */
+  protected boolean isInited = false;
+
+  /** The first instance. */
+  protected Instance firstInstance;
+
+  // @Override
+  /**
+   * On remove.
+   */
+  protected void onRemove() {
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.core.Processor#onCreate(int)
+   */
+  @Override
+  public void onCreate(int id) {
+    // TODO Auto-generated method stub
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
+   */
+  @Override
+  public Processor newProcessor(Processor sourceProcessor) {
+    // StreamSourceProcessor newProcessor = new StreamSourceProcessor();
+    // StreamSourceProcessor originProcessor = (StreamSourceProcessor)
+    // sourceProcessor;
+    // if (originProcessor.getStreamSource() != null){
+    // 
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
+    // }
+    // return newProcessor;
+    return null;
+  }
+
+  /**
+   * On event.
+   * 
+   * @param event
+   *          the event
+   * @return true, if successful
+   */
+  @Override
+  public boolean process(ContentEvent event) {
+    return false;
+  }
+
+  /**
+   * Gets the dataset.
+   * 
+   * @return the dataset
+   */
+  public Instances getDataset() {
+    return firstInstance.dataset();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/fs/FileStreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/fs/FileStreamSource.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/fs/FileStreamSource.java
new file mode 100644
index 0000000..32552a5
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/fs/FileStreamSource.java
@@ -0,0 +1,64 @@
+package org.apache.samoa.streams.fs;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * An interface for FileStream's source (Local FS, HDFS,...)
+ * 
+ * @author Casey
+ */
+public interface FileStreamSource extends Serializable {
+
+  /**
+   * Init the source with file/directory path and file extension
+   * 
+   * @param path
+   *          File or directory path
+   * @param ext
+   *          File extension to be used to filter files in a directory. If 
null, all files in the directory are
+   *          accepted.
+   */
+  public void init(String path, String ext);
+
+  /**
+   * Reset the source
+   */
+  public void reset() throws IOException;
+
+  /**
+   * Retrieve InputStream for next file. This method will return null if we 
are at the last file in the list.
+   * 
+   * @return InputStream for next file in the list
+   */
+  public InputStream getNextInputStream();
+
+  /**
+   * Retrieve InputStream for current file. The "current pointer" is moved 
forward with getNextInputStream method. So if
+   * there was no invocation of getNextInputStream, this method will return 
null.
+   * 
+   * @return InputStream for current file in the list
+   */
+  public InputStream getCurrentInputStream();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java
new file mode 100644
index 0000000..00abd1a
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java
@@ -0,0 +1,150 @@
+package org.apache.samoa.streams.fs;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileSystems;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+/**
+ * Source for FileStream for HDFS files
+ * 
+ * @author Casey
+ */
+public class HDFSFileStreamSource implements FileStreamSource {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = -3887354805787167400L;
+
+  private transient InputStream fileStream;
+  private transient Configuration config;
+  private List<String> filePaths;
+  private int currentIndex;
+
+  public HDFSFileStreamSource() {
+    this.currentIndex = -1;
+  }
+
+  public void init(String path, String ext) {
+    this.init(this.getDefaultConfig(), path, ext);
+  }
+
+  public void init(Configuration config, String path, String ext) {
+    this.config = config;
+    this.filePaths = new ArrayList<String>();
+    Path hdfsPath = new Path(path);
+    FileSystem fs;
+    try {
+      fs = FileSystem.get(config);
+      FileStatus fileStat = fs.getFileStatus(hdfsPath);
+      if (fileStat.isDirectory()) {
+        Path filterPath = hdfsPath;
+        if (ext != null) {
+          filterPath = new Path(path.toString(), "*." + ext);
+        }
+        else {
+          filterPath = new Path(path.toString(), "*");
+        }
+        FileStatus[] filesInDir = fs.globStatus(filterPath);
+        for (int i = 0; i < filesInDir.length; i++) {
+          if (filesInDir[i].isFile()) {
+            filePaths.add(filesInDir[i].getPath().toString());
+          }
+        }
+      }
+      else {
+        this.filePaths.add(path);
+      }
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed getting list of files at:" + path, 
ioe);
+    }
+
+    this.currentIndex = -1;
+  }
+
+  private Configuration getDefaultConfig() {
+    String hadoopHome = System.getenv("HADOOP_HOME");
+    Configuration conf = new Configuration();
+    if (hadoopHome != null) {
+      java.nio.file.Path coreSitePath = 
FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/core-site.xml");
+      java.nio.file.Path hdfsSitePath = 
FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/hdfs-site.xml");
+      conf.addResource(new Path(coreSitePath.toAbsolutePath().toString()));
+      conf.addResource(new Path(hdfsSitePath.toAbsolutePath().toString()));
+    }
+    return conf;
+  }
+
+  public void reset() throws IOException {
+    this.currentIndex = -1;
+    this.closeFileStream();
+  }
+
+  private void closeFileStream() {
+    IOUtils.closeStream(fileStream);
+  }
+
+  public InputStream getNextInputStream() {
+    this.closeFileStream();
+    if (this.currentIndex >= (this.filePaths.size() - 1))
+      return null;
+
+    this.currentIndex++;
+    String filePath = this.filePaths.get(currentIndex);
+
+    Path hdfsPath = new Path(filePath);
+    FileSystem fs;
+    try {
+      fs = FileSystem.get(config);
+      fileStream = fs.open(hdfsPath);
+    } catch (IOException ioe) {
+      this.closeFileStream();
+      throw new RuntimeException("Failed opening file:" + filePath, ioe);
+    }
+
+    return fileStream;
+  }
+
+  public InputStream getCurrentInputStream() {
+    return fileStream;
+  }
+
+  protected int getFilePathListSize() {
+    if (filePaths != null)
+      return filePaths.size();
+    return 0;
+  }
+
+  protected String getFilePathAt(int index) {
+    if (filePaths != null && filePaths.size() > index)
+      return filePaths.get(index);
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/fs/LocalFileStreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/fs/LocalFileStreamSource.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/fs/LocalFileStreamSource.java
new file mode 100644
index 0000000..129d247
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/fs/LocalFileStreamSource.java
@@ -0,0 +1,133 @@
+package org.apache.samoa.streams.fs;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileSystems;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Source for FileStream for local files
+ * 
+ * @author Casey
+ */
+public class LocalFileStreamSource implements FileStreamSource {
+  /**
+        * 
+        */
+  private static final long serialVersionUID = 3986511547525870698L;
+
+  private transient InputStream fileStream;
+  private List<String> filePaths;
+  private int currentIndex;
+
+  public LocalFileStreamSource() {
+    this.currentIndex = -1;
+  }
+
+  public void init(String path, String ext) {
+    this.filePaths = new ArrayList<String>();
+    File fileAtPath = new File(path);
+    if (fileAtPath.isDirectory()) {
+      File[] filesInDir = fileAtPath.listFiles(new FileExtensionFilter(ext));
+      for (int i = 0; i < filesInDir.length; i++) {
+        filePaths.add(filesInDir[i].getAbsolutePath());
+      }
+    }
+    else {
+      this.filePaths.add(path);
+    }
+    this.currentIndex = -1;
+  }
+
+  public void reset() throws IOException {
+    this.currentIndex = -1;
+    this.closeFileStream();
+  }
+
+  private void closeFileStream() {
+    if (fileStream != null) {
+      try {
+        fileStream.close();
+      } catch (IOException ioe) {
+        ioe.printStackTrace();
+      }
+    }
+  }
+
+  public InputStream getNextInputStream() {
+    this.closeFileStream();
+
+    if (this.currentIndex >= (this.filePaths.size() - 1))
+      return null;
+
+    this.currentIndex++;
+    String filePath = this.filePaths.get(currentIndex);
+
+    File file = new File(filePath);
+    try {
+      fileStream = new FileInputStream(file);
+    } catch (IOException ioe) {
+      this.closeFileStream();
+      throw new RuntimeException("Failed opening file:" + filePath, ioe);
+    }
+
+    return fileStream;
+  }
+
+  public InputStream getCurrentInputStream() {
+    return fileStream;
+  }
+
+  protected int getFilePathListSize() {
+    if (filePaths != null)
+      return filePaths.size();
+    return 0;
+  }
+
+  protected String getFilePathAt(int index) {
+    if (filePaths != null && filePaths.size() > index)
+      return filePaths.get(index);
+    return null;
+  }
+
+  private class FileExtensionFilter implements FilenameFilter {
+    private String extension;
+
+    FileExtensionFilter(String ext) {
+      extension = ext;
+    }
+
+    @Override
+    public boolean accept(File dir, String name) {
+      File f = new File(dir, name);
+      if (extension == null)
+        return f.isFile();
+      else
+        return f.isFile() && name.toLowerCase().endsWith("." + extension);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/tasks/ClusteringEvaluation.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/tasks/ClusteringEvaluation.java 
b/samoa-api/src/main/java/org/apache/samoa/tasks/ClusteringEvaluation.java
new file mode 100644
index 0000000..a5c0ef1
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/tasks/ClusteringEvaluation.java
@@ -0,0 +1,186 @@
+package org.apache.samoa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.samoa.evaluation.ClusteringEvaluatorProcessor;
+import org.apache.samoa.learners.Learner;
+import 
org.apache.samoa.learners.clusterers.simple.ClusteringDistributorProcessor;
+import org.apache.samoa.learners.clusterers.simple.DistributedClusterer;
+import org.apache.samoa.moa.streams.InstanceStream;
+import org.apache.samoa.moa.streams.clustering.ClusteringStream;
+import org.apache.samoa.moa.streams.clustering.RandomRBFGeneratorEvents;
+import org.apache.samoa.streams.ClusteringEntranceProcessor;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+import org.apache.samoa.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.FileOption;
+import com.github.javacliparser.FloatOption;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+
+/**
+ * A task that runs and evaluates a distributed clustering algorithm.
+ * 
+ */
+public class ClusteringEvaluation implements Task, Configurable {
+
+  private static final long serialVersionUID = -8246537378371580550L;
+
+  private static final int DISTRIBUTOR_PARALLELISM = 1;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ClusteringEvaluation.class);
+
+  public ClassOption learnerOption = new ClassOption("learner", 'l', 
"Clustering to run.", Learner.class,
+      DistributedClusterer.class.getName());
+
+  public ClassOption streamTrainOption = new ClassOption("streamTrain", 's', 
"Input stream.", InstanceStream.class,
+      RandomRBFGeneratorEvents.class.getName());
+
+  public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i',
+      "Maximum number of instances to test/train on  (-1 = no limit).", 
100000, -1,
+      Integer.MAX_VALUE);
+
+  public IntOption measureCollectionTypeOption = new 
IntOption("measureCollectionType", 'm',
+      "Type of measure collection", 0, 0, Integer.MAX_VALUE);
+
+  public IntOption timeLimitOption = new IntOption("timeLimit", 't',
+      "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1,
+      Integer.MAX_VALUE);
+
+  public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 
'f',
+      "How many instances between samples of the learning performance.", 1000, 
0,
+      Integer.MAX_VALUE);
+
+  public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation",
+      "Clustering__"
+          + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
+
+  public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to 
append intermediate csv results to",
+      null, "csv", true);
+
+  public FloatOption samplingThresholdOption = new 
FloatOption("samplingThreshold", 'a',
+      "Ratio of instances sampled that will be used for evaluation.", 0.5,
+      0.0, 1.0);
+
+  private ClusteringEntranceProcessor source;
+  private InstanceStream streamTrain;
+  private ClusteringDistributorProcessor distributor;
+  private Stream distributorStream;
+  private Stream evaluationStream;
+
+  // Default=0: no delay/waiting
+  public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w',
+      "How many miliseconds between injections of two instances.", 0, 0, 
Integer.MAX_VALUE);
+
+  private Learner learner;
+  private ClusteringEvaluatorProcessor evaluator;
+
+  private Topology topology;
+  private TopologyBuilder builder;
+
+  public void getDescription(StringBuilder sb) {
+    sb.append("Clustering evaluation");
+  }
+
+  @Override
+  public void init() {
+    // TODO remove the if statement theoretically, dynamic binding will work
+    // here! for now, the if statement is used by Storm
+
+    if (builder == null) {
+      logger.warn("Builder was not initialized, initializing it from the 
Task");
+
+      builder = new TopologyBuilder();
+      logger.debug("Successfully instantiating TopologyBuilder");
+
+      builder.initTopology(evaluationNameOption.getValue(), 
sourceDelayOption.getValue());
+      logger.debug("Successfully initializing SAMOA topology with name {}", 
evaluationNameOption.getValue());
+    }
+
+    // instantiate ClusteringEntranceProcessor and its output stream
+    // (sourceStream)
+    source = new ClusteringEntranceProcessor();
+    streamTrain = this.streamTrainOption.getValue();
+    source.setStreamSource(streamTrain);
+    builder.addEntranceProcessor(source);
+    source.setSamplingThreshold(samplingThresholdOption.getValue());
+    source.setMaxNumInstances(instanceLimitOption.getValue());
+    logger.debug("Successfully instantiated ClusteringEntranceProcessor");
+
+    Stream sourceStream = builder.createStream(source);
+    // starter.setInputStream(sourcePiOutputStream); // FIXME set stream in the
+    // EntrancePI
+
+    // distribution of instances and sampling for evaluation
+    distributor = new ClusteringDistributorProcessor();
+    builder.addProcessor(distributor, DISTRIBUTOR_PARALLELISM);
+    builder.connectInputShuffleStream(sourceStream, distributor);
+    distributorStream = builder.createStream(distributor);
+    distributor.setOutputStream(distributorStream);
+    evaluationStream = builder.createStream(distributor);
+    distributor.setEvaluationStream(evaluationStream); // passes evaluation 
events along
+    logger.debug("Successfully instantiated Distributor");
+
+    // instantiate learner and connect it to distributorStream
+    learner = this.learnerOption.getValue();
+    learner.init(builder, source.getDataset(), 1);
+    builder.connectInputShuffleStream(distributorStream, 
learner.getInputProcessor());
+    logger.debug("Successfully instantiated Learner");
+
+    evaluator = new ClusteringEvaluatorProcessor.Builder(
+        sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile())
+        .decayHorizon(((ClusteringStream) 
streamTrain).getDecayHorizon()).build();
+
+    builder.addProcessor(evaluator);
+    for (Stream evaluatorPiInputStream : learner.getResultStreams()) {
+      builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator);
+    }
+    builder.connectInputAllStream(evaluationStream, evaluator);
+    logger.debug("Successfully instantiated EvaluatorProcessor");
+
+    topology = builder.build();
+    logger.debug("Successfully built the topology");
+  }
+
+  @Override
+  public void setFactory(ComponentFactory factory) {
+    // TODO unify this code with init() for now, it's used by S4 App
+    // dynamic binding theoretically will solve this problem
+    builder = new TopologyBuilder(factory);
+    logger.debug("Successfully instantiated TopologyBuilder");
+
+    builder.initTopology(evaluationNameOption.getValue());
+    logger.debug("Successfully initialized SAMOA topology with name {}", 
evaluationNameOption.getValue());
+
+  }
+
+  public Topology getTopology() {
+    return topology;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/tasks/PrequentialEvaluation.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/tasks/PrequentialEvaluation.java 
b/samoa-api/src/main/java/org/apache/samoa/tasks/PrequentialEvaluation.java
new file mode 100644
index 0000000..575adad
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/tasks/PrequentialEvaluation.java
@@ -0,0 +1,220 @@
+package org.apache.samoa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.samoa.evaluation.BasicClassificationPerformanceEvaluator;
+import org.apache.samoa.evaluation.BasicRegressionPerformanceEvaluator;
+import org.apache.samoa.evaluation.ClassificationPerformanceEvaluator;
+import org.apache.samoa.evaluation.EvaluatorProcessor;
+import org.apache.samoa.evaluation.PerformanceEvaluator;
+import org.apache.samoa.evaluation.RegressionPerformanceEvaluator;
+import org.apache.samoa.learners.ClassificationLearner;
+import org.apache.samoa.learners.Learner;
+import org.apache.samoa.learners.RegressionLearner;
+import org.apache.samoa.learners.classifiers.trees.VerticalHoeffdingTree;
+import org.apache.samoa.moa.streams.InstanceStream;
+import org.apache.samoa.moa.streams.generators.RandomTreeGenerator;
+import org.apache.samoa.streams.PrequentialSourceProcessor;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+import org.apache.samoa.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.FileOption;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+
+/**
+ * Prequential Evaluation task is a scheme in evaluating performance of online 
classifiers which uses each instance for
+ * testing online classifiers model and then it further uses the same instance 
for training the model(Test-then-train)
+ * 
+ * @author Arinto Murdopo
+ * 
+ */
+public class PrequentialEvaluation implements Task, Configurable {
+
+  private static final long serialVersionUID = -8246537378371580550L;
+
+  private static Logger logger = 
LoggerFactory.getLogger(PrequentialEvaluation.class);
+
+  public ClassOption learnerOption = new ClassOption("learner", 'l', 
"Classifier to train.", Learner.class,
+      VerticalHoeffdingTree.class.getName());
+
+  public ClassOption streamTrainOption = new ClassOption("trainStream", 's', 
"Stream to learn from.",
+      InstanceStream.class,
+      RandomTreeGenerator.class.getName());
+
+  public ClassOption evaluatorOption = new ClassOption("evaluator", 'e',
+      "Classification performance evaluation method.",
+      PerformanceEvaluator.class, 
BasicClassificationPerformanceEvaluator.class.getName());
+
+  public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i',
+      "Maximum number of instances to test/train on  (-1 = no limit).", 
1000000, -1,
+      Integer.MAX_VALUE);
+
+  public IntOption timeLimitOption = new IntOption("timeLimit", 't',
+      "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1,
+      Integer.MAX_VALUE);
+
+  public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 
'f',
+      "How many instances between samples of the learning performance.", 
100000,
+      0, Integer.MAX_VALUE);
+
+  public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation",
+      "Prequential_"
+          + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
+
+  public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to 
append intermediate csv results to",
+      null, "csv", true);
+
+  // Default=0: no delay/waiting
+  public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w',
+      "How many microseconds between injections of two instances.", 0, 0, 
Integer.MAX_VALUE);
+  // Batch size to delay the incoming stream: delay of x milliseconds after 
each
+  // batch
+  public IntOption batchDelayOption = new IntOption("delayBatchSize", 'b',
+      "The delay batch size: delay of x milliseconds after each batch ", 1, 1, 
Integer.MAX_VALUE);
+
+  private PrequentialSourceProcessor preqSource;
+
+  // private PrequentialSourceTopologyStarter preqStarter;
+
+  // private EntranceProcessingItem sourcePi;
+
+  private Stream sourcePiOutputStream;
+
+  private Learner classifier;
+
+  private EvaluatorProcessor evaluator;
+
+  // private ProcessingItem evaluatorPi;
+
+  // private Stream evaluatorPiInputStream;
+
+  private Topology prequentialTopology;
+
+  private TopologyBuilder builder;
+
+  public void getDescription(StringBuilder sb, int indent) {
+    sb.append("Prequential evaluation");
+  }
+
+  @Override
+  public void init() {
+    // TODO remove the if statement
+    // theoretically, dynamic binding will work here!
+    // test later!
+    // for now, the if statement is used by Storm
+
+    if (builder == null) {
+      builder = new TopologyBuilder();
+      logger.debug("Successfully instantiating TopologyBuilder");
+
+      builder.initTopology(evaluationNameOption.getValue());
+      logger.debug("Successfully initializing SAMOA topology with name {}", 
evaluationNameOption.getValue());
+    }
+
+    // instantiate PrequentialSourceProcessor and its output stream
+    // (sourcePiOutputStream)
+    preqSource = new PrequentialSourceProcessor();
+    preqSource.setStreamSource((InstanceStream) 
this.streamTrainOption.getValue());
+    preqSource.setMaxNumInstances(instanceLimitOption.getValue());
+    preqSource.setSourceDelay(sourceDelayOption.getValue());
+    preqSource.setDelayBatchSize(batchDelayOption.getValue());
+    builder.addEntranceProcessor(preqSource);
+    logger.debug("Successfully instantiating PrequentialSourceProcessor");
+
+    // preqStarter = new PrequentialSourceTopologyStarter(preqSource,
+    // instanceLimitOption.getValue());
+    // sourcePi = builder.createEntrancePi(preqSource, preqStarter);
+    // sourcePiOutputStream = builder.createStream(sourcePi);
+
+    sourcePiOutputStream = builder.createStream(preqSource);
+    // preqStarter.setInputStream(sourcePiOutputStream);
+
+    // instantiate classifier and connect it to sourcePiOutputStream
+    classifier = this.learnerOption.getValue();
+    classifier.init(builder, preqSource.getDataset(), 1);
+    builder.connectInputShuffleStream(sourcePiOutputStream, 
classifier.getInputProcessor());
+    logger.debug("Successfully instantiating Classifier");
+
+    PerformanceEvaluator evaluatorOptionValue = 
this.evaluatorOption.getValue();
+    if (!PrequentialEvaluation.isLearnerAndEvaluatorCompatible(classifier, 
evaluatorOptionValue)) {
+      evaluatorOptionValue = 
getDefaultPerformanceEvaluatorForLearner(classifier);
+    }
+    evaluator = new EvaluatorProcessor.Builder(evaluatorOptionValue)
+        
.samplingFrequency(sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()).build();
+
+    // evaluatorPi = builder.createPi(evaluator);
+    // evaluatorPi.connectInputShuffleStream(evaluatorPiInputStream);
+    builder.addProcessor(evaluator);
+    for (Stream evaluatorPiInputStream : classifier.getResultStreams()) {
+      builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator);
+    }
+
+    logger.debug("Successfully instantiating EvaluatorProcessor");
+
+    prequentialTopology = builder.build();
+    logger.debug("Successfully building the topology");
+  }
+
+  @Override
+  public void setFactory(ComponentFactory factory) {
+    // TODO unify this code with init()
+    // for now, it's used by S4 App
+    // dynamic binding theoretically will solve this problem
+    builder = new TopologyBuilder(factory);
+    logger.debug("Successfully instantiating TopologyBuilder");
+
+    builder.initTopology(evaluationNameOption.getValue());
+    logger.debug("Successfully initializing SAMOA topology with name {}", 
evaluationNameOption.getValue());
+
+  }
+
+  public Topology getTopology() {
+    return prequentialTopology;
+  }
+
+  //
+  // @Override
+  // public TopologyStarter getTopologyStarter() {
+  // return this.preqStarter;
+  // }
+
+  private static boolean isLearnerAndEvaluatorCompatible(Learner learner, 
PerformanceEvaluator evaluator) {
+    return (learner instanceof RegressionLearner && evaluator instanceof 
RegressionPerformanceEvaluator) ||
+        (learner instanceof ClassificationLearner && evaluator instanceof 
ClassificationPerformanceEvaluator);
+  }
+
+  private static PerformanceEvaluator 
getDefaultPerformanceEvaluatorForLearner(Learner learner) {
+    if (learner instanceof RegressionLearner) {
+      return new BasicRegressionPerformanceEvaluator();
+    }
+    // Default to BasicClassificationPerformanceEvaluator for all other cases
+    return new BasicClassificationPerformanceEvaluator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/tasks/Task.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/tasks/Task.java 
b/samoa-api/src/main/java/org/apache/samoa/tasks/Task.java
new file mode 100644
index 0000000..73d9be8
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/tasks/Task.java
@@ -0,0 +1,59 @@
+package org.apache.samoa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.Topology;
+
+/**
+ * Task interface, the mother of all SAMOA tasks!
+ */
+public interface Task {
+
+  /**
+   * Initialize this SAMOA task, i.e. create and connect ProcessingItems and 
Streams and initialize the topology
+   */
+  public void init();
+
+  /**
+   * Return the final topology object to be executed in the cluster
+   * 
+   * @return topology object to be submitted to be executed in the cluster
+   */
+  public Topology getTopology();
+
+  // /**
+  // * Return the entrance processor to start SAMOA topology
+  // * The logic to start the topology should be implemented here
+  // * @return entrance processor to start the topology
+  // */
+  // public TopologyStarter getTopologyStarter();
+
+  /**
+   * Sets the factory. TODO: propose to hide factory from task, i.e. Task will 
only see TopologyBuilder, and factory
+   * creation will be handled by TopologyBuilder
+   * 
+   * @param factory
+   *          the new factory
+   */
+  public void setFactory(ComponentFactory factory);
+
+}

Reply via email to