http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/streams/generators/RandomTreeGenerator.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/streams/generators/RandomTreeGenerator.java b/samoa-api/src/main/java/org/apache/samoa/moa/streams/generators/RandomTreeGenerator.java new file mode 100644 index 0000000..69100aa --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/moa/streams/generators/RandomTreeGenerator.java @@ -0,0 +1,267 @@ +package org.apache.samoa.moa.streams.generators; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Random; + +import org.apache.samoa.instances.Attribute; +import org.apache.samoa.instances.DenseInstance; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.Instances; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.moa.core.FastVector; +import org.apache.samoa.moa.core.InstanceExample; +import org.apache.samoa.moa.core.ObjectRepository; +import org.apache.samoa.moa.options.AbstractOptionHandler; +import org.apache.samoa.moa.streams.InstanceStream; +import org.apache.samoa.moa.tasks.TaskMonitor; + +import com.github.javacliparser.FloatOption; +import com.github.javacliparser.IntOption; + +/** + * Stream generator for a stream based on a randomly generated tree.. + * + * @author Richard Kirkby ([email protected]) + * @version $Revision: 7 $ + */ +public class RandomTreeGenerator extends AbstractOptionHandler implements InstanceStream { + + @Override + public String getPurposeString() { + return "Generates a stream based on a randomly generated tree."; + } + + private static final long serialVersionUID = 1L; + + public IntOption treeRandomSeedOption = new IntOption("treeRandomSeed", + 'r', "Seed for random generation of tree.", 1); + + public IntOption instanceRandomSeedOption = new IntOption( + "instanceRandomSeed", 'i', + "Seed for random generation of instances.", 1); + + public IntOption numClassesOption = new IntOption("numClasses", 'c', + "The number of classes to generate.", 2, 2, Integer.MAX_VALUE); + + public IntOption numNominalsOption = new IntOption("numNominals", 'o', + "The number of nominal attributes to generate.", 5, 0, + Integer.MAX_VALUE); + + public IntOption numNumericsOption = new IntOption("numNumerics", 'u', + "The number of numeric attributes to generate.", 5, 0, + Integer.MAX_VALUE); + + public IntOption numValsPerNominalOption = new IntOption( + "numValsPerNominal", 'v', + "The number of values to generate per nominal attribute.", 5, 2, + Integer.MAX_VALUE); + + public IntOption maxTreeDepthOption = new IntOption("maxTreeDepth", 'd', + "The maximum depth of the tree concept.", 5, 0, Integer.MAX_VALUE); + + public IntOption firstLeafLevelOption = new IntOption( + "firstLeafLevel", + 'l', + "The first level of the tree above maxTreeDepth that can have leaves.", + 3, 0, Integer.MAX_VALUE); + + public FloatOption leafFractionOption = new FloatOption("leafFraction", + 'f', + "The fraction of leaves per level from firstLeafLevel onwards.", + 0.15, 0.0, 1.0); + + protected static class Node implements Serializable { + + private static final long serialVersionUID = 1L; + + public int classLabel; + + public int splitAttIndex; + + public double splitAttValue; + + public Node[] children; + } + + protected Node treeRoot; + + protected InstancesHeader streamHeader; + + protected Random instanceRandom; + + @Override + public void prepareForUseImpl(TaskMonitor monitor, + ObjectRepository repository) { + monitor.setCurrentActivity("Preparing random tree...", -1.0); + generateHeader(); + generateRandomTree(); + restart(); + } + + @Override + public long estimatedRemainingInstances() { + return -1; + } + + @Override + public boolean isRestartable() { + return true; + } + + @Override + public void restart() { + this.instanceRandom = new Random(this.instanceRandomSeedOption.getValue()); + } + + @Override + public InstancesHeader getHeader() { + return this.streamHeader; + } + + @Override + public boolean hasMoreInstances() { + return true; + } + + @Override + public InstanceExample nextInstance() { + double[] attVals = new double[this.numNominalsOption.getValue() + + this.numNumericsOption.getValue()]; + InstancesHeader header = getHeader(); + Instance inst = new DenseInstance(header.numAttributes()); + for (int i = 0; i < attVals.length; i++) { + attVals[i] = i < this.numNominalsOption.getValue() ? this.instanceRandom.nextInt(this.numValsPerNominalOption + .getValue()) + : this.instanceRandom.nextDouble(); + inst.setValue(i, attVals[i]); + } + inst.setDataset(header); + inst.setClassValue(classifyInstance(this.treeRoot, attVals)); + return new InstanceExample(inst); + } + + protected int classifyInstance(Node node, double[] attVals) { + if (node.children == null) { + return node.classLabel; + } + if (node.splitAttIndex < this.numNominalsOption.getValue()) { + return classifyInstance( + node.children[(int) attVals[node.splitAttIndex]], attVals); + } + return classifyInstance( + node.children[attVals[node.splitAttIndex] < node.splitAttValue ? 0 + : 1], attVals); + } + + protected void generateHeader() { + FastVector<Attribute> attributes = new FastVector<>(); + FastVector<String> nominalAttVals = new FastVector<>(); + for (int i = 0; i < this.numValsPerNominalOption.getValue(); i++) { + nominalAttVals.addElement("value" + (i + 1)); + } + for (int i = 0; i < this.numNominalsOption.getValue(); i++) { + attributes.addElement(new Attribute("nominal" + (i + 1), + nominalAttVals)); + } + for (int i = 0; i < this.numNumericsOption.getValue(); i++) { + attributes.addElement(new Attribute("numeric" + (i + 1))); + } + FastVector<String> classLabels = new FastVector<>(); + for (int i = 0; i < this.numClassesOption.getValue(); i++) { + classLabels.addElement("class" + (i + 1)); + } + attributes.addElement(new Attribute("class", classLabels)); + this.streamHeader = new InstancesHeader(new Instances( + getCLICreationString(InstanceStream.class), attributes, 0)); + this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1); + } + + protected void generateRandomTree() { + Random treeRand = new Random(this.treeRandomSeedOption.getValue()); + ArrayList<Integer> nominalAttCandidates = new ArrayList<>( + this.numNominalsOption.getValue()); + for (int i = 0; i < this.numNominalsOption.getValue(); i++) { + nominalAttCandidates.add(i); + } + double[] minNumericVals = new double[this.numNumericsOption.getValue()]; + double[] maxNumericVals = new double[this.numNumericsOption.getValue()]; + for (int i = 0; i < this.numNumericsOption.getValue(); i++) { + minNumericVals[i] = 0.0; + maxNumericVals[i] = 1.0; + } + this.treeRoot = generateRandomTreeNode(0, nominalAttCandidates, + minNumericVals, maxNumericVals, treeRand); + } + + protected Node generateRandomTreeNode(int currentDepth, + ArrayList<Integer> nominalAttCandidates, double[] minNumericVals, + double[] maxNumericVals, Random treeRand) { + if ((currentDepth >= this.maxTreeDepthOption.getValue()) + || ((currentDepth >= this.firstLeafLevelOption.getValue()) && (this.leafFractionOption.getValue() >= (1.0 - treeRand + .nextDouble())))) { + Node leaf = new Node(); + leaf.classLabel = treeRand.nextInt(this.numClassesOption.getValue()); + return leaf; + } + Node node = new Node(); + int chosenAtt = treeRand.nextInt(nominalAttCandidates.size() + + this.numNumericsOption.getValue()); + if (chosenAtt < nominalAttCandidates.size()) { + node.splitAttIndex = nominalAttCandidates.get(chosenAtt); + node.children = new Node[this.numValsPerNominalOption.getValue()]; + ArrayList<Integer> newNominalCandidates = new ArrayList<>( + nominalAttCandidates); + newNominalCandidates.remove(new Integer(node.splitAttIndex)); + newNominalCandidates.trimToSize(); + for (int i = 0; i < node.children.length; i++) { + node.children[i] = generateRandomTreeNode(currentDepth + 1, + newNominalCandidates, minNumericVals, maxNumericVals, + treeRand); + } + } else { + int numericIndex = chosenAtt - nominalAttCandidates.size(); + node.splitAttIndex = this.numNominalsOption.getValue() + + numericIndex; + double minVal = minNumericVals[numericIndex]; + double maxVal = maxNumericVals[numericIndex]; + node.splitAttValue = ((maxVal - minVal) * treeRand.nextDouble()) + + minVal; + node.children = new Node[2]; + double[] newMaxVals = maxNumericVals.clone(); + newMaxVals[numericIndex] = node.splitAttValue; + node.children[0] = generateRandomTreeNode(currentDepth + 1, + nominalAttCandidates, minNumericVals, newMaxVals, treeRand); + double[] newMinVals = minNumericVals.clone(); + newMinVals[numericIndex] = node.splitAttValue; + node.children[1] = generateRandomTreeNode(currentDepth + 1, + nominalAttCandidates, newMinVals, maxNumericVals, treeRand); + } + return node; + } + + @Override + public void getDescription(StringBuilder sb, int indent) { + // TODO Auto-generated method stub + } +}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/tasks/NullMonitor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/tasks/NullMonitor.java b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/NullMonitor.java new file mode 100644 index 0000000..a4fcd33 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/NullMonitor.java @@ -0,0 +1,102 @@ +package org.apache.samoa.moa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Class that represents a null monitor. + * + * @author Richard Kirkby ([email protected]) + * @version $Revision: 7 $ + */ +public class NullMonitor implements TaskMonitor { + + @Override + public void setCurrentActivity(String activityDescription, + double fracComplete) { + } + + @Override + public void setCurrentActivityDescription(String activity) { + } + + @Override + public void setCurrentActivityFractionComplete(double fracComplete) { + } + + @Override + public boolean taskShouldAbort() { + return false; + } + + @Override + public String getCurrentActivityDescription() { + return null; + } + + @Override + public double getCurrentActivityFractionComplete() { + return -1.0; + } + + @Override + public boolean isPaused() { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public void requestCancel() { + } + + @Override + public void requestPause() { + } + + @Override + public void requestResume() { + } + + @Override + public Object getLatestResultPreview() { + return null; + } + + @Override + public void requestResultPreview() { + } + + @Override + public boolean resultPreviewRequested() { + return false; + } + + @Override + public void setLatestResultPreview(Object latestPreview) { + } + + @Override + public void requestResultPreview(ResultPreviewListener toInform) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/tasks/ResultPreviewListener.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/tasks/ResultPreviewListener.java b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/ResultPreviewListener.java new file mode 100644 index 0000000..f8dfadf --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/ResultPreviewListener.java @@ -0,0 +1,37 @@ +package org.apache.samoa.moa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Interface implemented by classes that preview results on the Graphical User Interface + * + * @author Richard Kirkby ([email protected]) + * @version $Revision: 7 $ + */ +public interface ResultPreviewListener { + + /** + * This method is used to receive a signal from <code>TaskMonitor</code> that the lastest preview has changed. This + * method is implemented in <code>PreviewPanel</code> to change the results that are shown in its panel. + * + */ + public void latestPreviewChanged(); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/tasks/Task.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/tasks/Task.java b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/Task.java new file mode 100644 index 0000000..6776a05 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/Task.java @@ -0,0 +1,60 @@ +package org.apache.samoa.moa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.moa.MOAObject; +import org.apache.samoa.moa.core.ObjectRepository; + +/** + * Interface representing a task. + * + * @author Richard Kirkby ([email protected]) + * @version $Revision: 7 $ + */ +public interface Task extends MOAObject { + + /** + * Gets the result type of this task. Tasks can return LearningCurve, LearningEvaluation, Classifier, String, + * Instances.. + * + * @return a class object of the result of this task + */ + public Class<?> getTaskResultType(); + + /** + * This method performs this task, when TaskMonitor and ObjectRepository are no needed. + * + * @return an object with the result of this task + */ + public Object doTask(); + + /** + * This method performs this task. <code>AbstractTask</code> implements this method so all its extensions only need to + * implement <code>doTaskImpl</code> + * + * @param monitor + * the TaskMonitor to use + * @param repository + * the ObjectRepository to use + * @return an object with the result of this task + */ + public Object doTask(TaskMonitor monitor, ObjectRepository repository); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/tasks/TaskMonitor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/tasks/TaskMonitor.java b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/TaskMonitor.java new file mode 100644 index 0000000..4eaffaf --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/moa/tasks/TaskMonitor.java @@ -0,0 +1,146 @@ +package org.apache.samoa.moa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Interface representing a task monitor. + * + * @author Richard Kirkby ([email protected]) + * @version $Revision: 7 $ + */ +public interface TaskMonitor { + + /** + * Sets the description and the percentage done of the current activity. + * + * @param activity + * the description of the current activity + * @param fracComplete + * the percentage done of the current activity + */ + public void setCurrentActivity(String activityDescription, + double fracComplete); + + /** + * Sets the description of the current activity. + * + * @param activity + * the description of the current activity + */ + public void setCurrentActivityDescription(String activity); + + /** + * Sets the percentage done of the current activity + * + * @param fracComplete + * the percentage done of the current activity + */ + public void setCurrentActivityFractionComplete(double fracComplete); + + /** + * Gets whether the task should abort. + * + * @return true if the task should abort + */ + public boolean taskShouldAbort(); + + /** + * Gets whether there is a request for preview the task result. + * + * @return true if there is a request for preview the task result + */ + public boolean resultPreviewRequested(); + + /** + * Sets the current result to preview + * + * @param latestPreview + * the result to preview + */ + public void setLatestResultPreview(Object latestPreview); + + /** + * Gets the description of the current activity. + * + * @return the description of the current activity + */ + public String getCurrentActivityDescription(); + + /** + * Gets the percentage done of the current activity + * + * @return the percentage done of the current activity + */ + public double getCurrentActivityFractionComplete(); + + /** + * Requests the task monitored to pause. + * + */ + public void requestPause(); + + /** + * Requests the task monitored to resume. + * + */ + public void requestResume(); + + /** + * Requests the task monitored to cancel. + * + */ + public void requestCancel(); + + /** + * Gets whether the task monitored is paused. + * + * @return true if the task is paused + */ + public boolean isPaused(); + + /** + * Gets whether the task monitored is cancelled. + * + * @return true if the task is cancelled + */ + public boolean isCancelled(); + + /** + * Requests to preview the task result. + * + */ + public void requestResultPreview(); + + /** + * Requests to preview the task result. + * + * @param toInform + * the listener of the changes in the preview of the result + */ + public void requestResultPreview(ResultPreviewListener toInform); + + /** + * Gets the current result to preview + * + * @return the result to preview + */ + public Object getLatestResultPreview(); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java new file mode 100644 index 0000000..099f639 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java @@ -0,0 +1,120 @@ +package org.apache.samoa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; + +import org.apache.samoa.instances.Instances; +import org.apache.samoa.moa.core.InstanceExample; +import org.apache.samoa.moa.core.ObjectRepository; +import org.apache.samoa.moa.tasks.TaskMonitor; + +import com.github.javacliparser.FileOption; +import com.github.javacliparser.IntOption; + +/** + * InstanceStream for ARFF file + * + * @author Casey + */ +public class ArffFileStream extends FileStream { + + public FileOption arffFileOption = new FileOption("arffFile", 'f', + "ARFF File(s) to load.", null, null, false); + + public IntOption classIndexOption = new IntOption("classIndex", 'c', + "Class index of data. 0 for none or -1 for last attribute in file.", + -1, -1, Integer.MAX_VALUE); + + protected InstanceExample lastInstanceRead; + + @Override + public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { + super.prepareForUseImpl(monitor, repository); + String filePath = this.arffFileOption.getFile().getAbsolutePath(); + this.fileSource.init(filePath, "arff"); + this.lastInstanceRead = null; + } + + @Override + protected void reset() { + try { + if (this.fileReader != null) + this.fileReader.close(); + + fileSource.reset(); + } catch (IOException ioe) { + throw new RuntimeException("FileStream restart failed.", ioe); + } + + if (!getNextFileReader()) { + hitEndOfStream = true; + throw new RuntimeException("FileStream is empty."); + } + } + + @Override + protected boolean getNextFileReader() { + boolean ret = super.getNextFileReader(); + if (ret) { + this.instances = new Instances(this.fileReader, 1, -1); + if (this.classIndexOption.getValue() < 0) { + this.instances.setClassIndex(this.instances.numAttributes() - 1); + } else if (this.classIndexOption.getValue() > 0) { + this.instances.setClassIndex(this.classIndexOption.getValue() - 1); + } + } + return ret; + } + + @Override + protected boolean readNextInstanceFromFile() { + try { + if (this.instances.readInstance(this.fileReader)) { + this.lastInstanceRead = new InstanceExample(this.instances.instance(0)); + this.instances.delete(); // keep instances clean + return true; + } + if (this.fileReader != null) { + this.fileReader.close(); + this.fileReader = null; + } + return false; + } catch (IOException ioe) { + throw new RuntimeException( + "ArffFileStream failed to read instance from stream.", ioe); + } + + } + + @Override + protected InstanceExample getLastInstanceRead() { + return this.lastInstanceRead; + } + + /* + * extend org.apache.samoa.moa.MOAObject + */ + @Override + public void getDescription(StringBuilder sb, int indent) { + // TODO Auto-generated method stub + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/ClusteringEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/ClusteringEntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/ClusteringEntranceProcessor.java new file mode 100644 index 0000000..db6f698 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/ClusteringEntranceProcessor.java @@ -0,0 +1,252 @@ +package org.apache.samoa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.Random; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.evaluation.ClusteringEvaluationContentEvent; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.Instances; +import org.apache.samoa.learners.clusterers.ClusteringContentEvent; +import org.apache.samoa.moa.cluster.Clustering; +import org.apache.samoa.moa.core.DataPoint; +import org.apache.samoa.moa.options.AbstractOptionHandler; +import org.apache.samoa.moa.streams.InstanceStream; +import org.apache.samoa.moa.streams.clustering.ClusteringStream; +import org.apache.samoa.moa.streams.clustering.RandomRBFGeneratorEvents; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * EntranceProcessor for Clustering Evaluation Task. + * + */ +public final class ClusteringEntranceProcessor implements EntranceProcessor { + + private static final long serialVersionUID = 4169053337917578558L; + + private static final Logger logger = LoggerFactory.getLogger(ClusteringEntranceProcessor.class); + + private StreamSource streamSource; + private Instance firstInstance; + private boolean isInited = false; + private Random random = new Random(); + private double samplingThreshold; + private int numberInstances; + private int numInstanceSent = 0; + + private int groundTruthSamplingFrequency; + + @Override + public boolean process(ContentEvent event) { + // TODO: possible refactor of the super-interface implementation + // of source processor does not need this method + return false; + } + + @Override + public void onCreate(int id) { + logger.debug("Creating ClusteringSourceProcessor with id {}", id); + } + + @Override + public Processor newProcessor(Processor p) { + ClusteringEntranceProcessor newProcessor = new ClusteringEntranceProcessor(); + ClusteringEntranceProcessor originProcessor = (ClusteringEntranceProcessor) p; + if (originProcessor.getStreamSource() != null) { + newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); + } + return newProcessor; + } + + @Override + public boolean hasNext() { + return (!isFinished()); + } + + @Override + public boolean isFinished() { + return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && numInstanceSent >= numberInstances)); + } + + // /** + // * Method to send instances via input stream + // * + // * @param inputStream + // * @param numberInstances + // */ + // public void sendInstances(Stream inputStream, Stream evaluationStream, int + // numberInstances, double samplingThreshold) { + // int numInstanceSent = 0; + // this.samplingThreshold = samplingThreshold; + // while (streamSource.hasMoreInstances() && numInstanceSent < + // numberInstances) { + // numInstanceSent++; + // DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent); + // ClusteringContentEvent contentEvent = new + // ClusteringContentEvent(numInstanceSent, nextDataPoint); + // inputStream.put(contentEvent); + // sendPointsAndGroundTruth(streamSource, evaluationStream, numInstanceSent, + // nextDataPoint); + // } + // + // sendEndEvaluationInstance(inputStream); + // } + + public double getSamplingThreshold() { + return samplingThreshold; + } + + public void setSamplingThreshold(double samplingThreshold) { + this.samplingThreshold = samplingThreshold; + } + + public int getGroundTruthSamplingFrequency() { + return groundTruthSamplingFrequency; + } + + public void setGroundTruthSamplingFrequency(int groundTruthSamplingFrequency) { + this.groundTruthSamplingFrequency = groundTruthSamplingFrequency; + } + + public StreamSource getStreamSource() { + return streamSource; + } + + public void setStreamSource(InstanceStream stream) { + if (stream instanceof AbstractOptionHandler) { + ((AbstractOptionHandler) (stream)).prepareForUse(); + } + + this.streamSource = new StreamSource(stream); + firstInstance = streamSource.nextInstance().getData(); + } + + public Instances getDataset() { + return firstInstance.dataset(); + } + + private Instance nextInstance() { + if (this.isInited) { + return streamSource.nextInstance().getData(); + } else { + this.isInited = true; + return firstInstance; + } + } + + // private void sendEndEvaluationInstance(Stream inputStream) { + // ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, + // firstInstance); + // contentEvent.setLast(true); + // inputStream.put(contentEvent); + // } + + // private void sendPointsAndGroundTruth(StreamSource sourceStream, Stream + // evaluationStream, int numInstanceSent, DataPoint nextDataPoint) { + // boolean sendEvent = false; + // DataPoint instance = null; + // Clustering gtClustering = null; + // int samplingFrequency = ((ClusteringStream) + // sourceStream.getStream()).getDecayHorizon(); + // if (random.nextDouble() < samplingThreshold) { + // // Add instance + // sendEvent = true; + // instance = nextDataPoint; + // } + // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) { + // // Add GroundTruth + // sendEvent = true; + // gtClustering = ((RandomRBFGeneratorEvents) + // sourceStream.getStream()).getGeneratingClusters(); + // } + // if (sendEvent == true) { + // ClusteringEvaluationContentEvent evalEvent; + // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance, + // false); + // evaluationStream.put(evalEvent); + // } + // } + + public void setMaxNumInstances(int value) { + numberInstances = value; + } + + public int getMaxNumInstances() { + return this.numberInstances; + } + + @Override + public ContentEvent nextEvent() { + + // boolean sendEvent = false; + // DataPoint instance = null; + // Clustering gtClustering = null; + // int samplingFrequency = ((ClusteringStream) + // sourceStream.getStream()).getDecayHorizon(); + // if (random.nextDouble() < samplingThreshold) { + // // Add instance + // sendEvent = true; + // instance = nextDataPoint; + // } + // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) { + // // Add GroundTruth + // sendEvent = true; + // gtClustering = ((RandomRBFGeneratorEvents) + // sourceStream.getStream()).getGeneratingClusters(); + // } + // if (sendEvent == true) { + // ClusteringEvaluationContentEvent evalEvent; + // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance, + // false); + // evaluationStream.put(evalEvent); + // } + + groundTruthSamplingFrequency = ((ClusteringStream) streamSource.getStream()).getDecayHorizon(); // FIXME should it be takend from the ClusteringEvaluation -f option instead? + if (isFinished()) { + // send ending event + ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, firstInstance); + contentEvent.setLast(true); + return contentEvent; + } else { + DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent); + numInstanceSent++; + if (numInstanceSent % groundTruthSamplingFrequency == 0) { + // TODO implement an interface ClusteringGroundTruth with a + // getGeneratingClusters() method, check if the source implements the interface + // send a clustering evaluation event for external measures (distance from the gt clusters) + Clustering gtClustering = ((RandomRBFGeneratorEvents) streamSource.getStream()).getGeneratingClusters(); + return new ClusteringEvaluationContentEvent(gtClustering, nextDataPoint, false); + } else { + ClusteringContentEvent contentEvent = new ClusteringContentEvent(numInstanceSent, nextDataPoint); + if (random.nextDouble() < samplingThreshold) { + // send a clustering content event for internal measures (cohesion, + // separation) + contentEvent.setSample(true); + } + return contentEvent; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java new file mode 100644 index 0000000..4f07ed2 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java @@ -0,0 +1,174 @@ +package org.apache.samoa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; + +import org.apache.samoa.instances.Instances; +import org.apache.samoa.instances.InstancesHeader; +import org.apache.samoa.moa.core.InstanceExample; +import org.apache.samoa.moa.core.ObjectRepository; +import org.apache.samoa.moa.options.AbstractOptionHandler; +import org.apache.samoa.moa.streams.InstanceStream; +import org.apache.samoa.moa.tasks.TaskMonitor; +import org.apache.samoa.streams.fs.FileStreamSource; + +import com.github.javacliparser.ClassOption; + +/** + * InstanceStream for files (Abstract class: subclass this class for different file formats) + * + * @author Casey + */ +public abstract class FileStream extends AbstractOptionHandler implements InstanceStream { + /** + * + */ + private static final long serialVersionUID = 3028905554604259130L; + + public ClassOption sourceTypeOption = new ClassOption("sourceType", + 's', "Source Type (HDFS, local FS)", FileStreamSource.class, + "LocalFileStreamSource"); + + protected transient FileStreamSource fileSource; + protected transient Reader fileReader; + protected Instances instances; + + protected boolean hitEndOfStream; + private boolean hasStarted; + + /* + * Constructors + */ + public FileStream() { + this.hitEndOfStream = false; + } + + /* + * implement InstanceStream + */ + @Override + public InstancesHeader getHeader() { + return new InstancesHeader(this.instances); + } + + @Override + public long estimatedRemainingInstances() { + return -1; + } + + @Override + public boolean hasMoreInstances() { + return !this.hitEndOfStream; + } + + @Override + public InstanceExample nextInstance() { + if (this.getLastInstanceRead() == null) { + readNextInstanceFromStream(); + } + InstanceExample prevInstance = this.getLastInstanceRead(); + readNextInstanceFromStream(); + return prevInstance; + } + + @Override + public boolean isRestartable() { + return true; + } + + @Override + public void restart() { + reset(); + hasStarted = false; + } + + protected void reset() { + try { + if (this.fileReader != null) + this.fileReader.close(); + + fileSource.reset(); + } catch (IOException ioe) { + throw new RuntimeException("FileStream restart failed.", ioe); + } + + if (!getNextFileReader()) { + hitEndOfStream = true; + throw new RuntimeException("FileStream is empty."); + } + + this.instances = new Instances(this.fileReader, 1, -1); + this.instances.setClassIndex(this.instances.numAttributes() - 1); + } + + protected boolean getNextFileReader() { + if (this.fileReader != null) + try { + this.fileReader.close(); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + + InputStream inputStream = this.fileSource.getNextInputStream(); + if (inputStream == null) + return false; + + this.fileReader = new BufferedReader(new InputStreamReader(inputStream)); + return true; + } + + protected boolean readNextInstanceFromStream() { + if (!hasStarted) { + this.reset(); + hasStarted = true; + } + + while (true) { + if (readNextInstanceFromFile()) + return true; + + if (!getNextFileReader()) { + this.hitEndOfStream = true; + return false; + } + } + } + + /** + * Read next instance from the current file and assign it to lastInstanceRead. + * + * @return true if it was able to read next instance and false if it was at the end of the file + */ + protected abstract boolean readNextInstanceFromFile(); + + protected abstract InstanceExample getLastInstanceRead(); + + @Override + public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { + this.fileSource = sourceTypeOption.getValue(); + this.hasStarted = false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/PrequentialSourceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/PrequentialSourceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/PrequentialSourceProcessor.java new file mode 100644 index 0000000..b947b2f --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/PrequentialSourceProcessor.java @@ -0,0 +1,231 @@ +package org.apache.samoa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.Instances; +import org.apache.samoa.learners.InstanceContentEvent; +import org.apache.samoa.moa.options.AbstractOptionHandler; +import org.apache.samoa.moa.streams.InstanceStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Prequential Source Processor is the processor for Prequential Evaluation Task. + * + * @author Arinto Murdopo + * + */ +public final class PrequentialSourceProcessor implements EntranceProcessor { + + private static final long serialVersionUID = 4169053337917578558L; + + private static final Logger logger = LoggerFactory.getLogger(PrequentialSourceProcessor.class); + private boolean isInited = false; + private StreamSource streamSource; + private Instance firstInstance; + private int numberInstances; + private int numInstanceSent = 0; + + protected InstanceStream sourceStream; + + /* + * ScheduledExecutorService to schedule sending events after each delay + * interval. It is expected to have only one event in the queue at a time, so + * we need only one thread in the pool. + */ + private transient ScheduledExecutorService timer; + private transient ScheduledFuture<?> schedule = null; + private int readyEventIndex = 1; // No waiting for the first event + private int delay = 0; + private int batchSize = 1; + private boolean finished = false; + + @Override + public boolean process(ContentEvent event) { + // TODO: possible refactor of the super-interface implementation + // of source processor does not need this method + return false; + } + + @Override + public boolean isFinished() { + return finished; + } + + @Override + public boolean hasNext() { + return !isFinished() && (delay <= 0 || numInstanceSent < readyEventIndex); + } + + private boolean hasReachedEndOfStream() { + return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && numInstanceSent >= numberInstances)); + } + + @Override + public ContentEvent nextEvent() { + InstanceContentEvent contentEvent = null; + if (hasReachedEndOfStream()) { + contentEvent = new InstanceContentEvent(-1, firstInstance, false, true); + contentEvent.setLast(true); + // set finished status _after_ tagging last event + finished = true; + } + else if (hasNext()) { + numInstanceSent++; + contentEvent = new InstanceContentEvent(numInstanceSent, nextInstance(), true, true); + + // first call to this method will trigger the timer + if (schedule == null && delay > 0) { + schedule = timer.scheduleWithFixedDelay(new DelayTimeoutHandler(this), delay, delay, + TimeUnit.MICROSECONDS); + } + } + return contentEvent; + } + + private void increaseReadyEventIndex() { + readyEventIndex += batchSize; + // if we exceed the max, cancel the timer + if (schedule != null && isFinished()) { + schedule.cancel(false); + } + } + + @Override + public void onCreate(int id) { + initStreamSource(sourceStream); + timer = Executors.newScheduledThreadPool(1); + logger.debug("Creating PrequentialSourceProcessor with id {}", id); + } + + @Override + public Processor newProcessor(Processor p) { + PrequentialSourceProcessor newProcessor = new PrequentialSourceProcessor(); + PrequentialSourceProcessor originProcessor = (PrequentialSourceProcessor) p; + if (originProcessor.getStreamSource() != null) { + newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); + } + return newProcessor; + } + + // /** + // * Method to send instances via input stream + // * + // * @param inputStream + // * @param numberInstances + // */ + // public void sendInstances(Stream inputStream, int numberInstances) { + // int numInstanceSent = 0; + // initStreamSource(sourceStream); + // + // while (streamSource.hasMoreInstances() && numInstanceSent < + // numberInstances) { + // numInstanceSent++; + // InstanceContentEvent contentEvent = new + // InstanceContentEvent(numInstanceSent, nextInstance(), true, true); + // inputStream.put(contentEvent); + // } + // + // sendEndEvaluationInstance(inputStream); + // } + + public StreamSource getStreamSource() { + return streamSource; + } + + public void setStreamSource(InstanceStream stream) { + this.sourceStream = stream; + } + + public Instances getDataset() { + if (firstInstance == null) { + initStreamSource(sourceStream); + } + return firstInstance.dataset(); + } + + private Instance nextInstance() { + if (this.isInited) { + return streamSource.nextInstance().getData(); + } else { + this.isInited = true; + return firstInstance; + } + } + + // private void sendEndEvaluationInstance(Stream inputStream) { + // InstanceContentEvent contentEvent = new InstanceContentEvent(-1, + // firstInstance, false, true); + // contentEvent.setLast(true); + // inputStream.put(contentEvent); + // } + + private void initStreamSource(InstanceStream stream) { + if (stream instanceof AbstractOptionHandler) { + ((AbstractOptionHandler) (stream)).prepareForUse(); + } + + this.streamSource = new StreamSource(stream); + firstInstance = streamSource.nextInstance().getData(); + } + + public void setMaxNumInstances(int value) { + numberInstances = value; + } + + public int getMaxNumInstances() { + return this.numberInstances; + } + + public void setSourceDelay(int delay) { + this.delay = delay; + } + + public int getSourceDelay() { + return this.delay; + } + + public void setDelayBatchSize(int batch) { + this.batchSize = batch; + } + + private class DelayTimeoutHandler implements Runnable { + + private PrequentialSourceProcessor processor; + + public DelayTimeoutHandler(PrequentialSourceProcessor processor) { + this.processor = processor; + } + + public void run() { + processor.increaseReadyEventIndex(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/StreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/StreamSource.java b/samoa-api/src/main/java/org/apache/samoa/streams/StreamSource.java new file mode 100644 index 0000000..f4dba63 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/StreamSource.java @@ -0,0 +1,92 @@ +package org.apache.samoa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import org.apache.samoa.instances.Instance; +import org.apache.samoa.moa.core.Example; +import org.apache.samoa.moa.streams.InstanceStream; + +/** + * The Class StreamSource. + */ +public class StreamSource implements java.io.Serializable { + + /** + * + */ + private static final long serialVersionUID = 3974668694861231236L; + + /** + * Instantiates a new stream source. + * + * @param stream + * the stream + */ + public StreamSource(InstanceStream stream) { + super(); + this.stream = stream; + } + + /** The stream. */ + protected InstanceStream stream; + + /** + * Gets the stream. + * + * @return the stream + */ + public InstanceStream getStream() { + return stream; + } + + /** + * Next instance. + * + * @return the instance + */ + public Example<Instance> nextInstance() { + return stream.nextInstance(); + } + + /** + * Sets the stream. + * + * @param stream + * the new stream + */ + public void setStream(InstanceStream stream) { + this.stream = stream; + } + + /** + * Checks for more instances. + * + * @return true, if successful + */ + public boolean hasMoreInstances() { + return this.stream.hasMoreInstances(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/StreamSourceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/StreamSourceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/StreamSourceProcessor.java new file mode 100644 index 0000000..d34f701 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/StreamSourceProcessor.java @@ -0,0 +1,194 @@ +package org.apache.samoa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * License + */ + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.instances.Instance; +import org.apache.samoa.instances.Instances; +import org.apache.samoa.learners.InstanceContentEvent; +import org.apache.samoa.moa.streams.InstanceStream; +import org.apache.samoa.topology.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The Class StreamSourceProcessor. + */ +public class StreamSourceProcessor implements Processor { + + /** The Constant logger. */ + private static final Logger logger = LoggerFactory + .getLogger(StreamSourceProcessor.class); + + /** + * + */ + private static final long serialVersionUID = -204182279475432739L; + + /** The stream source. */ + private StreamSource streamSource; + + /** + * Gets the stream source. + * + * @return the stream source + */ + public StreamSource getStreamSource() { + return streamSource; + } + + /** + * Sets the stream source. + * + * @param stream + * the new stream source + */ + public void setStreamSource(InstanceStream stream) { + this.streamSource = new StreamSource(stream); + firstInstance = streamSource.nextInstance().getData(); + } + + /** The number instances sent. */ + private long numberInstancesSent = 0; + + /** + * Send instances. + * + * @param inputStream + * the input stream + * @param numberInstances + * the number instances + * @param isTraining + * the is training + */ + public void sendInstances(Stream inputStream, + int numberInstances, boolean isTraining, boolean isTesting) { + int numberSamples = 0; + + while (streamSource.hasMoreInstances() + && numberSamples < numberInstances) { + + numberSamples++; + numberInstancesSent++; + InstanceContentEvent instanceContentEvent = new InstanceContentEvent( + numberInstancesSent, nextInstance(), isTraining, isTesting); + + inputStream.put(instanceContentEvent); + } + + InstanceContentEvent instanceContentEvent = new InstanceContentEvent( + numberInstancesSent, null, isTraining, isTesting); + instanceContentEvent.setLast(true); + inputStream.put(instanceContentEvent); + } + + /** + * Send end evaluation instance. + * + * @param inputStream + * the input stream + */ + public void sendEndEvaluationInstance(Stream inputStream) { + InstanceContentEvent instanceContentEvent = new InstanceContentEvent(-1, firstInstance, false, true); + inputStream.put(instanceContentEvent); + } + + /** + * Next instance. + * + * @return the instance + */ + protected Instance nextInstance() { + if (this.isInited) { + return streamSource.nextInstance().getData(); + } else { + this.isInited = true; + return firstInstance; + } + } + + /** The is inited. */ + protected boolean isInited = false; + + /** The first instance. */ + protected Instance firstInstance; + + // @Override + /** + * On remove. + */ + protected void onRemove() { + } + + /* + * (non-Javadoc) + * + * @see samoa.core.Processor#onCreate(int) + */ + @Override + public void onCreate(int id) { + // TODO Auto-generated method stub + } + + /* + * (non-Javadoc) + * + * @see samoa.core.Processor#newProcessor(samoa.core.Processor) + */ + @Override + public Processor newProcessor(Processor sourceProcessor) { + // StreamSourceProcessor newProcessor = new StreamSourceProcessor(); + // StreamSourceProcessor originProcessor = (StreamSourceProcessor) + // sourceProcessor; + // if (originProcessor.getStreamSource() != null){ + // newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); + // } + // return newProcessor; + return null; + } + + /** + * On event. + * + * @param event + * the event + * @return true, if successful + */ + @Override + public boolean process(ContentEvent event) { + return false; + } + + /** + * Gets the dataset. + * + * @return the dataset + */ + public Instances getDataset() { + return firstInstance.dataset(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/fs/FileStreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/fs/FileStreamSource.java b/samoa-api/src/main/java/org/apache/samoa/streams/fs/FileStreamSource.java new file mode 100644 index 0000000..32552a5 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/fs/FileStreamSource.java @@ -0,0 +1,64 @@ +package org.apache.samoa.streams.fs; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.InputStream; +import java.io.IOException; +import java.io.Serializable; + +/** + * An interface for FileStream's source (Local FS, HDFS,...) + * + * @author Casey + */ +public interface FileStreamSource extends Serializable { + + /** + * Init the source with file/directory path and file extension + * + * @param path + * File or directory path + * @param ext + * File extension to be used to filter files in a directory. If null, all files in the directory are + * accepted. + */ + public void init(String path, String ext); + + /** + * Reset the source + */ + public void reset() throws IOException; + + /** + * Retrieve InputStream for next file. This method will return null if we are at the last file in the list. + * + * @return InputStream for next file in the list + */ + public InputStream getNextInputStream(); + + /** + * Retrieve InputStream for current file. The "current pointer" is moved forward with getNextInputStream method. So if + * there was no invocation of getNextInputStream, this method will return null. + * + * @return InputStream for current file in the list + */ + public InputStream getCurrentInputStream(); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java b/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java new file mode 100644 index 0000000..00abd1a --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java @@ -0,0 +1,150 @@ +package org.apache.samoa.streams.fs; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileSystems; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; + +/** + * Source for FileStream for HDFS files + * + * @author Casey + */ +public class HDFSFileStreamSource implements FileStreamSource { + + /** + * + */ + private static final long serialVersionUID = -3887354805787167400L; + + private transient InputStream fileStream; + private transient Configuration config; + private List<String> filePaths; + private int currentIndex; + + public HDFSFileStreamSource() { + this.currentIndex = -1; + } + + public void init(String path, String ext) { + this.init(this.getDefaultConfig(), path, ext); + } + + public void init(Configuration config, String path, String ext) { + this.config = config; + this.filePaths = new ArrayList<String>(); + Path hdfsPath = new Path(path); + FileSystem fs; + try { + fs = FileSystem.get(config); + FileStatus fileStat = fs.getFileStatus(hdfsPath); + if (fileStat.isDirectory()) { + Path filterPath = hdfsPath; + if (ext != null) { + filterPath = new Path(path.toString(), "*." + ext); + } + else { + filterPath = new Path(path.toString(), "*"); + } + FileStatus[] filesInDir = fs.globStatus(filterPath); + for (int i = 0; i < filesInDir.length; i++) { + if (filesInDir[i].isFile()) { + filePaths.add(filesInDir[i].getPath().toString()); + } + } + } + else { + this.filePaths.add(path); + } + } catch (IOException ioe) { + throw new RuntimeException("Failed getting list of files at:" + path, ioe); + } + + this.currentIndex = -1; + } + + private Configuration getDefaultConfig() { + String hadoopHome = System.getenv("HADOOP_HOME"); + Configuration conf = new Configuration(); + if (hadoopHome != null) { + java.nio.file.Path coreSitePath = FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/core-site.xml"); + java.nio.file.Path hdfsSitePath = FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/hdfs-site.xml"); + conf.addResource(new Path(coreSitePath.toAbsolutePath().toString())); + conf.addResource(new Path(hdfsSitePath.toAbsolutePath().toString())); + } + return conf; + } + + public void reset() throws IOException { + this.currentIndex = -1; + this.closeFileStream(); + } + + private void closeFileStream() { + IOUtils.closeStream(fileStream); + } + + public InputStream getNextInputStream() { + this.closeFileStream(); + if (this.currentIndex >= (this.filePaths.size() - 1)) + return null; + + this.currentIndex++; + String filePath = this.filePaths.get(currentIndex); + + Path hdfsPath = new Path(filePath); + FileSystem fs; + try { + fs = FileSystem.get(config); + fileStream = fs.open(hdfsPath); + } catch (IOException ioe) { + this.closeFileStream(); + throw new RuntimeException("Failed opening file:" + filePath, ioe); + } + + return fileStream; + } + + public InputStream getCurrentInputStream() { + return fileStream; + } + + protected int getFilePathListSize() { + if (filePaths != null) + return filePaths.size(); + return 0; + } + + protected String getFilePathAt(int index) { + if (filePaths != null && filePaths.size() > index) + return filePaths.get(index); + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/streams/fs/LocalFileStreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/fs/LocalFileStreamSource.java b/samoa-api/src/main/java/org/apache/samoa/streams/fs/LocalFileStreamSource.java new file mode 100644 index 0000000..129d247 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/fs/LocalFileStreamSource.java @@ -0,0 +1,133 @@ +package org.apache.samoa.streams.fs; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.io.FileInputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.FileSystems; +import java.util.ArrayList; +import java.util.List; + +/** + * Source for FileStream for local files + * + * @author Casey + */ +public class LocalFileStreamSource implements FileStreamSource { + /** + * + */ + private static final long serialVersionUID = 3986511547525870698L; + + private transient InputStream fileStream; + private List<String> filePaths; + private int currentIndex; + + public LocalFileStreamSource() { + this.currentIndex = -1; + } + + public void init(String path, String ext) { + this.filePaths = new ArrayList<String>(); + File fileAtPath = new File(path); + if (fileAtPath.isDirectory()) { + File[] filesInDir = fileAtPath.listFiles(new FileExtensionFilter(ext)); + for (int i = 0; i < filesInDir.length; i++) { + filePaths.add(filesInDir[i].getAbsolutePath()); + } + } + else { + this.filePaths.add(path); + } + this.currentIndex = -1; + } + + public void reset() throws IOException { + this.currentIndex = -1; + this.closeFileStream(); + } + + private void closeFileStream() { + if (fileStream != null) { + try { + fileStream.close(); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + } + } + + public InputStream getNextInputStream() { + this.closeFileStream(); + + if (this.currentIndex >= (this.filePaths.size() - 1)) + return null; + + this.currentIndex++; + String filePath = this.filePaths.get(currentIndex); + + File file = new File(filePath); + try { + fileStream = new FileInputStream(file); + } catch (IOException ioe) { + this.closeFileStream(); + throw new RuntimeException("Failed opening file:" + filePath, ioe); + } + + return fileStream; + } + + public InputStream getCurrentInputStream() { + return fileStream; + } + + protected int getFilePathListSize() { + if (filePaths != null) + return filePaths.size(); + return 0; + } + + protected String getFilePathAt(int index) { + if (filePaths != null && filePaths.size() > index) + return filePaths.get(index); + return null; + } + + private class FileExtensionFilter implements FilenameFilter { + private String extension; + + FileExtensionFilter(String ext) { + extension = ext; + } + + @Override + public boolean accept(File dir, String name) { + File f = new File(dir, name); + if (extension == null) + return f.isFile(); + else + return f.isFile() && name.toLowerCase().endsWith("." + extension); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/tasks/ClusteringEvaluation.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/tasks/ClusteringEvaluation.java b/samoa-api/src/main/java/org/apache/samoa/tasks/ClusteringEvaluation.java new file mode 100644 index 0000000..a5c0ef1 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/tasks/ClusteringEvaluation.java @@ -0,0 +1,186 @@ +package org.apache.samoa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.apache.samoa.evaluation.ClusteringEvaluatorProcessor; +import org.apache.samoa.learners.Learner; +import org.apache.samoa.learners.clusterers.simple.ClusteringDistributorProcessor; +import org.apache.samoa.learners.clusterers.simple.DistributedClusterer; +import org.apache.samoa.moa.streams.InstanceStream; +import org.apache.samoa.moa.streams.clustering.ClusteringStream; +import org.apache.samoa.moa.streams.clustering.RandomRBFGeneratorEvents; +import org.apache.samoa.streams.ClusteringEntranceProcessor; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; +import org.apache.samoa.topology.TopologyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.ClassOption; +import com.github.javacliparser.Configurable; +import com.github.javacliparser.FileOption; +import com.github.javacliparser.FloatOption; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; + +/** + * A task that runs and evaluates a distributed clustering algorithm. + * + */ +public class ClusteringEvaluation implements Task, Configurable { + + private static final long serialVersionUID = -8246537378371580550L; + + private static final int DISTRIBUTOR_PARALLELISM = 1; + + private static final Logger logger = LoggerFactory.getLogger(ClusteringEvaluation.class); + + public ClassOption learnerOption = new ClassOption("learner", 'l', "Clustering to run.", Learner.class, + DistributedClusterer.class.getName()); + + public ClassOption streamTrainOption = new ClassOption("streamTrain", 's', "Input stream.", InstanceStream.class, + RandomRBFGeneratorEvents.class.getName()); + + public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', + "Maximum number of instances to test/train on (-1 = no limit).", 100000, -1, + Integer.MAX_VALUE); + + public IntOption measureCollectionTypeOption = new IntOption("measureCollectionType", 'm', + "Type of measure collection", 0, 0, Integer.MAX_VALUE); + + public IntOption timeLimitOption = new IntOption("timeLimit", 't', + "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1, + Integer.MAX_VALUE); + + public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 'f', + "How many instances between samples of the learning performance.", 1000, 0, + Integer.MAX_VALUE); + + public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", + "Clustering__" + + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); + + public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to append intermediate csv results to", + null, "csv", true); + + public FloatOption samplingThresholdOption = new FloatOption("samplingThreshold", 'a', + "Ratio of instances sampled that will be used for evaluation.", 0.5, + 0.0, 1.0); + + private ClusteringEntranceProcessor source; + private InstanceStream streamTrain; + private ClusteringDistributorProcessor distributor; + private Stream distributorStream; + private Stream evaluationStream; + + // Default=0: no delay/waiting + public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', + "How many miliseconds between injections of two instances.", 0, 0, Integer.MAX_VALUE); + + private Learner learner; + private ClusteringEvaluatorProcessor evaluator; + + private Topology topology; + private TopologyBuilder builder; + + public void getDescription(StringBuilder sb) { + sb.append("Clustering evaluation"); + } + + @Override + public void init() { + // TODO remove the if statement theoretically, dynamic binding will work + // here! for now, the if statement is used by Storm + + if (builder == null) { + logger.warn("Builder was not initialized, initializing it from the Task"); + + builder = new TopologyBuilder(); + logger.debug("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue(), sourceDelayOption.getValue()); + logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + } + + // instantiate ClusteringEntranceProcessor and its output stream + // (sourceStream) + source = new ClusteringEntranceProcessor(); + streamTrain = this.streamTrainOption.getValue(); + source.setStreamSource(streamTrain); + builder.addEntranceProcessor(source); + source.setSamplingThreshold(samplingThresholdOption.getValue()); + source.setMaxNumInstances(instanceLimitOption.getValue()); + logger.debug("Successfully instantiated ClusteringEntranceProcessor"); + + Stream sourceStream = builder.createStream(source); + // starter.setInputStream(sourcePiOutputStream); // FIXME set stream in the + // EntrancePI + + // distribution of instances and sampling for evaluation + distributor = new ClusteringDistributorProcessor(); + builder.addProcessor(distributor, DISTRIBUTOR_PARALLELISM); + builder.connectInputShuffleStream(sourceStream, distributor); + distributorStream = builder.createStream(distributor); + distributor.setOutputStream(distributorStream); + evaluationStream = builder.createStream(distributor); + distributor.setEvaluationStream(evaluationStream); // passes evaluation events along + logger.debug("Successfully instantiated Distributor"); + + // instantiate learner and connect it to distributorStream + learner = this.learnerOption.getValue(); + learner.init(builder, source.getDataset(), 1); + builder.connectInputShuffleStream(distributorStream, learner.getInputProcessor()); + logger.debug("Successfully instantiated Learner"); + + evaluator = new ClusteringEvaluatorProcessor.Builder( + sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()) + .decayHorizon(((ClusteringStream) streamTrain).getDecayHorizon()).build(); + + builder.addProcessor(evaluator); + for (Stream evaluatorPiInputStream : learner.getResultStreams()) { + builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator); + } + builder.connectInputAllStream(evaluationStream, evaluator); + logger.debug("Successfully instantiated EvaluatorProcessor"); + + topology = builder.build(); + logger.debug("Successfully built the topology"); + } + + @Override + public void setFactory(ComponentFactory factory) { + // TODO unify this code with init() for now, it's used by S4 App + // dynamic binding theoretically will solve this problem + builder = new TopologyBuilder(factory); + logger.debug("Successfully instantiated TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.debug("Successfully initialized SAMOA topology with name {}", evaluationNameOption.getValue()); + + } + + public Topology getTopology() { + return topology; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/tasks/PrequentialEvaluation.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/tasks/PrequentialEvaluation.java b/samoa-api/src/main/java/org/apache/samoa/tasks/PrequentialEvaluation.java new file mode 100644 index 0000000..575adad --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/tasks/PrequentialEvaluation.java @@ -0,0 +1,220 @@ +package org.apache.samoa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import java.text.SimpleDateFormat; +import java.util.Date; + +import org.apache.samoa.evaluation.BasicClassificationPerformanceEvaluator; +import org.apache.samoa.evaluation.BasicRegressionPerformanceEvaluator; +import org.apache.samoa.evaluation.ClassificationPerformanceEvaluator; +import org.apache.samoa.evaluation.EvaluatorProcessor; +import org.apache.samoa.evaluation.PerformanceEvaluator; +import org.apache.samoa.evaluation.RegressionPerformanceEvaluator; +import org.apache.samoa.learners.ClassificationLearner; +import org.apache.samoa.learners.Learner; +import org.apache.samoa.learners.RegressionLearner; +import org.apache.samoa.learners.classifiers.trees.VerticalHoeffdingTree; +import org.apache.samoa.moa.streams.InstanceStream; +import org.apache.samoa.moa.streams.generators.RandomTreeGenerator; +import org.apache.samoa.streams.PrequentialSourceProcessor; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; +import org.apache.samoa.topology.TopologyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.ClassOption; +import com.github.javacliparser.Configurable; +import com.github.javacliparser.FileOption; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; + +/** + * Prequential Evaluation task is a scheme in evaluating performance of online classifiers which uses each instance for + * testing online classifiers model and then it further uses the same instance for training the model(Test-then-train) + * + * @author Arinto Murdopo + * + */ +public class PrequentialEvaluation implements Task, Configurable { + + private static final long serialVersionUID = -8246537378371580550L; + + private static Logger logger = LoggerFactory.getLogger(PrequentialEvaluation.class); + + public ClassOption learnerOption = new ClassOption("learner", 'l', "Classifier to train.", Learner.class, + VerticalHoeffdingTree.class.getName()); + + public ClassOption streamTrainOption = new ClassOption("trainStream", 's', "Stream to learn from.", + InstanceStream.class, + RandomTreeGenerator.class.getName()); + + public ClassOption evaluatorOption = new ClassOption("evaluator", 'e', + "Classification performance evaluation method.", + PerformanceEvaluator.class, BasicClassificationPerformanceEvaluator.class.getName()); + + public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', + "Maximum number of instances to test/train on (-1 = no limit).", 1000000, -1, + Integer.MAX_VALUE); + + public IntOption timeLimitOption = new IntOption("timeLimit", 't', + "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1, + Integer.MAX_VALUE); + + public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 'f', + "How many instances between samples of the learning performance.", 100000, + 0, Integer.MAX_VALUE); + + public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", + "Prequential_" + + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); + + public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to append intermediate csv results to", + null, "csv", true); + + // Default=0: no delay/waiting + public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', + "How many microseconds between injections of two instances.", 0, 0, Integer.MAX_VALUE); + // Batch size to delay the incoming stream: delay of x milliseconds after each + // batch + public IntOption batchDelayOption = new IntOption("delayBatchSize", 'b', + "The delay batch size: delay of x milliseconds after each batch ", 1, 1, Integer.MAX_VALUE); + + private PrequentialSourceProcessor preqSource; + + // private PrequentialSourceTopologyStarter preqStarter; + + // private EntranceProcessingItem sourcePi; + + private Stream sourcePiOutputStream; + + private Learner classifier; + + private EvaluatorProcessor evaluator; + + // private ProcessingItem evaluatorPi; + + // private Stream evaluatorPiInputStream; + + private Topology prequentialTopology; + + private TopologyBuilder builder; + + public void getDescription(StringBuilder sb, int indent) { + sb.append("Prequential evaluation"); + } + + @Override + public void init() { + // TODO remove the if statement + // theoretically, dynamic binding will work here! + // test later! + // for now, the if statement is used by Storm + + if (builder == null) { + builder = new TopologyBuilder(); + logger.debug("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + } + + // instantiate PrequentialSourceProcessor and its output stream + // (sourcePiOutputStream) + preqSource = new PrequentialSourceProcessor(); + preqSource.setStreamSource((InstanceStream) this.streamTrainOption.getValue()); + preqSource.setMaxNumInstances(instanceLimitOption.getValue()); + preqSource.setSourceDelay(sourceDelayOption.getValue()); + preqSource.setDelayBatchSize(batchDelayOption.getValue()); + builder.addEntranceProcessor(preqSource); + logger.debug("Successfully instantiating PrequentialSourceProcessor"); + + // preqStarter = new PrequentialSourceTopologyStarter(preqSource, + // instanceLimitOption.getValue()); + // sourcePi = builder.createEntrancePi(preqSource, preqStarter); + // sourcePiOutputStream = builder.createStream(sourcePi); + + sourcePiOutputStream = builder.createStream(preqSource); + // preqStarter.setInputStream(sourcePiOutputStream); + + // instantiate classifier and connect it to sourcePiOutputStream + classifier = this.learnerOption.getValue(); + classifier.init(builder, preqSource.getDataset(), 1); + builder.connectInputShuffleStream(sourcePiOutputStream, classifier.getInputProcessor()); + logger.debug("Successfully instantiating Classifier"); + + PerformanceEvaluator evaluatorOptionValue = this.evaluatorOption.getValue(); + if (!PrequentialEvaluation.isLearnerAndEvaluatorCompatible(classifier, evaluatorOptionValue)) { + evaluatorOptionValue = getDefaultPerformanceEvaluatorForLearner(classifier); + } + evaluator = new EvaluatorProcessor.Builder(evaluatorOptionValue) + .samplingFrequency(sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()).build(); + + // evaluatorPi = builder.createPi(evaluator); + // evaluatorPi.connectInputShuffleStream(evaluatorPiInputStream); + builder.addProcessor(evaluator); + for (Stream evaluatorPiInputStream : classifier.getResultStreams()) { + builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator); + } + + logger.debug("Successfully instantiating EvaluatorProcessor"); + + prequentialTopology = builder.build(); + logger.debug("Successfully building the topology"); + } + + @Override + public void setFactory(ComponentFactory factory) { + // TODO unify this code with init() + // for now, it's used by S4 App + // dynamic binding theoretically will solve this problem + builder = new TopologyBuilder(factory); + logger.debug("Successfully instantiating TopologyBuilder"); + + builder.initTopology(evaluationNameOption.getValue()); + logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); + + } + + public Topology getTopology() { + return prequentialTopology; + } + + // + // @Override + // public TopologyStarter getTopologyStarter() { + // return this.preqStarter; + // } + + private static boolean isLearnerAndEvaluatorCompatible(Learner learner, PerformanceEvaluator evaluator) { + return (learner instanceof RegressionLearner && evaluator instanceof RegressionPerformanceEvaluator) || + (learner instanceof ClassificationLearner && evaluator instanceof ClassificationPerformanceEvaluator); + } + + private static PerformanceEvaluator getDefaultPerformanceEvaluatorForLearner(Learner learner) { + if (learner instanceof RegressionLearner) { + return new BasicRegressionPerformanceEvaluator(); + } + // Default to BasicClassificationPerformanceEvaluator for all other cases + return new BasicClassificationPerformanceEvaluator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/tasks/Task.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/tasks/Task.java b/samoa-api/src/main/java/org/apache/samoa/tasks/Task.java new file mode 100644 index 0000000..73d9be8 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/tasks/Task.java @@ -0,0 +1,59 @@ +package org.apache.samoa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.Topology; + +/** + * Task interface, the mother of all SAMOA tasks! + */ +public interface Task { + + /** + * Initialize this SAMOA task, i.e. create and connect ProcessingItems and Streams and initialize the topology + */ + public void init(); + + /** + * Return the final topology object to be executed in the cluster + * + * @return topology object to be submitted to be executed in the cluster + */ + public Topology getTopology(); + + // /** + // * Return the entrance processor to start SAMOA topology + // * The logic to start the topology should be implemented here + // * @return entrance processor to start the topology + // */ + // public TopologyStarter getTopologyStarter(); + + /** + * Sets the factory. TODO: propose to hide factory from task, i.e. Task will only see TopologyBuilder, and factory + * creation will be handled by TopologyBuilder + * + * @param factory + * the new factory + */ + public void setFactory(ComponentFactory factory); + +}
