http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java index a7f982e..aa713cc 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java @@ -41,225 +41,227 @@ import com.yahoo.labs.samoa.moa.tasks.TaskMonitor; /** * Stream generator for a stream based on a randomly generated tree.. - * + * * @author Richard Kirkby ([email protected]) * @version $Revision: 7 $ */ public class RandomTreeGenerator extends AbstractOptionHandler implements InstanceStream { - @Override - public String getPurposeString() { - return "Generates a stream based on a randomly generated tree."; - } - - private static final long serialVersionUID = 1L; - - public IntOption treeRandomSeedOption = new IntOption("treeRandomSeed", - 'r', "Seed for random generation of tree.", 1); - - public IntOption instanceRandomSeedOption = new IntOption( - "instanceRandomSeed", 'i', - "Seed for random generation of instances.", 1); + @Override + public String getPurposeString() { + return "Generates a stream based on a randomly generated tree."; + } - public IntOption numClassesOption = new IntOption("numClasses", 'c', - "The number of classes to generate.", 2, 2, Integer.MAX_VALUE); + private static final long serialVersionUID = 1L; - public IntOption numNominalsOption = new IntOption("numNominals", 'o', - "The number of nominal attributes to generate.", 5, 0, - Integer.MAX_VALUE); + public IntOption treeRandomSeedOption = new IntOption("treeRandomSeed", + 'r', "Seed for random generation of tree.", 1); - public IntOption numNumericsOption = new IntOption("numNumerics", 'u', - "The number of numeric attributes to generate.", 5, 0, - Integer.MAX_VALUE); + public IntOption instanceRandomSeedOption = new IntOption( + "instanceRandomSeed", 'i', + "Seed for random generation of instances.", 1); - public IntOption numValsPerNominalOption = new IntOption( - "numValsPerNominal", 'v', - "The number of values to generate per nominal attribute.", 5, 2, - Integer.MAX_VALUE); + public IntOption numClassesOption = new IntOption("numClasses", 'c', + "The number of classes to generate.", 2, 2, Integer.MAX_VALUE); - public IntOption maxTreeDepthOption = new IntOption("maxTreeDepth", 'd', - "The maximum depth of the tree concept.", 5, 0, Integer.MAX_VALUE); + public IntOption numNominalsOption = new IntOption("numNominals", 'o', + "The number of nominal attributes to generate.", 5, 0, + Integer.MAX_VALUE); - public IntOption firstLeafLevelOption = new IntOption( - "firstLeafLevel", - 'l', - "The first level of the tree above maxTreeDepth that can have leaves.", - 3, 0, Integer.MAX_VALUE); + public IntOption numNumericsOption = new IntOption("numNumerics", 'u', + "The number of numeric attributes to generate.", 5, 0, + Integer.MAX_VALUE); - public FloatOption leafFractionOption = new FloatOption("leafFraction", - 'f', - "The fraction of leaves per level from firstLeafLevel onwards.", - 0.15, 0.0, 1.0); + public IntOption numValsPerNominalOption = new IntOption( + "numValsPerNominal", 'v', + "The number of values to generate per nominal attribute.", 5, 2, + Integer.MAX_VALUE); - protected static class Node implements Serializable { + public IntOption maxTreeDepthOption = new IntOption("maxTreeDepth", 'd', + "The maximum depth of the tree concept.", 5, 0, Integer.MAX_VALUE); - private static final long serialVersionUID = 1L; + public IntOption firstLeafLevelOption = new IntOption( + "firstLeafLevel", + 'l', + "The first level of the tree above maxTreeDepth that can have leaves.", + 3, 0, Integer.MAX_VALUE); - public int classLabel; + public FloatOption leafFractionOption = new FloatOption("leafFraction", + 'f', + "The fraction of leaves per level from firstLeafLevel onwards.", + 0.15, 0.0, 1.0); - public int splitAttIndex; + protected static class Node implements Serializable { - public double splitAttValue; + private static final long serialVersionUID = 1L; - public Node[] children; + public int classLabel; + + public int splitAttIndex; + + public double splitAttValue; + + public Node[] children; + } + + protected Node treeRoot; + + protected InstancesHeader streamHeader; + + protected Random instanceRandom; + + @Override + public void prepareForUseImpl(TaskMonitor monitor, + ObjectRepository repository) { + monitor.setCurrentActivity("Preparing random tree...", -1.0); + generateHeader(); + generateRandomTree(); + restart(); + } + + @Override + public long estimatedRemainingInstances() { + return -1; + } + + @Override + public boolean isRestartable() { + return true; + } + + @Override + public void restart() { + this.instanceRandom = new Random(this.instanceRandomSeedOption.getValue()); + } + + @Override + public InstancesHeader getHeader() { + return this.streamHeader; + } + + @Override + public boolean hasMoreInstances() { + return true; + } + + @Override + public InstanceExample nextInstance() { + double[] attVals = new double[this.numNominalsOption.getValue() + + this.numNumericsOption.getValue()]; + InstancesHeader header = getHeader(); + Instance inst = new DenseInstance(header.numAttributes()); + for (int i = 0; i < attVals.length; i++) { + attVals[i] = i < this.numNominalsOption.getValue() ? this.instanceRandom.nextInt(this.numValsPerNominalOption + .getValue()) + : this.instanceRandom.nextDouble(); + inst.setValue(i, attVals[i]); } - - protected Node treeRoot; - - protected InstancesHeader streamHeader; - - protected Random instanceRandom; - - @Override - public void prepareForUseImpl(TaskMonitor monitor, - ObjectRepository repository) { - monitor.setCurrentActivity("Preparing random tree...", -1.0); - generateHeader(); - generateRandomTree(); - restart(); + inst.setDataset(header); + inst.setClassValue(classifyInstance(this.treeRoot, attVals)); + return new InstanceExample(inst); + } + + protected int classifyInstance(Node node, double[] attVals) { + if (node.children == null) { + return node.classLabel; } - - @Override - public long estimatedRemainingInstances() { - return -1; + if (node.splitAttIndex < this.numNominalsOption.getValue()) { + return classifyInstance( + node.children[(int) attVals[node.splitAttIndex]], attVals); } - - @Override - public boolean isRestartable() { - return true; + return classifyInstance( + node.children[attVals[node.splitAttIndex] < node.splitAttValue ? 0 + : 1], attVals); + } + + protected void generateHeader() { + FastVector<Attribute> attributes = new FastVector<>(); + FastVector<String> nominalAttVals = new FastVector<>(); + for (int i = 0; i < this.numValsPerNominalOption.getValue(); i++) { + nominalAttVals.addElement("value" + (i + 1)); } - - @Override - public void restart() { - this.instanceRandom = new Random(this.instanceRandomSeedOption.getValue()); + for (int i = 0; i < this.numNominalsOption.getValue(); i++) { + attributes.addElement(new Attribute("nominal" + (i + 1), + nominalAttVals)); } - - @Override - public InstancesHeader getHeader() { - return this.streamHeader; + for (int i = 0; i < this.numNumericsOption.getValue(); i++) { + attributes.addElement(new Attribute("numeric" + (i + 1))); } - - @Override - public boolean hasMoreInstances() { - return true; + FastVector<String> classLabels = new FastVector<>(); + for (int i = 0; i < this.numClassesOption.getValue(); i++) { + classLabels.addElement("class" + (i + 1)); } - - @Override - public InstanceExample nextInstance() { - double[] attVals = new double[this.numNominalsOption.getValue() - + this.numNumericsOption.getValue()]; - InstancesHeader header = getHeader(); - Instance inst = new DenseInstance(header.numAttributes()); - for (int i = 0; i < attVals.length; i++) { - attVals[i] = i < this.numNominalsOption.getValue() ? this.instanceRandom.nextInt(this.numValsPerNominalOption.getValue()) - : this.instanceRandom.nextDouble(); - inst.setValue(i, attVals[i]); - } - inst.setDataset(header); - inst.setClassValue(classifyInstance(this.treeRoot, attVals)); - return new InstanceExample(inst); + attributes.addElement(new Attribute("class", classLabels)); + this.streamHeader = new InstancesHeader(new Instances( + getCLICreationString(InstanceStream.class), attributes, 0)); + this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1); + } + + protected void generateRandomTree() { + Random treeRand = new Random(this.treeRandomSeedOption.getValue()); + ArrayList<Integer> nominalAttCandidates = new ArrayList<>( + this.numNominalsOption.getValue()); + for (int i = 0; i < this.numNominalsOption.getValue(); i++) { + nominalAttCandidates.add(i); } - - protected int classifyInstance(Node node, double[] attVals) { - if (node.children == null) { - return node.classLabel; - } - if (node.splitAttIndex < this.numNominalsOption.getValue()) { - return classifyInstance( - node.children[(int) attVals[node.splitAttIndex]], attVals); - } - return classifyInstance( - node.children[attVals[node.splitAttIndex] < node.splitAttValue ? 0 - : 1], attVals); + double[] minNumericVals = new double[this.numNumericsOption.getValue()]; + double[] maxNumericVals = new double[this.numNumericsOption.getValue()]; + for (int i = 0; i < this.numNumericsOption.getValue(); i++) { + minNumericVals[i] = 0.0; + maxNumericVals[i] = 1.0; } - - protected void generateHeader() { - FastVector<Attribute> attributes = new FastVector<>(); - FastVector<String> nominalAttVals = new FastVector<>(); - for (int i = 0; i < this.numValsPerNominalOption.getValue(); i++) { - nominalAttVals.addElement("value" + (i + 1)); - } - for (int i = 0; i < this.numNominalsOption.getValue(); i++) { - attributes.addElement(new Attribute("nominal" + (i + 1), - nominalAttVals)); - } - for (int i = 0; i < this.numNumericsOption.getValue(); i++) { - attributes.addElement(new Attribute("numeric" + (i + 1))); - } - FastVector<String> classLabels = new FastVector<>(); - for (int i = 0; i < this.numClassesOption.getValue(); i++) { - classLabels.addElement("class" + (i + 1)); - } - attributes.addElement(new Attribute("class", classLabels)); - this.streamHeader = new InstancesHeader(new Instances( - getCLICreationString(InstanceStream.class), attributes, 0)); - this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1); + this.treeRoot = generateRandomTreeNode(0, nominalAttCandidates, + minNumericVals, maxNumericVals, treeRand); + } + + protected Node generateRandomTreeNode(int currentDepth, + ArrayList<Integer> nominalAttCandidates, double[] minNumericVals, + double[] maxNumericVals, Random treeRand) { + if ((currentDepth >= this.maxTreeDepthOption.getValue()) + || ((currentDepth >= this.firstLeafLevelOption.getValue()) && (this.leafFractionOption.getValue() >= (1.0 - treeRand + .nextDouble())))) { + Node leaf = new Node(); + leaf.classLabel = treeRand.nextInt(this.numClassesOption.getValue()); + return leaf; } - - protected void generateRandomTree() { - Random treeRand = new Random(this.treeRandomSeedOption.getValue()); - ArrayList<Integer> nominalAttCandidates = new ArrayList<>( - this.numNominalsOption.getValue()); - for (int i = 0; i < this.numNominalsOption.getValue(); i++) { - nominalAttCandidates.add(i); - } - double[] minNumericVals = new double[this.numNumericsOption.getValue()]; - double[] maxNumericVals = new double[this.numNumericsOption.getValue()]; - for (int i = 0; i < this.numNumericsOption.getValue(); i++) { - minNumericVals[i] = 0.0; - maxNumericVals[i] = 1.0; - } - this.treeRoot = generateRandomTreeNode(0, nominalAttCandidates, - minNumericVals, maxNumericVals, treeRand); + Node node = new Node(); + int chosenAtt = treeRand.nextInt(nominalAttCandidates.size() + + this.numNumericsOption.getValue()); + if (chosenAtt < nominalAttCandidates.size()) { + node.splitAttIndex = nominalAttCandidates.get(chosenAtt); + node.children = new Node[this.numValsPerNominalOption.getValue()]; + ArrayList<Integer> newNominalCandidates = new ArrayList<>( + nominalAttCandidates); + newNominalCandidates.remove(new Integer(node.splitAttIndex)); + newNominalCandidates.trimToSize(); + for (int i = 0; i < node.children.length; i++) { + node.children[i] = generateRandomTreeNode(currentDepth + 1, + newNominalCandidates, minNumericVals, maxNumericVals, + treeRand); + } + } else { + int numericIndex = chosenAtt - nominalAttCandidates.size(); + node.splitAttIndex = this.numNominalsOption.getValue() + + numericIndex; + double minVal = minNumericVals[numericIndex]; + double maxVal = maxNumericVals[numericIndex]; + node.splitAttValue = ((maxVal - minVal) * treeRand.nextDouble()) + + minVal; + node.children = new Node[2]; + double[] newMaxVals = maxNumericVals.clone(); + newMaxVals[numericIndex] = node.splitAttValue; + node.children[0] = generateRandomTreeNode(currentDepth + 1, + nominalAttCandidates, minNumericVals, newMaxVals, treeRand); + double[] newMinVals = minNumericVals.clone(); + newMinVals[numericIndex] = node.splitAttValue; + node.children[1] = generateRandomTreeNode(currentDepth + 1, + nominalAttCandidates, newMinVals, maxNumericVals, treeRand); } + return node; + } - protected Node generateRandomTreeNode(int currentDepth, - ArrayList<Integer> nominalAttCandidates, double[] minNumericVals, - double[] maxNumericVals, Random treeRand) { - if ((currentDepth >= this.maxTreeDepthOption.getValue()) - || ((currentDepth >= this.firstLeafLevelOption.getValue()) && (this.leafFractionOption.getValue() >= (1.0 - treeRand.nextDouble())))) { - Node leaf = new Node(); - leaf.classLabel = treeRand.nextInt(this.numClassesOption.getValue()); - return leaf; - } - Node node = new Node(); - int chosenAtt = treeRand.nextInt(nominalAttCandidates.size() - + this.numNumericsOption.getValue()); - if (chosenAtt < nominalAttCandidates.size()) { - node.splitAttIndex = nominalAttCandidates.get(chosenAtt); - node.children = new Node[this.numValsPerNominalOption.getValue()]; - ArrayList<Integer> newNominalCandidates = new ArrayList<>( - nominalAttCandidates); - newNominalCandidates.remove(new Integer(node.splitAttIndex)); - newNominalCandidates.trimToSize(); - for (int i = 0; i < node.children.length; i++) { - node.children[i] = generateRandomTreeNode(currentDepth + 1, - newNominalCandidates, minNumericVals, maxNumericVals, - treeRand); - } - } else { - int numericIndex = chosenAtt - nominalAttCandidates.size(); - node.splitAttIndex = this.numNominalsOption.getValue() - + numericIndex; - double minVal = minNumericVals[numericIndex]; - double maxVal = maxNumericVals[numericIndex]; - node.splitAttValue = ((maxVal - minVal) * treeRand.nextDouble()) - + minVal; - node.children = new Node[2]; - double[] newMaxVals = maxNumericVals.clone(); - newMaxVals[numericIndex] = node.splitAttValue; - node.children[0] = generateRandomTreeNode(currentDepth + 1, - nominalAttCandidates, minNumericVals, newMaxVals, treeRand); - double[] newMinVals = minNumericVals.clone(); - newMinVals[numericIndex] = node.splitAttValue; - node.children[1] = generateRandomTreeNode(currentDepth + 1, - nominalAttCandidates, newMinVals, maxNumericVals, treeRand); - } - return node; - } - - @Override - public void getDescription(StringBuilder sb, int indent) { - // TODO Auto-generated method stub - } + @Override + public void getDescription(StringBuilder sb, int indent) { + // TODO Auto-generated method stub + } }
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java index 977897f..da40835 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java @@ -22,81 +22,81 @@ package com.yahoo.labs.samoa.moa.tasks; /** * Class that represents a null monitor. - * + * * @author Richard Kirkby ([email protected]) * @version $Revision: 7 $ */ public class NullMonitor implements TaskMonitor { - @Override - public void setCurrentActivity(String activityDescription, - double fracComplete) { - } - - @Override - public void setCurrentActivityDescription(String activity) { - } - - @Override - public void setCurrentActivityFractionComplete(double fracComplete) { - } - - @Override - public boolean taskShouldAbort() { - return false; - } - - @Override - public String getCurrentActivityDescription() { - return null; - } - - @Override - public double getCurrentActivityFractionComplete() { - return -1.0; - } - - @Override - public boolean isPaused() { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public void requestCancel() { - } - - @Override - public void requestPause() { - } - - @Override - public void requestResume() { - } - - @Override - public Object getLatestResultPreview() { - return null; - } - - @Override - public void requestResultPreview() { - } - - @Override - public boolean resultPreviewRequested() { - return false; - } - - @Override - public void setLatestResultPreview(Object latestPreview) { - } - - @Override - public void requestResultPreview(ResultPreviewListener toInform) { - } + @Override + public void setCurrentActivity(String activityDescription, + double fracComplete) { + } + + @Override + public void setCurrentActivityDescription(String activity) { + } + + @Override + public void setCurrentActivityFractionComplete(double fracComplete) { + } + + @Override + public boolean taskShouldAbort() { + return false; + } + + @Override + public String getCurrentActivityDescription() { + return null; + } + + @Override + public double getCurrentActivityFractionComplete() { + return -1.0; + } + + @Override + public boolean isPaused() { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public void requestCancel() { + } + + @Override + public void requestPause() { + } + + @Override + public void requestResume() { + } + + @Override + public Object getLatestResultPreview() { + return null; + } + + @Override + public void requestResultPreview() { + } + + @Override + public boolean resultPreviewRequested() { + return false; + } + + @Override + public void setLatestResultPreview(Object latestPreview) { + } + + @Override + public void requestResultPreview(ResultPreviewListener toInform) { + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java index 3fece0b..63d1236 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java @@ -21,20 +21,20 @@ package com.yahoo.labs.samoa.moa.tasks; */ /** - * Interface implemented by classes that preview results - * on the Graphical User Interface - * + * Interface implemented by classes that preview results on the Graphical User + * Interface + * * @author Richard Kirkby ([email protected]) * @version $Revision: 7 $ */ public interface ResultPreviewListener { - /** - * This method is used to receive a signal from - * <code>TaskMonitor</code> that the lastest preview has - * changed. This method is implemented in <code>PreviewPanel</code> - * to change the results that are shown in its panel. - * - */ - public void latestPreviewChanged(); + /** + * This method is used to receive a signal from <code>TaskMonitor</code> that + * the lastest preview has changed. This method is implemented in + * <code>PreviewPanel</code> to change the results that are shown in its + * panel. + * + */ + public void latestPreviewChanged(); } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java index d2c96a8..d3dcc4f 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java @@ -24,38 +24,38 @@ import com.yahoo.labs.samoa.moa.MOAObject; import com.yahoo.labs.samoa.moa.core.ObjectRepository; /** - * Interface representing a task. - * + * Interface representing a task. + * * @author Richard Kirkby ([email protected]) - * @version $Revision: 7 $ + * @version $Revision: 7 $ */ public interface Task extends MOAObject { - /** - * Gets the result type of this task. - * Tasks can return LearningCurve, LearningEvaluation, - * Classifier, String, Instances.. - * - * @return a class object of the result of this task - */ - public Class<?> getTaskResultType(); + /** + * Gets the result type of this task. Tasks can return LearningCurve, + * LearningEvaluation, Classifier, String, Instances.. + * + * @return a class object of the result of this task + */ + public Class<?> getTaskResultType(); - /** - * This method performs this task, - * when TaskMonitor and ObjectRepository are no needed. - * - * @return an object with the result of this task - */ - public Object doTask(); + /** + * This method performs this task, when TaskMonitor and ObjectRepository are + * no needed. + * + * @return an object with the result of this task + */ + public Object doTask(); - /** - * This method performs this task. - * <code>AbstractTask</code> implements this method so all - * its extensions only need to implement <code>doTaskImpl</code> - * - * @param monitor the TaskMonitor to use - * @param repository the ObjectRepository to use - * @return an object with the result of this task - */ - public Object doTask(TaskMonitor monitor, ObjectRepository repository); + /** + * This method performs this task. <code>AbstractTask</code> implements this + * method so all its extensions only need to implement <code>doTaskImpl</code> + * + * @param monitor + * the TaskMonitor to use + * @param repository + * the ObjectRepository to use + * @return an object with the result of this task + */ + public Object doTask(TaskMonitor monitor, ObjectRepository repository); } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java index 4918cd8..7b37f05 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java @@ -21,120 +21,126 @@ package com.yahoo.labs.samoa.moa.tasks; */ /** - * Interface representing a task monitor. - * + * Interface representing a task monitor. + * * @author Richard Kirkby ([email protected]) - * @version $Revision: 7 $ + * @version $Revision: 7 $ */ public interface TaskMonitor { - /** - * Sets the description and the percentage done of the current activity. - * - * @param activity the description of the current activity - * @param fracComplete the percentage done of the current activity - */ - public void setCurrentActivity(String activityDescription, - double fracComplete); - - /** - * Sets the description of the current activity. - * - * @param activity the description of the current activity - */ - public void setCurrentActivityDescription(String activity); - - /** - * Sets the percentage done of the current activity - * - * @param fracComplete the percentage done of the current activity - */ - public void setCurrentActivityFractionComplete(double fracComplete); - - /** - * Gets whether the task should abort. - * - * @return true if the task should abort - */ - public boolean taskShouldAbort(); - - /** - * Gets whether there is a request for preview the task result. - * - * @return true if there is a request for preview the task result - */ - public boolean resultPreviewRequested(); - - /** - * Sets the current result to preview - * - * @param latestPreview the result to preview - */ - public void setLatestResultPreview(Object latestPreview); - - /** - * Gets the description of the current activity. - * - * @return the description of the current activity - */ - public String getCurrentActivityDescription(); - - /** - * Gets the percentage done of the current activity - * - * @return the percentage done of the current activity - */ - public double getCurrentActivityFractionComplete(); - - /** - * Requests the task monitored to pause. - * - */ - public void requestPause(); - - /** - * Requests the task monitored to resume. - * - */ - public void requestResume(); - - /** - * Requests the task monitored to cancel. - * - */ - public void requestCancel(); - - /** - * Gets whether the task monitored is paused. - * - * @return true if the task is paused - */ - public boolean isPaused(); - - /** - * Gets whether the task monitored is cancelled. - * - * @return true if the task is cancelled - */ - public boolean isCancelled(); - - /** - * Requests to preview the task result. - * - */ - public void requestResultPreview(); - - /** - * Requests to preview the task result. - * - * @param toInform the listener of the changes in the preview of the result - */ - public void requestResultPreview(ResultPreviewListener toInform); - - /** - * Gets the current result to preview - * - * @return the result to preview - */ - public Object getLatestResultPreview(); + /** + * Sets the description and the percentage done of the current activity. + * + * @param activity + * the description of the current activity + * @param fracComplete + * the percentage done of the current activity + */ + public void setCurrentActivity(String activityDescription, + double fracComplete); + + /** + * Sets the description of the current activity. + * + * @param activity + * the description of the current activity + */ + public void setCurrentActivityDescription(String activity); + + /** + * Sets the percentage done of the current activity + * + * @param fracComplete + * the percentage done of the current activity + */ + public void setCurrentActivityFractionComplete(double fracComplete); + + /** + * Gets whether the task should abort. + * + * @return true if the task should abort + */ + public boolean taskShouldAbort(); + + /** + * Gets whether there is a request for preview the task result. + * + * @return true if there is a request for preview the task result + */ + public boolean resultPreviewRequested(); + + /** + * Sets the current result to preview + * + * @param latestPreview + * the result to preview + */ + public void setLatestResultPreview(Object latestPreview); + + /** + * Gets the description of the current activity. + * + * @return the description of the current activity + */ + public String getCurrentActivityDescription(); + + /** + * Gets the percentage done of the current activity + * + * @return the percentage done of the current activity + */ + public double getCurrentActivityFractionComplete(); + + /** + * Requests the task monitored to pause. + * + */ + public void requestPause(); + + /** + * Requests the task monitored to resume. + * + */ + public void requestResume(); + + /** + * Requests the task monitored to cancel. + * + */ + public void requestCancel(); + + /** + * Gets whether the task monitored is paused. + * + * @return true if the task is paused + */ + public boolean isPaused(); + + /** + * Gets whether the task monitored is cancelled. + * + * @return true if the task is cancelled + */ + public boolean isCancelled(); + + /** + * Requests to preview the task result. + * + */ + public void requestResultPreview(); + + /** + * Requests to preview the task result. + * + * @param toInform + * the listener of the changes in the preview of the result + */ + public void requestResultPreview(ResultPreviewListener toInform); + + /** + * Gets the current result to preview + * + * @return the result to preview + */ + public Object getLatestResultPreview(); } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java index 93fc7c4..80a4910 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java @@ -31,89 +31,89 @@ import com.yahoo.labs.samoa.moa.tasks.TaskMonitor; /** * InstanceStream for ARFF file + * * @author Casey */ public class ArffFileStream extends FileStream { - public FileOption arffFileOption = new FileOption("arffFile", 'f', - "ARFF File(s) to load.", null, null, false); - - public IntOption classIndexOption = new IntOption("classIndex", 'c', - "Class index of data. 0 for none or -1 for last attribute in file.", - -1, -1, Integer.MAX_VALUE); - - protected InstanceExample lastInstanceRead; - - @Override - public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { - super.prepareForUseImpl(monitor, repository); - String filePath = this.arffFileOption.getFile().getAbsolutePath(); - this.fileSource.init(filePath, "arff"); - this.lastInstanceRead = null; - } - - @Override - protected void reset() { - try { - if (this.fileReader != null) - this.fileReader.close(); - - fileSource.reset(); - } - catch (IOException ioe) { - throw new RuntimeException("FileStream restart failed.", ioe); - } - - if (!getNextFileReader()) { - hitEndOfStream = true; - throw new RuntimeException("FileStream is empty."); - } - } - - @Override - protected boolean getNextFileReader() { - boolean ret = super.getNextFileReader(); - if (ret) { - this.instances = new Instances(this.fileReader, 1, -1); - if (this.classIndexOption.getValue() < 0) { - this.instances.setClassIndex(this.instances.numAttributes() - 1); - } else if (this.classIndexOption.getValue() > 0) { - this.instances.setClassIndex(this.classIndexOption.getValue() - 1); - } - } - return ret; - } - - @Override - protected boolean readNextInstanceFromFile() { - try { - if (this.instances.readInstance(this.fileReader)) { - this.lastInstanceRead = new InstanceExample(this.instances.instance(0)); - this.instances.delete(); // keep instances clean - return true; - } - if (this.fileReader != null) { - this.fileReader.close(); - this.fileReader = null; - } - return false; - } catch (IOException ioe) { - throw new RuntimeException( - "ArffFileStream failed to read instance from stream.", ioe); - } - - } - - @Override - protected InstanceExample getLastInstanceRead() { - return this.lastInstanceRead; - } - - /* - * extend com.yahoo.labs.samoa.moa.MOAObject - */ - @Override - public void getDescription(StringBuilder sb, int indent) { - // TODO Auto-generated method stub + public FileOption arffFileOption = new FileOption("arffFile", 'f', + "ARFF File(s) to load.", null, null, false); + + public IntOption classIndexOption = new IntOption("classIndex", 'c', + "Class index of data. 0 for none or -1 for last attribute in file.", + -1, -1, Integer.MAX_VALUE); + + protected InstanceExample lastInstanceRead; + + @Override + public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { + super.prepareForUseImpl(monitor, repository); + String filePath = this.arffFileOption.getFile().getAbsolutePath(); + this.fileSource.init(filePath, "arff"); + this.lastInstanceRead = null; + } + + @Override + protected void reset() { + try { + if (this.fileReader != null) + this.fileReader.close(); + + fileSource.reset(); + } catch (IOException ioe) { + throw new RuntimeException("FileStream restart failed.", ioe); + } + + if (!getNextFileReader()) { + hitEndOfStream = true; + throw new RuntimeException("FileStream is empty."); } + } + + @Override + protected boolean getNextFileReader() { + boolean ret = super.getNextFileReader(); + if (ret) { + this.instances = new Instances(this.fileReader, 1, -1); + if (this.classIndexOption.getValue() < 0) { + this.instances.setClassIndex(this.instances.numAttributes() - 1); + } else if (this.classIndexOption.getValue() > 0) { + this.instances.setClassIndex(this.classIndexOption.getValue() - 1); + } + } + return ret; + } + + @Override + protected boolean readNextInstanceFromFile() { + try { + if (this.instances.readInstance(this.fileReader)) { + this.lastInstanceRead = new InstanceExample(this.instances.instance(0)); + this.instances.delete(); // keep instances clean + return true; + } + if (this.fileReader != null) { + this.fileReader.close(); + this.fileReader = null; + } + return false; + } catch (IOException ioe) { + throw new RuntimeException( + "ArffFileStream failed to read instance from stream.", ioe); + } + + } + + @Override + protected InstanceExample getLastInstanceRead() { + return this.lastInstanceRead; + } + + /* + * extend com.yahoo.labs.samoa.moa.MOAObject + */ + @Override + public void getDescription(StringBuilder sb, int indent) { + // TODO Auto-generated method stub + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java index 20f3feb..ddb047e 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java @@ -45,127 +45,167 @@ import com.yahoo.labs.samoa.moa.streams.clustering.RandomRBFGeneratorEvents; */ public final class ClusteringEntranceProcessor implements EntranceProcessor { - private static final long serialVersionUID = 4169053337917578558L; + private static final long serialVersionUID = 4169053337917578558L; - private static final Logger logger = LoggerFactory.getLogger(ClusteringEntranceProcessor.class); + private static final Logger logger = LoggerFactory.getLogger(ClusteringEntranceProcessor.class); - private StreamSource streamSource; - private Instance firstInstance; - private boolean isInited = false; - private Random random = new Random(); - private double samplingThreshold; - private int numberInstances; - private int numInstanceSent = 0; + private StreamSource streamSource; + private Instance firstInstance; + private boolean isInited = false; + private Random random = new Random(); + private double samplingThreshold; + private int numberInstances; + private int numInstanceSent = 0; - private int groundTruthSamplingFrequency; + private int groundTruthSamplingFrequency; - @Override - public boolean process(ContentEvent event) { - // TODO: possible refactor of the super-interface implementation - // of source processor does not need this method - return false; - } + @Override + public boolean process(ContentEvent event) { + // TODO: possible refactor of the super-interface implementation + // of source processor does not need this method + return false; + } - @Override - public void onCreate(int id) { - logger.debug("Creating ClusteringSourceProcessor with id {}", id); - } + @Override + public void onCreate(int id) { + logger.debug("Creating ClusteringSourceProcessor with id {}", id); + } - @Override - public Processor newProcessor(Processor p) { - ClusteringEntranceProcessor newProcessor = new ClusteringEntranceProcessor(); - ClusteringEntranceProcessor originProcessor = (ClusteringEntranceProcessor) p; - if (originProcessor.getStreamSource() != null) { - newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); - } - return newProcessor; + @Override + public Processor newProcessor(Processor p) { + ClusteringEntranceProcessor newProcessor = new ClusteringEntranceProcessor(); + ClusteringEntranceProcessor originProcessor = (ClusteringEntranceProcessor) p; + if (originProcessor.getStreamSource() != null) { + newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); } + return newProcessor; + } - @Override - public boolean hasNext() { - return (!isFinished()); - } + @Override + public boolean hasNext() { + return (!isFinished()); + } - @Override - public boolean isFinished() { - return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && numInstanceSent >= numberInstances)); - } + @Override + public boolean isFinished() { + return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && numInstanceSent >= numberInstances)); + } - // /** - // * Method to send instances via input stream - // * - // * @param inputStream - // * @param numberInstances - // */ - // public void sendInstances(Stream inputStream, Stream evaluationStream, int numberInstances, double samplingThreshold) { - // int numInstanceSent = 0; - // this.samplingThreshold = samplingThreshold; - // while (streamSource.hasMoreInstances() && numInstanceSent < numberInstances) { - // numInstanceSent++; - // DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent); - // ClusteringContentEvent contentEvent = new ClusteringContentEvent(numInstanceSent, nextDataPoint); - // inputStream.put(contentEvent); - // sendPointsAndGroundTruth(streamSource, evaluationStream, numInstanceSent, nextDataPoint); - // } - // - // sendEndEvaluationInstance(inputStream); - // } + // /** + // * Method to send instances via input stream + // * + // * @param inputStream + // * @param numberInstances + // */ + // public void sendInstances(Stream inputStream, Stream evaluationStream, int + // numberInstances, double samplingThreshold) { + // int numInstanceSent = 0; + // this.samplingThreshold = samplingThreshold; + // while (streamSource.hasMoreInstances() && numInstanceSent < + // numberInstances) { + // numInstanceSent++; + // DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent); + // ClusteringContentEvent contentEvent = new + // ClusteringContentEvent(numInstanceSent, nextDataPoint); + // inputStream.put(contentEvent); + // sendPointsAndGroundTruth(streamSource, evaluationStream, numInstanceSent, + // nextDataPoint); + // } + // + // sendEndEvaluationInstance(inputStream); + // } - public double getSamplingThreshold() { - return samplingThreshold; - } + public double getSamplingThreshold() { + return samplingThreshold; + } - public void setSamplingThreshold(double samplingThreshold) { - this.samplingThreshold = samplingThreshold; - } - - + public void setSamplingThreshold(double samplingThreshold) { + this.samplingThreshold = samplingThreshold; + } - public int getGroundTruthSamplingFrequency() { - return groundTruthSamplingFrequency; - } + public int getGroundTruthSamplingFrequency() { + return groundTruthSamplingFrequency; + } - public void setGroundTruthSamplingFrequency(int groundTruthSamplingFrequency) { - this.groundTruthSamplingFrequency = groundTruthSamplingFrequency; - } + public void setGroundTruthSamplingFrequency(int groundTruthSamplingFrequency) { + this.groundTruthSamplingFrequency = groundTruthSamplingFrequency; + } + + public StreamSource getStreamSource() { + return streamSource; + } - public StreamSource getStreamSource() { - return streamSource; + public void setStreamSource(InstanceStream stream) { + if (stream instanceof AbstractOptionHandler) { + ((AbstractOptionHandler) (stream)).prepareForUse(); } - public void setStreamSource(InstanceStream stream) { - if (stream instanceof AbstractOptionHandler) { - ((AbstractOptionHandler) (stream)).prepareForUse(); - } + this.streamSource = new StreamSource(stream); + firstInstance = streamSource.nextInstance().getData(); + } - this.streamSource = new StreamSource(stream); - firstInstance = streamSource.nextInstance().getData(); - } + public Instances getDataset() { + return firstInstance.dataset(); + } - public Instances getDataset() { - return firstInstance.dataset(); + private Instance nextInstance() { + if (this.isInited) { + return streamSource.nextInstance().getData(); + } else { + this.isInited = true; + return firstInstance; } + } - private Instance nextInstance() { - if (this.isInited) { - return streamSource.nextInstance().getData(); - } else { - this.isInited = true; - return firstInstance; - } - } + // private void sendEndEvaluationInstance(Stream inputStream) { + // ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, + // firstInstance); + // contentEvent.setLast(true); + // inputStream.put(contentEvent); + // } - // private void sendEndEvaluationInstance(Stream inputStream) { - // ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, firstInstance); - // contentEvent.setLast(true); - // inputStream.put(contentEvent); - // } + // private void sendPointsAndGroundTruth(StreamSource sourceStream, Stream + // evaluationStream, int numInstanceSent, DataPoint nextDataPoint) { + // boolean sendEvent = false; + // DataPoint instance = null; + // Clustering gtClustering = null; + // int samplingFrequency = ((ClusteringStream) + // sourceStream.getStream()).getDecayHorizon(); + // if (random.nextDouble() < samplingThreshold) { + // // Add instance + // sendEvent = true; + // instance = nextDataPoint; + // } + // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) { + // // Add GroundTruth + // sendEvent = true; + // gtClustering = ((RandomRBFGeneratorEvents) + // sourceStream.getStream()).getGeneratingClusters(); + // } + // if (sendEvent == true) { + // ClusteringEvaluationContentEvent evalEvent; + // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance, + // false); + // evaluationStream.put(evalEvent); + // } + // } + + public void setMaxNumInstances(int value) { + numberInstances = value; + } + + public int getMaxNumInstances() { + return this.numberInstances; + } + + @Override + public ContentEvent nextEvent() { - // private void sendPointsAndGroundTruth(StreamSource sourceStream, Stream evaluationStream, int numInstanceSent, DataPoint nextDataPoint) { // boolean sendEvent = false; // DataPoint instance = null; // Clustering gtClustering = null; - // int samplingFrequency = ((ClusteringStream) sourceStream.getStream()).getDecayHorizon(); + // int samplingFrequency = ((ClusteringStream) + // sourceStream.getStream()).getDecayHorizon(); // if (random.nextDouble() < samplingThreshold) { // // Add instance // sendEvent = true; @@ -174,68 +214,52 @@ public final class ClusteringEntranceProcessor implements EntranceProcessor { // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) { // // Add GroundTruth // sendEvent = true; - // gtClustering = ((RandomRBFGeneratorEvents) sourceStream.getStream()).getGeneratingClusters(); + // gtClustering = ((RandomRBFGeneratorEvents) + // sourceStream.getStream()).getGeneratingClusters(); // } // if (sendEvent == true) { // ClusteringEvaluationContentEvent evalEvent; - // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance, false); + // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance, + // false); // evaluationStream.put(evalEvent); // } - // } - - public void setMaxNumInstances(int value) { - numberInstances = value; - } - - public int getMaxNumInstances() { - return this.numberInstances; - } - @Override - public ContentEvent nextEvent() { - - // boolean sendEvent = false; - // DataPoint instance = null; - // Clustering gtClustering = null; - // int samplingFrequency = ((ClusteringStream) sourceStream.getStream()).getDecayHorizon(); - // if (random.nextDouble() < samplingThreshold) { - // // Add instance - // sendEvent = true; - // instance = nextDataPoint; - // } - // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) { - // // Add GroundTruth - // sendEvent = true; - // gtClustering = ((RandomRBFGeneratorEvents) sourceStream.getStream()).getGeneratingClusters(); - // } - // if (sendEvent == true) { - // ClusteringEvaluationContentEvent evalEvent; - // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance, false); - // evaluationStream.put(evalEvent); - // } - - groundTruthSamplingFrequency = ((ClusteringStream) streamSource.getStream()).getDecayHorizon(); // FIXME should it be taken from the ClusteringEvaluation -f option instead? - if (isFinished()) { - // send ending event - ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, firstInstance); - contentEvent.setLast(true); - return contentEvent; - } else { - DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent); - numInstanceSent++; - if (numInstanceSent % groundTruthSamplingFrequency == 0) { - // TODO implement an interface ClusteringGroundTruth with a getGeneratingClusters() method, check if the source implements the interface - // send a clustering evaluation event for external measures (distance from the gt clusters) - Clustering gtClustering = ((RandomRBFGeneratorEvents) streamSource.getStream()).getGeneratingClusters(); - return new ClusteringEvaluationContentEvent(gtClustering, nextDataPoint, false); - } else { - ClusteringContentEvent contentEvent = new ClusteringContentEvent(numInstanceSent, nextDataPoint); - if (random.nextDouble() < samplingThreshold) { - // send a clustering content event for internal measures (cohesion, separation) - contentEvent.setSample(true); - } - return contentEvent; - } + groundTruthSamplingFrequency = ((ClusteringStream) streamSource.getStream()).getDecayHorizon(); // FIXME + // should + // it + // be + // taken + // from + // the + // ClusteringEvaluation + // -f + // option + // instead? + if (isFinished()) { + // send ending event + ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, firstInstance); + contentEvent.setLast(true); + return contentEvent; + } else { + DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent); + numInstanceSent++; + if (numInstanceSent % groundTruthSamplingFrequency == 0) { + // TODO implement an interface ClusteringGroundTruth with a + // getGeneratingClusters() method, check if the source implements the + // interface + // send a clustering evaluation event for external measures (distance + // from the gt clusters) + Clustering gtClustering = ((RandomRBFGeneratorEvents) streamSource.getStream()).getGeneratingClusters(); + return new ClusteringEvaluationContentEvent(gtClustering, nextDataPoint, false); + } else { + ClusteringContentEvent contentEvent = new ClusteringContentEvent(numInstanceSent, nextDataPoint); + if (random.nextDouble() < samplingThreshold) { + // send a clustering content event for internal measures (cohesion, + // separation) + contentEvent.setSample(true); } + return contentEvent; + } } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java index c8004f5..548c0da 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java @@ -37,138 +37,139 @@ import com.yahoo.labs.samoa.moa.tasks.TaskMonitor; import com.yahoo.labs.samoa.streams.fs.FileStreamSource; /** - * InstanceStream for files - * (Abstract class: subclass this class for different file formats) + * InstanceStream for files (Abstract class: subclass this class for different + * file formats) + * * @author Casey */ public abstract class FileStream extends AbstractOptionHandler implements InstanceStream { - /** + /** * */ - private static final long serialVersionUID = 3028905554604259130L; - - public ClassOption sourceTypeOption = new ClassOption("sourceType", - 's', "Source Type (HDFS, local FS)", FileStreamSource.class, - "LocalFileStreamSource"); - - protected transient FileStreamSource fileSource; - protected transient Reader fileReader; - protected Instances instances; - - protected boolean hitEndOfStream; - private boolean hasStarted; - - /* - * Constructors - */ - public FileStream() { - this.hitEndOfStream = false; - } - - /* - * implement InstanceStream - */ - @Override - public InstancesHeader getHeader() { - return new InstancesHeader(this.instances); - } - - @Override - public long estimatedRemainingInstances() { - return -1; - } - - @Override - public boolean hasMoreInstances() { - return !this.hitEndOfStream; - } - - @Override - public InstanceExample nextInstance() { - if (this.getLastInstanceRead() == null) { - readNextInstanceFromStream(); - } - InstanceExample prevInstance = this.getLastInstanceRead(); - readNextInstanceFromStream(); - return prevInstance; - } - - @Override - public boolean isRestartable() { - return true; - } - - @Override - public void restart() { - reset(); - hasStarted = false; - } - - protected void reset() { - try { - if (this.fileReader != null) - this.fileReader.close(); - - fileSource.reset(); - } - catch (IOException ioe) { - throw new RuntimeException("FileStream restart failed.", ioe); - } - - if (!getNextFileReader()) { - hitEndOfStream = true; - throw new RuntimeException("FileStream is empty."); - } - - this.instances = new Instances(this.fileReader, 1, -1); - this.instances.setClassIndex(this.instances.numAttributes() - 1); - } - - protected boolean getNextFileReader() { - if (this.fileReader != null) - try { - this.fileReader.close(); - } catch (IOException ioe) { - ioe.printStackTrace(); - } - - InputStream inputStream = this.fileSource.getNextInputStream(); - if (inputStream == null) - return false; - - this.fileReader = new BufferedReader(new InputStreamReader(inputStream)); - return true; - } - - protected boolean readNextInstanceFromStream() { - if (!hasStarted) { - this.reset(); - hasStarted = true; - } - - while (true) { - if (readNextInstanceFromFile()) return true; - - if (!getNextFileReader()) { - this.hitEndOfStream = true; - return false; - } - } - } - - /** - * Read next instance from the current file and assign it to - * lastInstanceRead. - * @return true if it was able to read next instance and - * false if it was at the end of the file - */ - protected abstract boolean readNextInstanceFromFile(); - - protected abstract InstanceExample getLastInstanceRead(); - - @Override - public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { - this.fileSource = sourceTypeOption.getValue(); - this.hasStarted = false; - } + private static final long serialVersionUID = 3028905554604259130L; + + public ClassOption sourceTypeOption = new ClassOption("sourceType", + 's', "Source Type (HDFS, local FS)", FileStreamSource.class, + "LocalFileStreamSource"); + + protected transient FileStreamSource fileSource; + protected transient Reader fileReader; + protected Instances instances; + + protected boolean hitEndOfStream; + private boolean hasStarted; + + /* + * Constructors + */ + public FileStream() { + this.hitEndOfStream = false; + } + + /* + * implement InstanceStream + */ + @Override + public InstancesHeader getHeader() { + return new InstancesHeader(this.instances); + } + + @Override + public long estimatedRemainingInstances() { + return -1; + } + + @Override + public boolean hasMoreInstances() { + return !this.hitEndOfStream; + } + + @Override + public InstanceExample nextInstance() { + if (this.getLastInstanceRead() == null) { + readNextInstanceFromStream(); + } + InstanceExample prevInstance = this.getLastInstanceRead(); + readNextInstanceFromStream(); + return prevInstance; + } + + @Override + public boolean isRestartable() { + return true; + } + + @Override + public void restart() { + reset(); + hasStarted = false; + } + + protected void reset() { + try { + if (this.fileReader != null) + this.fileReader.close(); + + fileSource.reset(); + } catch (IOException ioe) { + throw new RuntimeException("FileStream restart failed.", ioe); + } + + if (!getNextFileReader()) { + hitEndOfStream = true; + throw new RuntimeException("FileStream is empty."); + } + + this.instances = new Instances(this.fileReader, 1, -1); + this.instances.setClassIndex(this.instances.numAttributes() - 1); + } + + protected boolean getNextFileReader() { + if (this.fileReader != null) + try { + this.fileReader.close(); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + + InputStream inputStream = this.fileSource.getNextInputStream(); + if (inputStream == null) + return false; + + this.fileReader = new BufferedReader(new InputStreamReader(inputStream)); + return true; + } + + protected boolean readNextInstanceFromStream() { + if (!hasStarted) { + this.reset(); + hasStarted = true; + } + + while (true) { + if (readNextInstanceFromFile()) + return true; + + if (!getNextFileReader()) { + this.hitEndOfStream = true; + return false; + } + } + } + + /** + * Read next instance from the current file and assign it to lastInstanceRead. + * + * @return true if it was able to read next instance and false if it was at + * the end of the file + */ + protected abstract boolean readNextInstanceFromFile(); + + protected abstract InstanceExample getLastInstanceRead(); + + @Override + public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { + this.fileSource = sourceTypeOption.getValue(); + this.hasStarted = false; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java index e9a5aa1..f9884e9 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java @@ -38,192 +38,196 @@ import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler; import com.yahoo.labs.samoa.moa.streams.InstanceStream; /** - * Prequential Source Processor is the processor for Prequential Evaluation Task. + * Prequential Source Processor is the processor for Prequential Evaluation + * Task. * * @author Arinto Murdopo * */ public final class PrequentialSourceProcessor implements EntranceProcessor { - private static final long serialVersionUID = 4169053337917578558L; - - private static final Logger logger = LoggerFactory.getLogger(PrequentialSourceProcessor.class); - private boolean isInited = false; - private StreamSource streamSource; - private Instance firstInstance; - private int numberInstances; - private int numInstanceSent = 0; - - protected InstanceStream sourceStream; - - /* - * ScheduledExecutorService to schedule sending events after each delay interval. - * It is expected to have only one event in the queue at a time, so we need only - * one thread in the pool. - */ - private transient ScheduledExecutorService timer; - private transient ScheduledFuture<?> schedule = null; - private int readyEventIndex = 1; // No waiting for the first event - private int delay = 0; - private int batchSize = 1; - private boolean finished = false; - - @Override - public boolean process(ContentEvent event) { - // TODO: possible refactor of the super-interface implementation - // of source processor does not need this method - return false; + private static final long serialVersionUID = 4169053337917578558L; + + private static final Logger logger = LoggerFactory.getLogger(PrequentialSourceProcessor.class); + private boolean isInited = false; + private StreamSource streamSource; + private Instance firstInstance; + private int numberInstances; + private int numInstanceSent = 0; + + protected InstanceStream sourceStream; + + /* + * ScheduledExecutorService to schedule sending events after each delay + * interval. It is expected to have only one event in the queue at a time, so + * we need only one thread in the pool. + */ + private transient ScheduledExecutorService timer; + private transient ScheduledFuture<?> schedule = null; + private int readyEventIndex = 1; // No waiting for the first event + private int delay = 0; + private int batchSize = 1; + private boolean finished = false; + + @Override + public boolean process(ContentEvent event) { + // TODO: possible refactor of the super-interface implementation + // of source processor does not need this method + return false; + } + + @Override + public boolean isFinished() { + return finished; + } + + @Override + public boolean hasNext() { + return !isFinished() && (delay <= 0 || numInstanceSent < readyEventIndex); + } + + private boolean hasReachedEndOfStream() { + return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && numInstanceSent >= numberInstances)); + } + + @Override + public ContentEvent nextEvent() { + InstanceContentEvent contentEvent = null; + if (hasReachedEndOfStream()) { + contentEvent = new InstanceContentEvent(-1, firstInstance, false, true); + contentEvent.setLast(true); + // set finished status _after_ tagging last event + finished = true; } - - @Override - public boolean isFinished() { - return finished; + else if (hasNext()) { + numInstanceSent++; + contentEvent = new InstanceContentEvent(numInstanceSent, nextInstance(), true, true); + + // first call to this method will trigger the timer + if (schedule == null && delay > 0) { + schedule = timer.scheduleWithFixedDelay(new DelayTimeoutHandler(this), delay, delay, + TimeUnit.MICROSECONDS); + } } - - @Override - public boolean hasNext() { - return !isFinished() && (delay <= 0 || numInstanceSent < readyEventIndex); + return contentEvent; + } + + private void increaseReadyEventIndex() { + readyEventIndex += batchSize; + // if we exceed the max, cancel the timer + if (schedule != null && isFinished()) { + schedule.cancel(false); } - - private boolean hasReachedEndOfStream() { - return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && numInstanceSent >= numberInstances)); + } + + @Override + public void onCreate(int id) { + initStreamSource(sourceStream); + timer = Executors.newScheduledThreadPool(1); + logger.debug("Creating PrequentialSourceProcessor with id {}", id); + } + + @Override + public Processor newProcessor(Processor p) { + PrequentialSourceProcessor newProcessor = new PrequentialSourceProcessor(); + PrequentialSourceProcessor originProcessor = (PrequentialSourceProcessor) p; + if (originProcessor.getStreamSource() != null) { + newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); } - - @Override - public ContentEvent nextEvent() { - InstanceContentEvent contentEvent = null; - if (hasReachedEndOfStream()) { - contentEvent = new InstanceContentEvent(-1, firstInstance, false, true); - contentEvent.setLast(true); - // set finished status _after_ tagging last event - finished = true; - } - else if (hasNext()) { - numInstanceSent++; - contentEvent = new InstanceContentEvent(numInstanceSent, nextInstance(), true, true); - - // first call to this method will trigger the timer - if (schedule == null && delay > 0) { - schedule = timer.scheduleWithFixedDelay(new DelayTimeoutHandler(this), delay, delay, - TimeUnit.MICROSECONDS); - } - } - return contentEvent; + return newProcessor; + } + + // /** + // * Method to send instances via input stream + // * + // * @param inputStream + // * @param numberInstances + // */ + // public void sendInstances(Stream inputStream, int numberInstances) { + // int numInstanceSent = 0; + // initStreamSource(sourceStream); + // + // while (streamSource.hasMoreInstances() && numInstanceSent < + // numberInstances) { + // numInstanceSent++; + // InstanceContentEvent contentEvent = new + // InstanceContentEvent(numInstanceSent, nextInstance(), true, true); + // inputStream.put(contentEvent); + // } + // + // sendEndEvaluationInstance(inputStream); + // } + + public StreamSource getStreamSource() { + return streamSource; + } + + public void setStreamSource(InstanceStream stream) { + this.sourceStream = stream; + } + + public Instances getDataset() { + if (firstInstance == null) { + initStreamSource(sourceStream); } - - private void increaseReadyEventIndex() { - readyEventIndex+=batchSize; - // if we exceed the max, cancel the timer - if (schedule != null && isFinished()) { - schedule.cancel(false); - } - } - - @Override - public void onCreate(int id) { - initStreamSource(sourceStream); - timer = Executors.newScheduledThreadPool(1); - logger.debug("Creating PrequentialSourceProcessor with id {}", id); + return firstInstance.dataset(); + } + + private Instance nextInstance() { + if (this.isInited) { + return streamSource.nextInstance().getData(); + } else { + this.isInited = true; + return firstInstance; } - - @Override - public Processor newProcessor(Processor p) { - PrequentialSourceProcessor newProcessor = new PrequentialSourceProcessor(); - PrequentialSourceProcessor originProcessor = (PrequentialSourceProcessor) p; - if (originProcessor.getStreamSource() != null) { - newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); - } - return newProcessor; + } + + // private void sendEndEvaluationInstance(Stream inputStream) { + // InstanceContentEvent contentEvent = new InstanceContentEvent(-1, + // firstInstance, false, true); + // contentEvent.setLast(true); + // inputStream.put(contentEvent); + // } + + private void initStreamSource(InstanceStream stream) { + if (stream instanceof AbstractOptionHandler) { + ((AbstractOptionHandler) (stream)).prepareForUse(); } -// /** -// * Method to send instances via input stream -// * -// * @param inputStream -// * @param numberInstances -// */ -// public void sendInstances(Stream inputStream, int numberInstances) { -// int numInstanceSent = 0; -// initStreamSource(sourceStream); -// -// while (streamSource.hasMoreInstances() && numInstanceSent < numberInstances) { -// numInstanceSent++; -// InstanceContentEvent contentEvent = new InstanceContentEvent(numInstanceSent, nextInstance(), true, true); -// inputStream.put(contentEvent); -// } -// -// sendEndEvaluationInstance(inputStream); -// } - - public StreamSource getStreamSource() { - return streamSource; - } + this.streamSource = new StreamSource(stream); + firstInstance = streamSource.nextInstance().getData(); + } - public void setStreamSource(InstanceStream stream) { - this.sourceStream = stream; - } + public void setMaxNumInstances(int value) { + numberInstances = value; + } - public Instances getDataset() { - if (firstInstance == null) { - initStreamSource(sourceStream); - } - return firstInstance.dataset(); - } + public int getMaxNumInstances() { + return this.numberInstances; + } - private Instance nextInstance() { - if (this.isInited) { - return streamSource.nextInstance().getData(); - } else { - this.isInited = true; - return firstInstance; - } - } + public void setSourceDelay(int delay) { + this.delay = delay; + } -// private void sendEndEvaluationInstance(Stream inputStream) { -// InstanceContentEvent contentEvent = new InstanceContentEvent(-1, firstInstance, false, true); -// contentEvent.setLast(true); -// inputStream.put(contentEvent); -// } + public int getSourceDelay() { + return this.delay; + } - private void initStreamSource(InstanceStream stream) { - if (stream instanceof AbstractOptionHandler) { - ((AbstractOptionHandler) (stream)).prepareForUse(); - } + public void setDelayBatchSize(int batch) { + this.batchSize = batch; + } - this.streamSource = new StreamSource(stream); - firstInstance = streamSource.nextInstance().getData(); - } + private class DelayTimeoutHandler implements Runnable { - public void setMaxNumInstances(int value) { - numberInstances = value; - } - - public int getMaxNumInstances() { - return this.numberInstances; + private PrequentialSourceProcessor processor; + + public DelayTimeoutHandler(PrequentialSourceProcessor processor) { + this.processor = processor; } - - public void setSourceDelay(int delay) { - this.delay = delay; - } - - public int getSourceDelay() { - return this.delay; - } - - public void setDelayBatchSize(int batch) { - this.batchSize = batch; - } - - private class DelayTimeoutHandler implements Runnable { - - private PrequentialSourceProcessor processor; - - public DelayTimeoutHandler(PrequentialSourceProcessor processor) { - this.processor = processor; - } - - public void run() { - processor.increaseReadyEventIndex(); - } + + public void run() { + processor.increaseReadyEventIndex(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java index 453d02d..4eca28c 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java @@ -31,60 +31,62 @@ import com.yahoo.labs.samoa.instances.Instance; /** * The Class StreamSource. */ -public class StreamSource implements java.io.Serializable{ +public class StreamSource implements java.io.Serializable { - /** + /** * */ - private static final long serialVersionUID = 3974668694861231236L; + private static final long serialVersionUID = 3974668694861231236L; - /** - * Instantiates a new stream source. - * - * @param stream the stream - */ - public StreamSource(InstanceStream stream) { - super(); - this.stream = stream; - } + /** + * Instantiates a new stream source. + * + * @param stream + * the stream + */ + public StreamSource(InstanceStream stream) { + super(); + this.stream = stream; + } - /** The stream. */ - protected InstanceStream stream; + /** The stream. */ + protected InstanceStream stream; - /** - * Gets the stream. - * - * @return the stream - */ - public InstanceStream getStream() { - return stream; - } + /** + * Gets the stream. + * + * @return the stream + */ + public InstanceStream getStream() { + return stream; + } - /** - * Next instance. - * - * @return the instance - */ - public Example<Instance> nextInstance() { - return stream.nextInstance(); - } + /** + * Next instance. + * + * @return the instance + */ + public Example<Instance> nextInstance() { + return stream.nextInstance(); + } - /** - * Sets the stream. - * - * @param stream the new stream - */ - public void setStream(InstanceStream stream) { - this.stream = stream; - } + /** + * Sets the stream. + * + * @param stream + * the new stream + */ + public void setStream(InstanceStream stream) { + this.stream = stream; + } - /** - * Checks for more instances. - * - * @return true, if successful - */ - public boolean hasMoreInstances() { - return this.stream.hasMoreInstances(); - } + /** + * Checks for more instances. + * + * @return true, if successful + */ + public boolean hasMoreInstances() { + return this.stream.hasMoreInstances(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java index 2e66e4b..980802f 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java @@ -39,147 +39,157 @@ import com.yahoo.labs.samoa.topology.Stream; * The Class StreamSourceProcessor. */ public class StreamSourceProcessor implements Processor { - - /** The Constant logger. */ - private static final Logger logger = LoggerFactory - .getLogger(StreamSourceProcessor.class); - /** - * - */ - private static final long serialVersionUID = -204182279475432739L; + /** The Constant logger. */ + private static final Logger logger = LoggerFactory + .getLogger(StreamSourceProcessor.class); - /** The stream source. */ - private StreamSource streamSource; - - /** - * Gets the stream source. - * - * @return the stream source - */ - public StreamSource getStreamSource() { - return streamSource; - } - - /** - * Sets the stream source. - * - * @param stream the new stream source - */ - public void setStreamSource(InstanceStream stream) { - this.streamSource = new StreamSource(stream); - firstInstance = streamSource.nextInstance().getData(); - } - - /** The number instances sent. */ - private long numberInstancesSent = 0; - - /** - * Send instances. - * @param inputStream the input stream - * @param numberInstances the number instances - * @param isTraining the is training - */ - public void sendInstances(Stream inputStream, - int numberInstances, boolean isTraining, boolean isTesting) { - int numberSamples = 0; - - while (streamSource.hasMoreInstances() - && numberSamples < numberInstances) { - - numberSamples++; - numberInstancesSent++; - InstanceContentEvent instanceContentEvent = new InstanceContentEvent( - numberInstancesSent, nextInstance(), isTraining, isTesting); - - - inputStream.put(instanceContentEvent); - } - - InstanceContentEvent instanceContentEvent = new InstanceContentEvent( - numberInstancesSent, null, isTraining, isTesting); - instanceContentEvent.setLast(true); - inputStream.put(instanceContentEvent); - } - - /** - * Send end evaluation instance. - * - * @param inputStream the input stream - */ - public void sendEndEvaluationInstance(Stream inputStream) { - InstanceContentEvent instanceContentEvent = new InstanceContentEvent(-1, firstInstance,false, true); - inputStream.put(instanceContentEvent); - } - - /** - * Next instance. - * - * @return the instance - */ - protected Instance nextInstance() { - if (this.isInited) { - return streamSource.nextInstance().getData(); - } else { - this.isInited = true; - return firstInstance; - } - } - - /** The is inited. */ - protected boolean isInited = false; - - /** The first instance. */ - protected Instance firstInstance; - - //@Override - /** - * On remove. - */ - protected void onRemove() { - } - - /* (non-Javadoc) - * @see samoa.core.Processor#onCreate(int) - */ - @Override - public void onCreate(int id) { - // TODO Auto-generated method stub - } - - /* (non-Javadoc) - * @see samoa.core.Processor#newProcessor(samoa.core.Processor) - */ - @Override - public Processor newProcessor(Processor sourceProcessor) { -// StreamSourceProcessor newProcessor = new StreamSourceProcessor(); -// StreamSourceProcessor originProcessor = (StreamSourceProcessor) sourceProcessor; -// if (originProcessor.getStreamSource() != null){ -// newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); -// } - //return newProcessor; - return null; - } - - /** - * On event. - * - * @param event the event - * @return true, if successful - */ - @Override - public boolean process(ContentEvent event) { - return false; - } - - - /** - * Gets the dataset. - * - * @return the dataset + /** + * */ - public Instances getDataset() { - return firstInstance.dataset(); - } + private static final long serialVersionUID = -204182279475432739L; + + /** The stream source. */ + private StreamSource streamSource; + + /** + * Gets the stream source. + * + * @return the stream source + */ + public StreamSource getStreamSource() { + return streamSource; + } + + /** + * Sets the stream source. + * + * @param stream + * the new stream source + */ + public void setStreamSource(InstanceStream stream) { + this.streamSource = new StreamSource(stream); + firstInstance = streamSource.nextInstance().getData(); + } + + /** The number instances sent. */ + private long numberInstancesSent = 0; + + /** + * Send instances. + * + * @param inputStream + * the input stream + * @param numberInstances + * the number instances + * @param isTraining + * the is training + */ + public void sendInstances(Stream inputStream, + int numberInstances, boolean isTraining, boolean isTesting) { + int numberSamples = 0; + + while (streamSource.hasMoreInstances() + && numberSamples < numberInstances) { + + numberSamples++; + numberInstancesSent++; + InstanceContentEvent instanceContentEvent = new InstanceContentEvent( + numberInstancesSent, nextInstance(), isTraining, isTesting); + + inputStream.put(instanceContentEvent); + } + + InstanceContentEvent instanceContentEvent = new InstanceContentEvent( + numberInstancesSent, null, isTraining, isTesting); + instanceContentEvent.setLast(true); + inputStream.put(instanceContentEvent); + } + + /** + * Send end evaluation instance. + * + * @param inputStream + * the input stream + */ + public void sendEndEvaluationInstance(Stream inputStream) { + InstanceContentEvent instanceContentEvent = new InstanceContentEvent(-1, firstInstance, false, true); + inputStream.put(instanceContentEvent); + } + + /** + * Next instance. + * + * @return the instance + */ + protected Instance nextInstance() { + if (this.isInited) { + return streamSource.nextInstance().getData(); + } else { + this.isInited = true; + return firstInstance; + } + } + + /** The is inited. */ + protected boolean isInited = false; + + /** The first instance. */ + protected Instance firstInstance; + + // @Override + /** + * On remove. + */ + protected void onRemove() { + } + + /* + * (non-Javadoc) + * + * @see samoa.core.Processor#onCreate(int) + */ + @Override + public void onCreate(int id) { + // TODO Auto-generated method stub + } + + /* + * (non-Javadoc) + * + * @see samoa.core.Processor#newProcessor(samoa.core.Processor) + */ + @Override + public Processor newProcessor(Processor sourceProcessor) { + // StreamSourceProcessor newProcessor = new StreamSourceProcessor(); + // StreamSourceProcessor originProcessor = (StreamSourceProcessor) + // sourceProcessor; + // if (originProcessor.getStreamSource() != null){ + // newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); + // } + // return newProcessor; + return null; + } + + /** + * On event. + * + * @param event + * the event + * @return true, if successful + */ + @Override + public boolean process(ContentEvent event) { + return false; + } + + /** + * Gets the dataset. + * + * @return the dataset + */ + public Instances getDataset() { + return firstInstance.dataset(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java index 25541e2..6d741f9 100644 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java @@ -26,42 +26,41 @@ import java.io.Serializable; /** * An interface for FileStream's source (Local FS, HDFS,...) + * * @author Casey */ public interface FileStreamSource extends Serializable { - /** - * Init the source with file/directory path and file extension - * @param path - * File or directory path - * @param ext - * File extension to be used to filter files in a directory. - * If null, all files in the directory are accepted. - */ - public void init(String path, String ext); - - /** - * Reset the source - */ - public void reset() throws IOException; - - /** - * Retrieve InputStream for next file. - * This method will return null if we are at the last file - * in the list. - * - * @return InputStream for next file in the list - */ - public InputStream getNextInputStream(); - - /** - * Retrieve InputStream for current file. - * The "current pointer" is moved forward - * with getNextInputStream method. So if there was no - * invocation of getNextInputStream, this method will - * return null. - * - * @return InputStream for current file in the list - */ - public InputStream getCurrentInputStream(); + /** + * Init the source with file/directory path and file extension + * + * @param path + * File or directory path + * @param ext + * File extension to be used to filter files in a directory. If null, + * all files in the directory are accepted. + */ + public void init(String path, String ext); + + /** + * Reset the source + */ + public void reset() throws IOException; + + /** + * Retrieve InputStream for next file. This method will return null if we are + * at the last file in the list. + * + * @return InputStream for next file in the list + */ + public InputStream getNextInputStream(); + + /** + * Retrieve InputStream for current file. The "current pointer" is moved + * forward with getNextInputStream method. So if there was no invocation of + * getNextInputStream, this method will return null. + * + * @return InputStream for current file in the list + */ + public InputStream getCurrentInputStream(); }
