http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/clustering/RandomRBFGeneratorEvents.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/clustering/RandomRBFGeneratorEvents.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/clustering/RandomRBFGeneratorEvents.java
new file mode 100644
index 0000000..a65d9a1
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/clustering/RandomRBFGeneratorEvents.java
@@ -0,0 +1,975 @@
+package com.yahoo.labs.samoa.moa.streams.clustering;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2010 RWTH Aachen University, Germany
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.Random;
+import java.util.Vector;
+
+import com.yahoo.labs.samoa.moa.cluster.Clustering;
+import com.yahoo.labs.samoa.moa.cluster.SphereCluster;
+import com.yahoo.labs.samoa.moa.core.AutoExpandVector;
+import com.yahoo.labs.samoa.moa.core.InstanceExample;
+import com.yahoo.labs.samoa.instances.InstancesHeader;
+import com.yahoo.labs.samoa.moa.core.ObjectRepository;
+import com.yahoo.labs.samoa.moa.core.DataPoint;
+import com.github.javacliparser.FlagOption;
+import com.github.javacliparser.FloatOption;
+import com.github.javacliparser.IntOption;
+import com.yahoo.labs.samoa.moa.streams.InstanceStream;
+import com.yahoo.labs.samoa.moa.tasks.TaskMonitor;
+import com.yahoo.labs.samoa.instances.Attribute;
+import com.yahoo.labs.samoa.instances.DenseInstance;
+import com.yahoo.labs.samoa.moa.core.FastVector;
+import com.yahoo.labs.samoa.instances.Instance;
+import com.yahoo.labs.samoa.instances.Instances;
+
+
+public class RandomRBFGeneratorEvents extends ClusteringStream {
+    private transient Vector listeners;
+
+    private static final long serialVersionUID = 1L;
+
+    public IntOption modelRandomSeedOption = new IntOption("modelRandomSeed",
+                    'm', "Seed for random generation of model.", 1);
+
+    public IntOption instanceRandomSeedOption = new IntOption(
+                    "instanceRandomSeed", 'i',
+                    "Seed for random generation of instances.", 5);
+
+    public IntOption numClusterOption = new IntOption("numCluster", 'K',
+                    "The average number of centroids in the model.", 5, 1, 
Integer.MAX_VALUE);
+
+    public IntOption numClusterRangeOption = new IntOption("numClusterRange", 
'k',
+                    "Deviation of the number of centroids in the model.", 3, 
0, Integer.MAX_VALUE);
+
+    public FloatOption kernelRadiiOption = new FloatOption("kernelRadius", 'R',
+                    "The average radii of the centroids in the model.", 0.07, 
0, 1);
+
+    public FloatOption kernelRadiiRangeOption = new 
FloatOption("kernelRadiusRange", 'r',
+                    "Deviation of average radii of the centroids in the 
model.", 0, 0, 1);
+
+    public FloatOption densityRangeOption = new FloatOption("densityRange", 
'd',
+                    "Offset of the average weight a cluster has. Value of 0 
means all cluster " +
+                    "contain the same amount of points.", 0, 0, 1);
+
+    public IntOption speedOption = new IntOption("speed", 'V',
+                    "Kernels move a predefined distance of 0.01 every X 
points", 500, 1, Integer.MAX_VALUE);
+
+    public IntOption speedRangeOption = new IntOption("speedRange", 'v',
+                    "Speed/Velocity point offset", 0, 0, Integer.MAX_VALUE);
+
+    public FloatOption noiseLevelOption = new FloatOption("noiseLevel", 'N',
+                    "Noise level", 0.1, 0, 1);
+
+    public FlagOption noiseInClusterOption = new FlagOption("noiseInCluster", 
'n',
+                    "Allow noise to be placed within a cluster");
+
+    public IntOption eventFrequencyOption = new IntOption("eventFrequency", 
'E',
+                    "Event frequency. Enable at least one of the events below 
and set numClusterRange!", 30000, 0, Integer.MAX_VALUE);
+
+    public FlagOption eventMergeSplitOption = new 
FlagOption("eventMergeSplitOption", 'M',
+                    "Enable merging and splitting of clusters. Set 
eventFrequency and numClusterRange!");
+
+    public FlagOption eventDeleteCreateOption = new 
FlagOption("eventDeleteCreate", 'C',
+                               "Enable emering and disapperaing of clusters. 
Set eventFrequency and numClusterRange!");
+
+    
+    private double merge_threshold = 0.7;
+    private int kernelMovePointFrequency = 10;
+    private double maxDistanceMoveThresholdByStep = 0.01;
+    private int maxOverlapFitRuns = 50;
+    private double eventFrequencyRange = 0;
+
+    private boolean debug = false;
+
+    private AutoExpandVector<GeneratorCluster> kernels;
+    protected Random instanceRandom;
+    protected InstancesHeader streamHeader;
+    private int numGeneratedInstances;
+    private int numActiveKernels;
+    private int nextEventCounter;
+    private int nextEventChoice = -1;
+    private int clusterIdCounter;
+    private GeneratorCluster mergeClusterA;
+    private GeneratorCluster mergeClusterB;
+    private boolean mergeKernelsOverlapping = false;
+
+
+
+    private class GeneratorCluster implements Serializable{
+        //TODO: points is redundant to microclusterpoints, we need to come 
+        //up with a good strategy that microclusters get updated and 
+        //rebuild if needed. Idea: Sort microclusterpoints by timestamp and 
let 
+        // microclusterdecay hold the timestamp for when the last point in a 
+        //microcluster gets kicked, then we rebuild... or maybe not... could be
+        //same as searching for point to be kicked. more likely is we rebuild 
+        //fewer times then insert.
+        
+        private static final long serialVersionUID = -6301649898961112942L;
+        
+        SphereCluster generator;
+        int kill = -1;
+        boolean merging = false;
+        double[] moveVector;
+        int totalMovementSteps;
+        int currentMovementSteps;
+        boolean isSplitting = false;
+
+        LinkedList<DataPoint> points = new LinkedList<DataPoint>();
+        ArrayList<SphereCluster> microClusters = new 
ArrayList<SphereCluster>();
+        ArrayList<ArrayList<DataPoint>> microClustersPoints = new ArrayList();
+        ArrayList<Integer> microClustersDecay = new ArrayList();
+
+
+        public GeneratorCluster(int label) {
+            boolean outofbounds = true;
+            int tryCounter = 0;
+            while(outofbounds && tryCounter < maxOverlapFitRuns){
+                tryCounter++;
+                outofbounds = false;
+                double[] center = new double [numAttsOption.getValue()];
+                double radius = 
kernelRadiiOption.getValue()+(instanceRandom.nextBoolean()?-1:1)*kernelRadiiRangeOption.getValue()*instanceRandom.nextDouble();
+                while(radius <= 0){
+                    radius = 
kernelRadiiOption.getValue()+(instanceRandom.nextBoolean()?-1:1)*kernelRadiiRangeOption.getValue()*instanceRandom.nextDouble();
+                }
+                for (int j = 0; j < numAttsOption.getValue(); j++) {
+                     center[j] = instanceRandom.nextDouble();
+                     if(center[j]- radius < 0 || center[j] + radius > 1){
+                        outofbounds = true;
+                        break;
+                     }
+                }
+                generator = new SphereCluster(center, radius);
+            }
+            if(tryCounter < maxOverlapFitRuns){
+                generator.setId(label);
+                double avgWeight = 1.0/numClusterOption.getValue();
+                double weight = avgWeight + 
(instanceRandom.nextBoolean()?-1:1)*avgWeight*densityRangeOption.getValue()*instanceRandom.nextDouble();
+                generator.setWeight(weight);
+                setDesitnation(null);
+            }
+            else{
+                generator = null;
+                kill = 0;
+                System.out.println("Tried "+maxOverlapFitRuns+" times to 
create kernel. Reduce average radii." );
+            }
+        }
+
+        public GeneratorCluster(int label, SphereCluster cluster) {
+            this.generator = cluster;
+            cluster.setId(label);
+            setDesitnation(null);
+        }
+
+        public int getWorkID(){
+            for(int c = 0; c < kernels.size(); c++){
+                if(kernels.get(c).equals(this))
+                    return c;
+            }
+            return -1;
+        }
+
+        private void updateKernel(){
+            if(kill == 0){
+                kernels.remove(this);
+            }
+            if(kill > 0){
+                kill--;
+            }
+            //we could be lot more precise if we would keep track of 
timestamps of points
+            //then we could remove all old points and rebuild the cluster on 
up to date point base
+            //BUT worse the effort??? so far we just want to avoid overlap 
with this, so its more
+            //konservative as needed. Only needs to change when we need a 
thighter representation
+            for (int m = 0; m < microClusters.size(); m++) {
+                if(numGeneratedInstances-microClustersDecay.get(m) > 
decayHorizonOption.getValue()){
+                    microClusters.remove(m);
+                    microClustersPoints.remove(m);
+                    microClustersDecay.remove(m);
+                }
+            }
+
+            if(!points.isEmpty() && 
numGeneratedInstances-points.getFirst().getTimestamp() >= 
decayHorizonOption.getValue()){
+//                if(debug)
+//                    System.out.println("Cleaning up macro cluster 
"+generator.getId());
+                points.removeFirst();
+            }
+
+        }
+
+        private void addInstance(Instance instance){
+            DataPoint point = new DataPoint(instance, numGeneratedInstances);
+            points.add(point);
+            
+            int minMicroIndex = -1;
+            double minHullDist = Double.MAX_VALUE;
+            boolean inserted = false;
+            //we favour more recently build clusters so we can remove earlier 
cluster sooner
+            for (int m = microClusters.size()-1; m >=0 ; m--) {
+                SphereCluster micro = microClusters.get(m);
+                double hulldist = 
micro.getCenterDistance(point)-micro.getRadius();
+                //point fits into existing cluster
+                if(hulldist <= 0){
+                    microClustersPoints.get(m).add(point);
+                    microClustersDecay.set(m, numGeneratedInstances);
+                    inserted = true;
+                    break;
+                }
+                //if not, check if its at least the closest cluster
+                else{
+                    if(hulldist < minHullDist){
+                        minMicroIndex = m;
+                        minHullDist = hulldist;
+                    }
+                }
+            }
+            //Reseting index choice for alternative cluster building
+            int alt = 1;
+            if(alt == 1)
+                minMicroIndex = -1;
+            if(!inserted){
+                //add to closest cluster and expand cluster
+                if(minMicroIndex!=-1){
+                    microClustersPoints.get(minMicroIndex).add(point);
+                    //we should keep the miniball instances and just check in
+                    //new points instead of rebuilding the whole thing
+                    SphereCluster s = new 
SphereCluster(microClustersPoints.get(minMicroIndex),numAttsOption.getValue());
+                    //check if current microcluster is bigger then generating 
cluster
+                    if(s.getRadius() > generator.getRadius()){
+                        //remove previously added point
+                        
microClustersPoints.get(minMicroIndex).remove(microClustersPoints.get(minMicroIndex).size()-1);
+                        minMicroIndex = -1;
+                    }
+                    else{
+                        microClusters.set(minMicroIndex, s);
+                        microClustersDecay.set(minMicroIndex, 
numGeneratedInstances);
+                    }
+                }
+                //minMicroIndex might have been reset above
+                //create new micro cluster
+                if(minMicroIndex == -1){
+                    ArrayList<DataPoint> microPoints = new 
ArrayList<DataPoint>();
+                    microPoints.add(point);
+                    SphereCluster s;
+                    if(alt == 0)
+                        s = new 
SphereCluster(microPoints,numAttsOption.getValue());
+                    else
+                        s = new 
SphereCluster(generator.getCenter(),generator.getRadius(),1);
+
+                    microClusters.add(s);
+                    microClustersPoints.add(microPoints);
+                    microClustersDecay.add(numGeneratedInstances);
+                    int id = 0;
+                    while(id < kernels.size()){
+                        if(kernels.get(id) == this)
+                            break;
+                        id++;
+                    }
+                    s.setGroundTruth(id);
+                }
+            }
+
+        }
+
+
+        private void move(){
+            if(currentMovementSteps < totalMovementSteps){
+                currentMovementSteps++;
+                if( moveVector == null){
+                    return;
+                }
+                else{
+                    double[] center = generator.getCenter();
+                    boolean outofbounds = true;
+                    while(outofbounds){
+                        double radius = generator.getRadius();
+                        outofbounds = false;
+                        center = generator.getCenter();
+                        for ( int d = 0; d < center.length; d++ ) {
+                            center[d]+= moveVector[d];
+                            if(center[d]- radius < 0 || center[d] + radius > 
1){
+                                outofbounds = true;
+                                setDesitnation(null);
+                                break;
+                            }
+                        }
+                    }
+                    generator.setCenter(center);
+                }
+            }
+            else{
+                if(!merging){
+                    setDesitnation(null);
+                    isSplitting = false;
+                }
+            }
+        }
+
+        void setDesitnation(double[] destination){
+
+            if(destination == null){
+                destination = new double [numAttsOption.getValue()];
+                for (int j = 0; j < numAttsOption.getValue(); j++) {
+                     destination[j] = instanceRandom.nextDouble();
+                }
+            }
+            double[] center = generator.getCenter();
+            int dim = center.length;
+
+            double[] v = new double[dim];
+
+            for ( int d = 0; d < dim; d++ ) {
+                v[d]=destination[d]-center[d];
+            }
+            setMoveVector(v);
+        }
+
+        void setMoveVector(double[] vector){
+               //we are ignoring the steps, otherwise we have to change 
+               //speed of the kernels, do we want that?
+            moveVector = vector;
+            int speedInPoints  = speedOption.getValue();
+            if(speedRangeOption.getValue() > 0)
+                    speedInPoints 
+=(instanceRandom.nextBoolean()?-1:1)*instanceRandom.nextInt(speedRangeOption.getValue());
+            if(speedInPoints  < 1) speedInPoints  = speedOption.getValue();
+
+
+            double length = 0;
+            for ( int d = 0; d < moveVector.length; d++ ) {
+                length+=Math.pow(vector[d],2);
+            }
+            length = Math.sqrt(length);
+
+            totalMovementSteps = 
(int)(length/(maxDistanceMoveThresholdByStep*kernelMovePointFrequency)*speedInPoints);
+            for ( int d = 0; d < moveVector.length; d++ ) {
+                moveVector[d]/=(double)totalMovementSteps;
+            }
+
+
+            currentMovementSteps = 0;
+//            if(debug){
+//                System.out.println("Setting new direction for 
C"+generator.getId()+": distance "
+//                        +length+" in "+totalMovementSteps+" steps");
+//            }
+        }
+
+        private String tryMerging(GeneratorCluster merge){
+           String message = "";
+           double overlapDegree = 
generator.overlapRadiusDegree(merge.generator);
+           if(overlapDegree > merge_threshold){
+                SphereCluster mcluster = merge.generator;
+                double radius = Math.max(generator.getRadius(), 
mcluster.getRadius());
+                generator.combine(mcluster);
+
+//                //adjust radius, get bigger and bigger with high dim data
+                generator.setRadius(radius);
+//                double[] center = generator.getCenter();
+//                double[] mcenter = mcluster.getCenter();
+//                double weight = generator.getWeight();
+//                double mweight = generator.getWeight();
+////                for (int i = 0; i < center.length; i++) {
+////                    center[i] = (center[i] * weight + mcenter[i] * 
mweight) / (mweight + weight);
+////                }
+//                generator.setWeight(weight + mweight);
+                message  = "Clusters merging: 
"+mergeClusterB.generator.getId()+" into "+mergeClusterA.generator.getId();
+
+                //clean up and restet merging stuff
+                //mark kernel so it gets killed when it doesn't contain any 
more instances
+                merge.kill = decayHorizonOption.getValue();
+                //set weight to 0 so no new instances will be created in the 
cluster
+                mcluster.setWeight(0.0);
+                normalizeWeights();
+                numActiveKernels--;
+                mergeClusterB = mergeClusterA = null;
+                merging = false;
+                mergeKernelsOverlapping = false;
+            }
+           else{
+                  if(overlapDegree > 0 && !mergeKernelsOverlapping){
+                          mergeKernelsOverlapping = true;
+                          message = "Merge overlapping started";
+                  }
+           }
+           return message;
+        }
+
+        private String splitKernel(){
+            isSplitting = true;
+            //todo radius range
+            double radius = kernelRadiiOption.getValue();
+            double avgWeight = 1.0/numClusterOption.getValue();
+            double weight = avgWeight + 
avgWeight*densityRangeOption.getValue()*instanceRandom.nextDouble();
+            SphereCluster spcluster = null;
+
+            double[] center = generator.getCenter();
+            spcluster = new SphereCluster(center, radius, weight);
+
+            if(spcluster !=null){
+                GeneratorCluster gc = new GeneratorCluster(clusterIdCounter++, 
spcluster);
+                gc.isSplitting = true;
+                kernels.add(gc);
+                normalizeWeights();
+                numActiveKernels++;
+                return "Split from Kernel "+generator.getId();
+            }
+            else{
+                System.out.println("Tried to split new kernel from 
C"+generator.getId()+
+                        ". Not enough room for new cluster, decrease average 
radii, number of clusters or enable overlap.");
+                return "";
+            }
+        }
+        
+        private String fadeOut(){
+               kill = decayHorizonOption.getValue();
+               generator.setWeight(0.0);
+               numActiveKernels--;
+               normalizeWeights();
+               return "Fading out C"+generator.getId();
+        }
+        
+        
+    }
+
+    public RandomRBFGeneratorEvents() {
+        noiseInClusterOption.set();
+//        eventDeleteCreateOption.set();
+//        eventMergeSplitOption.set();
+    }
+
+    public InstancesHeader getHeader() {
+            return streamHeader;
+    }
+
+    public long estimatedRemainingInstances() {
+            return -1;
+    }
+
+    public boolean hasMoreInstances() {
+            return true;
+    }
+
+    public boolean isRestartable() {
+            return true;
+    }
+
+    @Override
+    public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
+        monitor.setCurrentActivity("Preparing random RBF...", -1.0);
+        generateHeader();
+        restart();
+    }
+
+    public void restart() {
+            instanceRandom = new Random(instanceRandomSeedOption.getValue());
+            nextEventCounter = eventFrequencyOption.getValue();
+            nextEventChoice = getNextEvent();
+            numActiveKernels = 0;
+            numGeneratedInstances = 0;
+            clusterIdCounter = 0;
+            mergeClusterA = mergeClusterB = null;
+            kernels = new AutoExpandVector<GeneratorCluster>();
+
+            initKernels();
+    }
+       
+    protected void generateHeader() {  // 2013/06/02: Noise label
+        ArrayList<Attribute> attributes = new ArrayList<Attribute>();
+        for (int i = 0; i < this.numAttsOption.getValue(); i++) {
+            attributes.add(new Attribute("att" + (i + 1)));
+        }
+        
+        ArrayList<String> classLabels = new ArrayList<String>();
+        for (int i = 0; i < this.numClusterOption.getValue(); i++) {
+            classLabels.add("class" + (i + 1));
+        }
+        if (noiseLevelOption.getValue() > 0) classLabels.add("noise"); // The 
last label = "noise"
+        
+        attributes.add(new Attribute("class", classLabels));
+        streamHeader = new InstancesHeader(new 
Instances(getCLICreationString(InstanceStream.class), attributes, 0));
+        streamHeader.setClassIndex(streamHeader.numAttributes() - 1);
+    }
+
+        
+    protected void initKernels() {
+        for (int i = 0; i < numClusterOption.getValue(); i++) {
+            kernels.add(new GeneratorCluster(clusterIdCounter));
+            numActiveKernels++;
+            clusterIdCounter++;
+        }
+        normalizeWeights();
+    }
+
+    public InstanceExample nextInstance() {
+        numGeneratedInstances++;
+        eventScheduler();
+
+        //make room for the classlabel
+        double[] values_new = new double [numAttsOption.getValue()]; //+1
+        double[] values = null;
+        int clusterChoice = -1;
+
+        if(instanceRandom.nextDouble() > noiseLevelOption.getValue()){
+            clusterChoice = chooseWeightedElement();
+            values = 
kernels.get(clusterChoice).generator.sample(instanceRandom).toDoubleArray();
+        }
+        else{
+            //get ranodm noise point
+            values = getNoisePoint();
+        }
+
+        if(Double.isNaN(values[0])){
+            System.out.println("Instance corrupted:"+numGeneratedInstances);
+        }
+        System.arraycopy(values, 0, values_new, 0, values.length);
+
+        Instance inst = new DenseInstance(1.0, values_new);
+        inst.setDataset(getHeader());
+        if(clusterChoice == -1){
+               // 2013/06/02 (Yunsu Kim)
+               // Noise instance has the last class value instead of "-1"
+               // Preventing ArrayIndexOutOfBoundsException in 
WriteStreamToARFFFile
+            inst.setClassValue(numClusterOption.getValue());
+        }
+        else{
+            inst.setClassValue(kernels.get(clusterChoice).generator.getId());
+            //Do we need micro cluster representation if have overlapping 
clusters?
+            //if(!overlappingOption.isSet())
+                kernels.get(clusterChoice).addInstance(inst);
+        }
+//        System.out.println(numGeneratedInstances+": Overlap 
is"+updateOverlaps());
+        
+        return new InstanceExample(inst);
+    }
+
+
+    public Clustering getGeneratingClusters(){
+        Clustering clustering = new Clustering();
+        for (int c = 0; c < kernels.size(); c++) {
+            clustering.add(kernels.get(c).generator);
+        }
+        return clustering;
+    }
+
+    public Clustering getMicroClustering(){
+        Clustering clustering = new Clustering();
+        int id = 0;
+
+            for (int c = 0; c < kernels.size(); c++) {
+                for (int m = 0; m < kernels.get(c).microClusters.size(); m++) {
+                    kernels.get(c).microClusters.get(m).setId(id);
+                    
kernels.get(c).microClusters.get(m).setGroundTruth(kernels.get(c).generator.getId());
+                    clustering.add(kernels.get(c).microClusters.get(m));
+                    id++;
+                }
+            }
+
+        //System.out.println("numMicroKernels "+clustering.size());
+        return clustering;
+    }
+
+/**************************** EVENTS 
******************************************/
+    private void eventScheduler(){
+
+        for ( int i = 0; i < kernels.size(); i++ ) {
+            kernels.get(i).updateKernel();
+        }
+        
+        nextEventCounter--;
+        //only move kernels every 10 points, performance reasons????
+        //should this be randomized as well???
+        if(nextEventCounter%kernelMovePointFrequency == 0){
+            //move kernels
+            for ( int i = 0; i < kernels.size(); i++ ) {
+                kernels.get(i).move();
+                //overlapControl();
+            }
+        }
+
+
+        if(eventFrequencyOption.getValue() == 0){
+            return;
+        }
+
+        String type ="";
+        String message ="";
+        boolean eventFinished = false;
+        switch(nextEventChoice){
+            case 0:
+                if(numActiveKernels > 1 && numActiveKernels > 
numClusterOption.getValue() - numClusterRangeOption.getValue()){
+                    message = mergeKernels(nextEventCounter);
+                    type = "Merge";
+                }
+                if(mergeClusterA==null && mergeClusterB==null && 
message.startsWith("Clusters merging")){
+                       eventFinished = true;
+                }                
+            break;
+            case 1:
+                if(nextEventCounter<=0){
+                    if(numActiveKernels < numClusterOption.getValue() + 
numClusterRangeOption.getValue()){
+                        type = "Split";
+                        message = splitKernel();
+                    }
+                    eventFinished = true;
+                }
+            break;
+            case 2:
+                if(nextEventCounter<=0){
+                       if(numActiveKernels > 1 && numActiveKernels > 
numClusterOption.getValue() - numClusterRangeOption.getValue()){
+                        message = fadeOut();
+                        type = "Delete";
+                    }
+                       eventFinished = true;
+                }
+            break;
+            case 3:
+                if(nextEventCounter<=0){
+                       if(numActiveKernels < numClusterOption.getValue() + 
numClusterRangeOption.getValue()){
+                           message = fadeIn();
+                           type = "Create";
+                       }
+                       eventFinished = true;           
+                }
+            break;
+
+        }
+        if (eventFinished){
+                nextEventCounter = 
(int)(eventFrequencyOption.getValue()+(instanceRandom.nextBoolean()?-1:1)*eventFrequencyOption.getValue()*eventFrequencyRange*instanceRandom.nextDouble());
+                nextEventChoice = getNextEvent();
+                //System.out.println("Next event choice: "+nextEventChoice);
+        }
+        if(!message.isEmpty()){
+               message+=" (numKernels = "+numActiveKernels+" at 
"+numGeneratedInstances+")";
+               if(!type.equals("Merge") || message.startsWith("Clusters 
merging"))
+                       fireClusterChange(numGeneratedInstances, type, message);
+        }
+    }
+    
+    private int getNextEvent() {
+       int choice = -1;
+       boolean lowerLimit = numActiveKernels <= numClusterOption.getValue() - 
numClusterRangeOption.getValue();
+       boolean upperLimit = numActiveKernels >= numClusterOption.getValue() + 
numClusterRangeOption.getValue();
+
+       if(!lowerLimit || !upperLimit){
+               int mode = -1;
+               if(eventDeleteCreateOption.isSet() && 
eventMergeSplitOption.isSet()){
+                       mode = instanceRandom.nextInt(2);
+               }
+               
+                       if(mode==0 || (mode==-1 && 
eventMergeSplitOption.isSet())){
+                               //have we reached a limit? if not free choice
+                               if(!lowerLimit && !upperLimit) 
+                                       choice = instanceRandom.nextInt(2);
+                               else
+                                       //we have a limit. if lower limit, 
choose split
+                                       if(lowerLimit)
+                                               choice = 1;
+                                       //otherwise we reached upper level, 
choose merge
+                                       else
+                                               choice = 0;
+                       }
+                       
+                       if(mode==1 || (mode==-1 && 
eventDeleteCreateOption.isSet())){
+                               //have we reached a limit? if not free choice
+                               if(!lowerLimit && !upperLimit) 
+                                       choice = instanceRandom.nextInt(2)+2;
+                               else
+                                       //we have a limit. if lower limit, 
choose create
+                                       if(lowerLimit)
+                                               choice = 3;
+                                       //otherwise we reached upper level, 
choose delete
+                                       else
+                                               choice = 2;
+                       }
+       }
+
+       
+       return choice;
+    }
+
+       private String fadeOut(){
+           int id = instanceRandom.nextInt(kernels.size());
+           while(kernels.get(id).kill!=-1)
+               id = instanceRandom.nextInt(kernels.size());
+       
+           String message = kernels.get(id).fadeOut();
+           return message;
+    }
+    
+    private String fadeIn(){
+               GeneratorCluster gc = new GeneratorCluster(clusterIdCounter++);
+               kernels.add(gc);
+               numActiveKernels++;
+               normalizeWeights();
+       return "Creating new cluster";
+    }
+    
+    
+    private String changeWeight(boolean increase){
+        double changeRate = 0.1;
+        int id = instanceRandom.nextInt(kernels.size());
+        while(kernels.get(id).kill!=-1)
+            id = instanceRandom.nextInt(kernels.size());
+
+        int sign = 1;
+        if(!increase)
+            sign = -1;
+        double weight_old = kernels.get(id).generator.getWeight();
+        double delta = 
sign*numActiveKernels*instanceRandom.nextDouble()*changeRate;
+        kernels.get(id).generator.setWeight(weight_old+delta);
+
+        normalizeWeights();
+
+        String message;
+        if(increase)
+            message = "Increase ";
+        else
+            message = "Decrease ";
+        message+=" weight on Cluster "+id+" from "+weight_old+" to 
"+(weight_old+delta);
+        return message;
+
+
+    }
+
+    private String changeRadius(boolean increase){
+        double maxChangeRate = 0.1;
+        int id = instanceRandom.nextInt(kernels.size());
+        while(kernels.get(id).kill!=-1)
+            id = instanceRandom.nextInt(kernels.size());
+
+        int sign = 1;
+        if(!increase)
+            sign = -1;
+
+        double r_old = kernels.get(id).generator.getRadius();
+        double r_new 
=r_old+sign*r_old*instanceRandom.nextDouble()*maxChangeRate;
+        if(r_new >= 0.5) return "Radius to big";
+        kernels.get(id).generator.setRadius(r_new);
+        
+        String message;
+        if(increase)
+            message = "Increase ";
+        else
+            message = "Decrease ";
+        message+=" radius on Cluster "+id+" from "+r_old+" to "+r_new;
+        return message;
+    }
+
+    private String splitKernel(){
+        int id = instanceRandom.nextInt(kernels.size());
+        while(kernels.get(id).kill!=-1)
+            id = instanceRandom.nextInt(kernels.size());
+
+        String message = kernels.get(id).splitKernel();
+
+        return message;
+    }
+
+    private String mergeKernels(int steps){
+        if(numActiveKernels >1 && ((mergeClusterA == null && mergeClusterB == 
null))){
+
+               //choose clusters to merge
+               double diseredDist = steps / speedOption.getValue() * 
maxDistanceMoveThresholdByStep;
+               double minDist = Double.MAX_VALUE;
+//             System.out.println("DisredDist:"+(2*diseredDist));
+               for(int i = 0; i < kernels.size(); i++){
+                       for(int j = 0; j < i; j++){
+                       if(kernels.get(i).kill!=-1 || kernels.get(j).kill!=-1){
+                               continue;
+                       }
+                       else{
+                               double kernelDist = 
kernels.get(i).generator.getCenterDistance(kernels.get(j).generator);
+                               double d = kernelDist-2*diseredDist;
+//                             System.out.println("Dist:"+i+" / "+j+" "+d);
+                               if(Math.abs(d) < minDist && 
+                                               (minDist != Double.MAX_VALUE || 
d>0 || Math.abs(d) < 0.001)){
+                                       minDist = Math.abs(d);
+                                       mergeClusterA = kernels.get(i);
+                                       mergeClusterB = kernels.get(j);
+                               }
+                       }
+                       }
+               }
+               
+               if(mergeClusterA!=null && mergeClusterB!=null){
+                       double[] merge_point = 
mergeClusterA.generator.getCenter();
+                       double[] v = 
mergeClusterA.generator.getDistanceVector(mergeClusterB.generator);
+                       for (int i = 0; i < v.length; i++) {
+                               merge_point[i]= merge_point[i]+v[i]*0.5;
+                               }
+       
+                   mergeClusterA.merging = true;
+                   mergeClusterB.merging = true;
+                   mergeClusterA.setDesitnation(merge_point);
+                   mergeClusterB.setDesitnation(merge_point);
+                   
+                   if(debug){
+                       
System.out.println("Center1"+Arrays.toString(mergeClusterA.generator.getCenter()));
+                               
System.out.println("Center2"+Arrays.toString(mergeClusterB.generator.getCenter()));
+                           System.out.println("Vector"+Arrays.toString(v));    
        
+                       
+                       System.out.println("Try to merge cluster 
"+mergeClusterA.generator.getId()+
+                               " into "+mergeClusterB.generator.getId()+
+                               " at "+Arrays.toString(merge_point)+
+                               " time "+numGeneratedInstances);
+                   }
+                   return "Init merge";
+               }
+        }
+
+        if(mergeClusterA != null && mergeClusterB != null){
+
+            //movekernels will move the kernels close to each other,
+            //we just need to check and merge here if they are close enough
+            return mergeClusterA.tryMerging(mergeClusterB);
+        }
+
+        return "";
+    }
+
+
+
+
+/************************* TOOLS **************************************/
+
+    public void getDescription(StringBuilder sb, int indent) {
+
+    }
+
+    private double[] getNoisePoint(){
+        double [] sample = new double [numAttsOption.getValue()];
+        boolean incluster = true;
+        int counter = 20;
+        while(incluster){
+            for (int j = 0; j < numAttsOption.getValue(); j++) {
+                 sample[j] = instanceRandom.nextDouble();
+            }
+            incluster = false;
+            if(!noiseInClusterOption.isSet() && counter > 0){
+                counter--;
+                for(int c = 0; c < kernels.size(); c++){
+                    for(int m = 0; m < kernels.get(c).microClusters.size(); 
m++){
+                        Instance inst = new DenseInstance(1, sample);
+                        
if(kernels.get(c).microClusters.get(m).getInclusionProbability(inst) > 0){
+                            incluster = true;
+                            break;
+                        }
+                    }
+                    if(incluster)
+                        break;
+                }
+            }
+        }
+
+//        double [] sample = new double [numAttsOption.getValue()];
+//        for (int j = 0; j < numAttsOption.getValue(); j++) {
+//             sample[j] = instanceRandom.nextDouble();
+//        }
+             
+        return sample;
+    }
+
+     private int chooseWeightedElement() {
+        double r = instanceRandom.nextDouble();
+
+        // Determine index of choosen element
+        int i = 0;
+        while (r > 0.0) {
+            r -= kernels.get(i).generator.getWeight();
+            i++;
+        }
+        --i;   // Overcounted once
+        //System.out.println(i);
+        return i;
+    }
+
+    private void normalizeWeights(){
+        double sumWeights = 0.0;
+        for (int i = 0; i < kernels.size(); i++) {
+            sumWeights+=kernels.get(i).generator.getWeight();
+        }
+        for (int i = 0; i < kernels.size(); i++) {
+            
kernels.get(i).generator.setWeight(kernels.get(i).generator.getWeight()/sumWeights);
+        }
+    }
+
+
+
+ /*************** EVENT Listener *********************/
+ // should go into the superclass of the generator, create new one for cluster 
streams?
+  
+  /** Add a listener */
+  synchronized public void addClusterChangeListener(ClusterEventListener l) {
+    if (listeners == null)
+      listeners = new Vector();
+    listeners.addElement(l);
+  }
+
+  /** Remove a listener */
+  synchronized public void removeClusterChangeListener(ClusterEventListener l) 
{
+    if (listeners == null)
+      listeners = new Vector();
+    listeners.removeElement(l);
+  }
+
+  /** Fire a ClusterChangeEvent to all registered listeners */
+  protected void fireClusterChange(long timestamp, String type, String 
message) {
+    // if we have no listeners, do nothing...
+    if (listeners != null && !listeners.isEmpty()) {
+      // create the event object to send
+      ClusterEvent event =
+        new ClusterEvent(this, timestamp, type , message);
+
+      // make a copy of the listener list in case
+      //   anyone adds/removes listeners
+      Vector targets;
+      synchronized (this) {
+        targets = (Vector) listeners.clone();
+      }
+
+      // walk through the listener list and
+      //   call the sunMoved method in each
+      Enumeration e = targets.elements();
+      while (e.hasMoreElements()) {
+        ClusterEventListener l = (ClusterEventListener) e.nextElement();
+        l.changeCluster(event);
+
+      }
+    }
+  }
+
+    @Override
+    public String getPurposeString() {
+            return "Generates a random radial basis function stream.";
+    }
+
+
+    public String getParameterString(){
+        return "";
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java
new file mode 100644
index 0000000..d0ab735
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/HyperplaneGenerator.java
@@ -0,0 +1,178 @@
+package com.yahoo.labs.samoa.moa.streams.generators;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ *    Copyright (C) 2007 University of Waikato, Hamilton, New Zealand
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.Random;
+
+import com.github.javacliparser.FloatOption;
+import com.github.javacliparser.IntOption;
+import com.yahoo.labs.samoa.instances.Attribute;
+import com.yahoo.labs.samoa.instances.DenseInstance;
+import com.yahoo.labs.samoa.instances.Instance;
+import com.yahoo.labs.samoa.instances.Instances;
+import com.yahoo.labs.samoa.instances.InstancesHeader;
+import com.yahoo.labs.samoa.moa.core.Example;
+import com.yahoo.labs.samoa.moa.core.FastVector;
+import com.yahoo.labs.samoa.moa.core.InstanceExample;
+import com.yahoo.labs.samoa.moa.core.ObjectRepository;
+import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler;
+import com.yahoo.labs.samoa.moa.streams.InstanceStream;
+import com.yahoo.labs.samoa.moa.tasks.TaskMonitor;
+
+/**
+ * Stream generator for Hyperplane data stream.
+ * 
+ * @author Albert Bifet (abifet at cs dot waikato dot ac dot nz)
+ * @version $Revision: 7 $
+ */
+public class HyperplaneGenerator extends AbstractOptionHandler implements 
InstanceStream {
+
+    @Override
+    public String getPurposeString() {
+        return "Generates a problem of predicting class of a rotating 
hyperplane.";
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    public IntOption instanceRandomSeedOption = new 
IntOption("instanceRandomSeed", 'i', "Seed for random generation of 
instances.", 1);
+
+    public IntOption numClassesOption = new IntOption("numClasses", 'c', "The 
number of classes to generate.", 2, 2, Integer.MAX_VALUE);
+
+    public IntOption numAttsOption = new IntOption("numAtts", 'a', "The number 
of attributes to generate.", 10, 0, Integer.MAX_VALUE);
+
+    public IntOption numDriftAttsOption = new IntOption("numDriftAtts", 'k', 
"The number of attributes with drift.", 2, 0, Integer.MAX_VALUE);
+
+    public FloatOption magChangeOption = new FloatOption("magChange", 't', 
"Magnitude of the change for every example", 0.0, 0.0, 1.0);
+
+    public IntOption noisePercentageOption = new IntOption("noisePercentage", 
'n', "Percentage of noise to add to the data.", 5, 0, 100);
+
+    public IntOption sigmaPercentageOption = new IntOption("sigmaPercentage", 
's', "Percentage of probability that the direction of change is reversed.", 10,
+            0, 100);
+
+    protected InstancesHeader streamHeader;
+
+    protected Random instanceRandom;
+
+    protected double[] weights;
+
+    protected int[] sigma;
+
+    public int numberInstance;
+
+    @Override
+    protected void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
+        monitor.setCurrentActivity("Preparing hyperplane...", -1.0);
+        generateHeader();
+        restart();
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    protected void generateHeader() {
+        FastVector attributes = new FastVector();
+        for (int i = 0; i < this.numAttsOption.getValue(); i++) {
+            attributes.addElement(new Attribute("att" + (i + 1)));
+        }
+
+        FastVector classLabels = new FastVector();
+        for (int i = 0; i < this.numClassesOption.getValue(); i++) {
+            classLabels.addElement("class" + (i + 1));
+        }
+        attributes.addElement(new Attribute("class", classLabels));
+        this.streamHeader = new InstancesHeader(new 
Instances(getCLICreationString(InstanceStream.class), attributes, 0));
+        this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1);
+    }
+
+    @Override
+    public long estimatedRemainingInstances() {
+        return -1;
+    }
+
+    @Override
+    public InstancesHeader getHeader() {
+        return this.streamHeader;
+    }
+
+    @Override
+    public boolean hasMoreInstances() {
+        return true;
+    }
+
+    @Override
+    public boolean isRestartable() {
+        return true;
+    }
+
+    @Override
+    public Example<Instance> nextInstance() {
+
+        int numAtts = this.numAttsOption.getValue();
+        double[] attVals = new double[numAtts + 1];
+        double sum = 0.0;
+        double sumWeights = 0.0;
+        for (int i = 0; i < numAtts; i++) {
+            attVals[i] = this.instanceRandom.nextDouble();
+            sum += this.weights[i] * attVals[i];
+            sumWeights += this.weights[i];
+        }
+        int classLabel;
+        if (sum >= sumWeights * 0.5) {
+            classLabel = 1;
+        } else {
+            classLabel = 0;
+        }
+        // Add Noise
+        if ((1 + (this.instanceRandom.nextInt(100))) <= 
this.noisePercentageOption.getValue()) {
+            classLabel = (classLabel == 0 ? 1 : 0);
+        }
+
+        Instance inst = new DenseInstance(1.0, attVals);
+        inst.setDataset(getHeader());
+        inst.setClassValue(classLabel);
+        addDrift();
+        return new InstanceExample(inst);
+    }
+
+    private void addDrift() {
+        for (int i = 0; i < this.numDriftAttsOption.getValue(); i++) {
+            this.weights[i] += (double) ((double) sigma[i]) * ((double) 
this.magChangeOption.getValue());
+            if (// this.weights[i] >= 1.0 || this.weights[i] <= 0.0 ||
+            (1 + (this.instanceRandom.nextInt(100))) <= 
this.sigmaPercentageOption.getValue()) {
+                this.sigma[i] *= -1;
+            }
+        }
+    }
+
+    @Override
+    public void restart() {
+        this.instanceRandom = new 
Random(this.instanceRandomSeedOption.getValue());
+        this.weights = new double[this.numAttsOption.getValue()];
+        this.sigma = new int[this.numAttsOption.getValue()];
+        for (int i = 0; i < this.numAttsOption.getValue(); i++) {
+            this.weights[i] = this.instanceRandom.nextDouble();
+            this.sigma[i] = (i < this.numDriftAttsOption.getValue() ? 1 : 0);
+        }
+    }
+
+    @Override
+    public void getDescription(StringBuilder sb, int indent) {
+        // TODO Auto-generated method stub
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java
new file mode 100644
index 0000000..a7f982e
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/streams/generators/RandomTreeGenerator.java
@@ -0,0 +1,265 @@
+package com.yahoo.labs.samoa.moa.streams.generators;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ *    Copyright (C) 2007 University of Waikato, Hamilton, New Zealand
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.instances.Attribute;
+import com.yahoo.labs.samoa.instances.DenseInstance;
+import com.yahoo.labs.samoa.moa.core.FastVector;
+import com.yahoo.labs.samoa.instances.Instance;
+import com.yahoo.labs.samoa.instances.Instances;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Random;
+import com.yahoo.labs.samoa.moa.core.InstanceExample;
+
+import com.yahoo.labs.samoa.instances.InstancesHeader;
+import com.yahoo.labs.samoa.moa.core.ObjectRepository;
+import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler;
+import com.github.javacliparser.FloatOption;
+import com.github.javacliparser.IntOption;
+import com.yahoo.labs.samoa.moa.streams.InstanceStream;
+import com.yahoo.labs.samoa.moa.tasks.TaskMonitor;
+
+/**
+ * Stream generator for a stream based on a randomly generated tree..
+ *
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public class RandomTreeGenerator extends AbstractOptionHandler implements 
InstanceStream {
+
+    @Override
+    public String getPurposeString() {
+        return "Generates a stream based on a randomly generated tree.";
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    public IntOption treeRandomSeedOption = new IntOption("treeRandomSeed",
+            'r', "Seed for random generation of tree.", 1);
+
+    public IntOption instanceRandomSeedOption = new IntOption(
+            "instanceRandomSeed", 'i',
+            "Seed for random generation of instances.", 1);
+
+    public IntOption numClassesOption = new IntOption("numClasses", 'c',
+            "The number of classes to generate.", 2, 2, Integer.MAX_VALUE);
+
+    public IntOption numNominalsOption = new IntOption("numNominals", 'o',
+            "The number of nominal attributes to generate.", 5, 0,
+            Integer.MAX_VALUE);
+
+    public IntOption numNumericsOption = new IntOption("numNumerics", 'u',
+            "The number of numeric attributes to generate.", 5, 0,
+            Integer.MAX_VALUE);
+
+    public IntOption numValsPerNominalOption = new IntOption(
+            "numValsPerNominal", 'v',
+            "The number of values to generate per nominal attribute.", 5, 2,
+            Integer.MAX_VALUE);
+
+    public IntOption maxTreeDepthOption = new IntOption("maxTreeDepth", 'd',
+            "The maximum depth of the tree concept.", 5, 0, Integer.MAX_VALUE);
+
+    public IntOption firstLeafLevelOption = new IntOption(
+            "firstLeafLevel",
+            'l',
+            "The first level of the tree above maxTreeDepth that can have 
leaves.",
+            3, 0, Integer.MAX_VALUE);
+
+    public FloatOption leafFractionOption = new FloatOption("leafFraction",
+            'f',
+            "The fraction of leaves per level from firstLeafLevel onwards.",
+            0.15, 0.0, 1.0);
+
+    protected static class Node implements Serializable {
+
+        private static final long serialVersionUID = 1L;
+
+        public int classLabel;
+
+        public int splitAttIndex;
+
+        public double splitAttValue;
+
+        public Node[] children;
+    }
+
+    protected Node treeRoot;
+
+    protected InstancesHeader streamHeader;
+
+    protected Random instanceRandom;
+
+    @Override
+    public void prepareForUseImpl(TaskMonitor monitor,
+            ObjectRepository repository) {
+        monitor.setCurrentActivity("Preparing random tree...", -1.0);
+        generateHeader();
+        generateRandomTree();
+        restart();
+    }
+
+    @Override
+    public long estimatedRemainingInstances() {
+        return -1;
+    }
+
+    @Override
+    public boolean isRestartable() {
+        return true;
+    }
+
+    @Override
+    public void restart() {
+        this.instanceRandom = new 
Random(this.instanceRandomSeedOption.getValue());
+    }
+
+    @Override
+    public InstancesHeader getHeader() {
+        return this.streamHeader;
+    }
+
+    @Override
+    public boolean hasMoreInstances() {
+        return true;
+    }
+
+    @Override
+    public InstanceExample nextInstance() {
+        double[] attVals = new double[this.numNominalsOption.getValue()
+                + this.numNumericsOption.getValue()];
+        InstancesHeader header = getHeader();
+        Instance inst = new DenseInstance(header.numAttributes());
+        for (int i = 0; i < attVals.length; i++) {
+            attVals[i] = i < this.numNominalsOption.getValue() ? 
this.instanceRandom.nextInt(this.numValsPerNominalOption.getValue())
+                    : this.instanceRandom.nextDouble();
+            inst.setValue(i, attVals[i]);
+        }
+        inst.setDataset(header);
+        inst.setClassValue(classifyInstance(this.treeRoot, attVals));
+        return new InstanceExample(inst);
+    }
+
+    protected int classifyInstance(Node node, double[] attVals) {
+        if (node.children == null) {
+            return node.classLabel;
+        }
+        if (node.splitAttIndex < this.numNominalsOption.getValue()) {
+            return classifyInstance(
+                    node.children[(int) attVals[node.splitAttIndex]], attVals);
+        }
+        return classifyInstance(
+                node.children[attVals[node.splitAttIndex] < node.splitAttValue 
? 0
+                : 1], attVals);
+    }
+
+    protected void generateHeader() {
+        FastVector<Attribute> attributes = new FastVector<>();
+        FastVector<String> nominalAttVals = new FastVector<>();
+        for (int i = 0; i < this.numValsPerNominalOption.getValue(); i++) {
+            nominalAttVals.addElement("value" + (i + 1));
+        }
+        for (int i = 0; i < this.numNominalsOption.getValue(); i++) {
+            attributes.addElement(new Attribute("nominal" + (i + 1),
+                    nominalAttVals));
+        }
+        for (int i = 0; i < this.numNumericsOption.getValue(); i++) {
+            attributes.addElement(new Attribute("numeric" + (i + 1)));
+        }
+        FastVector<String> classLabels = new FastVector<>();
+        for (int i = 0; i < this.numClassesOption.getValue(); i++) {
+            classLabels.addElement("class" + (i + 1));
+        }
+        attributes.addElement(new Attribute("class", classLabels));
+        this.streamHeader = new InstancesHeader(new Instances(
+                getCLICreationString(InstanceStream.class), attributes, 0));
+        this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1);
+    }
+
+    protected void generateRandomTree() {
+        Random treeRand = new Random(this.treeRandomSeedOption.getValue());
+        ArrayList<Integer> nominalAttCandidates = new ArrayList<>(
+                this.numNominalsOption.getValue());
+        for (int i = 0; i < this.numNominalsOption.getValue(); i++) {
+            nominalAttCandidates.add(i);
+        }
+        double[] minNumericVals = new 
double[this.numNumericsOption.getValue()];
+        double[] maxNumericVals = new 
double[this.numNumericsOption.getValue()];
+        for (int i = 0; i < this.numNumericsOption.getValue(); i++) {
+            minNumericVals[i] = 0.0;
+            maxNumericVals[i] = 1.0;
+        }
+        this.treeRoot = generateRandomTreeNode(0, nominalAttCandidates,
+                minNumericVals, maxNumericVals, treeRand);
+    }
+
+    protected Node generateRandomTreeNode(int currentDepth,
+            ArrayList<Integer> nominalAttCandidates, double[] minNumericVals,
+            double[] maxNumericVals, Random treeRand) {
+        if ((currentDepth >= this.maxTreeDepthOption.getValue())
+                || ((currentDepth >= this.firstLeafLevelOption.getValue()) && 
(this.leafFractionOption.getValue() >= (1.0 - treeRand.nextDouble())))) {
+            Node leaf = new Node();
+            leaf.classLabel = 
treeRand.nextInt(this.numClassesOption.getValue());
+            return leaf;
+        }
+        Node node = new Node();
+        int chosenAtt = treeRand.nextInt(nominalAttCandidates.size()
+                + this.numNumericsOption.getValue());
+        if (chosenAtt < nominalAttCandidates.size()) {
+            node.splitAttIndex = nominalAttCandidates.get(chosenAtt);
+            node.children = new Node[this.numValsPerNominalOption.getValue()];
+            ArrayList<Integer> newNominalCandidates = new ArrayList<>(
+                    nominalAttCandidates);
+            newNominalCandidates.remove(new Integer(node.splitAttIndex));
+            newNominalCandidates.trimToSize();
+            for (int i = 0; i < node.children.length; i++) {
+                node.children[i] = generateRandomTreeNode(currentDepth + 1,
+                        newNominalCandidates, minNumericVals, maxNumericVals,
+                        treeRand);
+            }
+        } else {
+            int numericIndex = chosenAtt - nominalAttCandidates.size();
+            node.splitAttIndex = this.numNominalsOption.getValue()
+                    + numericIndex;
+            double minVal = minNumericVals[numericIndex];
+            double maxVal = maxNumericVals[numericIndex];
+            node.splitAttValue = ((maxVal - minVal) * treeRand.nextDouble())
+                    + minVal;
+            node.children = new Node[2];
+            double[] newMaxVals = maxNumericVals.clone();
+            newMaxVals[numericIndex] = node.splitAttValue;
+            node.children[0] = generateRandomTreeNode(currentDepth + 1,
+                    nominalAttCandidates, minNumericVals, newMaxVals, 
treeRand);
+            double[] newMinVals = minNumericVals.clone();
+            newMinVals[numericIndex] = node.splitAttValue;
+            node.children[1] = generateRandomTreeNode(currentDepth + 1,
+                    nominalAttCandidates, newMinVals, maxNumericVals, 
treeRand);
+        }
+        return node;
+    }
+
+    @Override
+    public void getDescription(StringBuilder sb, int indent) {
+        // TODO Auto-generated method stub
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java
new file mode 100644
index 0000000..977897f
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/NullMonitor.java
@@ -0,0 +1,102 @@
+package com.yahoo.labs.samoa.moa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ *    Copyright (C) 2007 University of Waikato, Hamilton, New Zealand
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Class that represents a null monitor.
+ *
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public class NullMonitor implements TaskMonitor {
+
+    @Override
+    public void setCurrentActivity(String activityDescription,
+            double fracComplete) {
+    }
+
+    @Override
+    public void setCurrentActivityDescription(String activity) {
+    }
+
+    @Override
+    public void setCurrentActivityFractionComplete(double fracComplete) {
+    }
+
+    @Override
+    public boolean taskShouldAbort() {
+        return false;
+    }
+
+    @Override
+    public String getCurrentActivityDescription() {
+        return null;
+    }
+
+    @Override
+    public double getCurrentActivityFractionComplete() {
+        return -1.0;
+    }
+
+    @Override
+    public boolean isPaused() {
+        return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return false;
+    }
+
+    @Override
+    public void requestCancel() {
+    }
+
+    @Override
+    public void requestPause() {
+    }
+
+    @Override
+    public void requestResume() {
+    }
+
+    @Override
+    public Object getLatestResultPreview() {
+        return null;
+    }
+
+    @Override
+    public void requestResultPreview() {
+    }
+
+    @Override
+    public boolean resultPreviewRequested() {
+        return false;
+    }
+
+    @Override
+    public void setLatestResultPreview(Object latestPreview) {
+    }
+
+    @Override
+    public void requestResultPreview(ResultPreviewListener toInform) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java
new file mode 100644
index 0000000..3fece0b
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/ResultPreviewListener.java
@@ -0,0 +1,40 @@
+package com.yahoo.labs.samoa.moa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ *    Copyright (C) 2007 University of Waikato, Hamilton, New Zealand
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Interface implemented by classes that preview results 
+ * on the Graphical User Interface 
+ *
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public interface ResultPreviewListener {
+
+    /**
+     * This method is used to receive a signal from
+     * <code>TaskMonitor</code> that the lastest preview has
+     * changed. This method is implemented in <code>PreviewPanel</code>
+     * to change the results that are shown in its panel.
+     *
+     */
+    public void latestPreviewChanged();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java
new file mode 100644
index 0000000..d2c96a8
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/Task.java
@@ -0,0 +1,61 @@
+package com.yahoo.labs.samoa.moa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ *    Copyright (C) 2007 University of Waikato, Hamilton, New Zealand
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.moa.MOAObject;
+import com.yahoo.labs.samoa.moa.core.ObjectRepository;
+
+/**
+ * Interface representing a task. 
+ *
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $ 
+ */
+public interface Task extends MOAObject {
+
+    /**
+     * Gets the result type of this task.
+     * Tasks can return LearningCurve, LearningEvaluation,
+     * Classifier, String, Instances..
+     *
+     * @return a class object of the result of this task
+     */
+    public Class<?> getTaskResultType();
+
+    /**
+     * This method performs this task,
+     * when TaskMonitor and ObjectRepository are no needed.
+     *
+     * @return an object with the result of this task
+     */
+    public Object doTask();
+
+    /**
+     * This method performs this task.
+     * <code>AbstractTask</code> implements this method so all
+     * its extensions only need to implement <code>doTaskImpl</code>
+     *
+     * @param monitor the TaskMonitor to use
+     * @param repository  the ObjectRepository to use
+     * @return an object with the result of this task
+     */
+    public Object doTask(TaskMonitor monitor, ObjectRepository repository);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java
new file mode 100644
index 0000000..4918cd8
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/moa/tasks/TaskMonitor.java
@@ -0,0 +1,140 @@
+package com.yahoo.labs.samoa.moa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ *    Copyright (C) 2007 University of Waikato, Hamilton, New Zealand
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Interface representing a task monitor. 
+ *
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $ 
+ */
+public interface TaskMonitor {
+
+    /**
+     * Sets the description and the percentage done of the current activity.
+     *
+     * @param activity the description of the current activity
+     * @param fracComplete the percentage done of the current activity
+     */
+    public void setCurrentActivity(String activityDescription,
+            double fracComplete);
+
+    /**
+     * Sets the description of the current activity.
+     *
+     * @param activity the description of the current activity
+     */
+    public void setCurrentActivityDescription(String activity);
+
+    /**
+     * Sets the percentage done of the current activity
+     *
+     * @param fracComplete the percentage done of the current activity
+     */
+    public void setCurrentActivityFractionComplete(double fracComplete);
+
+    /**
+     * Gets whether the task should abort.
+     *
+     * @return true if the task should abort
+     */
+    public boolean taskShouldAbort();
+
+    /**
+     * Gets whether there is a request for preview the task result.
+     *
+     * @return true if there is a request for preview the task result
+     */
+    public boolean resultPreviewRequested();
+
+    /**
+     * Sets the current result to preview
+     *
+     * @param latestPreview the result to preview
+     */
+    public void setLatestResultPreview(Object latestPreview);
+
+    /**
+     * Gets the description of the current activity.
+     *
+     * @return the description of the current activity
+     */
+    public String getCurrentActivityDescription();
+
+    /**
+     * Gets the percentage done of the current activity
+     *
+     * @return the percentage done of the current activity
+     */
+    public double getCurrentActivityFractionComplete();
+
+    /**
+     * Requests the task monitored to pause.
+     *
+     */
+    public void requestPause();
+
+    /**
+     * Requests the task monitored to resume.
+     *
+     */
+    public void requestResume();
+
+    /**
+     * Requests the task monitored to cancel.
+     *
+     */
+    public void requestCancel();
+
+    /**
+     * Gets whether the task monitored is paused.
+     *
+     * @return true if the task is paused
+     */
+    public boolean isPaused();
+
+    /**
+     * Gets whether the task monitored is cancelled.
+     *
+     * @return true if the task is cancelled
+     */
+    public boolean isCancelled();
+
+    /**
+     * Requests to preview the task result.
+     *
+     */
+    public void requestResultPreview();
+
+    /**
+     * Requests to preview the task result.
+     *
+     * @param toInform the listener of the changes in the preview of the result
+     */
+    public void requestResultPreview(ResultPreviewListener toInform);
+
+    /**
+     * Gets the current result to preview
+     *
+     * @return the result to preview
+     */
+    public Object getLatestResultPreview();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java
new file mode 100644
index 0000000..93fc7c4
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ArffFileStream.java
@@ -0,0 +1,119 @@
+package com.yahoo.labs.samoa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+
+import com.github.javacliparser.FileOption;
+import com.github.javacliparser.IntOption;
+import com.yahoo.labs.samoa.instances.Instances;
+import com.yahoo.labs.samoa.moa.core.InstanceExample;
+import com.yahoo.labs.samoa.moa.core.ObjectRepository;
+import com.yahoo.labs.samoa.moa.tasks.TaskMonitor;
+
+/**
+ * InstanceStream for ARFF file
+ * @author Casey
+ */
+public class ArffFileStream extends FileStream {
+
+       public FileOption arffFileOption = new FileOption("arffFile", 'f',
+                      "ARFF File(s) to load.", null, null, false);
+
+       public IntOption classIndexOption = new IntOption("classIndex", 'c',
+                      "Class index of data. 0 for none or -1 for last 
attribute in file.",
+                      -1, -1, Integer.MAX_VALUE);
+                  
+       protected InstanceExample lastInstanceRead;
+
+       @Override
+       public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
+               super.prepareForUseImpl(monitor, repository);
+               String filePath = 
this.arffFileOption.getFile().getAbsolutePath();
+               this.fileSource.init(filePath, "arff");
+               this.lastInstanceRead = null;
+       }
+       
+       @Override
+       protected void reset() {
+               try {
+                       if (this.fileReader != null)
+                               this.fileReader.close();
+
+                       fileSource.reset();
+               }
+               catch (IOException ioe) {
+                       throw new RuntimeException("FileStream restart 
failed.", ioe);
+               }
+
+               if (!getNextFileReader()) {
+                       hitEndOfStream = true;
+                       throw new RuntimeException("FileStream is empty.");
+               }
+       }
+       
+       @Override
+       protected boolean getNextFileReader() {
+               boolean ret = super.getNextFileReader();
+               if (ret) {
+                       this.instances = new Instances(this.fileReader, 1, -1);
+                       if (this.classIndexOption.getValue() < 0) {
+                               
this.instances.setClassIndex(this.instances.numAttributes() - 1);
+                       } else if (this.classIndexOption.getValue() > 0) {
+                               
this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
+                       }
+               }
+               return ret;
+       }
+       
+       @Override
+       protected boolean readNextInstanceFromFile() {
+               try {
+            if (this.instances.readInstance(this.fileReader)) {
+                this.lastInstanceRead = new 
InstanceExample(this.instances.instance(0));
+                this.instances.delete(); // keep instances clean
+                                                       return true;
+            }
+            if (this.fileReader != null) {
+                this.fileReader.close();
+                this.fileReader = null;
+            }
+            return false;
+        } catch (IOException ioe) {
+            throw new RuntimeException(
+                    "ArffFileStream failed to read instance from stream.", 
ioe);
+        }
+
+       }
+       
+       @Override
+       protected InstanceExample getLastInstanceRead() {
+               return this.lastInstanceRead;
+       }
+       
+       /*
+     * extend com.yahoo.labs.samoa.moa.MOAObject
+     */
+    @Override
+    public void getDescription(StringBuilder sb, int indent) {
+       // TODO Auto-generated method stub
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java
 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java
new file mode 100644
index 0000000..20f3feb
--- /dev/null
+++ 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/ClusteringEntranceProcessor.java
@@ -0,0 +1,241 @@
+package com.yahoo.labs.samoa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.evaluation.ClusteringEvaluationContentEvent;
+import com.yahoo.labs.samoa.instances.Instance;
+import com.yahoo.labs.samoa.instances.Instances;
+import com.yahoo.labs.samoa.learners.clusterers.ClusteringContentEvent;
+import com.yahoo.labs.samoa.moa.cluster.Clustering;
+import com.yahoo.labs.samoa.moa.core.DataPoint;
+import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler;
+import com.yahoo.labs.samoa.moa.streams.InstanceStream;
+import com.yahoo.labs.samoa.moa.streams.clustering.ClusteringStream;
+import com.yahoo.labs.samoa.moa.streams.clustering.RandomRBFGeneratorEvents;
+
+/**
+ * EntranceProcessor for Clustering Evaluation Task.
+ * 
+ */
+public final class ClusteringEntranceProcessor implements EntranceProcessor {
+
+    private static final long serialVersionUID = 4169053337917578558L;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ClusteringEntranceProcessor.class);
+
+    private StreamSource streamSource;
+    private Instance firstInstance;
+    private boolean isInited = false;
+    private Random random = new Random();
+    private double samplingThreshold;
+    private int numberInstances;
+    private int numInstanceSent = 0;
+
+    private int groundTruthSamplingFrequency;
+
+    @Override
+    public boolean process(ContentEvent event) {
+        // TODO: possible refactor of the super-interface implementation
+        // of source processor does not need this method
+        return false;
+    }
+
+    @Override
+    public void onCreate(int id) {
+        logger.debug("Creating ClusteringSourceProcessor with id {}", id);
+    }
+
+    @Override
+    public Processor newProcessor(Processor p) {
+        ClusteringEntranceProcessor newProcessor = new 
ClusteringEntranceProcessor();
+        ClusteringEntranceProcessor originProcessor = 
(ClusteringEntranceProcessor) p;
+        if (originProcessor.getStreamSource() != null) {
+            
newProcessor.setStreamSource(originProcessor.getStreamSource().getStream());
+        }
+        return newProcessor;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return (!isFinished());
+    }
+
+    @Override
+    public boolean isFinished() {
+        return (!streamSource.hasMoreInstances() || (numberInstances >= 0 && 
numInstanceSent >= numberInstances));
+    }
+
+    // /**
+    // * Method to send instances via input stream
+    // *
+    // * @param inputStream
+    // * @param numberInstances
+    // */
+    // public void sendInstances(Stream inputStream, Stream evaluationStream, 
int numberInstances, double samplingThreshold) {
+    // int numInstanceSent = 0;
+    // this.samplingThreshold = samplingThreshold;
+    // while (streamSource.hasMoreInstances() && numInstanceSent < 
numberInstances) {
+    // numInstanceSent++;
+    // DataPoint nextDataPoint = new DataPoint(nextInstance(), 
numInstanceSent);
+    // ClusteringContentEvent contentEvent = new 
ClusteringContentEvent(numInstanceSent, nextDataPoint);
+    // inputStream.put(contentEvent);
+    // sendPointsAndGroundTruth(streamSource, evaluationStream, 
numInstanceSent, nextDataPoint);
+    // }
+    //
+    // sendEndEvaluationInstance(inputStream);
+    // }
+
+    public double getSamplingThreshold() {
+        return samplingThreshold;
+    }
+
+    public void setSamplingThreshold(double samplingThreshold) {
+        this.samplingThreshold = samplingThreshold;
+    }
+    
+    
+
+    public int getGroundTruthSamplingFrequency() {
+        return groundTruthSamplingFrequency;
+    }
+
+    public void setGroundTruthSamplingFrequency(int 
groundTruthSamplingFrequency) {
+        this.groundTruthSamplingFrequency = groundTruthSamplingFrequency;
+    }
+
+    public StreamSource getStreamSource() {
+        return streamSource;
+    }
+
+    public void setStreamSource(InstanceStream stream) {
+        if (stream instanceof AbstractOptionHandler) {
+            ((AbstractOptionHandler) (stream)).prepareForUse();
+        }
+
+        this.streamSource = new StreamSource(stream);
+        firstInstance = streamSource.nextInstance().getData();
+    }
+
+    public Instances getDataset() {
+        return firstInstance.dataset();
+    }
+
+    private Instance nextInstance() {
+        if (this.isInited) {
+            return streamSource.nextInstance().getData();
+        } else {
+            this.isInited = true;
+            return firstInstance;
+        }
+    }
+
+    // private void sendEndEvaluationInstance(Stream inputStream) {
+    // ClusteringContentEvent contentEvent = new ClusteringContentEvent(-1, 
firstInstance);
+    // contentEvent.setLast(true);
+    // inputStream.put(contentEvent);
+    // }
+
+    // private void sendPointsAndGroundTruth(StreamSource sourceStream, Stream 
evaluationStream, int numInstanceSent, DataPoint nextDataPoint) {
+    // boolean sendEvent = false;
+    // DataPoint instance = null;
+    // Clustering gtClustering = null;
+    // int samplingFrequency = ((ClusteringStream) 
sourceStream.getStream()).getDecayHorizon();
+    // if (random.nextDouble() < samplingThreshold) {
+    // // Add instance
+    // sendEvent = true;
+    // instance = nextDataPoint;
+    // }
+    // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 0) {
+    // // Add GroundTruth
+    // sendEvent = true;
+    // gtClustering = ((RandomRBFGeneratorEvents) 
sourceStream.getStream()).getGeneratingClusters();
+    // }
+    // if (sendEvent == true) {
+    // ClusteringEvaluationContentEvent evalEvent;
+    // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, 
instance, false);
+    // evaluationStream.put(evalEvent);
+    // }
+    // }
+
+    public void setMaxNumInstances(int value) {
+        numberInstances = value;
+    }
+
+    public int getMaxNumInstances() {
+        return this.numberInstances;
+    }
+
+    @Override
+    public ContentEvent nextEvent() {
+
+        // boolean sendEvent = false;
+        // DataPoint instance = null;
+        // Clustering gtClustering = null;
+        // int samplingFrequency = ((ClusteringStream) 
sourceStream.getStream()).getDecayHorizon();
+        // if (random.nextDouble() < samplingThreshold) {
+        // // Add instance
+        // sendEvent = true;
+        // instance = nextDataPoint;
+        // }
+        // if (numInstanceSent > 0 && numInstanceSent % samplingFrequency == 
0) {
+        // // Add GroundTruth
+        // sendEvent = true;
+        // gtClustering = ((RandomRBFGeneratorEvents) 
sourceStream.getStream()).getGeneratingClusters();
+        // }
+        // if (sendEvent == true) {
+        // ClusteringEvaluationContentEvent evalEvent;
+        // evalEvent = new ClusteringEvaluationContentEvent(gtClustering, 
instance, false);
+        // evaluationStream.put(evalEvent);
+        // }
+
+        groundTruthSamplingFrequency = ((ClusteringStream) 
streamSource.getStream()).getDecayHorizon(); // FIXME should it be taken from 
the ClusteringEvaluation -f option instead?
+        if (isFinished()) {
+            // send ending event
+            ClusteringContentEvent contentEvent = new 
ClusteringContentEvent(-1, firstInstance);
+            contentEvent.setLast(true);
+            return contentEvent;
+        } else {
+            DataPoint nextDataPoint = new DataPoint(nextInstance(), 
numInstanceSent);
+            numInstanceSent++;
+            if (numInstanceSent % groundTruthSamplingFrequency == 0) {
+                // TODO implement an interface ClusteringGroundTruth with a 
getGeneratingClusters() method, check if the source implements the interface
+                // send a clustering evaluation event for external measures 
(distance from the gt clusters)
+                Clustering gtClustering = ((RandomRBFGeneratorEvents) 
streamSource.getStream()).getGeneratingClusters();
+                return new ClusteringEvaluationContentEvent(gtClustering, 
nextDataPoint, false);
+            } else {
+                ClusteringContentEvent contentEvent = new 
ClusteringContentEvent(numInstanceSent, nextDataPoint);
+                if (random.nextDouble() < samplingThreshold) {
+                    // send a clustering content event for internal measures 
(cohesion, separation)
+                    contentEvent.setSample(true);
+                }
+                return contentEvent;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java 
b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java
new file mode 100644
index 0000000..c8004f5
--- /dev/null
+++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/streams/FileStream.java
@@ -0,0 +1,174 @@
+package com.yahoo.labs.samoa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+import com.github.javacliparser.ClassOption;
+import com.yahoo.labs.samoa.instances.Instances;
+import com.yahoo.labs.samoa.instances.InstancesHeader;
+import com.yahoo.labs.samoa.moa.core.InstanceExample;
+import com.yahoo.labs.samoa.moa.core.ObjectRepository;
+import com.yahoo.labs.samoa.moa.options.AbstractOptionHandler;
+import com.yahoo.labs.samoa.moa.streams.InstanceStream;
+import com.yahoo.labs.samoa.moa.tasks.TaskMonitor;
+import com.yahoo.labs.samoa.streams.fs.FileStreamSource;
+
+/**
+ * InstanceStream for files 
+ * (Abstract class: subclass this class for different file formats)
+ * @author Casey
+ */
+public abstract class FileStream extends AbstractOptionHandler implements 
InstanceStream {
+       /**
+    *
+    */
+   private static final long serialVersionUID = 3028905554604259130L;
+
+   public ClassOption sourceTypeOption = new ClassOption("sourceType",
+           's', "Source Type (HDFS, local FS)", FileStreamSource.class,
+           "LocalFileStreamSource");
+   
+   protected transient FileStreamSource fileSource;
+   protected transient Reader fileReader;
+   protected Instances instances;
+   
+   protected boolean hitEndOfStream;
+   private boolean hasStarted;
+
+   /*
+    * Constructors
+    */
+   public FileStream() {
+          this.hitEndOfStream = false;
+   }
+   
+   /*
+    * implement InstanceStream
+    */
+   @Override
+   public InstancesHeader getHeader() {
+          return new InstancesHeader(this.instances);
+   }
+
+   @Override
+   public long estimatedRemainingInstances() {
+          return -1;
+   }
+
+   @Override
+   public boolean hasMoreInstances() {
+          return !this.hitEndOfStream;
+   }
+   
+   @Override
+   public InstanceExample nextInstance() {
+          if (this.getLastInstanceRead() == null) {
+                  readNextInstanceFromStream();
+          }
+          InstanceExample prevInstance = this.getLastInstanceRead();
+          readNextInstanceFromStream();
+          return prevInstance;
+   }
+
+   @Override
+   public boolean isRestartable() {
+           return true;
+   }
+
+   @Override
+   public void restart() {
+          reset();
+          hasStarted = false;
+   }
+
+   protected void reset() {
+          try {
+                  if (this.fileReader != null)
+                          this.fileReader.close();
+                  
+                  fileSource.reset();
+          }
+          catch (IOException ioe) {
+                  throw new RuntimeException("FileStream restart failed.", 
ioe);
+          }
+          
+          if (!getNextFileReader()) {
+                  hitEndOfStream = true;
+                  throw new RuntimeException("FileStream is empty.");
+          }
+          
+       this.instances = new Instances(this.fileReader, 1, -1);
+       this.instances.setClassIndex(this.instances.numAttributes() - 1);
+   }
+   
+   protected boolean getNextFileReader() {
+          if (this.fileReader != null) 
+                  try {
+                          this.fileReader.close();
+                  } catch (IOException ioe) {
+                          ioe.printStackTrace();
+                  }
+          
+          InputStream inputStream = this.fileSource.getNextInputStream();
+          if (inputStream == null)
+                  return false;
+
+          this.fileReader = new BufferedReader(new 
InputStreamReader(inputStream));
+          return true;
+   }
+   
+   protected boolean readNextInstanceFromStream() {
+          if (!hasStarted) {
+                  this.reset();  
+                  hasStarted = true;
+          }
+          
+          while (true) {
+                  if (readNextInstanceFromFile()) return true;
+
+                  if (!getNextFileReader()) {
+                          this.hitEndOfStream = true;
+                          return false;
+                  }
+          }
+   }
+   
+   /**
+    * Read next instance from the current file and assign it to
+    * lastInstanceRead.
+    * @return true if it was able to read next instance and
+    *            false if it was at the end of the file
+    */
+   protected abstract boolean readNextInstanceFromFile();
+   
+   protected abstract InstanceExample getLastInstanceRead();
+   
+   @Override
+   public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
+          this.fileSource = sourceTypeOption.getValue();
+          this.hasStarted = false;
+   }
+}

Reply via email to