http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/clustering/RandomRBFGeneratorEvents.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/clustering/RandomRBFGeneratorEvents.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/clustering/RandomRBFGeneratorEvents.java new file mode 100644 index 0000000..a65d9a1 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/clustering/RandomRBFGeneratorEvents.java @@ -0,0 +1,975 @@ +package com.yahoo.labs.samoa.moa.streams.clustering; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2010 RWTH Aachen University, Germany + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.LinkedList; +import java.util.Random; +import java.util.Vector; + +import com.yahoo.labs.samoa.moa.cluster.Clustering; +import com.yahoo.labs.samoa.moa.cluster.SphereCluster; +import com.yahoo.labs.samoa.moa.core.AutoExpandVector; +import com.yahoo.labs.samoa.moa.core.InstanceExample; +import com.yahoo.labs.samoa.instances.InstancesHeader; +import com.yahoo.labs.samoa.moa.core.ObjectRepository; +import com.yahoo.labs.samoa.moa.core.DataPoint; +import com.github.javacliparser.FlagOption; +import com.github.javacliparser.FloatOption; +import com.github.javacliparser.IntOption; +import com.yahoo.labs.samoa.moa.streams.InstanceStream; +import com.yahoo.labs.samoa.moa.tasks.TaskMonitor; +import com.yahoo.labs.samoa.instances.Attribute; +import com.yahoo.labs.samoa.instances.DenseInstance; +import com.yahoo.labs.samoa.moa.core.FastVector; +import com.yahoo.labs.samoa.instances.Instance; +import com.yahoo.labs.samoa.instances.Instances; + + +public class RandomRBFGeneratorEvents extends ClusteringStream { + private transient Vector listeners; + + private static final long serialVersionUID = 1L; + + public IntOption modelRandomSeedOption = new IntOption("modelRandomSeed", + 'm', "Seed for random generation of model.", 1); + + public IntOption instanceRandomSeedOption = new IntOption( + "instanceRandomSeed", 'i', + "Seed for random generation of instances.", 5); + + public IntOption numClusterOption = new IntOption("numCluster", 'K', + "The average number of centroids in the model.", 5, 1, Integer.MAX_VALUE); + + public IntOption numClusterRangeOption = new IntOption("numClusterRange", 'k', + "Deviation of the number of centroids in the model.", 3, 0, Integer.MAX_VALUE); + + public FloatOption kernelRadiiOption = new FloatOption("kernelRadius", 'R', + "The average radii of the centroids in the model.", 0.07, 0, 1); + + public FloatOption kernelRadiiRangeOption = new FloatOption("kernelRadiusRange", 'r', + "Deviation of average radii of the centroids in the model.", 0, 0, 1); + + public FloatOption densityRangeOption = new FloatOption("densityRange", 'd', + "Offset of the average weight a cluster has. Value of 0 means all cluster " + + "contain the same amount of points.", 0, 0, 1); + + public IntOption speedOption = new IntOption("speed", 'V', + "Kernels move a predefined distance of 0.01 every X points", 500, 1, Integer.MAX_VALUE); + + public IntOption speedRangeOption = new IntOption("speedRange", 'v', + "Speed/Velocity point offset", 0, 0, Integer.MAX_VALUE); + + public FloatOption noiseLevelOption = new FloatOption("noiseLevel", 'N', + "Noise level", 0.1, 0, 1); + + public FlagOption noiseInClusterOption = new FlagOption("noiseInCluster", 'n', + "Allow noise to be placed within a cluster"); + + public IntOption eventFrequencyOption = new IntOption("eventFrequency", 'E', + "Event frequency. Enable at least one of the events below and set numClusterRange!", 30000, 0, Integer.MAX_VALUE); + + public FlagOption eventMergeSplitOption = new FlagOption("eventMergeSplitOption", 'M', + "Enable merging and splitting of clusters. Set eventFrequency and numClusterRange!"); + + public FlagOption eventDeleteCreateOption = new FlagOption("eventDeleteCreate", 'C', + "Enable emering and disapperaing of clusters. Set eventFrequency and numClusterRange!"); + + + private double merge_threshold = 0.7; + private int kernelMovePointFrequency = 10; + private double maxDistanceMoveThresholdByStep = 0.01; + private int maxOverlapFitRuns = 50; + private double eventFrequencyRange = 0; + + private boolean debug = false; + + private AutoExpandVector<GeneratorCluster> kernels; + protected Random instanceRandom; + protected InstancesHeader streamHeader; + private int numGeneratedInstances; + private int numActiveKernels; + private int nextEventCounter; + private int nextEventChoice = -1; + private int clusterIdCounter; + private GeneratorCluster mergeClusterA; + private GeneratorCluster mergeClusterB; + private boolean mergeKernelsOverlapping = false; + + + + private class GeneratorCluster implements Serializable{ + //TODO: points is redundant to microclusterpoints, we need to come + //up with a good strategy that microclusters get updated and + //rebuild if needed. Idea: Sort microclusterpoints by timestamp and let + // microclusterdecay hold the timestamp for when the last point in a + //microcluster gets kicked, then we rebuild... or maybe not... could be + //same as searching for point to be kicked. more likely is we rebuild + //fewer times then insert. + + private static final long serialVersionUID = -6301649898961112942L; + + SphereCluster generator; + int kill = -1; + boolean merging = false; + double[] moveVector; + int totalMovementSteps; + int currentMovementSteps; + boolean isSplitting = false; + + LinkedList<DataPoint> points = new LinkedList<DataPoint>(); + ArrayList<SphereCluster> microClusters = new ArrayList<SphereCluster>(); + ArrayList<ArrayList<DataPoint>> microClustersPoints = new ArrayList(); + ArrayList<Integer> microClustersDecay = new ArrayList(); + + + public GeneratorCluster(int label) { + boolean outofbounds = true; + int tryCounter = 0; + while(outofbounds && tryCounter < maxOverlapFitRuns){ + tryCounter++; + outofbounds = false; + double[] center = new double [numAttsOption.getValue()]; + double radius = kernelRadiiOption.getValue()+(instanceRandom.nextBoolean()?-1:1)*kernelRadiiRangeOption.getValue()*instanceRandom.nextDouble(); + while(radius <= 0){ + radius = kernelRadiiOption.getValue()+(instanceRandom.nextBoolean()?-1:1)*kernelRadiiRangeOption.getValue()*instanceRandom.nextDouble(); + } + for (int j = 0; j < numAttsOption.getValue(); j++) { + center[j] = instanceRandom.nextDouble(); + if(center[j]- radius < 0 || center[j] + radius > 1){ + outofbounds = true; + break; + } + } + generator = new SphereCluster(center, radius); + } + if(tryCounter < maxOverlapFitRuns){ + generator.setId(label); + double avgWeight = 1.0/numClusterOption.getValue(); + double weight = avgWeight + (instanceRandom.nextBoolean()?-1:1)*avgWeight*densityRangeOption.getValue()*instanceRandom.nextDouble(); + generator.setWeight(weight); + setDesitnation(null); + } + else{ + generator = null; + kill = 0; + System.out.println("Tried "+maxOverlapFitRuns+" times to create kernel. Reduce average radii." ); + } + } + + public GeneratorCluster(int label, SphereCluster cluster) { + this.generator = cluster; + cluster.setId(label); + setDesitnation(null); + } + + public int getWorkID(){ + for(int c = 0; c < kernels.size(); c++){ + if(kernels.get(c).equals(this)) + return c; + } + return -1; + } + + private void updateKernel(){ + if(kill == 0){ + kernels.remove(this); + } + if(kill > 0){ + kill--; + } + //we could be lot more precise if we would keep track of timestamps of points + //then we could remove all old points and rebuild the cluster on up to date point base + //BUT worse the effort??? so far we just want to avoid overlap with this, so its more + //konservative as needed. Only needs to change when we need a thighter representation + for (int m = 0; m < microClusters.size(); m++) { + if(numGeneratedInstances-microClustersDecay.get(m) > decayHorizonOption.getValue()){ + microClusters.remove(m); + microClustersPoints.remove(m); + microClustersDecay.remove(m); + } + } + + if(!points.isEmpty() && numGeneratedInstances-points.getFirst().getTimestamp() >= decayHorizonOption.getValue()){ +// if(debug) +// System.out.println("Cleaning up macro cluster "+generator.getId()); + points.removeFirst(); + } + + } + + private void addInstance(Instance instance){ + DataPoint point = new DataPoint(instance, numGeneratedInstances); + points.add(point); + + int minMicroIndex = -1; + double minHullDist = Double.MAX_VALUE; + boolean inserted = false; + //we favour more recently build clusters so we can remove earlier cluster sooner + for (int m = microClusters.size()-1; m >=0 ; m--) { + SphereCluster micro = microClusters.get(m); + double hulldist = micro.getCenterDistance(point)-micro.getRadius(); + //point fits into existing cluster + if(hulldist <= 0){ + microClustersPoints.get(m).add(point); + microClustersDecay.set(m, numGeneratedInstances); + inserted = true; + break; + } + //if not, check if its at least the closest cluster + else{ + if(hulldist < minHullDist){ + minMicroIndex = m; + minHullDist = hulldist; + } + } + } + //Reseting index choice for alternative cluster building + int alt = 1; + if(alt == 1) + minMicroIndex = -1; + if(!inserted){ + //add to closest cluster and expand cluster + if(minMicroIndex!=-1){ + microClustersPoints.get(minMicroIndex).add(point); + //we should keep the miniball instances and just check in + //new points instead of rebuilding the whole thing + SphereCluster s = new SphereCluster(microClustersPoints.get(minMicroIndex),numAttsOption.getValue()); + //check if current microcluster is bigger then generating cluster + if(s.getRadius() > generator.getRadius()){ + //remove previously added point + microClustersPoints.get(minMicroIndex).remove(microClustersPoints.get(minMicroIndex).size()-1); + minMicroIndex = -1; + } + else{ + microClusters.set(minMicroIndex, s); + microClustersDecay.set(minMicroIndex, numGeneratedInstances); + } + } + //minMicroIndex might have been reset above + //create new micro cluster + if(minMicroIndex == -1){ + ArrayList<DataPoint> microPoints = new ArrayList<DataPoint>(); + microPoints.add(point); + SphereCluster s; + if(alt == 0) + s = new SphereCluster(microPoints,numAttsOption.getValue()); + else + s = new SphereCluster(generator.getCenter(),generator.getRadius(),1); + + microClusters.add(s); + microClustersPoints.add(microPoints); + microClustersDecay.add(numGeneratedInstances); + int id = 0; + while(id < kernels.size()){ + if(kernels.get(id) == this) + break; + id++; + } + s.setGroundTruth(id); + } + } + + } + + + private void move(){ + if(currentMovementSteps < totalMovementSteps){ + currentMovementSteps++; + if( moveVector == null){ + return; + } + else{ + double[] center = generator.getCenter(); + boolean outofbounds = true; + while(outofbounds){ + double radius = generator.getRadius(); + outofbounds = false; + center = generator.getCenter(); + for ( int d = 0; d < center.length; d++ ) { + center[d]+= moveVector[d]; + if(center[d]- radius < 0 || center[d] + radius > 1){ + outofbounds = true; + setDesitnation(null); + break; + } + } + } + generator.setCenter(center); + } + } + else{ + if(!merging){ + setDesitnation(null); + isSplitting = false; + } + } + } + + void setDesitnation(double[] destination){ + + if(destination == null){ + destination = new double [numAttsOption.getValue()]; + for (int j = 0; j < numAttsOption.getValue(); j++) { + destination[j] = instanceRandom.nextDouble(); + } + } + double[] center = generator.getCenter(); + int dim = center.length; + + double[] v = new double[dim]; + + for ( int d = 0; d < dim; d++ ) { + v[d]=destination[d]-center[d]; + } + setMoveVector(v); + } + + void setMoveVector(double[] vector){ + //we are ignoring the steps, otherwise we have to change + //speed of the kernels, do we want that? + moveVector = vector; + int speedInPoints = speedOption.getValue(); + if(speedRangeOption.getValue() > 0) + speedInPoints +=(instanceRandom.nextBoolean()?-1:1)*instanceRandom.nextInt(speedRangeOption.getValue()); + if(speedInPoints < 1) speedInPoints = speedOption.getValue(); + + + double length = 0; + for ( int d = 0; d < moveVector.length; d++ ) { + length+=Math.pow(vector[d],2); + } + length = Math.sqrt(length); + + totalMovementSteps = (int)(length/(maxDistanceMoveThresholdByStep*kernelMovePointFrequency)*speedInPoints); + for ( int d = 0; d < moveVector.length; d++ ) { + moveVector[d]/=(double)totalMovementSteps; + } + + + currentMovementSteps = 0; +// if(debug){ +// System.out.println("Setting new direction for C"+generator.getId()+": distance " +// +length+" in "+totalMovementSteps+" steps"); +// } + } + + private String tryMerging(GeneratorCluster merge){ + String message = ""; + double overlapDegree = generator.overlapRadiusDegree(merge.generator); + if(overlapDegree > merge_threshold){ + SphereCluster mcluster = merge.generator; + double radius = Math.max(generator.getRadius(), mcluster.getRadius()); + generator.combine(mcluster); + +// //adjust radius, get bigger and bigger with high dim data + generator.setRadius(radius); +// double[] center = generator.getCenter(); +// double[] mcenter = mcluster.getCenter(); +// double weight = generator.getWeight(); +// double mweight = generator.getWeight(); +//// for (int i = 0; i < center.length; i++) { +//// center[i] = (center[i] * weight + mcenter[i] * mweight) / (mweight + weight); +//// } +// generator.setWeight(weight + mweight); + message = "Clusters merging: "+mergeClusterB.generator.getId()+" into "+mergeClusterA.generator.getId(); + + //clean up and restet merging stuff + //mark kernel so it gets killed when it doesn't contain any more instances + merge.kill = decayHorizonOption.getValue(); + //set weight to 0 so no new instances will be created in the cluster + mcluster.setWeight(0.0); + normalizeWeights(); + numActiveKernels--; + mergeClusterB = mergeClusterA = null; + merging = false; + mergeKernelsOverlapping = false; + } + else{ + if(overlapDegree > 0 && !mergeKernelsOverlapping){ + mergeKernelsOverlapping = true; + message = "Merge overlapping started"; + } + } + return message; + } + + private String splitKernel(){ + isSplitting = true; + //todo radius range + double radius = kernelRadiiOption.getValue(); + double avgWeight = 1.0/numClusterOption.getValue(); + double weight = avgWeight + avgWeight*densityRangeOption.getValue()*instanceRandom.nextDouble(); + SphereCluster spcluster = null; + + double[] center = generator.getCenter(); + spcluster = new SphereCluster(center, radius, weight); + + if(spcluster !=null){ + GeneratorCluster gc = new GeneratorCluster(clusterIdCounter++, spcluster); + gc.isSplitting = true; + kernels.add(gc); + normalizeWeights(); + numActiveKernels++; + return "Split from Kernel "+generator.getId(); + } + else{ + System.out.println("Tried to split new kernel from C"+generator.getId()+ + ". Not enough room for new cluster, decrease average radii, number of clusters or enable overlap."); + return ""; + } + } + + private String fadeOut(){ + kill = decayHorizonOption.getValue(); + generator.setWeight(0.0); + numActiveKernels--; + normalizeWeights(); + return "Fading out C"+generator.getId(); + } + + + } + + public RandomRBFGeneratorEvents() { + noiseInClusterOption.set(); +// eventDeleteCreateOption.set(); +// eventMergeSplitOption.set(); + } + + public InstancesHeader getHeader() { + return streamHeader; + } + + public long estimatedRemainingInstances() { + return -1; + } + + public boolean hasMoreInstances() { + return true; + } + + public boolean isRestartable() { + return true; + } + + @Override + public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { + monitor.setCurrentActivity("Preparing random RBF...", -1.0); + generateHeader(); + restart(); + } + + public void restart() { + instanceRandom = new Random(instanceRandomSeedOption.getValue()); + nextEventCounter = eventFrequencyOption.getValue(); + nextEventChoice = getNextEvent(); + numActiveKernels = 0; + numGeneratedInstances = 0; + clusterIdCounter = 0; + mergeClusterA = mergeClusterB = null; + kernels = new AutoExpandVector<GeneratorCluster>(); + + initKernels(); + } + + protected void generateHeader() { // 2013/06/02: Noise label + ArrayList<Attribute> attributes = new ArrayList<Attribute>(); + for (int i = 0; i < this.numAttsOption.getValue(); i++) { + attributes.add(new Attribute("att" + (i + 1))); + } + + ArrayList<String> classLabels = new ArrayList<String>(); + for (int i = 0; i < this.numClusterOption.getValue(); i++) { + classLabels.add("class" + (i + 1)); + } + if (noiseLevelOption.getValue() > 0) classLabels.add("noise"); // The last label = "noise" + + attributes.add(new Attribute("class", classLabels)); + streamHeader = new InstancesHeader(new Instances(getCLICreationString(InstanceStream.class), attributes, 0)); + streamHeader.setClassIndex(streamHeader.numAttributes() - 1); + } + + + protected void initKernels() { + for (int i = 0; i < numClusterOption.getValue(); i++) { + kernels.add(new GeneratorCluster(clusterIdCounter)); + numActiveKernels++; + clusterIdCounter++; + } + normalizeWeights(); + } + + public InstanceExample nextInstance() { + numGeneratedInstances++; + eventScheduler(); + + //make room for the classlabel + double[] values_new = new double [numAttsOption.getValue()]; //+1 + double[] values = null; + int clusterChoice = -1; + + if(instanceRandom.nextDouble() > noiseLevelOption.getValue()){ + clusterChoice = chooseWeightedElement(); + values = kernels.get(clusterChoice).generator.sample(instanceRandom).toDoubleArray(); + } + else{ + //get ranodm noise point + values = getNoisePoint(); + } + + if(Double.isNaN(values[0])){ + System.out.println("Instance corrupted:"+numGeneratedInstances); + } + System.arraycopy(values, 0, values_new, 0, values.length); + + Instance inst = new DenseInstance(1.0, values_new); + inst.setDataset(getHeader()); + if(clusterChoice == -1){ + // 2013/06/02 (Yunsu Kim) + // Noise instance has the last class value instead of "-1" + // Preventing ArrayIndexOutOfBoundsException in WriteStreamToARFFFile + inst.setClassValue(numClusterOption.getValue()); + } + else{ + inst.setClassValue(kernels.get(clusterChoice).generator.getId()); + //Do we need micro cluster representation if have overlapping clusters? + //if(!overlappingOption.isSet()) + kernels.get(clusterChoice).addInstance(inst); + } +// System.out.println(numGeneratedInstances+": Overlap is"+updateOverlaps()); + + return new InstanceExample(inst); + } + + + public Clustering getGeneratingClusters(){ + Clustering clustering = new Clustering(); + for (int c = 0; c < kernels.size(); c++) { + clustering.add(kernels.get(c).generator); + } + return clustering; + } + + public Clustering getMicroClustering(){ + Clustering clustering = new Clustering(); + int id = 0; + + for (int c = 0; c < kernels.size(); c++) { + for (int m = 0; m < kernels.get(c).microClusters.size(); m++) { + kernels.get(c).microClusters.get(m).setId(id); + kernels.get(c).microClusters.get(m).setGroundTruth(kernels.get(c).generator.getId()); + clustering.add(kernels.get(c).microClusters.get(m)); + id++; + } + } + + //System.out.println("numMicroKernels "+clustering.size()); + return clustering; + } + +/**************************** EVENTS ******************************************/ + private void eventScheduler(){ + + for ( int i = 0; i < kernels.size(); i++ ) { + kernels.get(i).updateKernel(); + } + + nextEventCounter--; + //only move kernels every 10 points, performance reasons???? + //should this be randomized as well??? + if(nextEventCounter%kernelMovePointFrequency == 0){ + //move kernels + for ( int i = 0; i < kernels.size(); i++ ) { + kernels.get(i).move(); + //overlapControl(); + } + } + + + if(eventFrequencyOption.getValue() == 0){ + return; + } + + String type =""; + String message =""; + boolean eventFinished = false; + switch(nextEventChoice){ + case 0: + if(numActiveKernels > 1 && numActiveKernels > numClusterOption.getValue() - numClusterRangeOption.getValue()){ + message = mergeKernels(nextEventCounter); + type = "Merge"; + } + if(mergeClusterA==null && mergeClusterB==null && message.startsWith("Clusters merging")){ + eventFinished = true; + } + break; + case 1: + if(nextEventCounter<=0){ + if(numActiveKernels < numClusterOption.getValue() + numClusterRangeOption.getValue()){ + type = "Split"; + message = splitKernel(); + } + eventFinished = true; + } + break; + case 2: + if(nextEventCounter<=0){ + if(numActiveKernels > 1 && numActiveKernels > numClusterOption.getValue() - numClusterRangeOption.getValue()){ + message = fadeOut(); + type = "Delete"; + } + eventFinished = true; + } + break; + case 3: + if(nextEventCounter<=0){ + if(numActiveKernels < numClusterOption.getValue() + numClusterRangeOption.getValue()){ + message = fadeIn(); + type = "Create"; + } + eventFinished = true; + } + break; + + } + if (eventFinished){ + nextEventCounter = (int)(eventFrequencyOption.getValue()+(instanceRandom.nextBoolean()?-1:1)*eventFrequencyOption.getValue()*eventFrequencyRange*instanceRandom.nextDouble()); + nextEventChoice = getNextEvent(); + //System.out.println("Next event choice: "+nextEventChoice); + } + if(!message.isEmpty()){ + message+=" (numKernels = "+numActiveKernels+" at "+numGeneratedInstances+")"; + if(!type.equals("Merge") || message.startsWith("Clusters merging")) + fireClusterChange(numGeneratedInstances, type, message); + } + } + + private int getNextEvent() { + int choice = -1; + boolean lowerLimit = numActiveKernels <= numClusterOption.getValue() - numClusterRangeOption.getValue(); + boolean upperLimit = numActiveKernels >= numClusterOption.getValue() + numClusterRangeOption.getValue(); + + if(!lowerLimit || !upperLimit){ + int mode = -1; + if(eventDeleteCreateOption.isSet() && eventMergeSplitOption.isSet()){ + mode = instanceRandom.nextInt(2); + } + + if(mode==0 || (mode==-1 && eventMergeSplitOption.isSet())){ + //have we reached a limit? if not free choice + if(!lowerLimit && !upperLimit) + choice = instanceRandom.nextInt(2); + else + //we have a limit. if lower limit, choose split + if(lowerLimit) + choice = 1; + //otherwise we reached upper level, choose merge + else + choice = 0; + } + + if(mode==1 || (mode==-1 && eventDeleteCreateOption.isSet())){ + //have we reached a limit? if not free choice + if(!lowerLimit && !upperLimit) + choice = instanceRandom.nextInt(2)+2; + else + //we have a limit. if lower limit, choose create + if(lowerLimit) + choice = 3; + //otherwise we reached upper level, choose delete + else + choice = 2; + } + } + + + return choice; + } + + private String fadeOut(){ + int id = instanceRandom.nextInt(kernels.size()); + while(kernels.get(id).kill!=-1) + id = instanceRandom.nextInt(kernels.size()); + + String message = kernels.get(id).fadeOut(); + return message; + } + + private String fadeIn(){ + GeneratorCluster gc = new GeneratorCluster(clusterIdCounter++); + kernels.add(gc); + numActiveKernels++; + normalizeWeights(); + return "Creating new cluster"; + } + + + private String changeWeight(boolean increase){ + double changeRate = 0.1; + int id = instanceRandom.nextInt(kernels.size()); + while(kernels.get(id).kill!=-1) + id = instanceRandom.nextInt(kernels.size()); + + int sign = 1; + if(!increase) + sign = -1; + double weight_old = kernels.get(id).generator.getWeight(); + double delta = sign*numActiveKernels*instanceRandom.nextDouble()*changeRate; + kernels.get(id).generator.setWeight(weight_old+delta); + + normalizeWeights(); + + String message; + if(increase) + message = "Increase "; + else + message = "Decrease "; + message+=" weight on Cluster "+id+" from "+weight_old+" to "+(weight_old+delta); + return message; + + + } + + private String changeRadius(boolean increase){ + double maxChangeRate = 0.1; + int id = instanceRandom.nextInt(kernels.size()); + while(kernels.get(id).kill!=-1) + id = instanceRandom.nextInt(kernels.size()); + + int sign = 1; + if(!increase) + sign = -1; + + double r_old = kernels.get(id).generator.getRadius(); + double r_new =r_old+sign*r_old*instanceRandom.nextDouble()*maxChangeRate; + if(r_new >= 0.5) return "Radius to big"; + kernels.get(id).generator.setRadius(r_new); + + String message; + if(increase) + message = "Increase "; + else + message = "Decrease "; + message+=" radius on Cluster "+id+" from "+r_old+" to "+r_new; + return message; + } + + private String splitKernel(){ + int id = instanceRandom.nextInt(kernels.size()); + while(kernels.get(id).kill!=-1) + id = instanceRandom.nextInt(kernels.size()); + + String message = kernels.get(id).splitKernel(); + + return message; + } + + private String mergeKernels(int steps){ + if(numActiveKernels >1 && ((mergeClusterA == null && mergeClusterB == null))){ + + //choose clusters to merge + double diseredDist = steps / speedOption.getValue() * maxDistanceMoveThresholdByStep; + double minDist = Double.MAX_VALUE; +// System.out.println("DisredDist:"+(2*diseredDist)); + for(int i = 0; i < kernels.size(); i++){ + for(int j = 0; j < i; j++){ + if(kernels.get(i).kill!=-1 || kernels.get(j).kill!=-1){ + continue; + } + else{ + double kernelDist = kernels.get(i).generator.getCenterDistance(kernels.get(j).generator); + double d = kernelDist-2*diseredDist; +// System.out.println("Dist:"+i+" / "+j+" "+d); + if(Math.abs(d) < minDist && + (minDist != Double.MAX_VALUE || d>0 || Math.abs(d) < 0.001)){ + minDist = Math.abs(d); + mergeClusterA = kernels.get(i); + mergeClusterB = kernels.get(j); + } + } + } + } + + if(mergeClusterA!=null && mergeClusterB!=null){ + double[] merge_point = mergeClusterA.generator.getCenter(); + double[] v = mergeClusterA.generator.getDistanceVector(mergeClusterB.generator); + for (int i = 0; i < v.length; i++) { + merge_point[i]= merge_point[i]+v[i]*0.5; + } + + mergeClusterA.merging = true; + mergeClusterB.merging = true; + mergeClusterA.setDesitnation(merge_point); + mergeClusterB.setDesitnation(merge_point); + + if(debug){ + System.out.println("Center1"+Arrays.toString(mergeClusterA.generator.getCenter())); + System.out.println("Center2"+Arrays.toString(mergeClusterB.generator.getCenter())); + System.out.println("Vector"+Arrays.toString(v)); + + System.out.println("Try to merge cluster "+mergeClusterA.generator.getId()+ + " into "+mergeClusterB.generator.getId()+ + " at "+Arrays.toString(merge_point)+ + " time "+numGeneratedInstances); + } + return "Init merge"; + } + } + + if(mergeClusterA != null && mergeClusterB != null){ + + //movekernels will move the kernels close to each other, + //we just need to check and merge here if they are close enough + return mergeClusterA.tryMerging(mergeClusterB); + } + + return ""; + } + + + + +/************************* TOOLS **************************************/ + + public void getDescription(StringBuilder sb, int indent) { + + } + + private double[] getNoisePoint(){ + double [] sample = new double [numAttsOption.getValue()]; + boolean incluster = true; + int counter = 20; + while(incluster){ + for (int j = 0; j < numAttsOption.getValue(); j++) { + sample[j] = instanceRandom.nextDouble(); + } + incluster = false; + if(!noiseInClusterOption.isSet() && counter > 0){ + counter--; + for(int c = 0; c < kernels.size(); c++){ + for(int m = 0; m < kernels.get(c).microClusters.size(); m++){ + Instance inst = new DenseInstance(1, sample); + if(kernels.get(c).microClusters.get(m).getInclusionProbability(inst) > 0){ + incluster = true; + break; + } + } + if(incluster) + break; + } + } + } + +// double [] sample = new double [numAttsOption.getValue()]; +// for (int j = 0; j < numAttsOption.getValue(); j++) { +// sample[j] = instanceRandom.nextDouble(); +// } + + return sample; + } + + private int chooseWeightedElement() { + double r = instanceRandom.nextDouble(); + + // Determine index of choosen element + int i = 0; + while (r > 0.0) { + r -= kernels.get(i).generator.getWeight(); + i++; + } + --i; // Overcounted once + //System.out.println(i); + return i; + } + + private void normalizeWeights(){ + double sumWeights = 0.0; + for (int i = 0; i < kernels.size(); i++) { + sumWeights+=kernels.get(i).generator.getWeight(); + } + for (int i = 0; i < kernels.size(); i++) { + kernels.get(i).generator.setWeight(kernels.get(i).generator.getWeight()/sumWeights); + } + } + + + + /*************** EVENT Listener *********************/ + // should go into the superclass of the generator, create new one for cluster streams? + + /** Add a listener */ + synchronized public void addClusterChangeListener(ClusterEventListener l) { + if (listeners == null) + listeners = new Vector(); + listeners.addElement(l); + } + + /** Remove a listener */ + synchronized public void removeClusterChangeListener(ClusterEventListener l) { + if (listeners == null) + listeners = new Vector(); + listeners.removeElement(l); + } + + /** Fire a ClusterChangeEvent to all registered listeners */ + protected void fireClusterChange(long timestamp, String type, String message) { + // if we have no listeners, do nothing... + if (listeners != null && !listeners.isEmpty()) { + // create the event object to send + ClusterEvent event = + new ClusterEvent(this, timestamp, type , message); + + // make a copy of the listener list in case + // anyone adds/removes listeners + Vector targets; + synchronized (this) { + targets = (Vector) listeners.clone(); + } + + // walk through the listener list and + // call the sunMoved method in each + Enumeration e = targets.elements(); + while (e.hasMoreElements()) { + ClusterEventListener l = (ClusterEventListener) e.nextElement(); + l.changeCluster(event); + + } + } + } + + @Override + public String getPurposeString() { + return "Generates a random radial basis function stream."; + } + + + public String getParameterString(){ + return ""; + } + + + + +}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java new file mode 100644 index 0000000..d0ab735 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java @@ -0,0 +1,178 @@ +package com.yahoo.labs.samoa.moa.streams.generators; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2007 University of Waikato, Hamilton, New Zealand + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.Random; + +import com.github.javacliparser.FloatOption; +import com.github.javacliparser.IntOption; +import com.yahoo.labs.samoa.instances.Attribute; +import com.yahoo.labs.samoa.instances.DenseInstance; +import com.yahoo.labs.samoa.instances.Instance; +import com.yahoo.labs.samoa.instances.Instances; +import com.yahoo.labs.samoa.instances.InstancesHeader; +import com.yahoo.labs.samoa.moa.core.Example; +import com.yahoo.labs.samoa.moa.core.FastVector; +import com.yahoo.labs.samoa.moa.core.InstanceExample; +import com.yahoo.labs.samoa.moa.core.ObjectRepository; +import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler; +import com.yahoo.labs.samoa.moa.streams.InstanceStream; +import com.yahoo.labs.samoa.moa.tasks.TaskMonitor; + +/** + * Stream generator for Hyperplane data stream. + * + * @author Albert Bifet (abifet at cs dot waikato dot ac dot nz) + * @version $Revision: 7 $ + */ +public class HyperplaneGenerator extends AbstractOptionHandler implements InstanceStream { + + @Override + public String getPurposeString() { + return "Generates a problem of predicting class of a rotating hyperplane."; + } + + private static final long serialVersionUID = 1L; + + public IntOption instanceRandomSeedOption = new IntOption("instanceRandomSeed", 'i', "Seed for random generation of instances.", 1); + + public IntOption numClassesOption = new IntOption("numClasses", 'c', "The number of classes to generate.", 2, 2, Integer.MAX_VALUE); + + public IntOption numAttsOption = new IntOption("numAtts", 'a', "The number of attributes to generate.", 10, 0, Integer.MAX_VALUE); + + public IntOption numDriftAttsOption = new IntOption("numDriftAtts", 'k', "The number of attributes with drift.", 2, 0, Integer.MAX_VALUE); + + public FloatOption magChangeOption = new FloatOption("magChange", 't', "Magnitude of the change for every example", 0.0, 0.0, 1.0); + + public IntOption noisePercentageOption = new IntOption("noisePercentage", 'n', "Percentage of noise to add to the data.", 5, 0, 100); + + public IntOption sigmaPercentageOption = new IntOption("sigmaPercentage", 's', "Percentage of probability that the direction of change is reversed.", 10, + 0, 100); + + protected InstancesHeader streamHeader; + + protected Random instanceRandom; + + protected double[] weights; + + protected int[] sigma; + + public int numberInstance; + + @Override + protected void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { + monitor.setCurrentActivity("Preparing hyperplane...", -1.0); + generateHeader(); + restart(); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + protected void generateHeader() { + FastVector attributes = new FastVector(); + for (int i = 0; i < this.numAttsOption.getValue(); i++) { + attributes.addElement(new Attribute("att" + (i + 1))); + } + + FastVector classLabels = new FastVector(); + for (int i = 0; i < this.numClassesOption.getValue(); i++) { + classLabels.addElement("class" + (i + 1)); + } + attributes.addElement(new Attribute("class", classLabels)); + this.streamHeader = new InstancesHeader(new Instances(getCLICreationString(InstanceStream.class), attributes, 0)); + this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1); + } + + @Override + public long estimatedRemainingInstances() { + return -1; + } + + @Override + public InstancesHeader getHeader() { + return this.streamHeader; + } + + @Override + public boolean hasMoreInstances() { + return true; + } + + @Override + public boolean isRestartable() { + return true; + } + + @Override + public Example<Instance> nextInstance() { + + int numAtts = this.numAttsOption.getValue(); + double[] attVals = new double[numAtts + 1]; + double sum = 0.0; + double sumWeights = 0.0; + for (int i = 0; i < numAtts; i++) { + attVals[i] = this.instanceRandom.nextDouble(); + sum += this.weights[i] * attVals[i]; + sumWeights += this.weights[i]; + } + int classLabel; + if (sum >= sumWeights * 0.5) { + classLabel = 1; + } else { + classLabel = 0; + } + // Add Noise + if ((1 + (this.instanceRandom.nextInt(100))) <= this.noisePercentageOption.getValue()) { + classLabel = (classLabel == 0 ? 1 : 0); + } + + Instance inst = new DenseInstance(1.0, attVals); + inst.setDataset(getHeader()); + inst.setClassValue(classLabel); + addDrift(); + return new InstanceExample(inst); + } + + private void addDrift() { + for (int i = 0; i < this.numDriftAttsOption.getValue(); i++) { + this.weights[i] += (double) ((double) sigma[i]) * ((double) this.magChangeOption.getValue()); + if (// this.weights[i] >= 1.0 || this.weights[i] <= 0.0 || + (1 + (this.instanceRandom.nextInt(100))) <= this.sigmaPercentageOption.getValue()) { + this.sigma[i] *= -1; + } + } + } + + @Override + public void restart() { + this.instanceRandom = new Random(this.instanceRandomSeedOption.getValue()); + this.weights = new double[this.numAttsOption.getValue()]; + this.sigma = new int[this.numAttsOption.getValue()]; + for (int i = 0; i < this.numAttsOption.getValue(); i++) { + this.weights[i] = this.instanceRandom.nextDouble(); + this.sigma[i] = (i < this.numDriftAttsOption.getValue() ? 1 : 0); + } + } + + @Override + public void getDescription(StringBuilder sb, int indent) { + // TODO Auto-generated method stub + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java new file mode 100644 index 0000000..a7f982e --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java @@ -0,0 +1,265 @@ +package com.yahoo.labs.samoa.moa.streams.generators; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2007 University of Waikato, Hamilton, New Zealand + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.instances.Attribute; +import com.yahoo.labs.samoa.instances.DenseInstance; +import com.yahoo.labs.samoa.moa.core.FastVector; +import com.yahoo.labs.samoa.instances.Instance; +import com.yahoo.labs.samoa.instances.Instances; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Random; +import com.yahoo.labs.samoa.moa.core.InstanceExample; + +import com.yahoo.labs.samoa.instances.InstancesHeader; +import com.yahoo.labs.samoa.moa.core.ObjectRepository; +import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler; +import com.github.javacliparser.FloatOption; +import com.github.javacliparser.IntOption; +import com.yahoo.labs.samoa.moa.streams.InstanceStream; +import com.yahoo.labs.samoa.moa.tasks.TaskMonitor; + +/** + * Stream generator for a stream based on a randomly generated tree.. + * + * @author Richard Kirkby ([email protected]) + * @version $Revision: 7 $ + */ +public class RandomTreeGenerator extends AbstractOptionHandler implements InstanceStream { + + @Override + public String getPurposeString() { + return "Generates a stream based on a randomly generated tree."; + } + + private static final long serialVersionUID = 1L; + + public IntOption treeRandomSeedOption = new IntOption("treeRandomSeed", + 'r', "Seed for random generation of tree.", 1); + + public IntOption instanceRandomSeedOption = new IntOption( + "instanceRandomSeed", 'i', + "Seed for random generation of instances.", 1); + + public IntOption numClassesOption = new IntOption("numClasses", 'c', + "The number of classes to generate.", 2, 2, Integer.MAX_VALUE); + + public IntOption numNominalsOption = new IntOption("numNominals", 'o', + "The number of nominal attributes to generate.", 5, 0, + Integer.MAX_VALUE); + + public IntOption numNumericsOption = new IntOption("numNumerics", 'u', + "The number of numeric attributes to generate.", 5, 0, + Integer.MAX_VALUE); + + public IntOption numValsPerNominalOption = new IntOption( + "numValsPerNominal", 'v', + "The number of values to generate per nominal attribute.", 5, 2, + Integer.MAX_VALUE); + + public IntOption maxTreeDepthOption = new IntOption("maxTreeDepth", 'd', + "The maximum depth of the tree concept.", 5, 0, Integer.MAX_VALUE); + + public IntOption firstLeafLevelOption = new IntOption( + "firstLeafLevel", + 'l', + "The first level of the tree above maxTreeDepth that can have leaves.", + 3, 0, Integer.MAX_VALUE); + + public FloatOption leafFractionOption = new FloatOption("leafFraction", + 'f', + "The fraction of leaves per level from firstLeafLevel onwards.", + 0.15, 0.0, 1.0); + + protected static class Node implements Serializable { + + private static final long serialVersionUID = 1L; + + public int classLabel; + + public int splitAttIndex; + + public double splitAttValue; + + public Node[] children; + } + + protected Node treeRoot; + + protected InstancesHeader streamHeader; + + protected Random instanceRandom; + + @Override + public void prepareForUseImpl(TaskMonitor monitor, + ObjectRepository repository) { + monitor.setCurrentActivity("Preparing random tree...", -1.0); + generateHeader(); + generateRandomTree(); + restart(); + } + + @Override + public long estimatedRemainingInstances() { + return -1; + } + + @Override + public boolean isRestartable() { + return true; + } + + @Override + public void restart() { + this.instanceRandom = new Random(this.instanceRandomSeedOption.getValue()); + } + + @Override + public InstancesHeader getHeader() { + return this.streamHeader; + } + + @Override + public boolean hasMoreInstances() { + return true; + } + + @Override + public InstanceExample nextInstance() { + double[] attVals = new double[this.numNominalsOption.getValue() + + this.numNumericsOption.getValue()]; + InstancesHeader header = getHeader(); + Instance inst = new DenseInstance(header.numAttributes()); + for (int i = 0; i < attVals.length; i++) { + attVals[i] = i < this.numNominalsOption.getValue() ? this.instanceRandom.nextInt(this.numValsPerNominalOption.getValue()) + : this.instanceRandom.nextDouble(); + inst.setValue(i, attVals[i]); + } + inst.setDataset(header); + inst.setClassValue(classifyInstance(this.treeRoot, attVals)); + return new InstanceExample(inst); + } + + protected int classifyInstance(Node node, double[] attVals) { + if (node.children == null) { + return node.classLabel; + } + if (node.splitAttIndex < this.numNominalsOption.getValue()) { + return classifyInstance( + node.children[(int) attVals[node.splitAttIndex]], attVals); + } + return classifyInstance( + node.children[attVals[node.splitAttIndex] < node.splitAttValue ? 0 + : 1], attVals); + } + + protected void generateHeader() { + FastVector<Attribute> attributes = new FastVector<>(); + FastVector<String> nominalAttVals = new FastVector<>(); + for (int i = 0; i < this.numValsPerNominalOption.getValue(); i++) { + nominalAttVals.addElement("value" + (i + 1)); + } + for (int i = 0; i < this.numNominalsOption.getValue(); i++) { + attributes.addElement(new Attribute("nominal" + (i + 1), + nominalAttVals)); + } + for (int i = 0; i < this.numNumericsOption.getValue(); i++) { + attributes.addElement(new Attribute("numeric" + (i + 1))); + } + FastVector<String> classLabels = new FastVector<>(); + for (int i = 0; i < this.numClassesOption.getValue(); i++) { + classLabels.addElement("class" + (i + 1)); + } + attributes.addElement(new Attribute("class", classLabels)); + this.streamHeader = new InstancesHeader(new Instances( + getCLICreationString(InstanceStream.class), attributes, 0)); + this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1); + } + + protected void generateRandomTree() { + Random treeRand = new Random(this.treeRandomSeedOption.getValue()); + ArrayList<Integer> nominalAttCandidates = new ArrayList<>( + this.numNominalsOption.getValue()); + for (int i = 0; i < this.numNominalsOption.getValue(); i++) { + nominalAttCandidates.add(i); + } + double[] minNumericVals = new double[this.numNumericsOption.getValue()]; + double[] maxNumericVals = new double[this.numNumericsOption.getValue()]; + for (int i = 0; i < this.numNumericsOption.getValue(); i++) { + minNumericVals[i] = 0.0; + maxNumericVals[i] = 1.0; + } + this.treeRoot = generateRandomTreeNode(0, nominalAttCandidates, + minNumericVals, maxNumericVals, treeRand); + } + + protected Node generateRandomTreeNode(int currentDepth, + ArrayList<Integer> nominalAttCandidates, double[] minNumericVals, + double[] maxNumericVals, Random treeRand) { + if ((currentDepth >= this.maxTreeDepthOption.getValue()) + || ((currentDepth >= this.firstLeafLevelOption.getValue()) && (this.leafFractionOption.getValue() >= (1.0 - treeRand.nextDouble())))) { + Node leaf = new Node(); + leaf.classLabel = treeRand.nextInt(this.numClassesOption.getValue()); + return leaf; + } + Node node = new Node(); + int chosenAtt = treeRand.nextInt(nominalAttCandidates.size() + + this.numNumericsOption.getValue()); + if (chosenAtt < nominalAttCandidates.size()) { + node.splitAttIndex = nominalAttCandidates.get(chosenAtt); + node.children = new Node[this.numValsPerNominalOption.getValue()]; + ArrayList<Integer> newNominalCandidates = new ArrayList<>( + nominalAttCandidates); + newNominalCandidates.remove(new Integer(node.splitAttIndex)); + newNominalCandidates.trimToSize(); + for (int i = 0; i < node.children.length; i++) { + node.children[i] = generateRandomTreeNode(currentDepth + 1, + newNominalCandidates, minNumericVals, maxNumericVals, + treeRand); + } + } else { + int numericIndex = chosenAtt - nominalAttCandidates.size(); + node.splitAttIndex = this.numNominalsOption.getValue() + + numericIndex; + double minVal = minNumericVals[numericIndex]; + double maxVal = maxNumericVals[numericIndex]; + node.splitAttValue = ((maxVal - minVal) * treeRand.nextDouble()) + + minVal; + node.children = new Node[2]; + double[] newMaxVals = maxNumericVals.clone(); + newMaxVals[numericIndex] = node.splitAttValue; + node.children[0] = generateRandomTreeNode(currentDepth + 1, + nominalAttCandidates, minNumericVals, newMaxVals, treeRand); + double[] newMinVals = minNumericVals.clone(); + newMinVals[numericIndex] = node.splitAttValue; + node.children[1] = generateRandomTreeNode(currentDepth + 1, + nominalAttCandidates, newMinVals, maxNumericVals, treeRand); + } + return node; + } + + @Override + public void getDescription(StringBuilder sb, int indent) { + // TODO Auto-generated method stub + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java new file mode 100644 index 0000000..977897f --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java @@ -0,0 +1,102 @@ +package com.yahoo.labs.samoa.moa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2007 University of Waikato, Hamilton, New Zealand + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Class that represents a null monitor. + * + * @author Richard Kirkby ([email protected]) + * @version $Revision: 7 $ + */ +public class NullMonitor implements TaskMonitor { + + @Override + public void setCurrentActivity(String activityDescription, + double fracComplete) { + } + + @Override + public void setCurrentActivityDescription(String activity) { + } + + @Override + public void setCurrentActivityFractionComplete(double fracComplete) { + } + + @Override + public boolean taskShouldAbort() { + return false; + } + + @Override + public String getCurrentActivityDescription() { + return null; + } + + @Override + public double getCurrentActivityFractionComplete() { + return -1.0; + } + + @Override + public boolean isPaused() { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public void requestCancel() { + } + + @Override + public void requestPause() { + } + + @Override + public void requestResume() { + } + + @Override + public Object getLatestResultPreview() { + return null; + } + + @Override + public void requestResultPreview() { + } + + @Override + public boolean resultPreviewRequested() { + return false; + } + + @Override + public void setLatestResultPreview(Object latestPreview) { + } + + @Override + public void requestResultPreview(ResultPreviewListener toInform) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java new file mode 100644 index 0000000..3fece0b --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java @@ -0,0 +1,40 @@ +package com.yahoo.labs.samoa.moa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2007 University of Waikato, Hamilton, New Zealand + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Interface implemented by classes that preview results + * on the Graphical User Interface + * + * @author Richard Kirkby ([email protected]) + * @version $Revision: 7 $ + */ +public interface ResultPreviewListener { + + /** + * This method is used to receive a signal from + * <code>TaskMonitor</code> that the lastest preview has + * changed. This method is implemented in <code>PreviewPanel</code> + * to change the results that are shown in its panel. + * + */ + public void latestPreviewChanged(); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java new file mode 100644 index 0000000..d2c96a8 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java @@ -0,0 +1,61 @@ +package com.yahoo.labs.samoa.moa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2007 University of Waikato, Hamilton, New Zealand + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.moa.MOAObject; +import com.yahoo.labs.samoa.moa.core.ObjectRepository; + +/** + * Interface representing a task. + * + * @author Richard Kirkby ([email protected]) + * @version $Revision: 7 $ + */ +public interface Task extends MOAObject { + + /** + * Gets the result type of this task. + * Tasks can return LearningCurve, LearningEvaluation, + * Classifier, String, Instances.. + * + * @return a class object of the result of this task + */ + public Class<?> getTaskResultType(); + + /** + * This method performs this task, + * when TaskMonitor and ObjectRepository are no needed. + * + * @return an object with the result of this task + */ + public Object doTask(); + + /** + * This method performs this task. + * <code>AbstractTask</code> implements this method so all + * its extensions only need to implement <code>doTaskImpl</code> + * + * @param monitor the TaskMonitor to use + * @param repository the ObjectRepository to use + * @return an object with the result of this task + */ + public Object doTask(TaskMonitor monitor, ObjectRepository repository); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java new file mode 100644 index 0000000..4918cd8 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java @@ -0,0 +1,140 @@ +package com.yahoo.labs.samoa.moa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2007 University of Waikato, Hamilton, New Zealand + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Interface representing a task monitor. + * + * @author Richard Kirkby ([email protected]) + * @version $Revision: 7 $ + */ +public interface TaskMonitor { + + /** + * Sets the description and the percentage done of the current activity. + * + * @param activity the description of the current activity + * @param fracComplete the percentage done of the current activity + */ + public void setCurrentActivity(String activityDescription, + double fracComplete); + + /** + * Sets the description of the current activity. + * + * @param activity the description of the current activity + */ + public void setCurrentActivityDescription(String activity); + + /** + * Sets the percentage done of the current activity + * + * @param fracComplete the percentage done of the current activity + */ + public void setCurrentActivityFractionComplete(double fracComplete); + + /** + * Gets whether the task should abort. + * + * @return true if the task should abort + */ + public boolean taskShouldAbort(); + + /** + * Gets whether there is a request for preview the task result. + * + * @return true if there is a request for preview the task result + */ + public boolean resultPreviewRequested(); + + /** + * Sets the current result to preview + * + * @param latestPreview the result to preview + */ + public void setLatestResultPreview(Object latestPreview); + + /** + * Gets the description of the current activity. + * + * @return the description of the current activity + */ + public String getCurrentActivityDescription(); + + /** + * Gets the percentage done of the current activity + * + * @return the percentage done of the current activity + */ + public double getCurrentActivityFractionComplete(); + + /** + * Requests the task monitored to pause. + * + */ + public void requestPause(); + + /** + * Requests the task monitored to resume. + * + */ + public void requestResume(); + + /** + * Requests the task monitored to cancel. + * + */ + public void requestCancel(); + + /** + * Gets whether the task monitored is paused. + * + * @return true if the task is paused + */ + public boolean isPaused(); + + /** + * Gets whether the task monitored is cancelled. + * + * @return true if the task is cancelled + */ + public boolean isCancelled(); + + /** + * Requests to preview the task result. + * + */ + public void requestResultPreview(); + + /** + * Requests to preview the task result. + * + * @param toInform the listener of the changes in the preview of the result + */ + public void requestResultPreview(ResultPreviewListener toInform); + + /** + * Gets the current result to preview + * + * @return the result to preview + */ + public Object getLatestResultPreview(); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java new file mode 100644 index 0000000..93fc7c4 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java @@ -0,0 +1,119 @@ +package com.yahoo.labs.samoa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; + +import com.github.javacliparser.FileOption; +import com.github.javacliparser.IntOption; +import com.yahoo.labs.samoa.instances.Instances; +import com.yahoo.labs.samoa.moa.core.InstanceExample; +import com.yahoo.labs.samoa.moa.core.ObjectRepository; +import com.yahoo.labs.samoa.moa.tasks.TaskMonitor; + +/** + * InstanceStream for ARFF file + * @author Casey + */ +public class ArffFileStream extends FileStream { + + public FileOption arffFileOption = new FileOption("arffFile", 'f', + "ARFF File(s) to load.", null, null, false); + + public IntOption classIndexOption = new IntOption("classIndex", 'c', + "Class index of data. 0 for none or -1 for last attribute in file.", + -1, -1, Integer.MAX_VALUE); + + protected InstanceExample lastInstanceRead; + + @Override + public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { + super.prepareForUseImpl(monitor, repository); + String filePath = this.arffFileOption.getFile().getAbsolutePath(); + this.fileSource.init(filePath, "arff"); + this.lastInstanceRead = null; + } + + @Override + protected void reset() { + try { + if (this.fileReader != null) + this.fileReader.close(); + + fileSource.reset(); + } + catch (IOException ioe) { + throw new RuntimeException("FileStream restart failed.", ioe); + } + + if (!getNextFileReader()) { + hitEndOfStream = true; + throw new RuntimeException("FileStream is empty."); + } + } + + @Override + protected boolean getNextFileReader() { + boolean ret = super.getNextFileReader(); + if (ret) { + this.instances = new Instances(this.fileReader, 1, -1); + if (this.classIndexOption.getValue() < 0) { + this.instances.setClassIndex(this.instances.numAttributes() - 1); + } else if (this.classIndexOption.getValue() > 0) { + this.instances.setClassIndex(this.classIndexOption.getValue() - 1); + } + } + return ret; + } + + @Override + protected boolean readNextInstanceFromFile() { + try { + if (this.instances.readInstance(this.fileReader)) { + this.lastInstanceRead = new InstanceExample(this.instances.instance(0)); + this.instances.delete(); // keep instances clean + return true; + } + if (this.fileReader != null) { + this.fileReader.close(); + this.fileReader = null; + } + return false; + } catch (IOException ioe) { + throw new RuntimeException( + "ArffFileStream failed to read instance from stream.", ioe); + } + + } + + @Override + protected InstanceExample getLastInstanceRead() { + return this.lastInstanceRead; + } + + /* + * extend com.yahoo.labs.samoa.moa.MOAObject + */ + @Override + public void getDescription(StringBuilder sb, int indent) { + // TODO Auto-generated method stub + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java new file mode 100644 index 0000000..20f3feb --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java @@ -0,0 +1,241 @@ +package com.yahoo.labs.samoa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.Random; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.labs.samoa.core.ContentEvent; +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.core.Processor; +import com.yahoo.labs.samoa.evaluation.ClusteringEvaluationContentEvent; +import com.yahoo.labs.samoa.instances.Instance; +import com.yahoo.labs.samoa.instances.Instances; +import com.yahoo.labs.samoa.learners.clusterers.ClusteringContentEvent; +import com.yahoo.labs.samoa.moa.cluster.Clustering; +import com.yahoo.labs.samoa.moa.core.DataPoint; +import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler; +import com.yahoo.labs.samoa.moa.streams.InstanceStream; +import com.yahoo.labs.samoa.moa.streams.clustering.ClusteringStream; +import com.yahoo.labs.samoa.moa.streams.clustering.RandomRBFGeneratorEvents; + +/** + * EntranceProcessor for Clustering Evaluation Task. + * + */ +public final class ClusteringEntranceProcessor implements EntranceProcessor { + + private static final long serialVersionUID = 4169053337917578558L; + + private static final Logger logger = LoggerFactory.getLogger(ClusteringEntranceProcessor.class); + + private StreamSource streamSource; + private Instance firstInstance; + private boolean isInited = false; + private Random random = new Random(); + private double samplingThreshold; + private int numberInstances; + private int numInstanceSent = 0; + + private int groundTruthSamplingFrequency; + + @Override + public boolean process(ContentEvent event) { + // TODO: possible refactor of the super-interface implementation + // of source processor does not need this method + return false; + } + + @Override + public void onCreate(int id) { + logger.debug("Creating ClusteringSourceProcessor with id {}", id); + } + + @Override + public Processor newProcessor(Processor p) { + ClusteringEntranceProcessor newProcessor = new ClusteringEntranceProcessor(); + ClusteringEntranceProcessor originProcessor = (ClusteringEntranceProcessor) p; + if (originProcessor.getStreamSource() != null) { + newProcessor.setStreamSource(originProcessor.getStreamSource().getStream()); + } + return newProcessor; + } + + @Override + public boolean hasNext() { + return (!isFinished()); + } + + @Override + public boolean isFinished() { + return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && numInstanceSent >= numberInstances)); + } + + // /** + // * Method to send instances via input stream + // * + // * @param inputStream + // * @param numberInstances + // */ + // public void sendInstances(Stream inputStream, Stream evaluationStream, int numberInstances, double samplingThreshold) { + // int numInstanceSent = 0; + // this.samplingThreshold = samplingThreshold; + // while (streamSource.hasMoreInstances() && numInstanceSent < numberInstances) { + // numInstanceSent++; + // DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent); + // ClusteringContentEvent contentEvent = new ClusteringContentEvent(numInstanceSent, nextDataPoint); + // inputStream.put(contentEvent); + // sendPointsAndGroundTruth(streamSource, evaluationStream, numInstanceSent, nextDataPoint); + // } + // + // sendEndEvaluationInstance(inputStream); + // } + + public double getSamplingThreshold() { + return samplingThreshold; + } + + public void setSamplingThreshold(double samplingThreshold) { + this.samplingThreshold = samplingThreshold; + } + + + + public int getGroundTruthSamplingFrequency() { + return groundTruthSamplingFrequency; + } + + public void setGroundTruthSamplingFrequency(int groundTruthSamplingFrequency) { + this.groundTruthSamplingFrequency = groundTruthSamplingFrequency; + } + + public StreamSource getStreamSource() { + return streamSource; + } + + public void setStreamSource(InstanceStream stream) { + if (stream instanceof AbstractOptionHandler) { + ((AbstractOptionHandler) (stream)).prepareForUse(); + } + + this.streamSource = new StreamSource(stream); + firstInstance = streamSource.nextInstance().getData(); + } + + public Instances getDataset() { + return firstInstance.dataset(); + } + + private Instance nextInstance() { + if (this.isInited) { + return streamSource.nextInstance().getData(); + } else { + this.isInited = true; + return firstInstance; + } + } + + // private void sendEndEvaluationInstance(Stream inputStream) { + // ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, firstInstance); + // contentEvent.setLast(true); + // inputStream.put(contentEvent); + // } + + // private void sendPointsAndGroundTruth(StreamSource sourceStream, Stream evaluationStream, int numInstanceSent, DataPoint nextDataPoint) { + // boolean sendEvent = false; + // DataPoint instance = null; + // Clustering gtClustering = null; + // int samplingFrequency = ((ClusteringStream) sourceStream.getStream()).getDecayHorizon(); + // if (random.nextDouble() < samplingThreshold) { + // // Add instance + // sendEvent = true; + // instance = nextDataPoint; + // } + // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) { + // // Add GroundTruth + // sendEvent = true; + // gtClustering = ((RandomRBFGeneratorEvents) sourceStream.getStream()).getGeneratingClusters(); + // } + // if (sendEvent == true) { + // ClusteringEvaluationContentEvent evalEvent; + // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance, false); + // evaluationStream.put(evalEvent); + // } + // } + + public void setMaxNumInstances(int value) { + numberInstances = value; + } + + public int getMaxNumInstances() { + return this.numberInstances; + } + + @Override + public ContentEvent nextEvent() { + + // boolean sendEvent = false; + // DataPoint instance = null; + // Clustering gtClustering = null; + // int samplingFrequency = ((ClusteringStream) sourceStream.getStream()).getDecayHorizon(); + // if (random.nextDouble() < samplingThreshold) { + // // Add instance + // sendEvent = true; + // instance = nextDataPoint; + // } + // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) { + // // Add GroundTruth + // sendEvent = true; + // gtClustering = ((RandomRBFGeneratorEvents) sourceStream.getStream()).getGeneratingClusters(); + // } + // if (sendEvent == true) { + // ClusteringEvaluationContentEvent evalEvent; + // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, instance, false); + // evaluationStream.put(evalEvent); + // } + + groundTruthSamplingFrequency = ((ClusteringStream) streamSource.getStream()).getDecayHorizon(); // FIXME should it be taken from the ClusteringEvaluation -f option instead? + if (isFinished()) { + // send ending event + ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, firstInstance); + contentEvent.setLast(true); + return contentEvent; + } else { + DataPoint nextDataPoint = new DataPoint(nextInstance(), numInstanceSent); + numInstanceSent++; + if (numInstanceSent % groundTruthSamplingFrequency == 0) { + // TODO implement an interface ClusteringGroundTruth with a getGeneratingClusters() method, check if the source implements the interface + // send a clustering evaluation event for external measures (distance from the gt clusters) + Clustering gtClustering = ((RandomRBFGeneratorEvents) streamSource.getStream()).getGeneratingClusters(); + return new ClusteringEvaluationContentEvent(gtClustering, nextDataPoint, false); + } else { + ClusteringContentEvent contentEvent = new ClusteringContentEvent(numInstanceSent, nextDataPoint); + if (random.nextDouble() < samplingThreshold) { + // send a clustering content event for internal measures (cohesion, separation) + contentEvent.setSample(true); + } + return contentEvent; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java new file mode 100644 index 0000000..c8004f5 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java @@ -0,0 +1,174 @@ +package com.yahoo.labs.samoa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; + +import com.github.javacliparser.ClassOption; +import com.yahoo.labs.samoa.instances.Instances; +import com.yahoo.labs.samoa.instances.InstancesHeader; +import com.yahoo.labs.samoa.moa.core.InstanceExample; +import com.yahoo.labs.samoa.moa.core.ObjectRepository; +import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler; +import com.yahoo.labs.samoa.moa.streams.InstanceStream; +import com.yahoo.labs.samoa.moa.tasks.TaskMonitor; +import com.yahoo.labs.samoa.streams.fs.FileStreamSource; + +/** + * InstanceStream for files + * (Abstract class: subclass this class for different file formats) + * @author Casey + */ +public abstract class FileStream extends AbstractOptionHandler implements InstanceStream { + /** + * + */ + private static final long serialVersionUID = 3028905554604259130L; + + public ClassOption sourceTypeOption = new ClassOption("sourceType", + 's', "Source Type (HDFS, local FS)", FileStreamSource.class, + "LocalFileStreamSource"); + + protected transient FileStreamSource fileSource; + protected transient Reader fileReader; + protected Instances instances; + + protected boolean hitEndOfStream; + private boolean hasStarted; + + /* + * Constructors + */ + public FileStream() { + this.hitEndOfStream = false; + } + + /* + * implement InstanceStream + */ + @Override + public InstancesHeader getHeader() { + return new InstancesHeader(this.instances); + } + + @Override + public long estimatedRemainingInstances() { + return -1; + } + + @Override + public boolean hasMoreInstances() { + return !this.hitEndOfStream; + } + + @Override + public InstanceExample nextInstance() { + if (this.getLastInstanceRead() == null) { + readNextInstanceFromStream(); + } + InstanceExample prevInstance = this.getLastInstanceRead(); + readNextInstanceFromStream(); + return prevInstance; + } + + @Override + public boolean isRestartable() { + return true; + } + + @Override + public void restart() { + reset(); + hasStarted = false; + } + + protected void reset() { + try { + if (this.fileReader != null) + this.fileReader.close(); + + fileSource.reset(); + } + catch (IOException ioe) { + throw new RuntimeException("FileStream restart failed.", ioe); + } + + if (!getNextFileReader()) { + hitEndOfStream = true; + throw new RuntimeException("FileStream is empty."); + } + + this.instances = new Instances(this.fileReader, 1, -1); + this.instances.setClassIndex(this.instances.numAttributes() - 1); + } + + protected boolean getNextFileReader() { + if (this.fileReader != null) + try { + this.fileReader.close(); + } catch (IOException ioe) { + ioe.printStackTrace(); + } + + InputStream inputStream = this.fileSource.getNextInputStream(); + if (inputStream == null) + return false; + + this.fileReader = new BufferedReader(new InputStreamReader(inputStream)); + return true; + } + + protected boolean readNextInstanceFromStream() { + if (!hasStarted) { + this.reset(); + hasStarted = true; + } + + while (true) { + if (readNextInstanceFromFile()) return true; + + if (!getNextFileReader()) { + this.hitEndOfStream = true; + return false; + } + } + } + + /** + * Read next instance from the current file and assign it to + * lastInstanceRead. + * @return true if it was able to read next instance and + * false if it was at the end of the file + */ + protected abstract boolean readNextInstanceFromFile(); + + protected abstract InstanceExample getLastInstanceRead(); + + @Override + public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { + this.fileSource = sourceTypeOption.getValue(); + this.hasStarted = false; + } +}
