http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java deleted file mode 100644 index 866e47f..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java +++ /dev/null @@ -1,185 +0,0 @@ -package com.yahoo.labs.samoa.moa.streams.generators; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.Random; - -import com.github.javacliparser.FloatOption; -import com.github.javacliparser.IntOption; -import com.yahoo.labs.samoa.instances.Attribute; -import com.yahoo.labs.samoa.instances.DenseInstance; -import com.yahoo.labs.samoa.instances.Instance; -import com.yahoo.labs.samoa.instances.Instances; -import com.yahoo.labs.samoa.instances.InstancesHeader; -import com.yahoo.labs.samoa.moa.core.Example; -import com.yahoo.labs.samoa.moa.core.FastVector; -import com.yahoo.labs.samoa.moa.core.InstanceExample; -import com.yahoo.labs.samoa.moa.core.ObjectRepository; -import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler; -import com.yahoo.labs.samoa.moa.streams.InstanceStream; -import com.yahoo.labs.samoa.moa.tasks.TaskMonitor; - -/** - * Stream generator for Hyperplane data stream. - * - * @author Albert Bifet (abifet at cs dot waikato dot ac dot nz) - * @version $Revision: 7 $ - */ -public class HyperplaneGenerator extends AbstractOptionHandler implements InstanceStream { - - @Override - public String getPurposeString() { - return "Generates a problem of predicting class of a rotating hyperplane."; - } - - private static final long serialVersionUID = 1L; - - public IntOption instanceRandomSeedOption = new IntOption("instanceRandomSeed", 'i', - "Seed for random generation of instances.", 1); - - public IntOption numClassesOption = new IntOption("numClasses", 'c', "The number of classes to generate.", 2, 2, - Integer.MAX_VALUE); - - public IntOption numAttsOption = new IntOption("numAtts", 'a', "The number of attributes to generate.", 10, 0, - Integer.MAX_VALUE); - - public IntOption numDriftAttsOption = new IntOption("numDriftAtts", 'k', "The number of attributes with drift.", 2, - 0, Integer.MAX_VALUE); - - public FloatOption magChangeOption = new FloatOption("magChange", 't', "Magnitude of the change for every example", - 0.0, 0.0, 1.0); - - public IntOption noisePercentageOption = new IntOption("noisePercentage", 'n', - "Percentage of noise to add to the data.", 5, 0, 100); - - public IntOption sigmaPercentageOption = new IntOption("sigmaPercentage", 's', - "Percentage of probability that the direction of change is reversed.", 10, - 0, 100); - - protected InstancesHeader streamHeader; - - protected Random instanceRandom; - - protected double[] weights; - - protected int[] sigma; - - public int numberInstance; - - @Override - protected void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { - monitor.setCurrentActivity("Preparing hyperplane...", -1.0); - generateHeader(); - restart(); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - protected void generateHeader() { - FastVector attributes = new FastVector(); - for (int i = 0; i < this.numAttsOption.getValue(); i++) { - attributes.addElement(new Attribute("att" + (i + 1))); - } - - FastVector classLabels = new FastVector(); - for (int i = 0; i < this.numClassesOption.getValue(); i++) { - classLabels.addElement("class" + (i + 1)); - } - attributes.addElement(new Attribute("class", classLabels)); - this.streamHeader = new InstancesHeader(new Instances(getCLICreationString(InstanceStream.class), attributes, 0)); - this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1); - } - - @Override - public long estimatedRemainingInstances() { - return -1; - } - - @Override - public InstancesHeader getHeader() { - return this.streamHeader; - } - - @Override - public boolean hasMoreInstances() { - return true; - } - - @Override - public boolean isRestartable() { - return true; - } - - @Override - public Example<Instance> nextInstance() { - - int numAtts = this.numAttsOption.getValue(); - double[] attVals = new double[numAtts + 1]; - double sum = 0.0; - double sumWeights = 0.0; - for (int i = 0; i < numAtts; i++) { - attVals[i] = this.instanceRandom.nextDouble(); - sum += this.weights[i] * attVals[i]; - sumWeights += this.weights[i]; - } - int classLabel; - if (sum >= sumWeights * 0.5) { - classLabel = 1; - } else { - classLabel = 0; - } - // Add Noise - if ((1 + (this.instanceRandom.nextInt(100))) <= this.noisePercentageOption.getValue()) { - classLabel = (classLabel == 0 ? 1 : 0); - } - - Instance inst = new DenseInstance(1.0, attVals); - inst.setDataset(getHeader()); - inst.setClassValue(classLabel); - addDrift(); - return new InstanceExample(inst); - } - - private void addDrift() { - for (int i = 0; i < this.numDriftAttsOption.getValue(); i++) { - this.weights[i] += (double) ((double) sigma[i]) * ((double) this.magChangeOption.getValue()); - if (// this.weights[i] >= 1.0 || this.weights[i] <= 0.0 || - (1 + (this.instanceRandom.nextInt(100))) <= this.sigmaPercentageOption.getValue()) { - this.sigma[i] *= -1; - } - } - } - - @Override - public void restart() { - this.instanceRandom = new Random(this.instanceRandomSeedOption.getValue()); - this.weights = new double[this.numAttsOption.getValue()]; - this.sigma = new int[this.numAttsOption.getValue()]; - for (int i = 0; i < this.numAttsOption.getValue(); i++) { - this.weights[i] = this.instanceRandom.nextDouble(); - this.sigma[i] = (i < this.numDriftAttsOption.getValue() ? 1 : 0); - } - } - - @Override - public void getDescription(StringBuilder sb, int indent) { - // TODO Auto-generated method stub - } -}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java deleted file mode 100644 index cc8384f..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java +++ /dev/null @@ -1,267 +0,0 @@ -package com.yahoo.labs.samoa.moa.streams.generators; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.instances.Attribute; -import com.yahoo.labs.samoa.instances.DenseInstance; -import com.yahoo.labs.samoa.moa.core.FastVector; -import com.yahoo.labs.samoa.instances.Instance; -import com.yahoo.labs.samoa.instances.Instances; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Random; -import com.yahoo.labs.samoa.moa.core.InstanceExample; - -import com.yahoo.labs.samoa.instances.InstancesHeader; -import com.yahoo.labs.samoa.moa.core.ObjectRepository; -import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler; -import com.github.javacliparser.FloatOption; -import com.github.javacliparser.IntOption; -import com.yahoo.labs.samoa.moa.streams.InstanceStream; -import com.yahoo.labs.samoa.moa.tasks.TaskMonitor; - -/** - * Stream generator for a stream based on a randomly generated tree.. - * - * @author Richard Kirkby ([email protected]) - * @version $Revision: 7 $ - */ -public class RandomTreeGenerator extends AbstractOptionHandler implements InstanceStream { - - @Override - public String getPurposeString() { - return "Generates a stream based on a randomly generated tree."; - } - - private static final long serialVersionUID = 1L; - - public IntOption treeRandomSeedOption = new IntOption("treeRandomSeed", - 'r', "Seed for random generation of tree.", 1); - - public IntOption instanceRandomSeedOption = new IntOption( - "instanceRandomSeed", 'i', - "Seed for random generation of instances.", 1); - - public IntOption numClassesOption = new IntOption("numClasses", 'c', - "The number of classes to generate.", 2, 2, Integer.MAX_VALUE); - - public IntOption numNominalsOption = new IntOption("numNominals", 'o', - "The number of nominal attributes to generate.", 5, 0, - Integer.MAX_VALUE); - - public IntOption numNumericsOption = new IntOption("numNumerics", 'u', - "The number of numeric attributes to generate.", 5, 0, - Integer.MAX_VALUE); - - public IntOption numValsPerNominalOption = new IntOption( - "numValsPerNominal", 'v', - "The number of values to generate per nominal attribute.", 5, 2, - Integer.MAX_VALUE); - - public IntOption maxTreeDepthOption = new IntOption("maxTreeDepth", 'd', - "The maximum depth of the tree concept.", 5, 0, Integer.MAX_VALUE); - - public IntOption firstLeafLevelOption = new IntOption( - "firstLeafLevel", - 'l', - "The first level of the tree above maxTreeDepth that can have leaves.", - 3, 0, Integer.MAX_VALUE); - - public FloatOption leafFractionOption = new FloatOption("leafFraction", - 'f', - "The fraction of leaves per level from firstLeafLevel onwards.", - 0.15, 0.0, 1.0); - - protected static class Node implements Serializable { - - private static final long serialVersionUID = 1L; - - public int classLabel; - - public int splitAttIndex; - - public double splitAttValue; - - public Node[] children; - } - - protected Node treeRoot; - - protected InstancesHeader streamHeader; - - protected Random instanceRandom; - - @Override - public void prepareForUseImpl(TaskMonitor monitor, - ObjectRepository repository) { - monitor.setCurrentActivity("Preparing random tree...", -1.0); - generateHeader(); - generateRandomTree(); - restart(); - } - - @Override - public long estimatedRemainingInstances() { - return -1; - } - - @Override - public boolean isRestartable() { - return true; - } - - @Override - public void restart() { - this.instanceRandom = new Random(this.instanceRandomSeedOption.getValue()); - } - - @Override - public InstancesHeader getHeader() { - return this.streamHeader; - } - - @Override - public boolean hasMoreInstances() { - return true; - } - - @Override - public InstanceExample nextInstance() { - double[] attVals = new double[this.numNominalsOption.getValue() - + this.numNumericsOption.getValue()]; - InstancesHeader header = getHeader(); - Instance inst = new DenseInstance(header.numAttributes()); - for (int i = 0; i < attVals.length; i++) { - attVals[i] = i < this.numNominalsOption.getValue() ? this.instanceRandom.nextInt(this.numValsPerNominalOption - .getValue()) - : this.instanceRandom.nextDouble(); - inst.setValue(i, attVals[i]); - } - inst.setDataset(header); - inst.setClassValue(classifyInstance(this.treeRoot, attVals)); - return new InstanceExample(inst); - } - - protected int classifyInstance(Node node, double[] attVals) { - if (node.children == null) { - return node.classLabel; - } - if (node.splitAttIndex < this.numNominalsOption.getValue()) { - return classifyInstance( - node.children[(int) attVals[node.splitAttIndex]], attVals); - } - return classifyInstance( - node.children[attVals[node.splitAttIndex] < node.splitAttValue ? 0 - : 1], attVals); - } - - protected void generateHeader() { - FastVector<Attribute> attributes = new FastVector<>(); - FastVector<String> nominalAttVals = new FastVector<>(); - for (int i = 0; i < this.numValsPerNominalOption.getValue(); i++) { - nominalAttVals.addElement("value" + (i + 1)); - } - for (int i = 0; i < this.numNominalsOption.getValue(); i++) { - attributes.addElement(new Attribute("nominal" + (i + 1), - nominalAttVals)); - } - for (int i = 0; i < this.numNumericsOption.getValue(); i++) { - attributes.addElement(new Attribute("numeric" + (i + 1))); - } - FastVector<String> classLabels = new FastVector<>(); - for (int i = 0; i < this.numClassesOption.getValue(); i++) { - classLabels.addElement("class" + (i + 1)); - } - attributes.addElement(new Attribute("class", classLabels)); - this.streamHeader = new InstancesHeader(new Instances( - getCLICreationString(InstanceStream.class), attributes, 0)); - this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1); - } - - protected void generateRandomTree() { - Random treeRand = new Random(this.treeRandomSeedOption.getValue()); - ArrayList<Integer> nominalAttCandidates = new ArrayList<>( - this.numNominalsOption.getValue()); - for (int i = 0; i < this.numNominalsOption.getValue(); i++) { - nominalAttCandidates.add(i); - } - double[] minNumericVals = new double[this.numNumericsOption.getValue()]; - double[] maxNumericVals = new double[this.numNumericsOption.getValue()]; - for (int i = 0; i < this.numNumericsOption.getValue(); i++) { - minNumericVals[i] = 0.0; - maxNumericVals[i] = 1.0; - } - this.treeRoot = generateRandomTreeNode(0, nominalAttCandidates, - minNumericVals, maxNumericVals, treeRand); - } - - protected Node generateRandomTreeNode(int currentDepth, - ArrayList<Integer> nominalAttCandidates, double[] minNumericVals, - double[] maxNumericVals, Random treeRand) { - if ((currentDepth >= this.maxTreeDepthOption.getValue()) - || ((currentDepth >= this.firstLeafLevelOption.getValue()) && (this.leafFractionOption.getValue() >= (1.0 - treeRand - .nextDouble())))) { - Node leaf = new Node(); - leaf.classLabel = treeRand.nextInt(this.numClassesOption.getValue()); - return leaf; - } - Node node = new Node(); - int chosenAtt = treeRand.nextInt(nominalAttCandidates.size() - + this.numNumericsOption.getValue()); - if (chosenAtt < nominalAttCandidates.size()) { - node.splitAttIndex = nominalAttCandidates.get(chosenAtt); - node.children = new Node[this.numValsPerNominalOption.getValue()]; - ArrayList<Integer> newNominalCandidates = new ArrayList<>( - nominalAttCandidates); - newNominalCandidates.remove(new Integer(node.splitAttIndex)); - newNominalCandidates.trimToSize(); - for (int i = 0; i < node.children.length; i++) { - node.children[i] = generateRandomTreeNode(currentDepth + 1, - newNominalCandidates, minNumericVals, maxNumericVals, - treeRand); - } - } else { - int numericIndex = chosenAtt - nominalAttCandidates.size(); - node.splitAttIndex = this.numNominalsOption.getValue() - + numericIndex; - double minVal = minNumericVals[numericIndex]; - double maxVal = maxNumericVals[numericIndex]; - node.splitAttValue = ((maxVal - minVal) * treeRand.nextDouble()) - + minVal; - node.children = new Node[2]; - double[] newMaxVals = maxNumericVals.clone(); - newMaxVals[numericIndex] = node.splitAttValue; - node.children[0] = generateRandomTreeNode(currentDepth + 1, - nominalAttCandidates, minNumericVals, newMaxVals, treeRand); - double[] newMinVals = minNumericVals.clone(); - newMinVals[numericIndex] = node.splitAttValue; - node.children[1] = generateRandomTreeNode(currentDepth + 1, - nominalAttCandidates, newMinVals, maxNumericVals, treeRand); - } - return node; - } - - @Override - public void getDescription(StringBuilder sb, int indent) { - // TODO Auto-generated method stub - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java deleted file mode 100644 index 4c51219..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java +++ /dev/null @@ -1,102 +0,0 @@ -package com.yahoo.labs.samoa.moa.tasks; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * Class that represents a null monitor. - * - * @author Richard Kirkby ([email protected]) - * @version $Revision: 7 $ - */ -public class NullMonitor implements TaskMonitor { - - @Override - public void setCurrentActivity(String activityDescription, - double fracComplete) { - } - - @Override - public void setCurrentActivityDescription(String activity) { - } - - @Override - public void setCurrentActivityFractionComplete(double fracComplete) { - } - - @Override - public boolean taskShouldAbort() { - return false; - } - - @Override - public String getCurrentActivityDescription() { - return null; - } - - @Override - public double getCurrentActivityFractionComplete() { - return -1.0; - } - - @Override - public boolean isPaused() { - return false; - } - - @Override - public boolean isCancelled() { - return false; - } - - @Override - public void requestCancel() { - } - - @Override - public void requestPause() { - } - - @Override - public void requestResume() { - } - - @Override - public Object getLatestResultPreview() { - return null; - } - - @Override - public void requestResultPreview() { - } - - @Override - public boolean resultPreviewRequested() { - return false; - } - - @Override - public void setLatestResultPreview(Object latestPreview) { - } - - @Override - public void requestResultPreview(ResultPreviewListener toInform) { - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java deleted file mode 100644 index 124008d..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.yahoo.labs.samoa.moa.tasks; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * Interface implemented by classes that preview results on the Graphical User Interface - * - * @author Richard Kirkby ([email protected]) - * @version $Revision: 7 $ - */ -public interface ResultPreviewListener { - - /** - * This method is used to receive a signal from <code>TaskMonitor</code> that the lastest preview has changed. This - * method is implemented in <code>PreviewPanel</code> to change the results that are shown in its panel. - * - */ - public void latestPreviewChanged(); -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java deleted file mode 100644 index cd56914..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.yahoo.labs.samoa.moa.tasks; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import com.yahoo.labs.samoa.moa.MOAObject; -import com.yahoo.labs.samoa.moa.core.ObjectRepository; - -/** - * Interface representing a task. - * - * @author Richard Kirkby ([email protected]) - * @version $Revision: 7 $ - */ -public interface Task extends MOAObject { - - /** - * Gets the result type of this task. Tasks can return LearningCurve, LearningEvaluation, Classifier, String, - * Instances.. - * - * @return a class object of the result of this task - */ - public Class<?> getTaskResultType(); - - /** - * This method performs this task, when TaskMonitor and ObjectRepository are no needed. - * - * @return an object with the result of this task - */ - public Object doTask(); - - /** - * This method performs this task. <code>AbstractTask</code> implements this method so all its extensions only need to - * implement <code>doTaskImpl</code> - * - * @param monitor - * the TaskMonitor to use - * @param repository - * the ObjectRepository to use - * @return an object with the result of this task - */ - public Object doTask(TaskMonitor monitor, ObjectRepository repository); -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java deleted file mode 100644 index cce3ebe..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java +++ /dev/null @@ -1,146 +0,0 @@ -package com.yahoo.labs.samoa.moa.tasks; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * Interface representing a task monitor. - * - * @author Richard Kirkby ([email protected]) - * @version $Revision: 7 $ - */ -public interface TaskMonitor { - - /** - * Sets the description and the percentage done of the current activity. - * - * @param activity - * the description of the current activity - * @param fracComplete - * the percentage done of the current activity - */ - public void setCurrentActivity(String activityDescription, - double fracComplete); - - /** - * Sets the description of the current activity. - * - * @param activity - * the description of the current activity - */ - public void setCurrentActivityDescription(String activity); - - /** - * Sets the percentage done of the current activity - * - * @param fracComplete - * the percentage done of the current activity - */ - public void setCurrentActivityFractionComplete(double fracComplete); - - /** - * Gets whether the task should abort. - * - * @return true if the task should abort - */ - public boolean taskShouldAbort(); - - /** - * Gets whether there is a request for preview the task result. - * - * @return true if there is a request for preview the task result - */ - public boolean resultPreviewRequested(); - - /** - * Sets the current result to preview - * - * @param latestPreview - * the result to preview - */ - public void setLatestResultPreview(Object latestPreview); - - /** - * Gets the description of the current activity. - * - * @return the description of the current activity - */ - public String getCurrentActivityDescription(); - - /** - * Gets the percentage done of the current activity - * - * @return the percentage done of the current activity - */ - public double getCurrentActivityFractionComplete(); - - /** - * Requests the task monitored to pause. - * - */ - public void requestPause(); - - /** - * Requests the task monitored to resume. - * - */ - public void requestResume(); - - /** - * Requests the task monitored to cancel. - * - */ - public void requestCancel(); - - /** - * Gets whether the task monitored is paused. - * - * @return true if the task is paused - */ - public boolean isPaused(); - - /** - * Gets whether the task monitored is cancelled. - * - * @return true if the task is cancelled - */ - public boolean isCancelled(); - - /** - * Requests to preview the task result. - * - */ - public void requestResultPreview(); - - /** - * Requests to preview the task result. - * - * @param toInform - * the listener of the changes in the preview of the result - */ - public void requestResultPreview(ResultPreviewListener toInform); - - /** - * Gets the current result to preview - * - * @return the result to preview - */ - public Object getLatestResultPreview(); -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java deleted file mode 100644 index 2053339..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java +++ /dev/null @@ -1,119 +0,0 @@ -package com.yahoo.labs.samoa.streams; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.IOException; - -import com.github.javacliparser.FileOption; -import com.github.javacliparser.IntOption; -import com.yahoo.labs.samoa.instances.Instances; -import com.yahoo.labs.samoa.moa.core.InstanceExample; -import com.yahoo.labs.samoa.moa.core.ObjectRepository; -import com.yahoo.labs.samoa.moa.tasks.TaskMonitor; - -/** - * InstanceStream for ARFF file - * - * @author Casey - */ -public class ArffFileStream extends FileStream { - - public FileOption arffFileOption = new FileOption("arffFile", 'f', - "ARFF File(s) to load.", null, null, false); - - public IntOption classIndexOption = new IntOption("classIndex", 'c', - "Class index of data. 0 for none or -1 for last attribute in file.", - -1, -1, Integer.MAX_VALUE); - - protected InstanceExample lastInstanceRead; - - @Override - public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { - super.prepareForUseImpl(monitor, repository); - String filePath = this.arffFileOption.getFile().getAbsolutePath(); - this.fileSource.init(filePath, "arff"); - this.lastInstanceRead = null; - } - - @Override - protected void reset() { - try { - if (this.fileReader != null) - this.fileReader.close(); - - fileSource.reset(); - } catch (IOException ioe) { - throw new RuntimeException("FileStream restart failed.", ioe); - } - - if (!getNextFileReader()) { - hitEndOfStream = true; - throw new RuntimeException("FileStream is empty."); - } - } - - @Override - protected boolean getNextFileReader() { - boolean ret = super.getNextFileReader(); - if (ret) { - this.instances = new Instances(this.fileReader, 1, -1); - if (this.classIndexOption.getValue() < 0) { - this.instances.setClassIndex(this.instances.numAttributes() - 1); - } else if (this.classIndexOption.getValue() > 0) { - this.instances.setClassIndex(this.classIndexOption.getValue() - 1); - } - } - return ret; - } - - @Override - protected boolean readNextInstanceFromFile() { - try { - if (this.instances.readInstance(this.fileReader)) { - this.lastInstanceRead = new InstanceExample(this.instances.instance(0)); - this.instances.delete(); // keep instances clean - return true; - } - if (this.fileReader != null) { - this.fileReader.close(); - this.fileReader = null; - } - return false; - } catch (IOException ioe) { - throw new RuntimeException( - "ArffFileStream failed to read instance from stream.", ioe); - } - - } - - @Override - protected InstanceExample getLastInstanceRead() { - return this.lastInstanceRead; - } - - /* - * extend com.yahoo.labs.samoa.moa.MOAObject - */ - @Override - public void getDescription(StringBuilder sb, int indent) { - // TODO Auto-generated method stub - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java deleted file mode 100644 index 70403ca..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java +++ /dev/null @@ -1,253 +0,0 @@ -package com.yahoo.labs.samoa.streams; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.Random; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.evaluation.ClusteringEvaluationContentEvent; -import com.yahoo.labs.samoa.instances.Instance; -import com.yahoo.labs.samoa.instances.Instances; -import com.yahoo.labs.samoa.learners.clusterers.ClusteringContentEvent; -import com.yahoo.labs.samoa.moa.cluster.Clustering; -import com.yahoo.labs.samoa.moa.core.DataPoint; -import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler; -import com.yahoo.labs.samoa.moa.streams.InstanceStream; -import com.yahoo.labs.samoa.moa.streams.clustering.ClusteringStream; -import com.yahoo.labs.samoa.moa.streams.clustering.RandomRBFGeneratorEvents; - -/** - * EntranceProcessor for Clustering Evaluation Task. - * - */ -public final class ClusteringEntranceProcessor implements EntranceProcessor { - - private static final long serialVersionUID = 4169053337917578558L; - - private static final Logger logger = LoggerFactory.getLogger(ClusteringEntranceProcessor.class); - - private StreamSource streamSource; - private Instance firstInstance; - private boolean isInited = false; - private Random random = new Random(); - private double samplingThreshold; - private int numberInstances; - private int numInstanceSent = 0; - - private int groundTruthSamplingFrequency; - - @Override - public boolean process(ContentEvent event) { - // TODO: possible refactor of the super-interface implementation - // of source processor does not need this method - return false; - } - - @Override - public void onCreate(int id) { - logger.debug("Creating ClusteringSourceProcessor with id {}", id); - } - - @Override - public Processor newProcessor(Processor p) { - ClusteringEntranceProcessor newProcessor = new ClusteringEntranceProcessor(); - ClusteringEntranceProcessor originProcessor = (ClusteringEntranceProcessor) p; - if (originProcessor.getStreamSource() != null) { - newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); - } - return newProcessor; - } - - @Override - public boolean hasNext() { - return (!isFinished()); - } - - @Override - public boolean isFinished() { - return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && numInstanceSent >= numberInstances)); - } - - // /** - // * Method to send instances via input stream - // * - // * @param inputStream - // * @param numberInstances - // */ - // public void sendInstances(Stream inputStream, Stream evaluationStream, int - // numberInstances, double samplingThreshold) { - // int numInstanceSent = 0; - // this.samplingThreshold = samplingThreshold; - // while (streamSource.hasMoreInstances() && numInstanceSent < - // numberInstances) { - // numInstanceSent++; - // DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent); - // ClusteringContentEvent contentEvent = new - // ClusteringContentEvent(numInstanceSent, nextDataPoint); - // inputStream.put(contentEvent); - // sendPointsAndGroundTruth(streamSource, evaluationStream, numInstanceSent, - // nextDataPoint); - // } - // - // sendEndEvaluationInstance(inputStream); - // } - - public double getSamplingThreshold() { - return samplingThreshold; - } - - public void setSamplingThreshold(double samplingThreshold) { - this.samplingThreshold = samplingThreshold; - } - - public int getGroundTruthSamplingFrequency() { - return groundTruthSamplingFrequency; - } - - public void setGroundTruthSamplingFrequency(int groundTruthSamplingFrequency) { - this.groundTruthSamplingFrequency = groundTruthSamplingFrequency; - } - - public StreamSource getStreamSource() { - return streamSource; - } - - public void setStreamSource(InstanceStream stream) { - if (stream instanceof AbstractOptionHandler) { - ((AbstractOptionHandler) (stream)).prepareForUse(); - } - - this.streamSource = new StreamSource(stream); - firstInstance = streamSource.nextInstance().getData(); - } - - public Instances getDataset() { - return firstInstance.dataset(); - } - - private Instance nextInstance() { - if (this.isInited) { - return streamSource.nextInstance().getData(); - } else { - this.isInited = true; - return firstInstance; - } - } - - // private void sendEndEvaluationInstance(Stream inputStream) { - // ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, - // firstInstance); - // contentEvent.setLast(true); - // inputStream.put(contentEvent); - // } - - // private void sendPointsAndGroundTruth(StreamSource sourceStream, Stream - // evaluationStream, int numInstanceSent, DataPoint nextDataPoint) { - // boolean sendEvent = false; - // DataPoint instance = null; - // Clustering gtClustering = null; - // int samplingFrequency = ((ClusteringStream) - // sourceStream.getStream()).getDecayHorizon(); - // if (random.nextDouble() < samplingThreshold) { - // // Add instance - // sendEvent = true; - // instance = nextDataPoint; - // } - // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) { - // // Add GroundTruth - // sendEvent = true; - // gtClustering = ((RandomRBFGeneratorEvents) - // sourceStream.getStream()).getGeneratingClusters(); - // } - // if (sendEvent == true) { - // ClusteringEvaluationContentEvent evalEvent; - // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance, - // false); - // evaluationStream.put(evalEvent); - // } - // } - - public void setMaxNumInstances(int value) { - numberInstances = value; - } - - public int getMaxNumInstances() { - return this.numberInstances; - } - - @Override - public ContentEvent nextEvent() { - - // boolean sendEvent = false; - // DataPoint instance = null; - // Clustering gtClustering = null; - // int samplingFrequency = ((ClusteringStream) - // sourceStream.getStream()).getDecayHorizon(); - // if (random.nextDouble() < samplingThreshold) { - // // Add instance - // sendEvent = true; - // instance = nextDataPoint; - // } - // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) { - // // Add GroundTruth - // sendEvent = true; - // gtClustering = ((RandomRBFGeneratorEvents) - // sourceStream.getStream()).getGeneratingClusters(); - // } - // if (sendEvent == true) { - // ClusteringEvaluationContentEvent evalEvent; - // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance, - // false); - // evaluationStream.put(evalEvent); - // } - - groundTruthSamplingFrequency = ((ClusteringStream) streamSource.getStream()).getDecayHorizon(); // FIXME should it be takend from the ClusteringEvaluation -f option instead? - if (isFinished()) { - // send ending event - ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, firstInstance); - contentEvent.setLast(true); - return contentEvent; - } else { - DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent); - numInstanceSent++; - if (numInstanceSent % groundTruthSamplingFrequency == 0) { - // TODO implement an interface ClusteringGroundTruth with a - // getGeneratingClusters() method, check if the source implements the interface - // send a clustering evaluation event for external measures (distance from the gt clusters) - Clustering gtClustering = ((RandomRBFGeneratorEvents) streamSource.getStream()).getGeneratingClusters(); - return new ClusteringEvaluationContentEvent(gtClustering, nextDataPoint, false); - } else { - ClusteringContentEvent contentEvent = new ClusteringContentEvent(numInstanceSent, nextDataPoint); - if (random.nextDouble() < samplingThreshold) { - // send a clustering content event for internal measures (cohesion, - // separation) - contentEvent.setSample(true); - } - return contentEvent; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java deleted file mode 100644 index 4784ae0..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java +++ /dev/null @@ -1,173 +0,0 @@ -package com.yahoo.labs.samoa.streams; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; - -import com.github.javacliparser.ClassOption; -import com.yahoo.labs.samoa.instances.Instances; -import com.yahoo.labs.samoa.instances.InstancesHeader; -import com.yahoo.labs.samoa.moa.core.InstanceExample; -import com.yahoo.labs.samoa.moa.core.ObjectRepository; -import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler; -import com.yahoo.labs.samoa.moa.streams.InstanceStream; -import com.yahoo.labs.samoa.moa.tasks.TaskMonitor; -import com.yahoo.labs.samoa.streams.fs.FileStreamSource; - -/** - * InstanceStream for files (Abstract class: subclass this class for different file formats) - * - * @author Casey - */ -public abstract class FileStream extends AbstractOptionHandler implements InstanceStream { - /** - * - */ - private static final long serialVersionUID = 3028905554604259130L; - - public ClassOption sourceTypeOption = new ClassOption("sourceType", - 's', "Source Type (HDFS, local FS)", FileStreamSource.class, - "LocalFileStreamSource"); - - protected transient FileStreamSource fileSource; - protected transient Reader fileReader; - protected Instances instances; - - protected boolean hitEndOfStream; - private boolean hasStarted; - - /* - * Constructors - */ - public FileStream() { - this.hitEndOfStream = false; - } - - /* - * implement InstanceStream - */ - @Override - public InstancesHeader getHeader() { - return new InstancesHeader(this.instances); - } - - @Override - public long estimatedRemainingInstances() { - return -1; - } - - @Override - public boolean hasMoreInstances() { - return !this.hitEndOfStream; - } - - @Override - public InstanceExample nextInstance() { - if (this.getLastInstanceRead() == null) { - readNextInstanceFromStream(); - } - InstanceExample prevInstance = this.getLastInstanceRead(); - readNextInstanceFromStream(); - return prevInstance; - } - - @Override - public boolean isRestartable() { - return true; - } - - @Override - public void restart() { - reset(); - hasStarted = false; - } - - protected void reset() { - try { - if (this.fileReader != null) - this.fileReader.close(); - - fileSource.reset(); - } catch (IOException ioe) { - throw new RuntimeException("FileStream restart failed.", ioe); - } - - if (!getNextFileReader()) { - hitEndOfStream = true; - throw new RuntimeException("FileStream is empty."); - } - - this.instances = new Instances(this.fileReader, 1, -1); - this.instances.setClassIndex(this.instances.numAttributes() - 1); - } - - protected boolean getNextFileReader() { - if (this.fileReader != null) - try { - this.fileReader.close(); - } catch (IOException ioe) { - ioe.printStackTrace(); - } - - InputStream inputStream = this.fileSource.getNextInputStream(); - if (inputStream == null) - return false; - - this.fileReader = new BufferedReader(new InputStreamReader(inputStream)); - return true; - } - - protected boolean readNextInstanceFromStream() { - if (!hasStarted) { - this.reset(); - hasStarted = true; - } - - while (true) { - if (readNextInstanceFromFile()) - return true; - - if (!getNextFileReader()) { - this.hitEndOfStream = true; - return false; - } - } - } - - /** - * Read next instance from the current file and assign it to lastInstanceRead. - * - * @return true if it was able to read next instance and false if it was at the end of the file - */ - protected abstract boolean readNextInstanceFromFile(); - - protected abstract InstanceExample getLastInstanceRead(); - - @Override - public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { - this.fileSource = sourceTypeOption.getValue(); - this.hasStarted = false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java deleted file mode 100644 index a16a9d3..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/PrequentialSourceProcessor.java +++ /dev/null @@ -1,232 +0,0 @@ -package com.yahoo.labs.samoa.streams; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.instances.Instance; -import com.yahoo.labs.samoa.instances.Instances; -import com.yahoo.labs.samoa.learners.InstanceContentEvent; -import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler; -import com.yahoo.labs.samoa.moa.streams.InstanceStream; - -/** - * Prequential Source Processor is the processor for Prequential Evaluation Task. - * - * @author Arinto Murdopo - * - */ -public final class PrequentialSourceProcessor implements EntranceProcessor { - - private static final long serialVersionUID = 4169053337917578558L; - - private static final Logger logger = LoggerFactory.getLogger(PrequentialSourceProcessor.class); - private boolean isInited = false; - private StreamSource streamSource; - private Instance firstInstance; - private int numberInstances; - private int numInstanceSent = 0; - - protected InstanceStream sourceStream; - - /* - * ScheduledExecutorService to schedule sending events after each delay - * interval. It is expected to have only one event in the queue at a time, so - * we need only one thread in the pool. - */ - private transient ScheduledExecutorService timer; - private transient ScheduledFuture<?> schedule = null; - private int readyEventIndex = 1; // No waiting for the first event - private int delay = 0; - private int batchSize = 1; - private boolean finished = false; - - @Override - public boolean process(ContentEvent event) { - // TODO: possible refactor of the super-interface implementation - // of source processor does not need this method - return false; - } - - @Override - public boolean isFinished() { - return finished; - } - - @Override - public boolean hasNext() { - return !isFinished() && (delay <= 0 || numInstanceSent < readyEventIndex); - } - - private boolean hasReachedEndOfStream() { - return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && numInstanceSent >= numberInstances)); - } - - @Override - public ContentEvent nextEvent() { - InstanceContentEvent contentEvent = null; - if (hasReachedEndOfStream()) { - contentEvent = new InstanceContentEvent(-1, firstInstance, false, true); - contentEvent.setLast(true); - // set finished status _after_ tagging last event - finished = true; - } - else if (hasNext()) { - numInstanceSent++; - contentEvent = new InstanceContentEvent(numInstanceSent, nextInstance(), true, true); - - // first call to this method will trigger the timer - if (schedule == null && delay > 0) { - schedule = timer.scheduleWithFixedDelay(new DelayTimeoutHandler(this), delay, delay, - TimeUnit.MICROSECONDS); - } - } - return contentEvent; - } - - private void increaseReadyEventIndex() { - readyEventIndex += batchSize; - // if we exceed the max, cancel the timer - if (schedule != null && isFinished()) { - schedule.cancel(false); - } - } - - @Override - public void onCreate(int id) { - initStreamSource(sourceStream); - timer = Executors.newScheduledThreadPool(1); - logger.debug("Creating PrequentialSourceProcessor with id {}", id); - } - - @Override - public Processor newProcessor(Processor p) { - PrequentialSourceProcessor newProcessor = new PrequentialSourceProcessor(); - PrequentialSourceProcessor originProcessor = (PrequentialSourceProcessor) p; - if (originProcessor.getStreamSource() != null) { - newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); - } - return newProcessor; - } - - // /** - // * Method to send instances via input stream - // * - // * @param inputStream - // * @param numberInstances - // */ - // public void sendInstances(Stream inputStream, int numberInstances) { - // int numInstanceSent = 0; - // initStreamSource(sourceStream); - // - // while (streamSource.hasMoreInstances() && numInstanceSent < - // numberInstances) { - // numInstanceSent++; - // InstanceContentEvent contentEvent = new - // InstanceContentEvent(numInstanceSent, nextInstance(), true, true); - // inputStream.put(contentEvent); - // } - // - // sendEndEvaluationInstance(inputStream); - // } - - public StreamSource getStreamSource() { - return streamSource; - } - - public void setStreamSource(InstanceStream stream) { - this.sourceStream = stream; - } - - public Instances getDataset() { - if (firstInstance == null) { - initStreamSource(sourceStream); - } - return firstInstance.dataset(); - } - - private Instance nextInstance() { - if (this.isInited) { - return streamSource.nextInstance().getData(); - } else { - this.isInited = true; - return firstInstance; - } - } - - // private void sendEndEvaluationInstance(Stream inputStream) { - // InstanceContentEvent contentEvent = new InstanceContentEvent(-1, - // firstInstance, false, true); - // contentEvent.setLast(true); - // inputStream.put(contentEvent); - // } - - private void initStreamSource(InstanceStream stream) { - if (stream instanceof AbstractOptionHandler) { - ((AbstractOptionHandler) (stream)).prepareForUse(); - } - - this.streamSource = new StreamSource(stream); - firstInstance = streamSource.nextInstance().getData(); - } - - public void setMaxNumInstances(int value) { - numberInstances = value; - } - - public int getMaxNumInstances() { - return this.numberInstances; - } - - public void setSourceDelay(int delay) { - this.delay = delay; - } - - public int getSourceDelay() { - return this.delay; - } - - public void setDelayBatchSize(int batch) { - this.batchSize = batch; - } - - private class DelayTimeoutHandler implements Runnable { - - private PrequentialSourceProcessor processor; - - public DelayTimeoutHandler(PrequentialSourceProcessor processor) { - this.processor = processor; - } - - public void run() { - processor.increaseReadyEventIndex(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java deleted file mode 100644 index 93dfee8..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSource.java +++ /dev/null @@ -1,92 +0,0 @@ -package com.yahoo.labs.samoa.streams; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * License - */ - -import com.yahoo.labs.samoa.moa.core.Example; -import com.yahoo.labs.samoa.moa.streams.InstanceStream; -import com.yahoo.labs.samoa.instances.Instance; - -/** - * The Class StreamSource. - */ -public class StreamSource implements java.io.Serializable { - - /** - * - */ - private static final long serialVersionUID = 3974668694861231236L; - - /** - * Instantiates a new stream source. - * - * @param stream - * the stream - */ - public StreamSource(InstanceStream stream) { - super(); - this.stream = stream; - } - - /** The stream. */ - protected InstanceStream stream; - - /** - * Gets the stream. - * - * @return the stream - */ - public InstanceStream getStream() { - return stream; - } - - /** - * Next instance. - * - * @return the instance - */ - public Example<Instance> nextInstance() { - return stream.nextInstance(); - } - - /** - * Sets the stream. - * - * @param stream - * the new stream - */ - public void setStream(InstanceStream stream) { - this.stream = stream; - } - - /** - * Checks for more instances. - * - * @return true, if successful - */ - public boolean hasMoreInstances() { - return this.stream.hasMoreInstances(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java deleted file mode 100644 index e40c843..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/StreamSourceProcessor.java +++ /dev/null @@ -1,195 +0,0 @@ -package com.yahoo.labs.samoa.streams; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -/** - * License - */ - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.instances.Instance; -import com.yahoo.labs.samoa.instances.Instances; -import com.yahoo.labs.samoa.learners.InstanceContentEvent; -import com.yahoo.labs.samoa.moa.streams.InstanceStream; -import com.yahoo.labs.samoa.topology.Stream; - -/** - * The Class StreamSourceProcessor. - */ -public class StreamSourceProcessor implements Processor { - - /** The Constant logger. */ - private static final Logger logger = LoggerFactory - .getLogger(StreamSourceProcessor.class); - - /** - * - */ - private static final long serialVersionUID = -204182279475432739L; - - /** The stream source. */ - private StreamSource streamSource; - - /** - * Gets the stream source. - * - * @return the stream source - */ - public StreamSource getStreamSource() { - return streamSource; - } - - /** - * Sets the stream source. - * - * @param stream - * the new stream source - */ - public void setStreamSource(InstanceStream stream) { - this.streamSource = new StreamSource(stream); - firstInstance = streamSource.nextInstance().getData(); - } - - /** The number instances sent. */ - private long numberInstancesSent = 0; - - /** - * Send instances. - * - * @param inputStream - * the input stream - * @param numberInstances - * the number instances - * @param isTraining - * the is training - */ - public void sendInstances(Stream inputStream, - int numberInstances, boolean isTraining, boolean isTesting) { - int numberSamples = 0; - - while (streamSource.hasMoreInstances() - && numberSamples < numberInstances) { - - numberSamples++; - numberInstancesSent++; - InstanceContentEvent instanceContentEvent = new InstanceContentEvent( - numberInstancesSent, nextInstance(), isTraining, isTesting); - - inputStream.put(instanceContentEvent); - } - - InstanceContentEvent instanceContentEvent = new InstanceContentEvent( - numberInstancesSent, null, isTraining, isTesting); - instanceContentEvent.setLast(true); - inputStream.put(instanceContentEvent); - } - - /** - * Send end evaluation instance. - * - * @param inputStream - * the input stream - */ - public void sendEndEvaluationInstance(Stream inputStream) { - InstanceContentEvent instanceContentEvent = new InstanceContentEvent(-1, firstInstance, false, true); - inputStream.put(instanceContentEvent); - } - - /** - * Next instance. - * - * @return the instance - */ - protected Instance nextInstance() { - if (this.isInited) { - return streamSource.nextInstance().getData(); - } else { - this.isInited = true; - return firstInstance; - } - } - - /** The is inited. */ - protected boolean isInited = false; - - /** The first instance. */ - protected Instance firstInstance; - - // @Override - /** - * On remove. - */ - protected void onRemove() { - } - - /* - * (non-Javadoc) - * - * @see samoa.core.Processor#onCreate(int) - */ - @Override - public void onCreate(int id) { - // TODO Auto-generated method stub - } - - /* - * (non-Javadoc) - * - * @see samoa.core.Processor#newProcessor(samoa.core.Processor) - */ - @Override - public Processor newProcessor(Processor sourceProcessor) { - // StreamSourceProcessor newProcessor = new StreamSourceProcessor(); - // StreamSourceProcessor originProcessor = (StreamSourceProcessor) - // sourceProcessor; - // if (originProcessor.getStreamSource() != null){ - // newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); - // } - // return newProcessor; - return null; - } - - /** - * On event. - * - * @param event - * the event - * @return true, if successful - */ - @Override - public boolean process(ContentEvent event) { - return false; - } - - /** - * Gets the dataset. - * - * @return the dataset - */ - public Instances getDataset() { - return firstInstance.dataset(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java deleted file mode 100644 index d14ebfc..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/FileStreamSource.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.yahoo.labs.samoa.streams.fs; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.InputStream; -import java.io.IOException; -import java.io.Serializable; - -/** - * An interface for FileStream's source (Local FS, HDFS,...) - * - * @author Casey - */ -public interface FileStreamSource extends Serializable { - - /** - * Init the source with file/directory path and file extension - * - * @param path - * File or directory path - * @param ext - * File extension to be used to filter files in a directory. If null, all files in the directory are - * accepted. - */ - public void init(String path, String ext); - - /** - * Reset the source - */ - public void reset() throws IOException; - - /** - * Retrieve InputStream for next file. This method will return null if we are at the last file in the list. - * - * @return InputStream for next file in the list - */ - public InputStream getNextInputStream(); - - /** - * Retrieve InputStream for current file. The "current pointer" is moved forward with getNextInputStream method. So if - * there was no invocation of getNextInputStream, this method will return null. - * - * @return InputStream for current file in the list - */ - public InputStream getCurrentInputStream(); -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java deleted file mode 100644 index 3fabcc7..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSource.java +++ /dev/null @@ -1,150 +0,0 @@ -package com.yahoo.labs.samoa.streams.fs; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.FileSystems; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; - -/** - * Source for FileStream for HDFS files - * - * @author Casey - */ -public class HDFSFileStreamSource implements FileStreamSource { - - /** - * - */ - private static final long serialVersionUID = -3887354805787167400L; - - private transient InputStream fileStream; - private transient Configuration config; - private List<String> filePaths; - private int currentIndex; - - public HDFSFileStreamSource() { - this.currentIndex = -1; - } - - public void init(String path, String ext) { - this.init(this.getDefaultConfig(), path, ext); - } - - public void init(Configuration config, String path, String ext) { - this.config = config; - this.filePaths = new ArrayList<String>(); - Path hdfsPath = new Path(path); - FileSystem fs; - try { - fs = FileSystem.get(config); - FileStatus fileStat = fs.getFileStatus(hdfsPath); - if (fileStat.isDirectory()) { - Path filterPath = hdfsPath; - if (ext != null) { - filterPath = new Path(path.toString(), "*." + ext); - } - else { - filterPath = new Path(path.toString(), "*"); - } - FileStatus[] filesInDir = fs.globStatus(filterPath); - for (int i = 0; i < filesInDir.length; i++) { - if (filesInDir[i].isFile()) { - filePaths.add(filesInDir[i].getPath().toString()); - } - } - } - else { - this.filePaths.add(path); - } - } catch (IOException ioe) { - throw new RuntimeException("Failed getting list of files at:" + path, ioe); - } - - this.currentIndex = -1; - } - - private Configuration getDefaultConfig() { - String hadoopHome = System.getenv("HADOOP_HOME"); - Configuration conf = new Configuration(); - if (hadoopHome != null) { - java.nio.file.Path coreSitePath = FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/core-site.xml"); - java.nio.file.Path hdfsSitePath = FileSystems.getDefault().getPath(hadoopHome, "etc/hadoop/hdfs-site.xml"); - conf.addResource(new Path(coreSitePath.toAbsolutePath().toString())); - conf.addResource(new Path(hdfsSitePath.toAbsolutePath().toString())); - } - return conf; - } - - public void reset() throws IOException { - this.currentIndex = -1; - this.closeFileStream(); - } - - private void closeFileStream() { - IOUtils.closeStream(fileStream); - } - - public InputStream getNextInputStream() { - this.closeFileStream(); - if (this.currentIndex >= (this.filePaths.size() - 1)) - return null; - - this.currentIndex++; - String filePath = this.filePaths.get(currentIndex); - - Path hdfsPath = new Path(filePath); - FileSystem fs; - try { - fs = FileSystem.get(config); - fileStream = fs.open(hdfsPath); - } catch (IOException ioe) { - this.closeFileStream(); - throw new RuntimeException("Failed opening file:" + filePath, ioe); - } - - return fileStream; - } - - public InputStream getCurrentInputStream() { - return fileStream; - } - - protected int getFilePathListSize() { - if (filePaths != null) - return filePaths.size(); - return 0; - } - - protected String getFilePathAt(int index) { - if (filePaths != null && filePaths.size() > index) - return filePaths.get(index); - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java deleted file mode 100644 index b9d9b9e..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSource.java +++ /dev/null @@ -1,133 +0,0 @@ -package com.yahoo.labs.samoa.streams.fs; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.File; -import java.io.FileInputStream; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.FileSystems; -import java.util.ArrayList; -import java.util.List; - -/** - * Source for FileStream for local files - * - * @author Casey - */ -public class LocalFileStreamSource implements FileStreamSource { - /** - * - */ - private static final long serialVersionUID = 3986511547525870698L; - - private transient InputStream fileStream; - private List<String> filePaths; - private int currentIndex; - - public LocalFileStreamSource() { - this.currentIndex = -1; - } - - public void init(String path, String ext) { - this.filePaths = new ArrayList<String>(); - File fileAtPath = new File(path); - if (fileAtPath.isDirectory()) { - File[] filesInDir = fileAtPath.listFiles(new FileExtensionFilter(ext)); - for (int i = 0; i < filesInDir.length; i++) { - filePaths.add(filesInDir[i].getAbsolutePath()); - } - } - else { - this.filePaths.add(path); - } - this.currentIndex = -1; - } - - public void reset() throws IOException { - this.currentIndex = -1; - this.closeFileStream(); - } - - private void closeFileStream() { - if (fileStream != null) { - try { - fileStream.close(); - } catch (IOException ioe) { - ioe.printStackTrace(); - } - } - } - - public InputStream getNextInputStream() { - this.closeFileStream(); - - if (this.currentIndex >= (this.filePaths.size() - 1)) - return null; - - this.currentIndex++; - String filePath = this.filePaths.get(currentIndex); - - File file = new File(filePath); - try { - fileStream = new FileInputStream(file); - } catch (IOException ioe) { - this.closeFileStream(); - throw new RuntimeException("Failed opening file:" + filePath, ioe); - } - - return fileStream; - } - - public InputStream getCurrentInputStream() { - return fileStream; - } - - protected int getFilePathListSize() { - if (filePaths != null) - return filePaths.size(); - return 0; - } - - protected String getFilePathAt(int index) { - if (filePaths != null && filePaths.size() > index) - return filePaths.get(index); - return null; - } - - private class FileExtensionFilter implements FilenameFilter { - private String extension; - - FileExtensionFilter(String ext) { - extension = ext; - } - - @Override - public boolean accept(File dir, String name) { - File f = new File(dir, name); - if (extension == null) - return f.isFile(); - else - return f.isFile() && name.toLowerCase().endsWith("." + extension); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java deleted file mode 100644 index d906b9f..0000000 --- a/samoa-api/src/main/java/com/yahoo/labs/samoa/tasks/ClusteringEvaluation.java +++ /dev/null @@ -1,186 +0,0 @@ -package com.yahoo.labs.samoa.tasks; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -import java.text.SimpleDateFormat; -import java.util.Date; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.javacliparser.ClassOption; -import com.github.javacliparser.Configurable; -import com.github.javacliparser.FileOption; -import com.github.javacliparser.FloatOption; -import com.github.javacliparser.IntOption; -import com.github.javacliparser.StringOption; -import com.yahoo.labs.samoa.evaluation.ClusteringEvaluatorProcessor; -import com.yahoo.labs.samoa.learners.Learner; -import com.yahoo.labs.samoa.learners.clusterers.simple.ClusteringDistributorProcessor; -import com.yahoo.labs.samoa.learners.clusterers.simple.DistributedClusterer; -import com.yahoo.labs.samoa.moa.streams.InstanceStream; -import com.yahoo.labs.samoa.moa.streams.clustering.ClusteringStream; -import com.yahoo.labs.samoa.moa.streams.clustering.RandomRBFGeneratorEvents; -import com.yahoo.labs.samoa.streams.ClusteringEntranceProcessor; -import com.yahoo.labs.samoa.topology.ComponentFactory; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.topology.Topology; -import com.yahoo.labs.samoa.topology.TopologyBuilder; - -/** - * A task that runs and evaluates a distributed clustering algorithm. - * - */ -public class ClusteringEvaluation implements Task, Configurable { - - private static final long serialVersionUID = -8246537378371580550L; - - private static final int DISTRIBUTOR_PARALLELISM = 1; - - private static final Logger logger = LoggerFactory.getLogger(ClusteringEvaluation.class); - - public ClassOption learnerOption = new ClassOption("learner", 'l', "Clustering to run.", Learner.class, - DistributedClusterer.class.getName()); - - public ClassOption streamTrainOption = new ClassOption("streamTrain", 's', "Input stream.", InstanceStream.class, - RandomRBFGeneratorEvents.class.getName()); - - public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i', - "Maximum number of instances to test/train on (-1 = no limit).", 100000, -1, - Integer.MAX_VALUE); - - public IntOption measureCollectionTypeOption = new IntOption("measureCollectionType", 'm', - "Type of measure collection", 0, 0, Integer.MAX_VALUE); - - public IntOption timeLimitOption = new IntOption("timeLimit", 't', - "Maximum number of seconds to test/train for (-1 = no limit).", -1, -1, - Integer.MAX_VALUE); - - public IntOption sampleFrequencyOption = new IntOption("sampleFrequency", 'f', - "How many instances between samples of the learning performance.", 1000, 0, - Integer.MAX_VALUE); - - public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", - "Clustering__" - + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); - - public FileOption dumpFileOption = new FileOption("dumpFile", 'd', "File to append intermediate csv results to", - null, "csv", true); - - public FloatOption samplingThresholdOption = new FloatOption("samplingThreshold", 'a', - "Ratio of instances sampled that will be used for evaluation.", 0.5, - 0.0, 1.0); - - private ClusteringEntranceProcessor source; - private InstanceStream streamTrain; - private ClusteringDistributorProcessor distributor; - private Stream distributorStream; - private Stream evaluationStream; - - // Default=0: no delay/waiting - public IntOption sourceDelayOption = new IntOption("sourceDelay", 'w', - "How many miliseconds between injections of two instances.", 0, 0, Integer.MAX_VALUE); - - private Learner learner; - private ClusteringEvaluatorProcessor evaluator; - - private Topology topology; - private TopologyBuilder builder; - - public void getDescription(StringBuilder sb) { - sb.append("Clustering evaluation"); - } - - @Override - public void init() { - // TODO remove the if statement theoretically, dynamic binding will work - // here! for now, the if statement is used by Storm - - if (builder == null) { - logger.warn("Builder was not initialized, initializing it from the Task"); - - builder = new TopologyBuilder(); - logger.debug("Successfully instantiating TopologyBuilder"); - - builder.initTopology(evaluationNameOption.getValue(), sourceDelayOption.getValue()); - logger.debug("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); - } - - // instantiate ClusteringEntranceProcessor and its output stream - // (sourceStream) - source = new ClusteringEntranceProcessor(); - streamTrain = this.streamTrainOption.getValue(); - source.setStreamSource(streamTrain); - builder.addEntranceProcessor(source); - source.setSamplingThreshold(samplingThresholdOption.getValue()); - source.setMaxNumInstances(instanceLimitOption.getValue()); - logger.debug("Successfully instantiated ClusteringEntranceProcessor"); - - Stream sourceStream = builder.createStream(source); - // starter.setInputStream(sourcePiOutputStream); // FIXME set stream in the - // EntrancePI - - // distribution of instances and sampling for evaluation - distributor = new ClusteringDistributorProcessor(); - builder.addProcessor(distributor, DISTRIBUTOR_PARALLELISM); - builder.connectInputShuffleStream(sourceStream, distributor); - distributorStream = builder.createStream(distributor); - distributor.setOutputStream(distributorStream); - evaluationStream = builder.createStream(distributor); - distributor.setEvaluationStream(evaluationStream); // passes evaluation events along - logger.debug("Successfully instantiated Distributor"); - - // instantiate learner and connect it to distributorStream - learner = this.learnerOption.getValue(); - learner.init(builder, source.getDataset(), 1); - builder.connectInputShuffleStream(distributorStream, learner.getInputProcessor()); - logger.debug("Successfully instantiated Learner"); - - evaluator = new ClusteringEvaluatorProcessor.Builder( - sampleFrequencyOption.getValue()).dumpFile(dumpFileOption.getFile()) - .decayHorizon(((ClusteringStream) streamTrain).getDecayHorizon()).build(); - - builder.addProcessor(evaluator); - for (Stream evaluatorPiInputStream : learner.getResultStreams()) { - builder.connectInputShuffleStream(evaluatorPiInputStream, evaluator); - } - builder.connectInputAllStream(evaluationStream, evaluator); - logger.debug("Successfully instantiated EvaluatorProcessor"); - - topology = builder.build(); - logger.debug("Successfully built the topology"); - } - - @Override - public void setFactory(ComponentFactory factory) { - // TODO unify this code with init() for now, it's used by S4 App - // dynamic binding theoretically will solve this problem - builder = new TopologyBuilder(factory); - logger.debug("Successfully instantiated TopologyBuilder"); - - builder.initTopology(evaluationNameOption.getValue()); - logger.debug("Successfully initialized SAMOA topology with name {}", evaluationNameOption.getValue()); - - } - - public Topology getTopology() { - return topology; - } -}
