http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java
deleted file mode 100644
index 866e47f..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java
+++ /dev/null
@@ -1,185 +0,0 @@
-package com.yahoo.labs.samoa.moa.streams.generators;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.Random;
-
-import com.github.javacliparser.FloatOption;
-import com.github.javacliparser.IntOption;
-import com.yahoo.labs.samoa.instances.Attribute;
-import com.yahoo.labs.samoa.instances.DenseInstance;
-import com.yahoo.labs.samoa.instances.Instance;
-import com.yahoo.labs.samoa.instances.Instances;
-import com.yahoo.labs.samoa.instances.InstancesHeader;
-import com.yahoo.labs.samoa.moa.core.Example;
-import com.yahoo.labs.samoa.moa.core.FastVector;
-import com.yahoo.labs.samoa.moa.core.InstanceExample;
-import com.yahoo.labs.samoa.moa.core.ObjectRepository;
-import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler;
-import com.yahoo.labs.samoa.moa.streams.InstanceStream;
-import com.yahoo.labs.samoa.moa.tasks.TaskMonitor;
-
-/**
- * Stream generator for Hyperplane data stream.
- * 
- * @author Albert Bifet (abifet at cs dot waikato dot ac dot nz)
- * @version $Revision: 7 $
- */
-public class HyperplaneGenerator extends AbstractOptionHandler implements 
InstanceStream {
-
-  @Override
-  public String getPurposeString() {
-    return "Generates a problem of predicting class of a rotating hyperplane.";
-  }
-
-  private static final long serialVersionUID = 1L;
-
-  public IntOption instanceRandomSeedOption = new 
IntOption("instanceRandomSeed", 'i',
-      "Seed for random generation of instances.", 1);
-
-  public IntOption numClassesOption = new IntOption("numClasses", 'c', "The 
number of classes to generate.", 2, 2,
-      Integer.MAX_VALUE);
-
-  public IntOption numAttsOption = new IntOption("numAtts", 'a', "The number 
of attributes to generate.", 10, 0,
-      Integer.MAX_VALUE);
-
-  public IntOption numDriftAttsOption = new IntOption("numDriftAtts", 'k', 
"The number of attributes with drift.", 2,
-      0, Integer.MAX_VALUE);
-
-  public FloatOption magChangeOption = new FloatOption("magChange", 't', 
"Magnitude of the change for every example",
-      0.0, 0.0, 1.0);
-
-  public IntOption noisePercentageOption = new IntOption("noisePercentage", 
'n',
-      "Percentage of noise to add to the data.", 5, 0, 100);
-
-  public IntOption sigmaPercentageOption = new IntOption("sigmaPercentage", 
's',
-      "Percentage of probability that the direction of change is reversed.", 
10,
-      0, 100);
-
-  protected InstancesHeader streamHeader;
-
-  protected Random instanceRandom;
-
-  protected double[] weights;
-
-  protected int[] sigma;
-
-  public int numberInstance;
-
-  @Override
-  protected void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
-    monitor.setCurrentActivity("Preparing hyperplane...", -1.0);
-    generateHeader();
-    restart();
-  }
-
-  @SuppressWarnings({ "rawtypes", "unchecked" })
-  protected void generateHeader() {
-    FastVector attributes = new FastVector();
-    for (int i = 0; i < this.numAttsOption.getValue(); i++) {
-      attributes.addElement(new Attribute("att" + (i + 1)));
-    }
-
-    FastVector classLabels = new FastVector();
-    for (int i = 0; i < this.numClassesOption.getValue(); i++) {
-      classLabels.addElement("class" + (i + 1));
-    }
-    attributes.addElement(new Attribute("class", classLabels));
-    this.streamHeader = new InstancesHeader(new 
Instances(getCLICreationString(InstanceStream.class), attributes, 0));
-    this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1);
-  }
-
-  @Override
-  public long estimatedRemainingInstances() {
-    return -1;
-  }
-
-  @Override
-  public InstancesHeader getHeader() {
-    return this.streamHeader;
-  }
-
-  @Override
-  public boolean hasMoreInstances() {
-    return true;
-  }
-
-  @Override
-  public boolean isRestartable() {
-    return true;
-  }
-
-  @Override
-  public Example<Instance> nextInstance() {
-
-    int numAtts = this.numAttsOption.getValue();
-    double[] attVals = new double[numAtts + 1];
-    double sum = 0.0;
-    double sumWeights = 0.0;
-    for (int i = 0; i < numAtts; i++) {
-      attVals[i] = this.instanceRandom.nextDouble();
-      sum += this.weights[i] * attVals[i];
-      sumWeights += this.weights[i];
-    }
-    int classLabel;
-    if (sum >= sumWeights * 0.5) {
-      classLabel = 1;
-    } else {
-      classLabel = 0;
-    }
-    // Add Noise
-    if ((1 + (this.instanceRandom.nextInt(100))) <= 
this.noisePercentageOption.getValue()) {
-      classLabel = (classLabel == 0 ? 1 : 0);
-    }
-
-    Instance inst = new DenseInstance(1.0, attVals);
-    inst.setDataset(getHeader());
-    inst.setClassValue(classLabel);
-    addDrift();
-    return new InstanceExample(inst);
-  }
-
-  private void addDrift() {
-    for (int i = 0; i < this.numDriftAttsOption.getValue(); i++) {
-      this.weights[i] += (double) ((double) sigma[i]) * ((double) 
this.magChangeOption.getValue());
-      if (// this.weights[i] >= 1.0 || this.weights[i] <= 0.0 ||
-      (1 + (this.instanceRandom.nextInt(100))) <= 
this.sigmaPercentageOption.getValue()) {
-        this.sigma[i] *= -1;
-      }
-    }
-  }
-
-  @Override
-  public void restart() {
-    this.instanceRandom = new Random(this.instanceRandomSeedOption.getValue());
-    this.weights = new double[this.numAttsOption.getValue()];
-    this.sigma = new int[this.numAttsOption.getValue()];
-    for (int i = 0; i < this.numAttsOption.getValue(); i++) {
-      this.weights[i] = this.instanceRandom.nextDouble();
-      this.sigma[i] = (i < this.numDriftAttsOption.getValue() ? 1 : 0);
-    }
-  }
-
-  @Override
-  public void getDescription(StringBuilder sb, int indent) {
-    // TODO Auto-generated method stub
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java
deleted file mode 100644
index cc8384f..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java
+++ /dev/null
@@ -1,267 +0,0 @@
-package com.yahoo.labs.samoa.moa.streams.generators;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.instances.Attribute;
-import com.yahoo.labs.samoa.instances.DenseInstance;
-import com.yahoo.labs.samoa.moa.core.FastVector;
-import com.yahoo.labs.samoa.instances.Instance;
-import com.yahoo.labs.samoa.instances.Instances;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Random;
-import com.yahoo.labs.samoa.moa.core.InstanceExample;
-
-import com.yahoo.labs.samoa.instances.InstancesHeader;
-import com.yahoo.labs.samoa.moa.core.ObjectRepository;
-import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler;
-import com.github.javacliparser.FloatOption;
-import com.github.javacliparser.IntOption;
-import com.yahoo.labs.samoa.moa.streams.InstanceStream;
-import com.yahoo.labs.samoa.moa.tasks.TaskMonitor;
-
-/**
- * Stream generator for a stream based on a randomly generated tree..
- * 
- * @author Richard Kirkby ([email protected])
- * @version $Revision: 7 $
- */
-public class RandomTreeGenerator extends AbstractOptionHandler implements 
InstanceStream {
-
-  @Override
-  public String getPurposeString() {
-    return "Generates a stream based on a randomly generated tree.";
-  }
-
-  private static final long serialVersionUID = 1L;
-
-  public IntOption treeRandomSeedOption = new IntOption("treeRandomSeed",
-      'r', "Seed for random generation of tree.", 1);
-
-  public IntOption instanceRandomSeedOption = new IntOption(
-      "instanceRandomSeed", 'i',
-      "Seed for random generation of instances.", 1);
-
-  public IntOption numClassesOption = new IntOption("numClasses", 'c',
-      "The number of classes to generate.", 2, 2, Integer.MAX_VALUE);
-
-  public IntOption numNominalsOption = new IntOption("numNominals", 'o',
-      "The number of nominal attributes to generate.", 5, 0,
-      Integer.MAX_VALUE);
-
-  public IntOption numNumericsOption = new IntOption("numNumerics", 'u',
-      "The number of numeric attributes to generate.", 5, 0,
-      Integer.MAX_VALUE);
-
-  public IntOption numValsPerNominalOption = new IntOption(
-      "numValsPerNominal", 'v',
-      "The number of values to generate per nominal attribute.", 5, 2,
-      Integer.MAX_VALUE);
-
-  public IntOption maxTreeDepthOption = new IntOption("maxTreeDepth", 'd',
-      "The maximum depth of the tree concept.", 5, 0, Integer.MAX_VALUE);
-
-  public IntOption firstLeafLevelOption = new IntOption(
-      "firstLeafLevel",
-      'l',
-      "The first level of the tree above maxTreeDepth that can have leaves.",
-      3, 0, Integer.MAX_VALUE);
-
-  public FloatOption leafFractionOption = new FloatOption("leafFraction",
-      'f',
-      "The fraction of leaves per level from firstLeafLevel onwards.",
-      0.15, 0.0, 1.0);
-
-  protected static class Node implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    public int classLabel;
-
-    public int splitAttIndex;
-
-    public double splitAttValue;
-
-    public Node[] children;
-  }
-
-  protected Node treeRoot;
-
-  protected InstancesHeader streamHeader;
-
-  protected Random instanceRandom;
-
-  @Override
-  public void prepareForUseImpl(TaskMonitor monitor,
-      ObjectRepository repository) {
-    monitor.setCurrentActivity("Preparing random tree...", -1.0);
-    generateHeader();
-    generateRandomTree();
-    restart();
-  }
-
-  @Override
-  public long estimatedRemainingInstances() {
-    return -1;
-  }
-
-  @Override
-  public boolean isRestartable() {
-    return true;
-  }
-
-  @Override
-  public void restart() {
-    this.instanceRandom = new Random(this.instanceRandomSeedOption.getValue());
-  }
-
-  @Override
-  public InstancesHeader getHeader() {
-    return this.streamHeader;
-  }
-
-  @Override
-  public boolean hasMoreInstances() {
-    return true;
-  }
-
-  @Override
-  public InstanceExample nextInstance() {
-    double[] attVals = new double[this.numNominalsOption.getValue()
-        + this.numNumericsOption.getValue()];
-    InstancesHeader header = getHeader();
-    Instance inst = new DenseInstance(header.numAttributes());
-    for (int i = 0; i < attVals.length; i++) {
-      attVals[i] = i < this.numNominalsOption.getValue() ? 
this.instanceRandom.nextInt(this.numValsPerNominalOption
-          .getValue())
-          : this.instanceRandom.nextDouble();
-      inst.setValue(i, attVals[i]);
-    }
-    inst.setDataset(header);
-    inst.setClassValue(classifyInstance(this.treeRoot, attVals));
-    return new InstanceExample(inst);
-  }
-
-  protected int classifyInstance(Node node, double[] attVals) {
-    if (node.children == null) {
-      return node.classLabel;
-    }
-    if (node.splitAttIndex < this.numNominalsOption.getValue()) {
-      return classifyInstance(
-          node.children[(int) attVals[node.splitAttIndex]], attVals);
-    }
-    return classifyInstance(
-        node.children[attVals[node.splitAttIndex] < node.splitAttValue ? 0
-            : 1], attVals);
-  }
-
-  protected void generateHeader() {
-    FastVector<Attribute> attributes = new FastVector<>();
-    FastVector<String> nominalAttVals = new FastVector<>();
-    for (int i = 0; i < this.numValsPerNominalOption.getValue(); i++) {
-      nominalAttVals.addElement("value" + (i + 1));
-    }
-    for (int i = 0; i < this.numNominalsOption.getValue(); i++) {
-      attributes.addElement(new Attribute("nominal" + (i + 1),
-          nominalAttVals));
-    }
-    for (int i = 0; i < this.numNumericsOption.getValue(); i++) {
-      attributes.addElement(new Attribute("numeric" + (i + 1)));
-    }
-    FastVector<String> classLabels = new FastVector<>();
-    for (int i = 0; i < this.numClassesOption.getValue(); i++) {
-      classLabels.addElement("class" + (i + 1));
-    }
-    attributes.addElement(new Attribute("class", classLabels));
-    this.streamHeader = new InstancesHeader(new Instances(
-        getCLICreationString(InstanceStream.class), attributes, 0));
-    this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1);
-  }
-
-  protected void generateRandomTree() {
-    Random treeRand = new Random(this.treeRandomSeedOption.getValue());
-    ArrayList<Integer> nominalAttCandidates = new ArrayList<>(
-        this.numNominalsOption.getValue());
-    for (int i = 0; i < this.numNominalsOption.getValue(); i++) {
-      nominalAttCandidates.add(i);
-    }
-    double[] minNumericVals = new double[this.numNumericsOption.getValue()];
-    double[] maxNumericVals = new double[this.numNumericsOption.getValue()];
-    for (int i = 0; i < this.numNumericsOption.getValue(); i++) {
-      minNumericVals[i] = 0.0;
-      maxNumericVals[i] = 1.0;
-    }
-    this.treeRoot = generateRandomTreeNode(0, nominalAttCandidates,
-        minNumericVals, maxNumericVals, treeRand);
-  }
-
-  protected Node generateRandomTreeNode(int currentDepth,
-      ArrayList<Integer> nominalAttCandidates, double[] minNumericVals,
-      double[] maxNumericVals, Random treeRand) {
-    if ((currentDepth >= this.maxTreeDepthOption.getValue())
-        || ((currentDepth >= this.firstLeafLevelOption.getValue()) && 
(this.leafFractionOption.getValue() >= (1.0 - treeRand
-            .nextDouble())))) {
-      Node leaf = new Node();
-      leaf.classLabel = treeRand.nextInt(this.numClassesOption.getValue());
-      return leaf;
-    }
-    Node node = new Node();
-    int chosenAtt = treeRand.nextInt(nominalAttCandidates.size()
-        + this.numNumericsOption.getValue());
-    if (chosenAtt < nominalAttCandidates.size()) {
-      node.splitAttIndex = nominalAttCandidates.get(chosenAtt);
-      node.children = new Node[this.numValsPerNominalOption.getValue()];
-      ArrayList<Integer> newNominalCandidates = new ArrayList<>(
-          nominalAttCandidates);
-      newNominalCandidates.remove(new Integer(node.splitAttIndex));
-      newNominalCandidates.trimToSize();
-      for (int i = 0; i < node.children.length; i++) {
-        node.children[i] = generateRandomTreeNode(currentDepth + 1,
-            newNominalCandidates, minNumericVals, maxNumericVals,
-            treeRand);
-      }
-    } else {
-      int numericIndex = chosenAtt - nominalAttCandidates.size();
-      node.splitAttIndex = this.numNominalsOption.getValue()
-          + numericIndex;
-      double minVal = minNumericVals[numericIndex];
-      double maxVal = maxNumericVals[numericIndex];
-      node.splitAttValue = ((maxVal - minVal) * treeRand.nextDouble())
-          + minVal;
-      node.children = new Node[2];
-      double[] newMaxVals = maxNumericVals.clone();
-      newMaxVals[numericIndex] = node.splitAttValue;
-      node.children[0] = generateRandomTreeNode(currentDepth + 1,
-          nominalAttCandidates, minNumericVals, newMaxVals, treeRand);
-      double[] newMinVals = minNumericVals.clone();
-      newMinVals[numericIndex] = node.splitAttValue;
-      node.children[1] = generateRandomTreeNode(currentDepth + 1,
-          nominalAttCandidates, newMinVals, maxNumericVals, treeRand);
-    }
-    return node;
-  }
-
-  @Override
-  public void getDescription(StringBuilder sb, int indent) {
-    // TODO Auto-generated method stub
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java
deleted file mode 100644
index 4c51219..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java
+++ /dev/null
@@ -1,102 +0,0 @@
-package com.yahoo.labs.samoa.moa.tasks;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * Class that represents a null monitor.
- * 
- * @author Richard Kirkby ([email protected])
- * @version $Revision: 7 $
- */
-public class NullMonitor implements TaskMonitor {
-
-  @Override
-  public void setCurrentActivity(String activityDescription,
-      double fracComplete) {
-  }
-
-  @Override
-  public void setCurrentActivityDescription(String activity) {
-  }
-
-  @Override
-  public void setCurrentActivityFractionComplete(double fracComplete) {
-  }
-
-  @Override
-  public boolean taskShouldAbort() {
-    return false;
-  }
-
-  @Override
-  public String getCurrentActivityDescription() {
-    return null;
-  }
-
-  @Override
-  public double getCurrentActivityFractionComplete() {
-    return -1.0;
-  }
-
-  @Override
-  public boolean isPaused() {
-    return false;
-  }
-
-  @Override
-  public boolean isCancelled() {
-    return false;
-  }
-
-  @Override
-  public void requestCancel() {
-  }
-
-  @Override
-  public void requestPause() {
-  }
-
-  @Override
-  public void requestResume() {
-  }
-
-  @Override
-  public Object getLatestResultPreview() {
-    return null;
-  }
-
-  @Override
-  public void requestResultPreview() {
-  }
-
-  @Override
-  public boolean resultPreviewRequested() {
-    return false;
-  }
-
-  @Override
-  public void setLatestResultPreview(Object latestPreview) {
-  }
-
-  @Override
-  public void requestResultPreview(ResultPreviewListener toInform) {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java
deleted file mode 100644
index 124008d..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.yahoo.labs.samoa.moa.tasks;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * Interface implemented by classes that preview results on the Graphical User 
Interface
- * 
- * @author Richard Kirkby ([email protected])
- * @version $Revision: 7 $
- */
-public interface ResultPreviewListener {
-
-  /**
-   * This method is used to receive a signal from <code>TaskMonitor</code> 
that the lastest preview has changed. This
-   * method is implemented in <code>PreviewPanel</code> to change the results 
that are shown in its panel.
-   * 
-   */
-  public void latestPreviewChanged();
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java
deleted file mode 100644
index cd56914..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package com.yahoo.labs.samoa.moa.tasks;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.moa.MOAObject;
-import com.yahoo.labs.samoa.moa.core.ObjectRepository;
-
-/**
- * Interface representing a task.
- * 
- * @author Richard Kirkby ([email protected])
- * @version $Revision: 7 $
- */
-public interface Task extends MOAObject {
-
-  /**
-   * Gets the result type of this task. Tasks can return LearningCurve, 
LearningEvaluation, Classifier, String,
-   * Instances..
-   * 
-   * @return a class object of the result of this task
-   */
-  public Class<?> getTaskResultType();
-
-  /**
-   * This method performs this task, when TaskMonitor and ObjectRepository are 
no needed.
-   * 
-   * @return an object with the result of this task
-   */
-  public Object doTask();
-
-  /**
-   * This method performs this task. <code>AbstractTask</code> implements this 
method so all its extensions only need to
-   * implement <code>doTaskImpl</code>
-   * 
-   * @param monitor
-   *          the TaskMonitor to use
-   * @param repository
-   *          the ObjectRepository to use
-   * @return an object with the result of this task
-   */
-  public Object doTask(TaskMonitor monitor, ObjectRepository repository);
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java
deleted file mode 100644
index cce3ebe..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package com.yahoo.labs.samoa.moa.tasks;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * Interface representing a task monitor.
- * 
- * @author Richard Kirkby ([email protected])
- * @version $Revision: 7 $
- */
-public interface TaskMonitor {
-
-  /**
-   * Sets the description and the percentage done of the current activity.
-   * 
-   * @param activity
-   *          the description of the current activity
-   * @param fracComplete
-   *          the percentage done of the current activity
-   */
-  public void setCurrentActivity(String activityDescription,
-      double fracComplete);
-
-  /**
-   * Sets the description of the current activity.
-   * 
-   * @param activity
-   *          the description of the current activity
-   */
-  public void setCurrentActivityDescription(String activity);
-
-  /**
-   * Sets the percentage done of the current activity
-   * 
-   * @param fracComplete
-   *          the percentage done of the current activity
-   */
-  public void setCurrentActivityFractionComplete(double fracComplete);
-
-  /**
-   * Gets whether the task should abort.
-   * 
-   * @return true if the task should abort
-   */
-  public boolean taskShouldAbort();
-
-  /**
-   * Gets whether there is a request for preview the task result.
-   * 
-   * @return true if there is a request for preview the task result
-   */
-  public boolean resultPreviewRequested();
-
-  /**
-   * Sets the current result to preview
-   * 
-   * @param latestPreview
-   *          the result to preview
-   */
-  public void setLatestResultPreview(Object latestPreview);
-
-  /**
-   * Gets the description of the current activity.
-   * 
-   * @return the description of the current activity
-   */
-  public String getCurrentActivityDescription();
-
-  /**
-   * Gets the percentage done of the current activity
-   * 
-   * @return the percentage done of the current activity
-   */
-  public double getCurrentActivityFractionComplete();
-
-  /**
-   * Requests the task monitored to pause.
-   * 
-   */
-  public void requestPause();
-
-  /**
-   * Requests the task monitored to resume.
-   * 
-   */
-  public void requestResume();
-
-  /**
-   * Requests the task monitored to cancel.
-   * 
-   */
-  public void requestCancel();
-
-  /**
-   * Gets whether the task monitored is paused.
-   * 
-   * @return true if the task is paused
-   */
-  public boolean isPaused();
-
-  /**
-   * Gets whether the task monitored is cancelled.
-   * 
-   * @return true if the task is cancelled
-   */
-  public boolean isCancelled();
-
-  /**
-   * Requests to preview the task result.
-   * 
-   */
-  public void requestResultPreview();
-
-  /**
-   * Requests to preview the task result.
-   * 
-   * @param toInform
-   *          the listener of the changes in the preview of the result
-   */
-  public void requestResultPreview(ResultPreviewListener toInform);
-
-  /**
-   * Gets the current result to preview
-   * 
-   * @return the result to preview
-   */
-  public Object getLatestResultPreview();
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java
deleted file mode 100644
index 2053339..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package com.yahoo.labs.samoa.streams;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.IOException;
-
-import com.github.javacliparser.FileOption;
-import com.github.javacliparser.IntOption;
-import com.yahoo.labs.samoa.instances.Instances;
-import com.yahoo.labs.samoa.moa.core.InstanceExample;
-import com.yahoo.labs.samoa.moa.core.ObjectRepository;
-import com.yahoo.labs.samoa.moa.tasks.TaskMonitor;
-
-/**
- * InstanceStream for ARFF file
- * 
- * @author Casey
- */
-public class ArffFileStream extends FileStream {
-
-  public FileOption arffFileOption = new FileOption("arffFile", 'f',
-      "ARFF File(s) to load.", null, null, false);
-
-  public IntOption classIndexOption = new IntOption("classIndex", 'c',
-      "Class index of data. 0 for none or -1 for last attribute in file.",
-      -1, -1, Integer.MAX_VALUE);
-
-  protected InstanceExample lastInstanceRead;
-
-  @Override
-  public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
-    super.prepareForUseImpl(monitor, repository);
-    String filePath = this.arffFileOption.getFile().getAbsolutePath();
-    this.fileSource.init(filePath, "arff");
-    this.lastInstanceRead = null;
-  }
-
-  @Override
-  protected void reset() {
-    try {
-      if (this.fileReader != null)
-        this.fileReader.close();
-
-      fileSource.reset();
-    } catch (IOException ioe) {
-      throw new RuntimeException("FileStream restart failed.", ioe);
-    }
-
-    if (!getNextFileReader()) {
-      hitEndOfStream = true;
-      throw new RuntimeException("FileStream is empty.");
-    }
-  }
-
-  @Override
-  protected boolean getNextFileReader() {
-    boolean ret = super.getNextFileReader();
-    if (ret) {
-      this.instances = new Instances(this.fileReader, 1, -1);
-      if (this.classIndexOption.getValue() < 0) {
-        this.instances.setClassIndex(this.instances.numAttributes() - 1);
-      } else if (this.classIndexOption.getValue() > 0) {
-        this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
-      }
-    }
-    return ret;
-  }
-
-  @Override
-  protected boolean readNextInstanceFromFile() {
-    try {
-      if (this.instances.readInstance(this.fileReader)) {
-        this.lastInstanceRead = new 
InstanceExample(this.instances.instance(0));
-        this.instances.delete(); // keep instances clean
-        return true;
-      }
-      if (this.fileReader != null) {
-        this.fileReader.close();
-        this.fileReader = null;
-      }
-      return false;
-    } catch (IOException ioe) {
-      throw new RuntimeException(
-          "ArffFileStream failed to read instance from stream.", ioe);
-    }
-
-  }
-
-  @Override
-  protected InstanceExample getLastInstanceRead() {
-    return this.lastInstanceRead;
-  }
-
-  /*
-   * extend com.yahoo.labs.samoa.moa.MOAObject
-   */
-  @Override
-  public void getDescription(StringBuilder sb, int indent) {
-    // TODO Auto-generated method stub
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java
deleted file mode 100644
index 70403ca..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java
+++ /dev/null
@@ -1,253 +0,0 @@
-package com.yahoo.labs.samoa.streams;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.Random;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.evaluation.ClusteringEvaluationContentEvent;
-import com.yahoo.labs.samoa.instances.Instance;
-import com.yahoo.labs.samoa.instances.Instances;
-import com.yahoo.labs.samoa.learners.clusterers.ClusteringContentEvent;
-import com.yahoo.labs.samoa.moa.cluster.Clustering;
-import com.yahoo.labs.samoa.moa.core.DataPoint;
-import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler;
-import com.yahoo.labs.samoa.moa.streams.InstanceStream;
-import com.yahoo.labs.samoa.moa.streams.clustering.ClusteringStream;
-import com.yahoo.labs.samoa.moa.streams.clustering.RandomRBFGeneratorEvents;
-
-/**
- * EntranceProcessor for Clustering Evaluation Task.
- * 
- */
-public final class ClusteringEntranceProcessor implements EntranceProcessor {
-
-  private static final long serialVersionUID = 4169053337917578558L;
-
-  private static final Logger logger = 
LoggerFactory.getLogger(ClusteringEntranceProcessor.class);
-
-  private StreamSource streamSource;
-  private Instance firstInstance;
-  private boolean isInited = false;
-  private Random random = new Random();
-  private double samplingThreshold;
-  private int numberInstances;
-  private int numInstanceSent = 0;
-
-  private int groundTruthSamplingFrequency;
-
-  @Override
-  public boolean process(ContentEvent event) {
-    // TODO: possible refactor of the super-interface implementation
-    // of source processor does not need this method
-    return false;
-  }
-
-  @Override
-  public void onCreate(int id) {
-    logger.debug("Creating ClusteringSourceProcessor with id {}", id);
-  }
-
-  @Override
-  public Processor newProcessor(Processor p) {
-    ClusteringEntranceProcessor newProcessor = new 
ClusteringEntranceProcessor();
-    ClusteringEntranceProcessor originProcessor = 
(ClusteringEntranceProcessor) p;
-    if (originProcessor.getStreamSource() != null) {
-      
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
-    }
-    return newProcessor;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return (!isFinished());
-  }
-
-  @Override
-  public boolean isFinished() {
-    return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && 
numInstanceSent >= numberInstances));
-  }
-
-  // /**
-  // * Method to send instances via input stream
-  // *
-  // * @param inputStream
-  // * @param numberInstances
-  // */
-  // public void sendInstances(Stream inputStream, Stream evaluationStream, int
-  // numberInstances, double samplingThreshold) {
-  // int numInstanceSent = 0;
-  // this.samplingThreshold = samplingThreshold;
-  // while (streamSource.hasMoreInstances() && numInstanceSent <
-  // numberInstances) {
-  // numInstanceSent++;
-  // DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent);
-  // ClusteringContentEvent contentEvent = new
-  // ClusteringContentEvent(numInstanceSent, nextDataPoint);
-  // inputStream.put(contentEvent);
-  // sendPointsAndGroundTruth(streamSource, evaluationStream, numInstanceSent,
-  // nextDataPoint);
-  // }
-  //
-  // sendEndEvaluationInstance(inputStream);
-  // }
-
-  public double getSamplingThreshold() {
-    return samplingThreshold;
-  }
-
-  public void setSamplingThreshold(double samplingThreshold) {
-    this.samplingThreshold = samplingThreshold;
-  }
-
-  public int getGroundTruthSamplingFrequency() {
-    return groundTruthSamplingFrequency;
-  }
-
-  public void setGroundTruthSamplingFrequency(int 
groundTruthSamplingFrequency) {
-    this.groundTruthSamplingFrequency = groundTruthSamplingFrequency;
-  }
-
-  public StreamSource getStreamSource() {
-    return streamSource;
-  }
-
-  public void setStreamSource(InstanceStream stream) {
-    if (stream instanceof AbstractOptionHandler) {
-      ((AbstractOptionHandler) (stream)).prepareForUse();
-    }
-
-    this.streamSource = new StreamSource(stream);
-    firstInstance = streamSource.nextInstance().getData();
-  }
-
-  public Instances getDataset() {
-    return firstInstance.dataset();
-  }
-
-  private Instance nextInstance() {
-    if (this.isInited) {
-      return streamSource.nextInstance().getData();
-    } else {
-      this.isInited = true;
-      return firstInstance;
-    }
-  }
-
-  // private void sendEndEvaluationInstance(Stream inputStream) {
-  // ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1,
-  // firstInstance);
-  // contentEvent.setLast(true);
-  // inputStream.put(contentEvent);
-  // }
-
-  // private void sendPointsAndGroundTruth(StreamSource sourceStream, Stream
-  // evaluationStream, int numInstanceSent, DataPoint nextDataPoint) {
-  // boolean sendEvent = false;
-  // DataPoint instance = null;
-  // Clustering gtClustering = null;
-  // int samplingFrequency = ((ClusteringStream)
-  // sourceStream.getStream()).getDecayHorizon();
-  // if (random.nextDouble() < samplingThreshold) {
-  // // Add instance
-  // sendEvent = true;
-  // instance = nextDataPoint;
-  // }
-  // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) {
-  // // Add GroundTruth
-  // sendEvent = true;
-  // gtClustering = ((RandomRBFGeneratorEvents)
-  // sourceStream.getStream()).getGeneratingClusters();
-  // }
-  // if (sendEvent == true) {
-  // ClusteringEvaluationContentEvent evalEvent;
-  // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance,
-  // false);
-  // evaluationStream.put(evalEvent);
-  // }
-  // }
-
-  public void setMaxNumInstances(int value) {
-    numberInstances = value;
-  }
-
-  public int getMaxNumInstances() {
-    return this.numberInstances;
-  }
-
-  @Override
-  public ContentEvent nextEvent() {
-
-    // boolean sendEvent = false;
-    // DataPoint instance = null;
-    // Clustering gtClustering = null;
-    // int samplingFrequency = ((ClusteringStream)
-    // sourceStream.getStream()).getDecayHorizon();
-    // if (random.nextDouble() < samplingThreshold) {
-    // // Add instance
-    // sendEvent = true;
-    // instance = nextDataPoint;
-    // }
-    // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) {
-    // // Add GroundTruth
-    // sendEvent = true;
-    // gtClustering = ((RandomRBFGeneratorEvents)
-    // sourceStream.getStream()).getGeneratingClusters();
-    // }
-    // if (sendEvent == true) {
-    // ClusteringEvaluationContentEvent evalEvent;
-    // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance,
-    // false);
-    // evaluationStream.put(evalEvent);
-    // }
-
-    groundTruthSamplingFrequency = ((ClusteringStream) 
streamSource.getStream()).getDecayHorizon(); // FIXME should it be takend from 
the ClusteringEvaluation -f option instead?
-    if (isFinished()) {
-      // send ending event
-      ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, 
firstInstance);
-      contentEvent.setLast(true);
-      return contentEvent;
-    } else {
-      DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent);
-      numInstanceSent++;
-      if (numInstanceSent % groundTruthSamplingFrequency == 0) {
-        // TODO implement an interface ClusteringGroundTruth with a
-        // getGeneratingClusters() method, check if the source implements the 
interface
-        // send a clustering evaluation event for external measures (distance 
from the gt clusters)
-        Clustering gtClustering = ((RandomRBFGeneratorEvents) 
streamSource.getStream()).getGeneratingClusters();
-        return new ClusteringEvaluationContentEvent(gtClustering, 
nextDataPoint, false);
-      } else {
-        ClusteringContentEvent contentEvent = new 
ClusteringContentEvent(numInstanceSent, nextDataPoint);
-        if (random.nextDouble() < samplingThreshold) {
-          // send a clustering content event for internal measures (cohesion,
-          // separation)
-          contentEvent.setSample(true);
-        }
-        return contentEvent;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java
deleted file mode 100644
index 4784ae0..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java
+++ /dev/null
@@ -1,173 +0,0 @@
-package com.yahoo.labs.samoa.streams;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-
-import com.github.javacliparser.ClassOption;
-import com.yahoo.labs.samoa.instances.Instances;
-import com.yahoo.labs.samoa.instances.InstancesHeader;
-import com.yahoo.labs.samoa.moa.core.InstanceExample;
-import com.yahoo.labs.samoa.moa.core.ObjectRepository;
-import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler;
-import com.yahoo.labs.samoa.moa.streams.InstanceStream;
-import com.yahoo.labs.samoa.moa.tasks.TaskMonitor;
-import com.yahoo.labs.samoa.streams.fs.FileStreamSource;
-
-/**
- * InstanceStream for files (Abstract class: subclass this class for different 
file formats)
- * 
- * @author Casey
- */
-public abstract class FileStream extends AbstractOptionHandler implements 
InstanceStream {
-  /**
-    *
-    */
-  private static final long serialVersionUID = 3028905554604259130L;
-
-  public ClassOption sourceTypeOption = new ClassOption("sourceType",
-      's', "Source Type (HDFS, local FS)", FileStreamSource.class,
-      "LocalFileStreamSource");
-
-  protected transient FileStreamSource fileSource;
-  protected transient Reader fileReader;
-  protected Instances instances;
-
-  protected boolean hitEndOfStream;
-  private boolean hasStarted;
-
-  /*
-   * Constructors
-   */
-  public FileStream() {
-    this.hitEndOfStream = false;
-  }
-
-  /*
-   * implement InstanceStream
-   */
-  @Override
-  public InstancesHeader getHeader() {
-    return new InstancesHeader(this.instances);
-  }
-
-  @Override
-  public long estimatedRemainingInstances() {
-    return -1;
-  }
-
-  @Override
-  public boolean hasMoreInstances() {
-    return !this.hitEndOfStream;
-  }
-
-  @Override
-  public InstanceExample nextInstance() {
-    if (this.getLastInstanceRead() == null) {
-      readNextInstanceFromStream();
-    }
-    InstanceExample prevInstance = this.getLastInstanceRead();
-    readNextInstanceFromStream();
-    return prevInstance;
-  }
-
-  @Override
-  public boolean isRestartable() {
-    return true;
-  }
-
-  @Override
-  public void restart() {
-    reset();
-    hasStarted = false;
-  }
-
-  protected void reset() {
-    try {
-      if (this.fileReader != null)
-        this.fileReader.close();
-
-      fileSource.reset();
-    } catch (IOException ioe) {
-      throw new RuntimeException("FileStream restart failed.", ioe);
-    }
-
-    if (!getNextFileReader()) {
-      hitEndOfStream = true;
-      throw new RuntimeException("FileStream is empty.");
-    }
-
-    this.instances = new Instances(this.fileReader, 1, -1);
-    this.instances.setClassIndex(this.instances.numAttributes() - 1);
-  }
-
-  protected boolean getNextFileReader() {
-    if (this.fileReader != null)
-      try {
-        this.fileReader.close();
-      } catch (IOException ioe) {
-        ioe.printStackTrace();
-      }
-
-    InputStream inputStream = this.fileSource.getNextInputStream();
-    if (inputStream == null)
-      return false;
-
-    this.fileReader = new BufferedReader(new InputStreamReader(inputStream));
-    return true;
-  }
-
-  protected boolean readNextInstanceFromStream() {
-    if (!hasStarted) {
-      this.reset();
-      hasStarted = true;
-    }
-
-    while (true) {
-      if (readNextInstanceFromFile())
-        return true;
-
-      if (!getNextFileReader()) {
-        this.hitEndOfStream = true;
-        return false;
-      }
-    }
-  }
-
-  /**
-   * Read next instance from the current file and assign it to 
lastInstanceRead.
-   * 
-   * @return true if it was able to read next instance and false if it was at 
the end of the file
-   */
-  protected abstract boolean readNextInstanceFromFile();
-
-  protected abstract InstanceExample getLastInstanceRead();
-
-  @Override
-  public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
-    this.fileSource = sourceTypeOption.getValue();
-    this.hasStarted = false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java
deleted file mode 100644
index a16a9d3..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java
+++ /dev/null
@@ -1,232 +0,0 @@
-package com.yahoo.labs.samoa.streams;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.instances.Instance;
-import com.yahoo.labs.samoa.instances.Instances;
-import com.yahoo.labs.samoa.learners.InstanceContentEvent;
-import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler;
-import com.yahoo.labs.samoa.moa.streams.InstanceStream;
-
-/**
- * Prequential Source Processor is the processor for Prequential Evaluation 
Task.
- * 
- * @author Arinto Murdopo
- * 
- */
-public final class PrequentialSourceProcessor implements EntranceProcessor {
-
-  private static final long serialVersionUID = 4169053337917578558L;
-
-  private static final Logger logger = 
LoggerFactory.getLogger(PrequentialSourceProcessor.class);
-  private boolean isInited = false;
-  private StreamSource streamSource;
-  private Instance firstInstance;
-  private int numberInstances;
-  private int numInstanceSent = 0;
-
-  protected InstanceStream sourceStream;
-
-  /*
-   * ScheduledExecutorService to schedule sending events after each delay
-   * interval. It is expected to have only one event in the queue at a time, so
-   * we need only one thread in the pool.
-   */
-  private transient ScheduledExecutorService timer;
-  private transient ScheduledFuture<?> schedule = null;
-  private int readyEventIndex = 1; // No waiting for the first event
-  private int delay = 0;
-  private int batchSize = 1;
-  private boolean finished = false;
-
-  @Override
-  public boolean process(ContentEvent event) {
-    // TODO: possible refactor of the super-interface implementation
-    // of source processor does not need this method
-    return false;
-  }
-
-  @Override
-  public boolean isFinished() {
-    return finished;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return !isFinished() && (delay <= 0 || numInstanceSent < readyEventIndex);
-  }
-
-  private boolean hasReachedEndOfStream() {
-    return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && 
numInstanceSent >= numberInstances));
-  }
-
-  @Override
-  public ContentEvent nextEvent() {
-    InstanceContentEvent contentEvent = null;
-    if (hasReachedEndOfStream()) {
-      contentEvent = new InstanceContentEvent(-1, firstInstance, false, true);
-      contentEvent.setLast(true);
-      // set finished status _after_ tagging last event
-      finished = true;
-    }
-    else if (hasNext()) {
-      numInstanceSent++;
-      contentEvent = new InstanceContentEvent(numInstanceSent, nextInstance(), 
true, true);
-
-      // first call to this method will trigger the timer
-      if (schedule == null && delay > 0) {
-        schedule = timer.scheduleWithFixedDelay(new DelayTimeoutHandler(this), 
delay, delay,
-            TimeUnit.MICROSECONDS);
-      }
-    }
-    return contentEvent;
-  }
-
-  private void increaseReadyEventIndex() {
-    readyEventIndex += batchSize;
-    // if we exceed the max, cancel the timer
-    if (schedule != null && isFinished()) {
-      schedule.cancel(false);
-    }
-  }
-
-  @Override
-  public void onCreate(int id) {
-    initStreamSource(sourceStream);
-    timer = Executors.newScheduledThreadPool(1);
-    logger.debug("Creating PrequentialSourceProcessor with id {}", id);
-  }
-
-  @Override
-  public Processor newProcessor(Processor p) {
-    PrequentialSourceProcessor newProcessor = new PrequentialSourceProcessor();
-    PrequentialSourceProcessor originProcessor = (PrequentialSourceProcessor) 
p;
-    if (originProcessor.getStreamSource() != null) {
-      
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
-    }
-    return newProcessor;
-  }
-
-  // /**
-  // * Method to send instances via input stream
-  // *
-  // * @param inputStream
-  // * @param numberInstances
-  // */
-  // public void sendInstances(Stream inputStream, int numberInstances) {
-  // int numInstanceSent = 0;
-  // initStreamSource(sourceStream);
-  //
-  // while (streamSource.hasMoreInstances() && numInstanceSent <
-  // numberInstances) {
-  // numInstanceSent++;
-  // InstanceContentEvent contentEvent = new
-  // InstanceContentEvent(numInstanceSent, nextInstance(), true, true);
-  // inputStream.put(contentEvent);
-  // }
-  //
-  // sendEndEvaluationInstance(inputStream);
-  // }
-
-  public StreamSource getStreamSource() {
-    return streamSource;
-  }
-
-  public void setStreamSource(InstanceStream stream) {
-    this.sourceStream = stream;
-  }
-
-  public Instances getDataset() {
-    if (firstInstance == null) {
-      initStreamSource(sourceStream);
-    }
-    return firstInstance.dataset();
-  }
-
-  private Instance nextInstance() {
-    if (this.isInited) {
-      return streamSource.nextInstance().getData();
-    } else {
-      this.isInited = true;
-      return firstInstance;
-    }
-  }
-
-  // private void sendEndEvaluationInstance(Stream inputStream) {
-  // InstanceContentEvent contentEvent = new InstanceContentEvent(-1,
-  // firstInstance, false, true);
-  // contentEvent.setLast(true);
-  // inputStream.put(contentEvent);
-  // }
-
-  private void initStreamSource(InstanceStream stream) {
-    if (stream instanceof AbstractOptionHandler) {
-      ((AbstractOptionHandler) (stream)).prepareForUse();
-    }
-
-    this.streamSource = new StreamSource(stream);
-    firstInstance = streamSource.nextInstance().getData();
-  }
-
-  public void setMaxNumInstances(int value) {
-    numberInstances = value;
-  }
-
-  public int getMaxNumInstances() {
-    return this.numberInstances;
-  }
-
-  public void setSourceDelay(int delay) {
-    this.delay = delay;
-  }
-
-  public int getSourceDelay() {
-    return this.delay;
-  }
-
-  public void setDelayBatchSize(int batch) {
-    this.batchSize = batch;
-  }
-
-  private class DelayTimeoutHandler implements Runnable {
-
-    private PrequentialSourceProcessor processor;
-
-    public DelayTimeoutHandler(PrequentialSourceProcessor processor) {
-      this.processor = processor;
-    }
-
-    public void run() {
-      processor.increaseReadyEventIndex();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java
deleted file mode 100644
index 93dfee8..0000000
--- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package com.yahoo.labs.samoa.streams;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * License
- */
-
-import com.yahoo.labs.samoa.moa.core.Example;
-import com.yahoo.labs.samoa.moa.streams.InstanceStream;
-import com.yahoo.labs.samoa.instances.Instance;
-
-/**
- * The Class StreamSource.
- */
-public class StreamSource implements java.io.Serializable {
-
-  /**
-        * 
-        */
-  private static final long serialVersionUID = 3974668694861231236L;
-
-  /**
-   * Instantiates a new stream source.
-   * 
-   * @param stream
-   *          the stream
-   */
-  public StreamSource(InstanceStream stream) {
-    super();
-    this.stream = stream;
-  }
-
-  /** The stream. */
-  protected InstanceStream stream;
-
-  /**
-   * Gets the stream.
-   * 
-   * @return the stream
-   */
-  public InstanceStream getStream() {
-    return stream;
-  }
-
-  /**
-   * Next instance.
-   * 
-   * @return the instance
-   */
-  public Example<Instance> nextInstance() {
-    return stream.nextInstance();
-  }
-
-  /**
-   * Sets the stream.
-   * 
-   * @param stream
-   *          the new stream
-   */
-  public void setStream(InstanceStream stream) {
-    this.stream = stream;
-  }
-
-  /**
-   * Checks for more instances.
-   * 
-   * @return true, if successful
-   */
-  public boolean hasMoreInstances() {
-    return this.stream.hasMoreInstances();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java
deleted file mode 100644
index e40c843..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java
+++ /dev/null
@@ -1,195 +0,0 @@
-package com.yahoo.labs.samoa.streams;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * License
- */
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.instances.Instance;
-import com.yahoo.labs.samoa.instances.Instances;
-import com.yahoo.labs.samoa.learners.InstanceContentEvent;
-import com.yahoo.labs.samoa.moa.streams.InstanceStream;
-import com.yahoo.labs.samoa.topology.Stream;
-
-/**
- * The Class StreamSourceProcessor.
- */
-public class StreamSourceProcessor implements Processor {
-
-  /** The Constant logger. */
-  private static final Logger logger = LoggerFactory
-      .getLogger(StreamSourceProcessor.class);
-
-  /**
-        * 
-        */
-  private static final long serialVersionUID = -204182279475432739L;
-
-  /** The stream source. */
-  private StreamSource streamSource;
-
-  /**
-   * Gets the stream source.
-   * 
-   * @return the stream source
-   */
-  public StreamSource getStreamSource() {
-    return streamSource;
-  }
-
-  /**
-   * Sets the stream source.
-   * 
-   * @param stream
-   *          the new stream source
-   */
-  public void setStreamSource(InstanceStream stream) {
-    this.streamSource = new StreamSource(stream);
-    firstInstance = streamSource.nextInstance().getData();
-  }
-
-  /** The number instances sent. */
-  private long numberInstancesSent = 0;
-
-  /**
-   * Send instances.
-   * 
-   * @param inputStream
-   *          the input stream
-   * @param numberInstances
-   *          the number instances
-   * @param isTraining
-   *          the is training
-   */
-  public void sendInstances(Stream inputStream,
-      int numberInstances, boolean isTraining, boolean isTesting) {
-    int numberSamples = 0;
-
-    while (streamSource.hasMoreInstances()
-        && numberSamples < numberInstances) {
-
-      numberSamples++;
-      numberInstancesSent++;
-      InstanceContentEvent instanceContentEvent = new InstanceContentEvent(
-          numberInstancesSent, nextInstance(), isTraining, isTesting);
-
-      inputStream.put(instanceContentEvent);
-    }
-
-    InstanceContentEvent instanceContentEvent = new InstanceContentEvent(
-        numberInstancesSent, null, isTraining, isTesting);
-    instanceContentEvent.setLast(true);
-    inputStream.put(instanceContentEvent);
-  }
-
-  /**
-   * Send end evaluation instance.
-   * 
-   * @param inputStream
-   *          the input stream
-   */
-  public void sendEndEvaluationInstance(Stream inputStream) {
-    InstanceContentEvent instanceContentEvent = new InstanceContentEvent(-1, 
firstInstance, false, true);
-    inputStream.put(instanceContentEvent);
-  }
-
-  /**
-   * Next instance.
-   * 
-   * @return the instance
-   */
-  protected Instance nextInstance() {
-    if (this.isInited) {
-      return streamSource.nextInstance().getData();
-    } else {
-      this.isInited = true;
-      return firstInstance;
-    }
-  }
-
-  /** The is inited. */
-  protected boolean isInited = false;
-
-  /** The first instance. */
-  protected Instance firstInstance;
-
-  // @Override
-  /**
-   * On remove.
-   */
-  protected void onRemove() {
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see samoa.core.Processor#onCreate(int)
-   */
-  @Override
-  public void onCreate(int id) {
-    // TODO Auto-generated method stub
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
-   */
-  @Override
-  public Processor newProcessor(Processor sourceProcessor) {
-    // StreamSourceProcessor newProcessor = new StreamSourceProcessor();
-    // StreamSourceProcessor originProcessor = (StreamSourceProcessor)
-    // sourceProcessor;
-    // if (originProcessor.getStreamSource() != null){
-    // 
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
-    // }
-    // return newProcessor;
-    return null;
-  }
-
-  /**
-   * On event.
-   * 
-   * @param event
-   *          the event
-   * @return true, if successful
-   */
-  @Override
-  public boolean process(ContentEvent event) {
-    return false;
-  }
-
-  /**
-   * Gets the dataset.
-   * 
-   * @return the dataset
-   */
-  public Instances getDataset() {
-    return firstInstance.dataset();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java
deleted file mode 100644
index d14ebfc..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package com.yahoo.labs.samoa.streams.fs;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.InputStream;
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * An interface for FileStream's source (Local FS, HDFS,...)
- * 
- * @author Casey
- */
-public interface FileStreamSource extends Serializable {
-
-  /**
-   * Init the source with file/directory path and file extension
-   * 
-   * @param path
-   *          File or directory path
-   * @param ext
-   *          File extension to be used to filter files in a directory. If 
null, all files in the directory are
-   *          accepted.
-   */
-  public void init(String path, String ext);
-
-  /**
-   * Reset the source
-   */
-  public void reset() throws IOException;
-
-  /**
-   * Retrieve InputStream for next file. This method will return null if we 
are at the last file in the list.
-   * 
-   * @return InputStream for next file in the list
-   */
-  public InputStream getNextInputStream();
-
-  /**
-   * Retrieve InputStream for current file. The "current pointer" is moved 
forward with getNextInputStream method. So if
-   * there was no invocation of getNextInputStream, this method will return 
null.
-   * 
-   * @return InputStream for current file in the list
-   */
-  public InputStream getCurrentInputStream();
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java
deleted file mode 100644
index 3fabcc7..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java
+++ /dev/null
@@ -1,150 +0,0 @@
-package com.yahoo.labs.samoa.streams.fs;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.FileSystems;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-
-/**
- * Source for FileStream for HDFS files
- * 
- * @author Casey
- */
-public class HDFSFileStreamSource implements FileStreamSource {
-
-  /**
-        * 
-        */
-  private static final long serialVersionUID = -3887354805787167400L;
-
-  private transient InputStream fileStream;
-  private transient Configuration config;
-  private List<String> filePaths;
-  private int currentIndex;
-
-  public HDFSFileStreamSource() {
-    this.currentIndex = -1;
-  }
-
-  public void init(String path, String ext) {
-    this.init(this.getDefaultConfig(), path, ext);
-  }
-
-  public void init(Configuration config, String path, String ext) {
-    this.config = config;
-    this.filePaths = new ArrayList<String>();
-    Path hdfsPath = new Path(path);
-    FileSystem fs;
-    try {
-      fs = FileSystem.get(config);
-      FileStatus fileStat = fs.getFileStatus(hdfsPath);
-      if (fileStat.isDirectory()) {
-        Path filterPath = hdfsPath;
-        if (ext != null) {
-          filterPath = new Path(path.toString(), "*." + ext);
-        }
-        else {
-          filterPath = new Path(path.toString(), "*");
-        }
-        FileStatus[] filesInDir = fs.globStatus(filterPath);
-        for (int i = 0; i < filesInDir.length; i++) {
-          if (filesInDir[i].isFile()) {
-            filePaths.add(filesInDir[i].getPath().toString());
-          }
-        }
-      }
-      else {
-        this.filePaths.add(path);
-      }
-    } catch (IOException ioe) {
-      throw new RuntimeException("Failed getting list of files at:" + path, 
ioe);
-    }
-
-    this.currentIndex = -1;
-  }
-
-  private Configuration getDefaultConfig() {
-    String hadoopHome = System.getenv("HADOOP_HOME");
-    Configuration conf = new Configuration();
-    if (hadoopHome != null) {
-      java.nio.file.Path coreSitePath = 
FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/core-site.xml");
-      java.nio.file.Path hdfsSitePath = 
FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/hdfs-site.xml");
-      conf.addResource(new Path(coreSitePath.toAbsolutePath().toString()));
-      conf.addResource(new Path(hdfsSitePath.toAbsolutePath().toString()));
-    }
-    return conf;
-  }
-
-  public void reset() throws IOException {
-    this.currentIndex = -1;
-    this.closeFileStream();
-  }
-
-  private void closeFileStream() {
-    IOUtils.closeStream(fileStream);
-  }
-
-  public InputStream getNextInputStream() {
-    this.closeFileStream();
-    if (this.currentIndex >= (this.filePaths.size() - 1))
-      return null;
-
-    this.currentIndex++;
-    String filePath = this.filePaths.get(currentIndex);
-
-    Path hdfsPath = new Path(filePath);
-    FileSystem fs;
-    try {
-      fs = FileSystem.get(config);
-      fileStream = fs.open(hdfsPath);
-    } catch (IOException ioe) {
-      this.closeFileStream();
-      throw new RuntimeException("Failed opening file:" + filePath, ioe);
-    }
-
-    return fileStream;
-  }
-
-  public InputStream getCurrentInputStream() {
-    return fileStream;
-  }
-
-  protected int getFilePathListSize() {
-    if (filePaths != null)
-      return filePaths.size();
-    return 0;
-  }
-
-  protected String getFilePathAt(int index) {
-    if (filePaths != null && filePaths.size() > index)
-      return filePaths.get(index);
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java
deleted file mode 100644
index b9d9b9e..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java
+++ /dev/null
@@ -1,133 +0,0 @@
-package com.yahoo.labs.samoa.streams.fs;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.FileSystems;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Source for FileStream for local files
- * 
- * @author Casey
- */
-public class LocalFileStreamSource implements FileStreamSource {
-  /**
-        * 
-        */
-  private static final long serialVersionUID = 3986511547525870698L;
-
-  private transient InputStream fileStream;
-  private List<String> filePaths;
-  private int currentIndex;
-
-  public LocalFileStreamSource() {
-    this.currentIndex = -1;
-  }
-
-  public void init(String path, String ext) {
-    this.filePaths = new ArrayList<String>();
-    File fileAtPath = new File(path);
-    if (fileAtPath.isDirectory()) {
-      File[] filesInDir = fileAtPath.listFiles(new FileExtensionFilter(ext));
-      for (int i = 0; i < filesInDir.length; i++) {
-        filePaths.add(filesInDir[i].getAbsolutePath());
-      }
-    }
-    else {
-      this.filePaths.add(path);
-    }
-    this.currentIndex = -1;
-  }
-
-  public void reset() throws IOException {
-    this.currentIndex = -1;
-    this.closeFileStream();
-  }
-
-  private void closeFileStream() {
-    if (fileStream != null) {
-      try {
-        fileStream.close();
-      } catch (IOException ioe) {
-        ioe.printStackTrace();
-      }
-    }
-  }
-
-  public InputStream getNextInputStream() {
-    this.closeFileStream();
-
-    if (this.currentIndex >= (this.filePaths.size() - 1))
-      return null;
-
-    this.currentIndex++;
-    String filePath = this.filePaths.get(currentIndex);
-
-    File file = new File(filePath);
-    try {
-      fileStream = new FileInputStream(file);
-    } catch (IOException ioe) {
-      this.closeFileStream();
-      throw new RuntimeException("Failed opening file:" + filePath, ioe);
-    }
-
-    return fileStream;
-  }
-
-  public InputStream getCurrentInputStream() {
-    return fileStream;
-  }
-
-  protected int getFilePathListSize() {
-    if (filePaths != null)
-      return filePaths.size();
-    return 0;
-  }
-
-  protected String getFilePathAt(int index) {
-    if (filePaths != null && filePaths.size() > index)
-      return filePaths.get(index);
-    return null;
-  }
-
-  private class FileExtensionFilter implements FilenameFilter {
-    private String extension;
-
-    FileExtensionFilter(String ext) {
-      extension = ext;
-    }
-
-    @Override
-    public boolean accept(File dir, String name) {
-      File f = new File(dir, name);
-      if (extension == null)
-        return f.isFile();
-      else
-        return f.isFile() && name.toLowerCase().endsWith("." + extension);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java
deleted file mode 100644
index d906b9f..0000000
--- 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java
+++ /dev/null
@@ -1,186 +0,0 @@
-package com.yahoo.labs.samoa.tasks;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.javacliparser.ClassOption;
-import com.github.javacliparser.Configurable;
-import com.github.javacliparser.FileOption;
-import com.github.javacliparser.FloatOption;
-import com.github.javacliparser.IntOption;
-import com.github.javacliparser.StringOption;
-import com.yahoo.labs.samoa.evaluation.ClusteringEvaluatorProcessor;
-import com.yahoo.labs.samoa.learners.Learner;
-import 
com.yahoo.labs.samoa.learners.clusterers.simple.ClusteringDistributorProcessor;
-import com.yahoo.labs.samoa.learners.clusterers.simple.DistributedClusterer;
-import com.yahoo.labs.samoa.moa.streams.InstanceStream;
-import com.yahoo.labs.samoa.moa.streams.clustering.ClusteringStream;
-import com.yahoo.labs.samoa.moa.streams.clustering.RandomRBFGeneratorEvents;
-import com.yahoo.labs.samoa.streams.ClusteringEntranceProcessor;
-import com.yahoo.labs.samoa.topology.ComponentFactory;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.Topology;
-import com.yahoo.labs.samoa.topology.TopologyBuilder;
-
-/**
- * A task that runs and evaluates a distributed clustering algorithm.
- * 
- */
-public class ClusteringEvaluation implements Task, Configurable {
-
-  private static final long serialVersionUID = -8246537378371580550L;
-
-  private static final int DISTRIBUTOR_PARALLELISM = 1;
-
-  private static final Logger logger = 
LoggerFactory.getLogger(ClusteringEvaluation.class);
-
-  public ClassOption learnerOption = new ClassOption("learner", 'l', 
"Clustering to run.", Learner.class,
-      DistributedClusterer.class.getName());
-
-  public ClassOption streamTrainOption = new ClassOption("streamTrain", 's', 
"Input stream.", InstanceStream.class,
-      RandomRBFGeneratorEvents.class.getName());
-
-  public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i',
-      "Maximum number of instances to test/train on  (-1 = no limit).", 
100000, -1,
-      Integer.MAX_VALUE);
-
-  public IntOption measureCollectionTypeOption = new 
IntOption("measureCollectionType", 'm',
-      "Type of measure collection", 0, 0, Integer.MAX_VALUE);
-
-  public IntOption timeLimitOption = new IntOption("timeLimit", 't',
-      "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1,
-      Integer.MAX_VALUE);
-
-  public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 
'f',
-      "How many instances between samples of the learning performance.", 1000, 
0,
-      Integer.MAX_VALUE);
-
-  public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation",
-      "Clustering__"
-          + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
-
-  public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to 
append intermediate csv results to",
-      null, "csv", true);
-
-  public FloatOption samplingThresholdOption = new 
FloatOption("samplingThreshold", 'a',
-      "Ratio of instances sampled that will be used for evaluation.", 0.5,
-      0.0, 1.0);
-
-  private ClusteringEntranceProcessor source;
-  private InstanceStream streamTrain;
-  private ClusteringDistributorProcessor distributor;
-  private Stream distributorStream;
-  private Stream evaluationStream;
-
-  // Default=0: no delay/waiting
-  public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w',
-      "How many miliseconds between injections of two instances.", 0, 0, 
Integer.MAX_VALUE);
-
-  private Learner learner;
-  private ClusteringEvaluatorProcessor evaluator;
-
-  private Topology topology;
-  private TopologyBuilder builder;
-
-  public void getDescription(StringBuilder sb) {
-    sb.append("Clustering evaluation");
-  }
-
-  @Override
-  public void init() {
-    // TODO remove the if statement theoretically, dynamic binding will work
-    // here! for now, the if statement is used by Storm
-
-    if (builder == null) {
-      logger.warn("Builder was not initialized, initializing it from the 
Task");
-
-      builder = new TopologyBuilder();
-      logger.debug("Successfully instantiating TopologyBuilder");
-
-      builder.initTopology(evaluationNameOption.getValue(), 
sourceDelayOption.getValue());
-      logger.debug("Successfully initializing SAMOA topology with name {}", 
evaluationNameOption.getValue());
-    }
-
-    // instantiate ClusteringEntranceProcessor and its output stream
-    // (sourceStream)
-    source = new ClusteringEntranceProcessor();
-    streamTrain = this.streamTrainOption.getValue();
-    source.setStreamSource(streamTrain);
-    builder.addEntranceProcessor(source);
-    source.setSamplingThreshold(samplingThresholdOption.getValue());
-    source.setMaxNumInstances(instanceLimitOption.getValue());
-    logger.debug("Successfully instantiated ClusteringEntranceProcessor");
-
-    Stream sourceStream = builder.createStream(source);
-    // starter.setInputStream(sourcePiOutputStream); // FIXME set stream in the
-    // EntrancePI
-
-    // distribution of instances and sampling for evaluation
-    distributor = new ClusteringDistributorProcessor();
-    builder.addProcessor(distributor, DISTRIBUTOR_PARALLELISM);
-    builder.connectInputShuffleStream(sourceStream, distributor);
-    distributorStream = builder.createStream(distributor);
-    distributor.setOutputStream(distributorStream);
-    evaluationStream = builder.createStream(distributor);
-    distributor.setEvaluationStream(evaluationStream); // passes evaluation 
events along
-    logger.debug("Successfully instantiated Distributor");
-
-    // instantiate learner and connect it to distributorStream
-    learner = this.learnerOption.getValue();
-    learner.init(builder, source.getDataset(), 1);
-    builder.connectInputShuffleStream(distributorStream, 
learner.getInputProcessor());
-    logger.debug("Successfully instantiated Learner");
-
-    evaluator = new ClusteringEvaluatorProcessor.Builder(
-        sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile())
-        .decayHorizon(((ClusteringStream) 
streamTrain).getDecayHorizon()).build();
-
-    builder.addProcessor(evaluator);
-    for (Stream evaluatorPiInputStream : learner.getResultStreams()) {
-      builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator);
-    }
-    builder.connectInputAllStream(evaluationStream, evaluator);
-    logger.debug("Successfully instantiated EvaluatorProcessor");
-
-    topology = builder.build();
-    logger.debug("Successfully built the topology");
-  }
-
-  @Override
-  public void setFactory(ComponentFactory factory) {
-    // TODO unify this code with init() for now, it's used by S4 App
-    // dynamic binding theoretically will solve this problem
-    builder = new TopologyBuilder(factory);
-    logger.debug("Successfully instantiated TopologyBuilder");
-
-    builder.initTopology(evaluationNameOption.getValue());
-    logger.debug("Successfully initialized SAMOA topology with name {}", 
evaluationNameOption.getValue());
-
-  }
-
-  public Topology getTopology() {
-    return topology;
-  }
-}

Reply via email to