http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java
index a7f982e..aa713cc 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java
@@ -41,225 +41,227 @@ import com.yahoo.labs.samoa.moa.tasks.TaskMonitor;
 
 /**
  * Stream generator for a stream based on a randomly generated tree..
- *
+ * 
  * @author Richard Kirkby ([email protected])
  * @version $Revision: 7 $
  */
 public class RandomTreeGenerator extends AbstractOptionHandler implements 
InstanceStream {
 
-    @Override
-    public String getPurposeString() {
-        return "Generates a stream based on a randomly generated tree.";
-    }
-
-    private static final long serialVersionUID = 1L;
-
-    public IntOption treeRandomSeedOption = new IntOption("treeRandomSeed",
-            'r', "Seed for random generation of tree.", 1);
-
-    public IntOption instanceRandomSeedOption = new IntOption(
-            "instanceRandomSeed", 'i',
-            "Seed for random generation of instances.", 1);
+  @Override
+  public String getPurposeString() {
+    return "Generates a stream based on a randomly generated tree.";
+  }
 
-    public IntOption numClassesOption = new IntOption("numClasses", 'c',
-            "The number of classes to generate.", 2, 2, Integer.MAX_VALUE);
+  private static final long serialVersionUID = 1L;
 
-    public IntOption numNominalsOption = new IntOption("numNominals", 'o',
-            "The number of nominal attributes to generate.", 5, 0,
-            Integer.MAX_VALUE);
+  public IntOption treeRandomSeedOption = new IntOption("treeRandomSeed",
+      'r', "Seed for random generation of tree.", 1);
 
-    public IntOption numNumericsOption = new IntOption("numNumerics", 'u',
-            "The number of numeric attributes to generate.", 5, 0,
-            Integer.MAX_VALUE);
+  public IntOption instanceRandomSeedOption = new IntOption(
+      "instanceRandomSeed", 'i',
+      "Seed for random generation of instances.", 1);
 
-    public IntOption numValsPerNominalOption = new IntOption(
-            "numValsPerNominal", 'v',
-            "The number of values to generate per nominal attribute.", 5, 2,
-            Integer.MAX_VALUE);
+  public IntOption numClassesOption = new IntOption("numClasses", 'c',
+      "The number of classes to generate.", 2, 2, Integer.MAX_VALUE);
 
-    public IntOption maxTreeDepthOption = new IntOption("maxTreeDepth", 'd',
-            "The maximum depth of the tree concept.", 5, 0, Integer.MAX_VALUE);
+  public IntOption numNominalsOption = new IntOption("numNominals", 'o',
+      "The number of nominal attributes to generate.", 5, 0,
+      Integer.MAX_VALUE);
 
-    public IntOption firstLeafLevelOption = new IntOption(
-            "firstLeafLevel",
-            'l',
-            "The first level of the tree above maxTreeDepth that can have 
leaves.",
-            3, 0, Integer.MAX_VALUE);
+  public IntOption numNumericsOption = new IntOption("numNumerics", 'u',
+      "The number of numeric attributes to generate.", 5, 0,
+      Integer.MAX_VALUE);
 
-    public FloatOption leafFractionOption = new FloatOption("leafFraction",
-            'f',
-            "The fraction of leaves per level from firstLeafLevel onwards.",
-            0.15, 0.0, 1.0);
+  public IntOption numValsPerNominalOption = new IntOption(
+      "numValsPerNominal", 'v',
+      "The number of values to generate per nominal attribute.", 5, 2,
+      Integer.MAX_VALUE);
 
-    protected static class Node implements Serializable {
+  public IntOption maxTreeDepthOption = new IntOption("maxTreeDepth", 'd',
+      "The maximum depth of the tree concept.", 5, 0, Integer.MAX_VALUE);
 
-        private static final long serialVersionUID = 1L;
+  public IntOption firstLeafLevelOption = new IntOption(
+      "firstLeafLevel",
+      'l',
+      "The first level of the tree above maxTreeDepth that can have leaves.",
+      3, 0, Integer.MAX_VALUE);
 
-        public int classLabel;
+  public FloatOption leafFractionOption = new FloatOption("leafFraction",
+      'f',
+      "The fraction of leaves per level from firstLeafLevel onwards.",
+      0.15, 0.0, 1.0);
 
-        public int splitAttIndex;
+  protected static class Node implements Serializable {
 
-        public double splitAttValue;
+    private static final long serialVersionUID = 1L;
 
-        public Node[] children;
+    public int classLabel;
+
+    public int splitAttIndex;
+
+    public double splitAttValue;
+
+    public Node[] children;
+  }
+
+  protected Node treeRoot;
+
+  protected InstancesHeader streamHeader;
+
+  protected Random instanceRandom;
+
+  @Override
+  public void prepareForUseImpl(TaskMonitor monitor,
+      ObjectRepository repository) {
+    monitor.setCurrentActivity("Preparing random tree...", -1.0);
+    generateHeader();
+    generateRandomTree();
+    restart();
+  }
+
+  @Override
+  public long estimatedRemainingInstances() {
+    return -1;
+  }
+
+  @Override
+  public boolean isRestartable() {
+    return true;
+  }
+
+  @Override
+  public void restart() {
+    this.instanceRandom = new Random(this.instanceRandomSeedOption.getValue());
+  }
+
+  @Override
+  public InstancesHeader getHeader() {
+    return this.streamHeader;
+  }
+
+  @Override
+  public boolean hasMoreInstances() {
+    return true;
+  }
+
+  @Override
+  public InstanceExample nextInstance() {
+    double[] attVals = new double[this.numNominalsOption.getValue()
+        + this.numNumericsOption.getValue()];
+    InstancesHeader header = getHeader();
+    Instance inst = new DenseInstance(header.numAttributes());
+    for (int i = 0; i < attVals.length; i++) {
+      attVals[i] = i < this.numNominalsOption.getValue() ? 
this.instanceRandom.nextInt(this.numValsPerNominalOption
+          .getValue())
+          : this.instanceRandom.nextDouble();
+      inst.setValue(i, attVals[i]);
     }
-
-    protected Node treeRoot;
-
-    protected InstancesHeader streamHeader;
-
-    protected Random instanceRandom;
-
-    @Override
-    public void prepareForUseImpl(TaskMonitor monitor,
-            ObjectRepository repository) {
-        monitor.setCurrentActivity("Preparing random tree...", -1.0);
-        generateHeader();
-        generateRandomTree();
-        restart();
+    inst.setDataset(header);
+    inst.setClassValue(classifyInstance(this.treeRoot, attVals));
+    return new InstanceExample(inst);
+  }
+
+  protected int classifyInstance(Node node, double[] attVals) {
+    if (node.children == null) {
+      return node.classLabel;
     }
-
-    @Override
-    public long estimatedRemainingInstances() {
-        return -1;
+    if (node.splitAttIndex < this.numNominalsOption.getValue()) {
+      return classifyInstance(
+          node.children[(int) attVals[node.splitAttIndex]], attVals);
     }
-
-    @Override
-    public boolean isRestartable() {
-        return true;
+    return classifyInstance(
+        node.children[attVals[node.splitAttIndex] < node.splitAttValue ? 0
+            : 1], attVals);
+  }
+
+  protected void generateHeader() {
+    FastVector<Attribute> attributes = new FastVector<>();
+    FastVector<String> nominalAttVals = new FastVector<>();
+    for (int i = 0; i < this.numValsPerNominalOption.getValue(); i++) {
+      nominalAttVals.addElement("value" + (i + 1));
     }
-
-    @Override
-    public void restart() {
-        this.instanceRandom = new 
Random(this.instanceRandomSeedOption.getValue());
+    for (int i = 0; i < this.numNominalsOption.getValue(); i++) {
+      attributes.addElement(new Attribute("nominal" + (i + 1),
+          nominalAttVals));
     }
-
-    @Override
-    public InstancesHeader getHeader() {
-        return this.streamHeader;
+    for (int i = 0; i < this.numNumericsOption.getValue(); i++) {
+      attributes.addElement(new Attribute("numeric" + (i + 1)));
     }
-
-    @Override
-    public boolean hasMoreInstances() {
-        return true;
+    FastVector<String> classLabels = new FastVector<>();
+    for (int i = 0; i < this.numClassesOption.getValue(); i++) {
+      classLabels.addElement("class" + (i + 1));
     }
-
-    @Override
-    public InstanceExample nextInstance() {
-        double[] attVals = new double[this.numNominalsOption.getValue()
-                + this.numNumericsOption.getValue()];
-        InstancesHeader header = getHeader();
-        Instance inst = new DenseInstance(header.numAttributes());
-        for (int i = 0; i < attVals.length; i++) {
-            attVals[i] = i < this.numNominalsOption.getValue() ? 
this.instanceRandom.nextInt(this.numValsPerNominalOption.getValue())
-                    : this.instanceRandom.nextDouble();
-            inst.setValue(i, attVals[i]);
-        }
-        inst.setDataset(header);
-        inst.setClassValue(classifyInstance(this.treeRoot, attVals));
-        return new InstanceExample(inst);
+    attributes.addElement(new Attribute("class", classLabels));
+    this.streamHeader = new InstancesHeader(new Instances(
+        getCLICreationString(InstanceStream.class), attributes, 0));
+    this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1);
+  }
+
+  protected void generateRandomTree() {
+    Random treeRand = new Random(this.treeRandomSeedOption.getValue());
+    ArrayList<Integer> nominalAttCandidates = new ArrayList<>(
+        this.numNominalsOption.getValue());
+    for (int i = 0; i < this.numNominalsOption.getValue(); i++) {
+      nominalAttCandidates.add(i);
     }
-
-    protected int classifyInstance(Node node, double[] attVals) {
-        if (node.children == null) {
-            return node.classLabel;
-        }
-        if (node.splitAttIndex < this.numNominalsOption.getValue()) {
-            return classifyInstance(
-                    node.children[(int) attVals[node.splitAttIndex]], attVals);
-        }
-        return classifyInstance(
-                node.children[attVals[node.splitAttIndex] < node.splitAttValue 
? 0
-                : 1], attVals);
+    double[] minNumericVals = new double[this.numNumericsOption.getValue()];
+    double[] maxNumericVals = new double[this.numNumericsOption.getValue()];
+    for (int i = 0; i < this.numNumericsOption.getValue(); i++) {
+      minNumericVals[i] = 0.0;
+      maxNumericVals[i] = 1.0;
     }
-
-    protected void generateHeader() {
-        FastVector<Attribute> attributes = new FastVector<>();
-        FastVector<String> nominalAttVals = new FastVector<>();
-        for (int i = 0; i < this.numValsPerNominalOption.getValue(); i++) {
-            nominalAttVals.addElement("value" + (i + 1));
-        }
-        for (int i = 0; i < this.numNominalsOption.getValue(); i++) {
-            attributes.addElement(new Attribute("nominal" + (i + 1),
-                    nominalAttVals));
-        }
-        for (int i = 0; i < this.numNumericsOption.getValue(); i++) {
-            attributes.addElement(new Attribute("numeric" + (i + 1)));
-        }
-        FastVector<String> classLabels = new FastVector<>();
-        for (int i = 0; i < this.numClassesOption.getValue(); i++) {
-            classLabels.addElement("class" + (i + 1));
-        }
-        attributes.addElement(new Attribute("class", classLabels));
-        this.streamHeader = new InstancesHeader(new Instances(
-                getCLICreationString(InstanceStream.class), attributes, 0));
-        this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1);
+    this.treeRoot = generateRandomTreeNode(0, nominalAttCandidates,
+        minNumericVals, maxNumericVals, treeRand);
+  }
+
+  protected Node generateRandomTreeNode(int currentDepth,
+      ArrayList<Integer> nominalAttCandidates, double[] minNumericVals,
+      double[] maxNumericVals, Random treeRand) {
+    if ((currentDepth >= this.maxTreeDepthOption.getValue())
+        || ((currentDepth >= this.firstLeafLevelOption.getValue()) && 
(this.leafFractionOption.getValue() >= (1.0 - treeRand
+            .nextDouble())))) {
+      Node leaf = new Node();
+      leaf.classLabel = treeRand.nextInt(this.numClassesOption.getValue());
+      return leaf;
     }
-
-    protected void generateRandomTree() {
-        Random treeRand = new Random(this.treeRandomSeedOption.getValue());
-        ArrayList<Integer> nominalAttCandidates = new ArrayList<>(
-                this.numNominalsOption.getValue());
-        for (int i = 0; i < this.numNominalsOption.getValue(); i++) {
-            nominalAttCandidates.add(i);
-        }
-        double[] minNumericVals = new 
double[this.numNumericsOption.getValue()];
-        double[] maxNumericVals = new 
double[this.numNumericsOption.getValue()];
-        for (int i = 0; i < this.numNumericsOption.getValue(); i++) {
-            minNumericVals[i] = 0.0;
-            maxNumericVals[i] = 1.0;
-        }
-        this.treeRoot = generateRandomTreeNode(0, nominalAttCandidates,
-                minNumericVals, maxNumericVals, treeRand);
+    Node node = new Node();
+    int chosenAtt = treeRand.nextInt(nominalAttCandidates.size()
+        + this.numNumericsOption.getValue());
+    if (chosenAtt < nominalAttCandidates.size()) {
+      node.splitAttIndex = nominalAttCandidates.get(chosenAtt);
+      node.children = new Node[this.numValsPerNominalOption.getValue()];
+      ArrayList<Integer> newNominalCandidates = new ArrayList<>(
+          nominalAttCandidates);
+      newNominalCandidates.remove(new Integer(node.splitAttIndex));
+      newNominalCandidates.trimToSize();
+      for (int i = 0; i < node.children.length; i++) {
+        node.children[i] = generateRandomTreeNode(currentDepth + 1,
+            newNominalCandidates, minNumericVals, maxNumericVals,
+            treeRand);
+      }
+    } else {
+      int numericIndex = chosenAtt - nominalAttCandidates.size();
+      node.splitAttIndex = this.numNominalsOption.getValue()
+          + numericIndex;
+      double minVal = minNumericVals[numericIndex];
+      double maxVal = maxNumericVals[numericIndex];
+      node.splitAttValue = ((maxVal - minVal) * treeRand.nextDouble())
+          + minVal;
+      node.children = new Node[2];
+      double[] newMaxVals = maxNumericVals.clone();
+      newMaxVals[numericIndex] = node.splitAttValue;
+      node.children[0] = generateRandomTreeNode(currentDepth + 1,
+          nominalAttCandidates, minNumericVals, newMaxVals, treeRand);
+      double[] newMinVals = minNumericVals.clone();
+      newMinVals[numericIndex] = node.splitAttValue;
+      node.children[1] = generateRandomTreeNode(currentDepth + 1,
+          nominalAttCandidates, newMinVals, maxNumericVals, treeRand);
     }
+    return node;
+  }
 
-    protected Node generateRandomTreeNode(int currentDepth,
-            ArrayList<Integer> nominalAttCandidates, double[] minNumericVals,
-            double[] maxNumericVals, Random treeRand) {
-        if ((currentDepth >= this.maxTreeDepthOption.getValue())
-                || ((currentDepth >= this.firstLeafLevelOption.getValue()) && 
(this.leafFractionOption.getValue() >= (1.0 - treeRand.nextDouble())))) {
-            Node leaf = new Node();
-            leaf.classLabel = 
treeRand.nextInt(this.numClassesOption.getValue());
-            return leaf;
-        }
-        Node node = new Node();
-        int chosenAtt = treeRand.nextInt(nominalAttCandidates.size()
-                + this.numNumericsOption.getValue());
-        if (chosenAtt < nominalAttCandidates.size()) {
-            node.splitAttIndex = nominalAttCandidates.get(chosenAtt);
-            node.children = new Node[this.numValsPerNominalOption.getValue()];
-            ArrayList<Integer> newNominalCandidates = new ArrayList<>(
-                    nominalAttCandidates);
-            newNominalCandidates.remove(new Integer(node.splitAttIndex));
-            newNominalCandidates.trimToSize();
-            for (int i = 0; i < node.children.length; i++) {
-                node.children[i] = generateRandomTreeNode(currentDepth + 1,
-                        newNominalCandidates, minNumericVals, maxNumericVals,
-                        treeRand);
-            }
-        } else {
-            int numericIndex = chosenAtt - nominalAttCandidates.size();
-            node.splitAttIndex = this.numNominalsOption.getValue()
-                    + numericIndex;
-            double minVal = minNumericVals[numericIndex];
-            double maxVal = maxNumericVals[numericIndex];
-            node.splitAttValue = ((maxVal - minVal) * treeRand.nextDouble())
-                    + minVal;
-            node.children = new Node[2];
-            double[] newMaxVals = maxNumericVals.clone();
-            newMaxVals[numericIndex] = node.splitAttValue;
-            node.children[0] = generateRandomTreeNode(currentDepth + 1,
-                    nominalAttCandidates, minNumericVals, newMaxVals, 
treeRand);
-            double[] newMinVals = minNumericVals.clone();
-            newMinVals[numericIndex] = node.splitAttValue;
-            node.children[1] = generateRandomTreeNode(currentDepth + 1,
-                    nominalAttCandidates, newMinVals, maxNumericVals, 
treeRand);
-        }
-        return node;
-    }
-
-    @Override
-    public void getDescription(StringBuilder sb, int indent) {
-        // TODO Auto-generated method stub
-    }
+  @Override
+  public void getDescription(StringBuilder sb, int indent) {
+    // TODO Auto-generated method stub
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java
index 977897f..da40835 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java
@@ -22,81 +22,81 @@ package com.yahoo.labs.samoa.moa.tasks;
 
 /**
  * Class that represents a null monitor.
- *
+ * 
  * @author Richard Kirkby ([email protected])
  * @version $Revision: 7 $
  */
 public class NullMonitor implements TaskMonitor {
 
-    @Override
-    public void setCurrentActivity(String activityDescription,
-            double fracComplete) {
-    }
-
-    @Override
-    public void setCurrentActivityDescription(String activity) {
-    }
-
-    @Override
-    public void setCurrentActivityFractionComplete(double fracComplete) {
-    }
-
-    @Override
-    public boolean taskShouldAbort() {
-        return false;
-    }
-
-    @Override
-    public String getCurrentActivityDescription() {
-        return null;
-    }
-
-    @Override
-    public double getCurrentActivityFractionComplete() {
-        return -1.0;
-    }
-
-    @Override
-    public boolean isPaused() {
-        return false;
-    }
-
-    @Override
-    public boolean isCancelled() {
-        return false;
-    }
-
-    @Override
-    public void requestCancel() {
-    }
-
-    @Override
-    public void requestPause() {
-    }
-
-    @Override
-    public void requestResume() {
-    }
-
-    @Override
-    public Object getLatestResultPreview() {
-        return null;
-    }
-
-    @Override
-    public void requestResultPreview() {
-    }
-
-    @Override
-    public boolean resultPreviewRequested() {
-        return false;
-    }
-
-    @Override
-    public void setLatestResultPreview(Object latestPreview) {
-    }
-
-    @Override
-    public void requestResultPreview(ResultPreviewListener toInform) {
-    }
+  @Override
+  public void setCurrentActivity(String activityDescription,
+      double fracComplete) {
+  }
+
+  @Override
+  public void setCurrentActivityDescription(String activity) {
+  }
+
+  @Override
+  public void setCurrentActivityFractionComplete(double fracComplete) {
+  }
+
+  @Override
+  public boolean taskShouldAbort() {
+    return false;
+  }
+
+  @Override
+  public String getCurrentActivityDescription() {
+    return null;
+  }
+
+  @Override
+  public double getCurrentActivityFractionComplete() {
+    return -1.0;
+  }
+
+  @Override
+  public boolean isPaused() {
+    return false;
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return false;
+  }
+
+  @Override
+  public void requestCancel() {
+  }
+
+  @Override
+  public void requestPause() {
+  }
+
+  @Override
+  public void requestResume() {
+  }
+
+  @Override
+  public Object getLatestResultPreview() {
+    return null;
+  }
+
+  @Override
+  public void requestResultPreview() {
+  }
+
+  @Override
+  public boolean resultPreviewRequested() {
+    return false;
+  }
+
+  @Override
+  public void setLatestResultPreview(Object latestPreview) {
+  }
+
+  @Override
+  public void requestResultPreview(ResultPreviewListener toInform) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java
index 3fece0b..63d1236 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java
@@ -21,20 +21,20 @@ package com.yahoo.labs.samoa.moa.tasks;
  */
 
 /**
- * Interface implemented by classes that preview results 
- * on the Graphical User Interface 
- *
+ * Interface implemented by classes that preview results on the Graphical User
+ * Interface
+ * 
  * @author Richard Kirkby ([email protected])
  * @version $Revision: 7 $
  */
 public interface ResultPreviewListener {
 
-    /**
-     * This method is used to receive a signal from
-     * <code>TaskMonitor</code> that the lastest preview has
-     * changed. This method is implemented in <code>PreviewPanel</code>
-     * to change the results that are shown in its panel.
-     *
-     */
-    public void latestPreviewChanged();
+  /**
+   * This method is used to receive a signal from <code>TaskMonitor</code> that
+   * the lastest preview has changed. This method is implemented in
+   * <code>PreviewPanel</code> to change the results that are shown in its
+   * panel.
+   * 
+   */
+  public void latestPreviewChanged();
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java
index d2c96a8..d3dcc4f 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java
@@ -24,38 +24,38 @@ import com.yahoo.labs.samoa.moa.MOAObject;
 import com.yahoo.labs.samoa.moa.core.ObjectRepository;
 
 /**
- * Interface representing a task. 
- *
+ * Interface representing a task.
+ * 
  * @author Richard Kirkby ([email protected])
- * @version $Revision: 7 $ 
+ * @version $Revision: 7 $
  */
 public interface Task extends MOAObject {
 
-    /**
-     * Gets the result type of this task.
-     * Tasks can return LearningCurve, LearningEvaluation,
-     * Classifier, String, Instances..
-     *
-     * @return a class object of the result of this task
-     */
-    public Class<?> getTaskResultType();
+  /**
+   * Gets the result type of this task. Tasks can return LearningCurve,
+   * LearningEvaluation, Classifier, String, Instances..
+   * 
+   * @return a class object of the result of this task
+   */
+  public Class<?> getTaskResultType();
 
-    /**
-     * This method performs this task,
-     * when TaskMonitor and ObjectRepository are no needed.
-     *
-     * @return an object with the result of this task
-     */
-    public Object doTask();
+  /**
+   * This method performs this task, when TaskMonitor and ObjectRepository are
+   * no needed.
+   * 
+   * @return an object with the result of this task
+   */
+  public Object doTask();
 
-    /**
-     * This method performs this task.
-     * <code>AbstractTask</code> implements this method so all
-     * its extensions only need to implement <code>doTaskImpl</code>
-     *
-     * @param monitor the TaskMonitor to use
-     * @param repository  the ObjectRepository to use
-     * @return an object with the result of this task
-     */
-    public Object doTask(TaskMonitor monitor, ObjectRepository repository);
+  /**
+   * This method performs this task. <code>AbstractTask</code> implements this
+   * method so all its extensions only need to implement 
<code>doTaskImpl</code>
+   * 
+   * @param monitor
+   *          the TaskMonitor to use
+   * @param repository
+   *          the ObjectRepository to use
+   * @return an object with the result of this task
+   */
+  public Object doTask(TaskMonitor monitor, ObjectRepository repository);
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java
index 4918cd8..7b37f05 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java
@@ -21,120 +21,126 @@ package com.yahoo.labs.samoa.moa.tasks;
  */
 
 /**
- * Interface representing a task monitor. 
- *
+ * Interface representing a task monitor.
+ * 
  * @author Richard Kirkby ([email protected])
- * @version $Revision: 7 $ 
+ * @version $Revision: 7 $
  */
 public interface TaskMonitor {
 
-    /**
-     * Sets the description and the percentage done of the current activity.
-     *
-     * @param activity the description of the current activity
-     * @param fracComplete the percentage done of the current activity
-     */
-    public void setCurrentActivity(String activityDescription,
-            double fracComplete);
-
-    /**
-     * Sets the description of the current activity.
-     *
-     * @param activity the description of the current activity
-     */
-    public void setCurrentActivityDescription(String activity);
-
-    /**
-     * Sets the percentage done of the current activity
-     *
-     * @param fracComplete the percentage done of the current activity
-     */
-    public void setCurrentActivityFractionComplete(double fracComplete);
-
-    /**
-     * Gets whether the task should abort.
-     *
-     * @return true if the task should abort
-     */
-    public boolean taskShouldAbort();
-
-    /**
-     * Gets whether there is a request for preview the task result.
-     *
-     * @return true if there is a request for preview the task result
-     */
-    public boolean resultPreviewRequested();
-
-    /**
-     * Sets the current result to preview
-     *
-     * @param latestPreview the result to preview
-     */
-    public void setLatestResultPreview(Object latestPreview);
-
-    /**
-     * Gets the description of the current activity.
-     *
-     * @return the description of the current activity
-     */
-    public String getCurrentActivityDescription();
-
-    /**
-     * Gets the percentage done of the current activity
-     *
-     * @return the percentage done of the current activity
-     */
-    public double getCurrentActivityFractionComplete();
-
-    /**
-     * Requests the task monitored to pause.
-     *
-     */
-    public void requestPause();
-
-    /**
-     * Requests the task monitored to resume.
-     *
-     */
-    public void requestResume();
-
-    /**
-     * Requests the task monitored to cancel.
-     *
-     */
-    public void requestCancel();
-
-    /**
-     * Gets whether the task monitored is paused.
-     *
-     * @return true if the task is paused
-     */
-    public boolean isPaused();
-
-    /**
-     * Gets whether the task monitored is cancelled.
-     *
-     * @return true if the task is cancelled
-     */
-    public boolean isCancelled();
-
-    /**
-     * Requests to preview the task result.
-     *
-     */
-    public void requestResultPreview();
-
-    /**
-     * Requests to preview the task result.
-     *
-     * @param toInform the listener of the changes in the preview of the result
-     */
-    public void requestResultPreview(ResultPreviewListener toInform);
-
-    /**
-     * Gets the current result to preview
-     *
-     * @return the result to preview
-     */
-    public Object getLatestResultPreview();
+  /**
+   * Sets the description and the percentage done of the current activity.
+   * 
+   * @param activity
+   *          the description of the current activity
+   * @param fracComplete
+   *          the percentage done of the current activity
+   */
+  public void setCurrentActivity(String activityDescription,
+      double fracComplete);
+
+  /**
+   * Sets the description of the current activity.
+   * 
+   * @param activity
+   *          the description of the current activity
+   */
+  public void setCurrentActivityDescription(String activity);
+
+  /**
+   * Sets the percentage done of the current activity
+   * 
+   * @param fracComplete
+   *          the percentage done of the current activity
+   */
+  public void setCurrentActivityFractionComplete(double fracComplete);
+
+  /**
+   * Gets whether the task should abort.
+   * 
+   * @return true if the task should abort
+   */
+  public boolean taskShouldAbort();
+
+  /**
+   * Gets whether there is a request for preview the task result.
+   * 
+   * @return true if there is a request for preview the task result
+   */
+  public boolean resultPreviewRequested();
+
+  /**
+   * Sets the current result to preview
+   * 
+   * @param latestPreview
+   *          the result to preview
+   */
+  public void setLatestResultPreview(Object latestPreview);
+
+  /**
+   * Gets the description of the current activity.
+   * 
+   * @return the description of the current activity
+   */
+  public String getCurrentActivityDescription();
+
+  /**
+   * Gets the percentage done of the current activity
+   * 
+   * @return the percentage done of the current activity
+   */
+  public double getCurrentActivityFractionComplete();
+
+  /**
+   * Requests the task monitored to pause.
+   * 
+   */
+  public void requestPause();
+
+  /**
+   * Requests the task monitored to resume.
+   * 
+   */
+  public void requestResume();
+
+  /**
+   * Requests the task monitored to cancel.
+   * 
+   */
+  public void requestCancel();
+
+  /**
+   * Gets whether the task monitored is paused.
+   * 
+   * @return true if the task is paused
+   */
+  public boolean isPaused();
+
+  /**
+   * Gets whether the task monitored is cancelled.
+   * 
+   * @return true if the task is cancelled
+   */
+  public boolean isCancelled();
+
+  /**
+   * Requests to preview the task result.
+   * 
+   */
+  public void requestResultPreview();
+
+  /**
+   * Requests to preview the task result.
+   * 
+   * @param toInform
+   *          the listener of the changes in the preview of the result
+   */
+  public void requestResultPreview(ResultPreviewListener toInform);
+
+  /**
+   * Gets the current result to preview
+   * 
+   * @return the result to preview
+   */
+  public Object getLatestResultPreview();
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java
index 93fc7c4..80a4910 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java
@@ -31,89 +31,89 @@ import com.yahoo.labs.samoa.moa.tasks.TaskMonitor;
 
 /**
  * InstanceStream for ARFF file
+ * 
  * @author Casey
  */
 public class ArffFileStream extends FileStream {
 
-       public FileOption arffFileOption = new FileOption("arffFile", 'f',
-                      "ARFF File(s) to load.", null, null, false);
-
-       public IntOption classIndexOption = new IntOption("classIndex", 'c',
-                      "Class index of data. 0 for none or -1 for last 
attribute in file.",
-                      -1, -1, Integer.MAX_VALUE);
-                  
-       protected InstanceExample lastInstanceRead;
-
-       @Override
-       public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
-               super.prepareForUseImpl(monitor, repository);
-               String filePath = 
this.arffFileOption.getFile().getAbsolutePath();
-               this.fileSource.init(filePath, "arff");
-               this.lastInstanceRead = null;
-       }
-       
-       @Override
-       protected void reset() {
-               try {
-                       if (this.fileReader != null)
-                               this.fileReader.close();
-
-                       fileSource.reset();
-               }
-               catch (IOException ioe) {
-                       throw new RuntimeException("FileStream restart 
failed.", ioe);
-               }
-
-               if (!getNextFileReader()) {
-                       hitEndOfStream = true;
-                       throw new RuntimeException("FileStream is empty.");
-               }
-       }
-       
-       @Override
-       protected boolean getNextFileReader() {
-               boolean ret = super.getNextFileReader();
-               if (ret) {
-                       this.instances = new Instances(this.fileReader, 1, -1);
-                       if (this.classIndexOption.getValue() < 0) {
-                               
this.instances.setClassIndex(this.instances.numAttributes() - 1);
-                       } else if (this.classIndexOption.getValue() > 0) {
-                               
this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
-                       }
-               }
-               return ret;
-       }
-       
-       @Override
-       protected boolean readNextInstanceFromFile() {
-               try {
-            if (this.instances.readInstance(this.fileReader)) {
-                this.lastInstanceRead = new 
InstanceExample(this.instances.instance(0));
-                this.instances.delete(); // keep instances clean
-                                                       return true;
-            }
-            if (this.fileReader != null) {
-                this.fileReader.close();
-                this.fileReader = null;
-            }
-            return false;
-        } catch (IOException ioe) {
-            throw new RuntimeException(
-                    "ArffFileStream failed to read instance from stream.", 
ioe);
-        }
-
-       }
-       
-       @Override
-       protected InstanceExample getLastInstanceRead() {
-               return this.lastInstanceRead;
-       }
-       
-       /*
-     * extend com.yahoo.labs.samoa.moa.MOAObject
-     */
-    @Override
-    public void getDescription(StringBuilder sb, int indent) {
-       // TODO Auto-generated method stub
+  public FileOption arffFileOption = new FileOption("arffFile", 'f',
+      "ARFF File(s) to load.", null, null, false);
+
+  public IntOption classIndexOption = new IntOption("classIndex", 'c',
+      "Class index of data. 0 for none or -1 for last attribute in file.",
+      -1, -1, Integer.MAX_VALUE);
+
+  protected InstanceExample lastInstanceRead;
+
+  @Override
+  public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
+    super.prepareForUseImpl(monitor, repository);
+    String filePath = this.arffFileOption.getFile().getAbsolutePath();
+    this.fileSource.init(filePath, "arff");
+    this.lastInstanceRead = null;
+  }
+
+  @Override
+  protected void reset() {
+    try {
+      if (this.fileReader != null)
+        this.fileReader.close();
+
+      fileSource.reset();
+    } catch (IOException ioe) {
+      throw new RuntimeException("FileStream restart failed.", ioe);
+    }
+
+    if (!getNextFileReader()) {
+      hitEndOfStream = true;
+      throw new RuntimeException("FileStream is empty.");
     }
+  }
+
+  @Override
+  protected boolean getNextFileReader() {
+    boolean ret = super.getNextFileReader();
+    if (ret) {
+      this.instances = new Instances(this.fileReader, 1, -1);
+      if (this.classIndexOption.getValue() < 0) {
+        this.instances.setClassIndex(this.instances.numAttributes() - 1);
+      } else if (this.classIndexOption.getValue() > 0) {
+        this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
+      }
+    }
+    return ret;
+  }
+
+  @Override
+  protected boolean readNextInstanceFromFile() {
+    try {
+      if (this.instances.readInstance(this.fileReader)) {
+        this.lastInstanceRead = new 
InstanceExample(this.instances.instance(0));
+        this.instances.delete(); // keep instances clean
+        return true;
+      }
+      if (this.fileReader != null) {
+        this.fileReader.close();
+        this.fileReader = null;
+      }
+      return false;
+    } catch (IOException ioe) {
+      throw new RuntimeException(
+          "ArffFileStream failed to read instance from stream.", ioe);
+    }
+
+  }
+
+  @Override
+  protected InstanceExample getLastInstanceRead() {
+    return this.lastInstanceRead;
+  }
+
+  /*
+   * extend com.yahoo.labs.samoa.moa.MOAObject
+   */
+  @Override
+  public void getDescription(StringBuilder sb, int indent) {
+    // TODO Auto-generated method stub
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java
index 20f3feb..ddb047e 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java
@@ -45,127 +45,167 @@ import 
com.yahoo.labs.samoa.moa.streams.clustering.RandomRBFGeneratorEvents;
  */
 public final class ClusteringEntranceProcessor implements EntranceProcessor {
 
-    private static final long serialVersionUID = 4169053337917578558L;
+  private static final long serialVersionUID = 4169053337917578558L;
 
-    private static final Logger logger = 
LoggerFactory.getLogger(ClusteringEntranceProcessor.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(ClusteringEntranceProcessor.class);
 
-    private StreamSource streamSource;
-    private Instance firstInstance;
-    private boolean isInited = false;
-    private Random random = new Random();
-    private double samplingThreshold;
-    private int numberInstances;
-    private int numInstanceSent = 0;
+  private StreamSource streamSource;
+  private Instance firstInstance;
+  private boolean isInited = false;
+  private Random random = new Random();
+  private double samplingThreshold;
+  private int numberInstances;
+  private int numInstanceSent = 0;
 
-    private int groundTruthSamplingFrequency;
+  private int groundTruthSamplingFrequency;
 
-    @Override
-    public boolean process(ContentEvent event) {
-        // TODO: possible refactor of the super-interface implementation
-        // of source processor does not need this method
-        return false;
-    }
+  @Override
+  public boolean process(ContentEvent event) {
+    // TODO: possible refactor of the super-interface implementation
+    // of source processor does not need this method
+    return false;
+  }
 
-    @Override
-    public void onCreate(int id) {
-        logger.debug("Creating ClusteringSourceProcessor with id {}", id);
-    }
+  @Override
+  public void onCreate(int id) {
+    logger.debug("Creating ClusteringSourceProcessor with id {}", id);
+  }
 
-    @Override
-    public Processor newProcessor(Processor p) {
-        ClusteringEntranceProcessor newProcessor = new 
ClusteringEntranceProcessor();
-        ClusteringEntranceProcessor originProcessor = 
(ClusteringEntranceProcessor) p;
-        if (originProcessor.getStreamSource() != null) {
-            
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
-        }
-        return newProcessor;
+  @Override
+  public Processor newProcessor(Processor p) {
+    ClusteringEntranceProcessor newProcessor = new 
ClusteringEntranceProcessor();
+    ClusteringEntranceProcessor originProcessor = 
(ClusteringEntranceProcessor) p;
+    if (originProcessor.getStreamSource() != null) {
+      
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
     }
+    return newProcessor;
+  }
 
-    @Override
-    public boolean hasNext() {
-        return (!isFinished());
-    }
+  @Override
+  public boolean hasNext() {
+    return (!isFinished());
+  }
 
-    @Override
-    public boolean isFinished() {
-        return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && 
numInstanceSent >= numberInstances));
-    }
+  @Override
+  public boolean isFinished() {
+    return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && 
numInstanceSent >= numberInstances));
+  }
 
-    // /**
-    // * Method to send instances via input stream
-    // *
-    // * @param inputStream
-    // * @param numberInstances
-    // */
-    // public void sendInstances(Stream inputStream, Stream evaluationStream, 
int numberInstances, double samplingThreshold) {
-    // int numInstanceSent = 0;
-    // this.samplingThreshold = samplingThreshold;
-    // while (streamSource.hasMoreInstances() && numInstanceSent < 
numberInstances) {
-    // numInstanceSent++;
-    // DataPoint nextDataPoint = new DataPoint(nextInstance(), 
numInstanceSent);
-    // ClusteringContentEvent contentEvent = new 
ClusteringContentEvent(numInstanceSent, nextDataPoint);
-    // inputStream.put(contentEvent);
-    // sendPointsAndGroundTruth(streamSource, evaluationStream, 
numInstanceSent, nextDataPoint);
-    // }
-    //
-    // sendEndEvaluationInstance(inputStream);
-    // }
+  // /**
+  // * Method to send instances via input stream
+  // *
+  // * @param inputStream
+  // * @param numberInstances
+  // */
+  // public void sendInstances(Stream inputStream, Stream evaluationStream, int
+  // numberInstances, double samplingThreshold) {
+  // int numInstanceSent = 0;
+  // this.samplingThreshold = samplingThreshold;
+  // while (streamSource.hasMoreInstances() && numInstanceSent <
+  // numberInstances) {
+  // numInstanceSent++;
+  // DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent);
+  // ClusteringContentEvent contentEvent = new
+  // ClusteringContentEvent(numInstanceSent, nextDataPoint);
+  // inputStream.put(contentEvent);
+  // sendPointsAndGroundTruth(streamSource, evaluationStream, numInstanceSent,
+  // nextDataPoint);
+  // }
+  //
+  // sendEndEvaluationInstance(inputStream);
+  // }
 
-    public double getSamplingThreshold() {
-        return samplingThreshold;
-    }
+  public double getSamplingThreshold() {
+    return samplingThreshold;
+  }
 
-    public void setSamplingThreshold(double samplingThreshold) {
-        this.samplingThreshold = samplingThreshold;
-    }
-    
-    
+  public void setSamplingThreshold(double samplingThreshold) {
+    this.samplingThreshold = samplingThreshold;
+  }
 
-    public int getGroundTruthSamplingFrequency() {
-        return groundTruthSamplingFrequency;
-    }
+  public int getGroundTruthSamplingFrequency() {
+    return groundTruthSamplingFrequency;
+  }
 
-    public void setGroundTruthSamplingFrequency(int 
groundTruthSamplingFrequency) {
-        this.groundTruthSamplingFrequency = groundTruthSamplingFrequency;
-    }
+  public void setGroundTruthSamplingFrequency(int 
groundTruthSamplingFrequency) {
+    this.groundTruthSamplingFrequency = groundTruthSamplingFrequency;
+  }
+
+  public StreamSource getStreamSource() {
+    return streamSource;
+  }
 
-    public StreamSource getStreamSource() {
-        return streamSource;
+  public void setStreamSource(InstanceStream stream) {
+    if (stream instanceof AbstractOptionHandler) {
+      ((AbstractOptionHandler) (stream)).prepareForUse();
     }
 
-    public void setStreamSource(InstanceStream stream) {
-        if (stream instanceof AbstractOptionHandler) {
-            ((AbstractOptionHandler) (stream)).prepareForUse();
-        }
+    this.streamSource = new StreamSource(stream);
+    firstInstance = streamSource.nextInstance().getData();
+  }
 
-        this.streamSource = new StreamSource(stream);
-        firstInstance = streamSource.nextInstance().getData();
-    }
+  public Instances getDataset() {
+    return firstInstance.dataset();
+  }
 
-    public Instances getDataset() {
-        return firstInstance.dataset();
+  private Instance nextInstance() {
+    if (this.isInited) {
+      return streamSource.nextInstance().getData();
+    } else {
+      this.isInited = true;
+      return firstInstance;
     }
+  }
 
-    private Instance nextInstance() {
-        if (this.isInited) {
-            return streamSource.nextInstance().getData();
-        } else {
-            this.isInited = true;
-            return firstInstance;
-        }
-    }
+  // private void sendEndEvaluationInstance(Stream inputStream) {
+  // ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1,
+  // firstInstance);
+  // contentEvent.setLast(true);
+  // inputStream.put(contentEvent);
+  // }
 
-    // private void sendEndEvaluationInstance(Stream inputStream) {
-    // ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, 
firstInstance);
-    // contentEvent.setLast(true);
-    // inputStream.put(contentEvent);
-    // }
+  // private void sendPointsAndGroundTruth(StreamSource sourceStream, Stream
+  // evaluationStream, int numInstanceSent, DataPoint nextDataPoint) {
+  // boolean sendEvent = false;
+  // DataPoint instance = null;
+  // Clustering gtClustering = null;
+  // int samplingFrequency = ((ClusteringStream)
+  // sourceStream.getStream()).getDecayHorizon();
+  // if (random.nextDouble() < samplingThreshold) {
+  // // Add instance
+  // sendEvent = true;
+  // instance = nextDataPoint;
+  // }
+  // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) {
+  // // Add GroundTruth
+  // sendEvent = true;
+  // gtClustering = ((RandomRBFGeneratorEvents)
+  // sourceStream.getStream()).getGeneratingClusters();
+  // }
+  // if (sendEvent == true) {
+  // ClusteringEvaluationContentEvent evalEvent;
+  // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance,
+  // false);
+  // evaluationStream.put(evalEvent);
+  // }
+  // }
+
+  public void setMaxNumInstances(int value) {
+    numberInstances = value;
+  }
+
+  public int getMaxNumInstances() {
+    return this.numberInstances;
+  }
+
+  @Override
+  public ContentEvent nextEvent() {
 
-    // private void sendPointsAndGroundTruth(StreamSource sourceStream, Stream 
evaluationStream, int numInstanceSent, DataPoint nextDataPoint) {
     // boolean sendEvent = false;
     // DataPoint instance = null;
     // Clustering gtClustering = null;
-    // int samplingFrequency = ((ClusteringStream) 
sourceStream.getStream()).getDecayHorizon();
+    // int samplingFrequency = ((ClusteringStream)
+    // sourceStream.getStream()).getDecayHorizon();
     // if (random.nextDouble() < samplingThreshold) {
     // // Add instance
     // sendEvent = true;
@@ -174,68 +214,52 @@ public final class ClusteringEntranceProcessor implements 
EntranceProcessor {
     // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) {
     // // Add GroundTruth
     // sendEvent = true;
-    // gtClustering = ((RandomRBFGeneratorEvents) 
sourceStream.getStream()).getGeneratingClusters();
+    // gtClustering = ((RandomRBFGeneratorEvents)
+    // sourceStream.getStream()).getGeneratingClusters();
     // }
     // if (sendEvent == true) {
     // ClusteringEvaluationContentEvent evalEvent;
-    // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, 
instance, false);
+    // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance,
+    // false);
     // evaluationStream.put(evalEvent);
     // }
-    // }
-
-    public void setMaxNumInstances(int value) {
-        numberInstances = value;
-    }
-
-    public int getMaxNumInstances() {
-        return this.numberInstances;
-    }
 
-    @Override
-    public ContentEvent nextEvent() {
-
-        // boolean sendEvent = false;
-        // DataPoint instance = null;
-        // Clustering gtClustering = null;
-        // int samplingFrequency = ((ClusteringStream) 
sourceStream.getStream()).getDecayHorizon();
-        // if (random.nextDouble() < samplingThreshold) {
-        // // Add instance
-        // sendEvent = true;
-        // instance = nextDataPoint;
-        // }
-        // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 
0) {
-        // // Add GroundTruth
-        // sendEvent = true;
-        // gtClustering = ((RandomRBFGeneratorEvents) 
sourceStream.getStream()).getGeneratingClusters();
-        // }
-        // if (sendEvent == true) {
-        // ClusteringEvaluationContentEvent evalEvent;
-        // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, 
instance, false);
-        // evaluationStream.put(evalEvent);
-        // }
-
-        groundTruthSamplingFrequency = ((ClusteringStream) 
streamSource.getStream()).getDecayHorizon(); // FIXME should it be taken from 
the ClusteringEvaluation -f option instead?
-        if (isFinished()) {
-            // send ending event
-            ClusteringContentEvent contentEvent = new 
ClusteringContentEvent(-1, firstInstance);
-            contentEvent.setLast(true);
-            return contentEvent;
-        } else {
-            DataPoint nextDataPoint = new DataPoint(nextInstance(), 
numInstanceSent);
-            numInstanceSent++;
-            if (numInstanceSent % groundTruthSamplingFrequency == 0) {
-                // TODO implement an interface ClusteringGroundTruth with a 
getGeneratingClusters() method, check if the source implements the interface
-                // send a clustering evaluation event for external measures 
(distance from the gt clusters)
-                Clustering gtClustering = ((RandomRBFGeneratorEvents) 
streamSource.getStream()).getGeneratingClusters();
-                return new ClusteringEvaluationContentEvent(gtClustering, 
nextDataPoint, false);
-            } else {
-                ClusteringContentEvent contentEvent = new 
ClusteringContentEvent(numInstanceSent, nextDataPoint);
-                if (random.nextDouble() < samplingThreshold) {
-                    // send a clustering content event for internal measures 
(cohesion, separation)
-                    contentEvent.setSample(true);
-                }
-                return contentEvent;
-            }
+    groundTruthSamplingFrequency = ((ClusteringStream) 
streamSource.getStream()).getDecayHorizon(); // FIXME
+                                                                               
                     // should
+                                                                               
                     // it
+                                                                               
                     // be
+                                                                               
                     // taken
+                                                                               
                     // from
+                                                                               
                     // the
+                                                                               
                     // ClusteringEvaluation
+                                                                               
                     // -f
+                                                                               
                     // option
+                                                                               
                     // instead?
+    if (isFinished()) {
+      // send ending event
+      ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, 
firstInstance);
+      contentEvent.setLast(true);
+      return contentEvent;
+    } else {
+      DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent);
+      numInstanceSent++;
+      if (numInstanceSent % groundTruthSamplingFrequency == 0) {
+        // TODO implement an interface ClusteringGroundTruth with a
+        // getGeneratingClusters() method, check if the source implements the
+        // interface
+        // send a clustering evaluation event for external measures (distance
+        // from the gt clusters)
+        Clustering gtClustering = ((RandomRBFGeneratorEvents) 
streamSource.getStream()).getGeneratingClusters();
+        return new ClusteringEvaluationContentEvent(gtClustering, 
nextDataPoint, false);
+      } else {
+        ClusteringContentEvent contentEvent = new 
ClusteringContentEvent(numInstanceSent, nextDataPoint);
+        if (random.nextDouble() < samplingThreshold) {
+          // send a clustering content event for internal measures (cohesion,
+          // separation)
+          contentEvent.setSample(true);
         }
+        return contentEvent;
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java
index c8004f5..548c0da 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java
@@ -37,138 +37,139 @@ import com.yahoo.labs.samoa.moa.tasks.TaskMonitor;
 import com.yahoo.labs.samoa.streams.fs.FileStreamSource;
 
 /**
- * InstanceStream for files 
- * (Abstract class: subclass this class for different file formats)
+ * InstanceStream for files (Abstract class: subclass this class for different
+ * file formats)
+ * 
  * @author Casey
  */
 public abstract class FileStream extends AbstractOptionHandler implements 
InstanceStream {
-       /**
+  /**
     *
     */
-   private static final long serialVersionUID = 3028905554604259130L;
-
-   public ClassOption sourceTypeOption = new ClassOption("sourceType",
-           's', "Source Type (HDFS, local FS)", FileStreamSource.class,
-           "LocalFileStreamSource");
-   
-   protected transient FileStreamSource fileSource;
-   protected transient Reader fileReader;
-   protected Instances instances;
-   
-   protected boolean hitEndOfStream;
-   private boolean hasStarted;
-
-   /*
-    * Constructors
-    */
-   public FileStream() {
-          this.hitEndOfStream = false;
-   }
-   
-   /*
-    * implement InstanceStream
-    */
-   @Override
-   public InstancesHeader getHeader() {
-          return new InstancesHeader(this.instances);
-   }
-
-   @Override
-   public long estimatedRemainingInstances() {
-          return -1;
-   }
-
-   @Override
-   public boolean hasMoreInstances() {
-          return !this.hitEndOfStream;
-   }
-   
-   @Override
-   public InstanceExample nextInstance() {
-          if (this.getLastInstanceRead() == null) {
-                  readNextInstanceFromStream();
-          }
-          InstanceExample prevInstance = this.getLastInstanceRead();
-          readNextInstanceFromStream();
-          return prevInstance;
-   }
-
-   @Override
-   public boolean isRestartable() {
-           return true;
-   }
-
-   @Override
-   public void restart() {
-          reset();
-          hasStarted = false;
-   }
-
-   protected void reset() {
-          try {
-                  if (this.fileReader != null)
-                          this.fileReader.close();
-                  
-                  fileSource.reset();
-          }
-          catch (IOException ioe) {
-                  throw new RuntimeException("FileStream restart failed.", 
ioe);
-          }
-          
-          if (!getNextFileReader()) {
-                  hitEndOfStream = true;
-                  throw new RuntimeException("FileStream is empty.");
-          }
-          
-       this.instances = new Instances(this.fileReader, 1, -1);
-       this.instances.setClassIndex(this.instances.numAttributes() - 1);
-   }
-   
-   protected boolean getNextFileReader() {
-          if (this.fileReader != null) 
-                  try {
-                          this.fileReader.close();
-                  } catch (IOException ioe) {
-                          ioe.printStackTrace();
-                  }
-          
-          InputStream inputStream = this.fileSource.getNextInputStream();
-          if (inputStream == null)
-                  return false;
-
-          this.fileReader = new BufferedReader(new 
InputStreamReader(inputStream));
-          return true;
-   }
-   
-   protected boolean readNextInstanceFromStream() {
-          if (!hasStarted) {
-                  this.reset();  
-                  hasStarted = true;
-          }
-          
-          while (true) {
-                  if (readNextInstanceFromFile()) return true;
-
-                  if (!getNextFileReader()) {
-                          this.hitEndOfStream = true;
-                          return false;
-                  }
-          }
-   }
-   
-   /**
-    * Read next instance from the current file and assign it to
-    * lastInstanceRead.
-    * @return true if it was able to read next instance and
-    *            false if it was at the end of the file
-    */
-   protected abstract boolean readNextInstanceFromFile();
-   
-   protected abstract InstanceExample getLastInstanceRead();
-   
-   @Override
-   public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
-          this.fileSource = sourceTypeOption.getValue();
-          this.hasStarted = false;
-   }
+  private static final long serialVersionUID = 3028905554604259130L;
+
+  public ClassOption sourceTypeOption = new ClassOption("sourceType",
+      's', "Source Type (HDFS, local FS)", FileStreamSource.class,
+      "LocalFileStreamSource");
+
+  protected transient FileStreamSource fileSource;
+  protected transient Reader fileReader;
+  protected Instances instances;
+
+  protected boolean hitEndOfStream;
+  private boolean hasStarted;
+
+  /*
+   * Constructors
+   */
+  public FileStream() {
+    this.hitEndOfStream = false;
+  }
+
+  /*
+   * implement InstanceStream
+   */
+  @Override
+  public InstancesHeader getHeader() {
+    return new InstancesHeader(this.instances);
+  }
+
+  @Override
+  public long estimatedRemainingInstances() {
+    return -1;
+  }
+
+  @Override
+  public boolean hasMoreInstances() {
+    return !this.hitEndOfStream;
+  }
+
+  @Override
+  public InstanceExample nextInstance() {
+    if (this.getLastInstanceRead() == null) {
+      readNextInstanceFromStream();
+    }
+    InstanceExample prevInstance = this.getLastInstanceRead();
+    readNextInstanceFromStream();
+    return prevInstance;
+  }
+
+  @Override
+  public boolean isRestartable() {
+    return true;
+  }
+
+  @Override
+  public void restart() {
+    reset();
+    hasStarted = false;
+  }
+
+  protected void reset() {
+    try {
+      if (this.fileReader != null)
+        this.fileReader.close();
+
+      fileSource.reset();
+    } catch (IOException ioe) {
+      throw new RuntimeException("FileStream restart failed.", ioe);
+    }
+
+    if (!getNextFileReader()) {
+      hitEndOfStream = true;
+      throw new RuntimeException("FileStream is empty.");
+    }
+
+    this.instances = new Instances(this.fileReader, 1, -1);
+    this.instances.setClassIndex(this.instances.numAttributes() - 1);
+  }
+
+  protected boolean getNextFileReader() {
+    if (this.fileReader != null)
+      try {
+        this.fileReader.close();
+      } catch (IOException ioe) {
+        ioe.printStackTrace();
+      }
+
+    InputStream inputStream = this.fileSource.getNextInputStream();
+    if (inputStream == null)
+      return false;
+
+    this.fileReader = new BufferedReader(new InputStreamReader(inputStream));
+    return true;
+  }
+
+  protected boolean readNextInstanceFromStream() {
+    if (!hasStarted) {
+      this.reset();
+      hasStarted = true;
+    }
+
+    while (true) {
+      if (readNextInstanceFromFile())
+        return true;
+
+      if (!getNextFileReader()) {
+        this.hitEndOfStream = true;
+        return false;
+      }
+    }
+  }
+
+  /**
+   * Read next instance from the current file and assign it to 
lastInstanceRead.
+   * 
+   * @return true if it was able to read next instance and false if it was at
+   *         the end of the file
+   */
+  protected abstract boolean readNextInstanceFromFile();
+
+  protected abstract InstanceExample getLastInstanceRead();
+
+  @Override
+  public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
+    this.fileSource = sourceTypeOption.getValue();
+    this.hasStarted = false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java
index e9a5aa1..f9884e9 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java
@@ -38,192 +38,196 @@ import 
com.yahoo.labs.samoa.moa.options.AbstractOptionHandler;
 import com.yahoo.labs.samoa.moa.streams.InstanceStream;
 
 /**
- * Prequential Source Processor is the processor for Prequential Evaluation 
Task.
+ * Prequential Source Processor is the processor for Prequential Evaluation
+ * Task.
  * 
  * @author Arinto Murdopo
  * 
  */
 public final class PrequentialSourceProcessor implements EntranceProcessor {
 
-    private static final long serialVersionUID = 4169053337917578558L;
-
-    private static final Logger logger = 
LoggerFactory.getLogger(PrequentialSourceProcessor.class);
-    private boolean isInited = false;
-    private StreamSource streamSource;
-    private Instance firstInstance;
-    private int numberInstances;
-    private int numInstanceSent = 0;
-
-    protected InstanceStream sourceStream;
-    
-    /*
-        * ScheduledExecutorService to schedule sending events after each delay 
interval.
-        * It is expected to have only one event in the queue at a time, so we 
need only 
-        * one thread in the pool.
-        */
-       private transient ScheduledExecutorService timer;
-       private transient ScheduledFuture<?> schedule = null;
-       private int readyEventIndex = 1; // No waiting for the first event
-       private int delay = 0;
-       private int batchSize = 1;
-    private boolean finished = false;
-
-    @Override
-    public boolean process(ContentEvent event) {
-        // TODO: possible refactor of the super-interface implementation
-        // of source processor does not need this method
-        return false;
+  private static final long serialVersionUID = 4169053337917578558L;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(PrequentialSourceProcessor.class);
+  private boolean isInited = false;
+  private StreamSource streamSource;
+  private Instance firstInstance;
+  private int numberInstances;
+  private int numInstanceSent = 0;
+
+  protected InstanceStream sourceStream;
+
+  /*
+   * ScheduledExecutorService to schedule sending events after each delay
+   * interval. It is expected to have only one event in the queue at a time, so
+   * we need only one thread in the pool.
+   */
+  private transient ScheduledExecutorService timer;
+  private transient ScheduledFuture<?> schedule = null;
+  private int readyEventIndex = 1; // No waiting for the first event
+  private int delay = 0;
+  private int batchSize = 1;
+  private boolean finished = false;
+
+  @Override
+  public boolean process(ContentEvent event) {
+    // TODO: possible refactor of the super-interface implementation
+    // of source processor does not need this method
+    return false;
+  }
+
+  @Override
+  public boolean isFinished() {
+    return finished;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return !isFinished() && (delay <= 0 || numInstanceSent < readyEventIndex);
+  }
+
+  private boolean hasReachedEndOfStream() {
+    return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && 
numInstanceSent >= numberInstances));
+  }
+
+  @Override
+  public ContentEvent nextEvent() {
+    InstanceContentEvent contentEvent = null;
+    if (hasReachedEndOfStream()) {
+      contentEvent = new InstanceContentEvent(-1, firstInstance, false, true);
+      contentEvent.setLast(true);
+      // set finished status _after_ tagging last event
+      finished = true;
     }
-    
-    @Override
-    public boolean isFinished() {
-       return finished;
+    else if (hasNext()) {
+      numInstanceSent++;
+      contentEvent = new InstanceContentEvent(numInstanceSent, nextInstance(), 
true, true);
+
+      // first call to this method will trigger the timer
+      if (schedule == null && delay > 0) {
+        schedule = timer.scheduleWithFixedDelay(new DelayTimeoutHandler(this), 
delay, delay,
+            TimeUnit.MICROSECONDS);
+      }
     }
-
-    @Override
-    public boolean hasNext() {
-        return !isFinished() && (delay <= 0 || numInstanceSent < 
readyEventIndex);
+    return contentEvent;
+  }
+
+  private void increaseReadyEventIndex() {
+    readyEventIndex += batchSize;
+    // if we exceed the max, cancel the timer
+    if (schedule != null && isFinished()) {
+      schedule.cancel(false);
     }
-
-    private boolean hasReachedEndOfStream() {
-        return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && 
numInstanceSent >= numberInstances));
+  }
+
+  @Override
+  public void onCreate(int id) {
+    initStreamSource(sourceStream);
+    timer = Executors.newScheduledThreadPool(1);
+    logger.debug("Creating PrequentialSourceProcessor with id {}", id);
+  }
+
+  @Override
+  public Processor newProcessor(Processor p) {
+    PrequentialSourceProcessor newProcessor = new PrequentialSourceProcessor();
+    PrequentialSourceProcessor originProcessor = (PrequentialSourceProcessor) 
p;
+    if (originProcessor.getStreamSource() != null) {
+      
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
     }
-
-    @Override
-    public ContentEvent nextEvent() {
-        InstanceContentEvent contentEvent = null;
-        if (hasReachedEndOfStream()) {
-               contentEvent = new InstanceContentEvent(-1, firstInstance, 
false, true);
-            contentEvent.setLast(true);
-            // set finished status _after_ tagging last event
-            finished = true;
-        }
-        else if (hasNext()) {
-            numInstanceSent++;
-            contentEvent = new InstanceContentEvent(numInstanceSent, 
nextInstance(), true, true);
-            
-            // first call to this method will trigger the timer
-            if (schedule == null && delay > 0) {
-                schedule = timer.scheduleWithFixedDelay(new 
DelayTimeoutHandler(this), delay, delay,
-                        TimeUnit.MICROSECONDS);
-            }
-        }
-        return contentEvent;
+    return newProcessor;
+  }
+
+  // /**
+  // * Method to send instances via input stream
+  // *
+  // * @param inputStream
+  // * @param numberInstances
+  // */
+  // public void sendInstances(Stream inputStream, int numberInstances) {
+  // int numInstanceSent = 0;
+  // initStreamSource(sourceStream);
+  //
+  // while (streamSource.hasMoreInstances() && numInstanceSent <
+  // numberInstances) {
+  // numInstanceSent++;
+  // InstanceContentEvent contentEvent = new
+  // InstanceContentEvent(numInstanceSent, nextInstance(), true, true);
+  // inputStream.put(contentEvent);
+  // }
+  //
+  // sendEndEvaluationInstance(inputStream);
+  // }
+
+  public StreamSource getStreamSource() {
+    return streamSource;
+  }
+
+  public void setStreamSource(InstanceStream stream) {
+    this.sourceStream = stream;
+  }
+
+  public Instances getDataset() {
+    if (firstInstance == null) {
+      initStreamSource(sourceStream);
     }
-    
-       private void increaseReadyEventIndex() {
-               readyEventIndex+=batchSize;
-               // if we exceed the max, cancel the timer
-               if (schedule != null && isFinished()) {
-                       schedule.cancel(false);
-               }
-       }
-
-    @Override
-    public void onCreate(int id) {
-        initStreamSource(sourceStream);
-        timer = Executors.newScheduledThreadPool(1);
-        logger.debug("Creating PrequentialSourceProcessor with id {}", id);
+    return firstInstance.dataset();
+  }
+
+  private Instance nextInstance() {
+    if (this.isInited) {
+      return streamSource.nextInstance().getData();
+    } else {
+      this.isInited = true;
+      return firstInstance;
     }
-
-    @Override
-    public Processor newProcessor(Processor p) {
-        PrequentialSourceProcessor newProcessor = new 
PrequentialSourceProcessor();
-        PrequentialSourceProcessor originProcessor = 
(PrequentialSourceProcessor) p;
-        if (originProcessor.getStreamSource() != null) {
-            
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
-        }
-        return newProcessor;
+  }
+
+  // private void sendEndEvaluationInstance(Stream inputStream) {
+  // InstanceContentEvent contentEvent = new InstanceContentEvent(-1,
+  // firstInstance, false, true);
+  // contentEvent.setLast(true);
+  // inputStream.put(contentEvent);
+  // }
+
+  private void initStreamSource(InstanceStream stream) {
+    if (stream instanceof AbstractOptionHandler) {
+      ((AbstractOptionHandler) (stream)).prepareForUse();
     }
 
-//    /**
-//     * Method to send instances via input stream
-//     * 
-//     * @param inputStream
-//     * @param numberInstances
-//     */
-//    public void sendInstances(Stream inputStream, int numberInstances) {
-//        int numInstanceSent = 0;
-//        initStreamSource(sourceStream);
-//
-//        while (streamSource.hasMoreInstances() && numInstanceSent < 
numberInstances) {
-//            numInstanceSent++;
-//            InstanceContentEvent contentEvent = new 
InstanceContentEvent(numInstanceSent, nextInstance(), true, true);
-//            inputStream.put(contentEvent);
-//        }
-//
-//        sendEndEvaluationInstance(inputStream);
-//    }
-
-    public StreamSource getStreamSource() {
-        return streamSource;
-    }
+    this.streamSource = new StreamSource(stream);
+    firstInstance = streamSource.nextInstance().getData();
+  }
 
-    public void setStreamSource(InstanceStream stream) {
-        this.sourceStream = stream;
-    }
+  public void setMaxNumInstances(int value) {
+    numberInstances = value;
+  }
 
-    public Instances getDataset() {
-        if (firstInstance == null) {
-            initStreamSource(sourceStream);
-        }
-        return firstInstance.dataset();
-    }
+  public int getMaxNumInstances() {
+    return this.numberInstances;
+  }
 
-    private Instance nextInstance() {
-        if (this.isInited) {
-            return streamSource.nextInstance().getData();
-        } else {
-            this.isInited = true;
-            return firstInstance;
-        }
-    }
+  public void setSourceDelay(int delay) {
+    this.delay = delay;
+  }
 
-//    private void sendEndEvaluationInstance(Stream inputStream) {
-//        InstanceContentEvent contentEvent = new InstanceContentEvent(-1, 
firstInstance, false, true);
-//        contentEvent.setLast(true);
-//        inputStream.put(contentEvent);
-//    }
+  public int getSourceDelay() {
+    return this.delay;
+  }
 
-    private void initStreamSource(InstanceStream stream) {
-        if (stream instanceof AbstractOptionHandler) {
-            ((AbstractOptionHandler) (stream)).prepareForUse();
-        }
+  public void setDelayBatchSize(int batch) {
+    this.batchSize = batch;
+  }
 
-        this.streamSource = new StreamSource(stream);
-        firstInstance = streamSource.nextInstance().getData();
-    }
+  private class DelayTimeoutHandler implements Runnable {
 
-    public void setMaxNumInstances(int value) {
-        numberInstances = value;
-    }
-    
-    public int getMaxNumInstances() {
-       return this.numberInstances;
+    private PrequentialSourceProcessor processor;
+
+    public DelayTimeoutHandler(PrequentialSourceProcessor processor) {
+      this.processor = processor;
     }
-    
-       public void setSourceDelay(int delay) {
-               this.delay = delay;
-       }
-
-       public int getSourceDelay() {
-               return this.delay;
-       }
-       
-       public void setDelayBatchSize(int batch) {
-               this.batchSize = batch;
-       }
-       
-       private class DelayTimeoutHandler implements Runnable {
-       
-       private PrequentialSourceProcessor processor;
-       
-       public DelayTimeoutHandler(PrequentialSourceProcessor processor) {
-               this.processor = processor;
-       }
-       
-       public void run() {
-               processor.increaseReadyEventIndex();
-       }
+
+    public void run() {
+      processor.increaseReadyEventIndex();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java
index 453d02d..4eca28c 100644
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java
@@ -31,60 +31,62 @@ import com.yahoo.labs.samoa.instances.Instance;
 /**
  * The Class StreamSource.
  */
-public class StreamSource implements java.io.Serializable{
+public class StreamSource implements java.io.Serializable {
 
-       /**
+  /**
         * 
         */
-       private static final long serialVersionUID = 3974668694861231236L;
+  private static final long serialVersionUID = 3974668694861231236L;
 
-       /**
-        * Instantiates a new stream source.
-        *
-        * @param stream the stream
-        */
-       public StreamSource(InstanceStream stream) {
-               super();
-               this.stream = stream;
-       }
+  /**
+   * Instantiates a new stream source.
+   * 
+   * @param stream
+   *          the stream
+   */
+  public StreamSource(InstanceStream stream) {
+    super();
+    this.stream = stream;
+  }
 
-       /** The stream. */
-       protected InstanceStream stream;
+  /** The stream. */
+  protected InstanceStream stream;
 
-       /**
-        * Gets the stream.
-        *
-        * @return the stream
-        */
-       public InstanceStream getStream() {
-               return stream;
-       }
+  /**
+   * Gets the stream.
+   * 
+   * @return the stream
+   */
+  public InstanceStream getStream() {
+    return stream;
+  }
 
-       /**
-        * Next instance.
-        *
-        * @return the instance
-        */
-       public Example<Instance> nextInstance() {
-               return stream.nextInstance();
-       }
+  /**
+   * Next instance.
+   * 
+   * @return the instance
+   */
+  public Example<Instance> nextInstance() {
+    return stream.nextInstance();
+  }
 
-       /**
-        * Sets the stream.
-        *
-        * @param stream the new stream
-        */
-       public void setStream(InstanceStream stream) {
-               this.stream = stream;
-       }
+  /**
+   * Sets the stream.
+   * 
+   * @param stream
+   *          the new stream
+   */
+  public void setStream(InstanceStream stream) {
+    this.stream = stream;
+  }
 
-       /**
-        * Checks for more instances.
-        *
-        * @return true, if successful
-        */
-       public boolean hasMoreInstances() {
-               return this.stream.hasMoreInstances();
-       }
+  /**
+   * Checks for more instances.
+   * 
+   * @return true, if successful
+   */
+  public boolean hasMoreInstances() {
+    return this.stream.hasMoreInstances();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java
index 2e66e4b..980802f 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java
@@ -39,147 +39,157 @@ import com.yahoo.labs.samoa.topology.Stream;
  * The Class StreamSourceProcessor.
  */
 public class StreamSourceProcessor implements Processor {
-       
-       /** The Constant logger. */
-       private static final Logger logger = LoggerFactory
-                       .getLogger(StreamSourceProcessor.class);
 
-       /**
-        * 
-        */
-       private static final long serialVersionUID = -204182279475432739L;
+  /** The Constant logger. */
+  private static final Logger logger = LoggerFactory
+      .getLogger(StreamSourceProcessor.class);
 
-       /** The stream source. */
-       private StreamSource streamSource;
-
-       /**
-        * Gets the stream source.
-        *
-        * @return the stream source
-        */
-       public StreamSource getStreamSource() {
-               return streamSource;
-       }
-
-       /**
-        * Sets the stream source.
-        *
-        * @param stream the new stream source
-        */
-       public void setStreamSource(InstanceStream stream) {
-               this.streamSource = new StreamSource(stream);
-               firstInstance = streamSource.nextInstance().getData();
-       }
-
-       /** The number instances sent. */
-       private long numberInstancesSent = 0;
-
-       /**
-        * Send instances.
-        *  @param inputStream the input stream
-        * @param numberInstances the number instances
-        * @param isTraining the is training
-        */
-       public void sendInstances(Stream inputStream,
-                                                                               
                                int numberInstances, boolean isTraining, 
boolean isTesting) {
-               int numberSamples = 0;
-
-               while (streamSource.hasMoreInstances()
-                               && numberSamples < numberInstances) {
-                       
-                       numberSamples++;
-                       numberInstancesSent++;
-                       InstanceContentEvent instanceContentEvent = new 
InstanceContentEvent(
-                                       numberInstancesSent, nextInstance(), 
isTraining, isTesting);
-               
-                       
-                       inputStream.put(instanceContentEvent);
-               }
-
-               InstanceContentEvent instanceContentEvent = new 
InstanceContentEvent(
-                               numberInstancesSent, null, isTraining, 
isTesting);
-               instanceContentEvent.setLast(true);
-               inputStream.put(instanceContentEvent);
-       }
-
-       /**
-        * Send end evaluation instance.
-        *
-        * @param inputStream the input stream
-        */
-       public void sendEndEvaluationInstance(Stream inputStream) {
-               InstanceContentEvent instanceContentEvent = new 
InstanceContentEvent(-1, firstInstance,false, true);
-               inputStream.put(instanceContentEvent);
-       }
-
-       /**
-        * Next instance.
-        *
-        * @return the instance
-        */
-       protected Instance nextInstance() {
-               if (this.isInited) {
-                       return streamSource.nextInstance().getData();
-               } else {
-                       this.isInited = true;
-                       return firstInstance;
-               }
-       }
-
-       /** The is inited. */
-       protected boolean isInited = false;
-       
-       /** The first instance. */
-       protected Instance firstInstance;
-
-       //@Override
-       /**
-        * On remove.
-        */
-       protected void onRemove() {
-       }
-
-       /* (non-Javadoc)
-        * @see samoa.core.Processor#onCreate(int)
-        */
-       @Override
-       public void onCreate(int id) {
-               // TODO Auto-generated method stub
-       }
-
-       /* (non-Javadoc)
-        * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
-        */
-       @Override
-       public Processor newProcessor(Processor sourceProcessor) {
-//             StreamSourceProcessor newProcessor = new 
StreamSourceProcessor();
-//             StreamSourceProcessor originProcessor = (StreamSourceProcessor) 
sourceProcessor;
-//             if (originProcessor.getStreamSource() != null){
-//                     
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
-//             }
-               //return newProcessor;
-               return null;
-       }
-
-       /**
-        * On event.
-        *
-        * @param event the event
-        * @return true, if successful
-        */
-       @Override
-       public boolean process(ContentEvent event) {
-               return false;
-       }
-       
-       
-       /**
-        * Gets the dataset.
-        *
-        * @return the dataset
+  /**
+        * 
         */
-       public Instances getDataset() {
-               return firstInstance.dataset();
-       }
+  private static final long serialVersionUID = -204182279475432739L;
+
+  /** The stream source. */
+  private StreamSource streamSource;
+
+  /**
+   * Gets the stream source.
+   * 
+   * @return the stream source
+   */
+  public StreamSource getStreamSource() {
+    return streamSource;
+  }
+
+  /**
+   * Sets the stream source.
+   * 
+   * @param stream
+   *          the new stream source
+   */
+  public void setStreamSource(InstanceStream stream) {
+    this.streamSource = new StreamSource(stream);
+    firstInstance = streamSource.nextInstance().getData();
+  }
+
+  /** The number instances sent. */
+  private long numberInstancesSent = 0;
+
+  /**
+   * Send instances.
+   * 
+   * @param inputStream
+   *          the input stream
+   * @param numberInstances
+   *          the number instances
+   * @param isTraining
+   *          the is training
+   */
+  public void sendInstances(Stream inputStream,
+      int numberInstances, boolean isTraining, boolean isTesting) {
+    int numberSamples = 0;
+
+    while (streamSource.hasMoreInstances()
+        && numberSamples < numberInstances) {
+
+      numberSamples++;
+      numberInstancesSent++;
+      InstanceContentEvent instanceContentEvent = new InstanceContentEvent(
+          numberInstancesSent, nextInstance(), isTraining, isTesting);
+
+      inputStream.put(instanceContentEvent);
+    }
+
+    InstanceContentEvent instanceContentEvent = new InstanceContentEvent(
+        numberInstancesSent, null, isTraining, isTesting);
+    instanceContentEvent.setLast(true);
+    inputStream.put(instanceContentEvent);
+  }
+
+  /**
+   * Send end evaluation instance.
+   * 
+   * @param inputStream
+   *          the input stream
+   */
+  public void sendEndEvaluationInstance(Stream inputStream) {
+    InstanceContentEvent instanceContentEvent = new InstanceContentEvent(-1, 
firstInstance, false, true);
+    inputStream.put(instanceContentEvent);
+  }
+
+  /**
+   * Next instance.
+   * 
+   * @return the instance
+   */
+  protected Instance nextInstance() {
+    if (this.isInited) {
+      return streamSource.nextInstance().getData();
+    } else {
+      this.isInited = true;
+      return firstInstance;
+    }
+  }
+
+  /** The is inited. */
+  protected boolean isInited = false;
+
+  /** The first instance. */
+  protected Instance firstInstance;
+
+  // @Override
+  /**
+   * On remove.
+   */
+  protected void onRemove() {
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.core.Processor#onCreate(int)
+   */
+  @Override
+  public void onCreate(int id) {
+    // TODO Auto-generated method stub
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
+   */
+  @Override
+  public Processor newProcessor(Processor sourceProcessor) {
+    // StreamSourceProcessor newProcessor = new StreamSourceProcessor();
+    // StreamSourceProcessor originProcessor = (StreamSourceProcessor)
+    // sourceProcessor;
+    // if (originProcessor.getStreamSource() != null){
+    // 
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
+    // }
+    // return newProcessor;
+    return null;
+  }
+
+  /**
+   * On event.
+   * 
+   * @param event
+   *          the event
+   * @return true, if successful
+   */
+  @Override
+  public boolean process(ContentEvent event) {
+    return false;
+  }
+
+  /**
+   * Gets the dataset.
+   * 
+   * @return the dataset
+   */
+  public Instances getDataset() {
+    return firstInstance.dataset();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java
index 25541e2..6d741f9 100644
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java
@@ -26,42 +26,41 @@ import java.io.Serializable;
 
 /**
  * An interface for FileStream's source (Local FS, HDFS,...)
+ * 
  * @author Casey
  */
 public interface FileStreamSource extends Serializable {
 
-       /**
-        * Init the source with file/directory path and file extension
-        * @param path
-        *            File or directory path
-        * @param ext
-        *            File extension to be used to filter files in a directory. 
-        *            If null, all files in the directory are accepted.
-        */
-       public void init(String path, String ext);
-       
-       /**
-        * Reset the source
-        */
-       public void reset() throws IOException;
-       
-       /**
-        * Retrieve InputStream for next file.
-        * This method will return null if we are at the last file 
-        * in the list.
-        * 
-        * @return InputStream for next file in the list
-        */
-       public InputStream getNextInputStream();
-       
-       /**
-        * Retrieve InputStream for current file.
-        * The "current pointer" is moved forward
-        * with getNextInputStream method. So if there was no
-        * invocation of getNextInputStream, this method will
-        * return null.
-        * 
-        * @return InputStream for current file in the list
-        */
-       public InputStream getCurrentInputStream();
+  /**
+   * Init the source with file/directory path and file extension
+   * 
+   * @param path
+   *          File or directory path
+   * @param ext
+   *          File extension to be used to filter files in a directory. If 
null,
+   *          all files in the directory are accepted.
+   */
+  public void init(String path, String ext);
+
+  /**
+   * Reset the source
+   */
+  public void reset() throws IOException;
+
+  /**
+   * Retrieve InputStream for next file. This method will return null if we are
+   * at the last file in the list.
+   * 
+   * @return InputStream for next file in the list
+   */
+  public InputStream getNextInputStream();
+
+  /**
+   * Retrieve InputStream for current file. The "current pointer" is moved
+   * forward with getNextInputStream method. So if there was no invocation of
+   * getNextInputStream, this method will return null.
+   * 
+   * @return InputStream for current file in the list
+   */
+  public InputStream getCurrentInputStream();
 }

Reply via email to