http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/EntropyCollection.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/EntropyCollection.java
 
b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/EntropyCollection.java
new file mode 100644
index 0000000..a2f0100
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/EntropyCollection.java
@@ -0,0 +1,174 @@
+package org.apache.samoa.evaluation.measures;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+
+import org.apache.samoa.moa.cluster.Clustering;
+import org.apache.samoa.moa.core.DataPoint;
+import org.apache.samoa.moa.evaluation.MeasureCollection;
+import org.apache.samoa.moa.evaluation.MembershipMatrix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EntropyCollection extends MeasureCollection {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(EntropyCollection.class);
+
+  @Override
+  protected String[] getNames() {
+    return new String[] { "GT cross entropy", "FC cross entropy", 
"Homogeneity", "Completeness", "V-Measure",
+        "VarInformation" };
+  }
+
+  @Override
+  protected boolean[] getDefaultEnabled() {
+    return new boolean[] { false, false, false, false, false, false };
+  }
+
+  @Override
+  public void evaluateClustering(Clustering fclustering, Clustering 
hClustering, ArrayList<DataPoint> points)
+      throws Exception {
+
+    MembershipMatrix mm = new MembershipMatrix(fclustering, points);
+    int numClasses = mm.getNumClasses();
+    int numCluster = fclustering.size() + 1;
+    int n = mm.getTotalEntries();
+
+    double FCentropy = 0;
+    if (numCluster > 1) {
+      for (int fc = 0; fc < numCluster; fc++) {
+        double weight = mm.getClusterSum(fc) / (double) n;
+        if (weight > 0)
+          FCentropy += weight * Math.log10(weight);
+      }
+      FCentropy /= (-1 * Math.log10(numCluster));
+    }
+
+    logger.debug("FC entropy: {}", FCentropy);
+
+    double GTentropy = 0;
+    if (numClasses > 1) {
+      for (int hc = 0; hc < numClasses; hc++) {
+        double weight = mm.getClassSum(hc) / (double) n;
+        if (weight > 0)
+          GTentropy += weight * Math.log10(weight);
+      }
+      GTentropy /= (-1 * Math.log10(numClasses));
+    }
+
+    logger.debug("GT entropy: {}", GTentropy);
+
+    // cluster based entropy
+    double FCcrossEntropy = 0;
+
+    for (int fc = 0; fc < numCluster; fc++) {
+      double e = 0;
+      int clusterWeight = mm.getClusterSum(fc);
+      if (clusterWeight > 0) {
+        for (int hc = 0; hc < numClasses; hc++) {
+          double p = mm.getClusterClassWeight(fc, hc) / (double) clusterWeight;
+          if (p != 0) {
+            e += p * Math.log10(p);
+          }
+        }
+        FCcrossEntropy += ((clusterWeight / (double) n) * e);
+      }
+    }
+    if (numCluster > 1) {
+      FCcrossEntropy /= -1 * Math.log10(numCluster);
+    }
+
+    addValue("FC cross entropy", 1 - FCcrossEntropy);
+    logger.debug("FC cross entropy: {}", 1 - FCcrossEntropy);
+
+    // class based entropy
+    double GTcrossEntropy = 0;
+    for (int hc = 0; hc < numClasses; hc++) {
+      double e = 0;
+      int classWeight = mm.getClassSum(hc);
+      if (classWeight > 0) {
+        for (int fc = 0; fc < numCluster; fc++) {
+          double p = mm.getClusterClassWeight(fc, hc) / (double) classWeight;
+          if (p != 0) {
+            e += p * Math.log10(p);
+          }
+        }
+      }
+      GTcrossEntropy += ((classWeight / (double) n) * e);
+    }
+    if (numClasses > 1)
+      GTcrossEntropy /= -1 * Math.log10(numClasses);
+    addValue("GT cross entropy", 1 - GTcrossEntropy);
+    logger.debug("GT cross entropy: {}", 1 - GTcrossEntropy);
+
+    double homogeneity;
+    if (FCentropy == 0)
+      homogeneity = 1;
+    else
+      homogeneity = 1 - FCcrossEntropy / FCentropy;
+
+    // TODO set err values for now, needs to be debugged
+    if (homogeneity > 1 || homogeneity < 0)
+      addValue("Homogeneity", -1);
+    else
+      addValue("Homogeneity", homogeneity);
+
+    double completeness;
+    if (GTentropy == 0)
+      completeness = 1;
+    else
+      completeness = 1 - GTcrossEntropy / GTentropy;
+    addValue("Completeness", completeness);
+
+    double beta = 1;
+    double vmeasure = (1 + beta) * homogeneity * completeness / (beta * 
homogeneity + completeness);
+
+    if (vmeasure > 1 || homogeneity < 0)
+      addValue("V-Measure", -1);
+    else
+      addValue("V-Measure", vmeasure);
+
+    double mutual = 0;
+    for (int i = 0; i < numCluster; i++) {
+      for (int j = 0; j < numClasses; j++) {
+        if (mm.getClusterClassWeight(i, j) == 0)
+          continue;
+        double m = Math.log10(mm.getClusterClassWeight(i, j) / (double) 
mm.getClusterSum(i)
+            / (double) mm.getClassSum(j) * (double) n);
+        m *= mm.getClusterClassWeight(i, j) / (double) n;
+        logger.debug("( {} / {}): ", m, m);
+        mutual += m;
+      }
+    }
+    if (numClasses > 1)
+      mutual /= Math.log10(numClasses);
+
+    double varInfo = 1;
+    if (FCentropy + GTentropy > 0)
+      varInfo = 2 * mutual / (FCentropy + GTentropy);
+
+    logger.debug("mutual: {} / VI: {}", mutual, varInfo);
+    addValue("VarInformation", varInfo);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/F1.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/F1.java 
b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/F1.java
new file mode 100644
index 0000000..2da9a59
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/F1.java
@@ -0,0 +1,111 @@
+package org.apache.samoa.evaluation.measures;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+
+import org.apache.samoa.moa.cluster.Clustering;
+import org.apache.samoa.moa.core.DataPoint;
+import org.apache.samoa.moa.evaluation.MeasureCollection;
+import org.apache.samoa.moa.evaluation.MembershipMatrix;
+
+public class F1 extends MeasureCollection {
+
+  @Override
+  protected String[] getNames() {
+    return new String[] { "F1-P", "F1-R", "Purity" };
+  }
+
+  public void evaluateClustering(Clustering clustering, Clustering 
trueClustering, ArrayList<DataPoint> points) {
+
+    if (clustering.size() < 0) {
+      addValue(0, 0);
+      addValue(1, 0);
+      return;
+    }
+
+    MembershipMatrix mm = new MembershipMatrix(clustering, points);
+    // System.out.println(mm.toString());
+
+    int numClasses = mm.getNumClasses();
+    if (mm.hasNoiseClass())
+      numClasses--;
+
+    // F1 as defined in P3C, try using F1 optimization
+    double F1_P = 0.0;
+    double purity = 0;
+    int realClusters = 0;
+    for (int i = 0; i < clustering.size(); i++) {
+      int max_weight = 0;
+      int max_weight_index = -1;
+
+      // find max index
+      for (int j = 0; j < numClasses; j++) {
+        if (mm.getClusterClassWeight(i, j) > max_weight) {
+          max_weight = mm.getClusterClassWeight(i, j);
+          max_weight_index = j;
+        }
+      }
+      if (max_weight_index != -1) {
+        realClusters++;
+        double precision = mm.getClusterClassWeight(i, max_weight_index) / 
(double) mm.getClusterSum(i);
+        double recall = mm.getClusterClassWeight(i, max_weight_index) / 
(double) mm.getClassSum(max_weight_index);
+        double f1 = 0;
+        if (precision > 0 || recall > 0) {
+          f1 = 2 * precision * recall / (precision + recall);
+        }
+        F1_P += f1;
+        purity += precision;
+
+        // TODO should we move setMeasure stuff into the Cluster interface?
+        clustering.get(i).setMeasureValue("F1-P", Double.toString(f1));
+      }
+    }
+    if (realClusters > 0) {
+      F1_P /= realClusters;
+      purity /= realClusters;
+    }
+    addValue("F1-P", F1_P);
+    addValue("Purity", purity);
+
+    // F1 as defined in .... mainly maximizes F1 for each class
+    double F1_R = 0.0;
+    for (int j = 0; j < numClasses; j++) {
+      double max_f1 = 0;
+      for (int i = 0; i < clustering.size(); i++) {
+        double precision = mm.getClusterClassWeight(i, j) / (double) 
mm.getClusterSum(i);
+        double recall = mm.getClusterClassWeight(i, j) / (double) 
mm.getClassSum(j);
+        double f1 = 0;
+        if (precision > 0 || recall > 0) {
+          f1 = 2 * precision * recall / (precision + recall);
+        }
+        if (max_f1 < f1) {
+          max_f1 = f1;
+        }
+      }
+      F1_R += max_f1;
+    }
+    F1_R /= numClasses;
+
+    addValue("F1-R", F1_R);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/General.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/General.java 
b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/General.java
new file mode 100644
index 0000000..1a9ca1d
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/General.java
@@ -0,0 +1,193 @@
+package org.apache.samoa.evaluation.measures;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.moa.cluster.Clustering;
+import org.apache.samoa.moa.cluster.SphereCluster;
+import org.apache.samoa.moa.core.DataPoint;
+import org.apache.samoa.moa.evaluation.MeasureCollection;
+
+public class General extends MeasureCollection {
+  private int numPoints;
+  private int numFClusters;
+  private int numDims;
+  private double pointInclusionProbThreshold = 0.8;
+  private Clustering clustering;
+  private ArrayList<DataPoint> points;
+
+  public General() {
+    super();
+  }
+
+  @Override
+  protected String[] getNames() {
+    // String[] names =
+    // 
{"GPrecision","GRecall","Redundancy","Overlap","numCluster","numClasses","Compactness"};
+    return new String[] { "GPrecision", "GRecall", "Redundancy", "numCluster", 
"numClasses" };
+  }
+
+  // @Override
+  // protected boolean[] getDefaultEnabled() {
+  // boolean [] defaults = {false, false, false, false, false ,false};
+  // return defaults;
+  // }
+
+  @Override
+  public void evaluateClustering(Clustering clustering, Clustering 
trueClustering, ArrayList<DataPoint> points)
+      throws Exception {
+
+    this.points = points;
+    this.clustering = clustering;
+    numPoints = points.size();
+    numFClusters = clustering.size();
+    numDims = points.get(0).numAttributes() - 1;
+
+    int totalRedundancy = 0;
+    int trueCoverage = 0;
+    int totalCoverage = 0;
+
+    int numNoise = 0;
+    for (int p = 0; p < numPoints; p++) {
+      int coverage = 0;
+      for (int c = 0; c < numFClusters; c++) {
+        // contained in cluster c?
+        if (clustering.get(c).getInclusionProbability(points.get(p)) >= 
pointInclusionProbThreshold) {
+          coverage++;
+        }
+      }
+
+      if (points.get(p).classValue() == -1) {
+        numNoise++;
+      }
+      else {
+        if (coverage > 0)
+          trueCoverage++;
+      }
+
+      if (coverage > 0)
+        totalCoverage++; // points covered by clustering (incl. noise)
+      if (coverage > 1)
+        totalRedundancy++; // include noise
+    }
+
+    addValue("numCluster", clustering.size());
+    addValue("numClasses", trueClustering.size());
+    addValue("Redundancy", ((double) totalRedundancy / (double) numPoints));
+    addValue("GPrecision", (totalCoverage == 0 ? 0 : ((double) trueCoverage / 
(double) (totalCoverage))));
+    addValue("GRecall", ((double) trueCoverage / (double) (numPoints - 
numNoise)));
+    // if(isEnabled(3)){
+    // addValue("Compactness", computeCompactness());
+    // }
+    // if(isEnabled(3)){
+    // addValue("Overlap", computeOverlap());
+    // }
+  }
+
+  private double computeOverlap() {
+    for (int c = 0; c < numFClusters; c++) {
+      if (!(clustering.get(c) instanceof SphereCluster)) {
+        System.out.println("Overlap only supports Sphere Cluster. Found: " + 
clustering.get(c).getClass());
+        return Double.NaN;
+      }
+    }
+
+    boolean[] overlap = new boolean[numFClusters];
+
+    for (int c0 = 0; c0 < numFClusters; c0++) {
+      if (overlap[c0])
+        continue;
+      SphereCluster s0 = (SphereCluster) clustering.get(c0);
+      for (int c1 = c0; c1 < clustering.size(); c1++) {
+        if (c1 == c0)
+          continue;
+        SphereCluster s1 = (SphereCluster) clustering.get(c1);
+        if (s0.overlapRadiusDegree(s1) > 0) {
+          overlap[c0] = overlap[c1] = true;
+        }
+      }
+    }
+
+    double totalOverlap = 0;
+    for (int c0 = 0; c0 < numFClusters; c0++) {
+      if (overlap[c0])
+        totalOverlap++;
+    }
+
+    // if(totalOverlap/(double)numFClusters > .8) RunVisualizer.pause();
+    if (numFClusters > 0)
+      totalOverlap /= (double) numFClusters;
+    return totalOverlap;
+  }
+
+  private double computeCompactness() {
+    if (numFClusters == 0)
+      return 0;
+    for (int c = 0; c < numFClusters; c++) {
+      if (!(clustering.get(c) instanceof SphereCluster)) {
+        System.out.println("Compactness only supports Sphere Cluster. Found: " 
+ clustering.get(c).getClass());
+        return Double.NaN;
+      }
+    }
+
+    // TODO weight radius by number of dimensions
+    double totalCompactness = 0;
+    for (int c = 0; c < numFClusters; c++) {
+      ArrayList<Instance> containedPoints = new ArrayList<Instance>();
+      for (int p = 0; p < numPoints; p++) {
+        // p in c
+        if (clustering.get(c).getInclusionProbability(points.get(p)) >= 
pointInclusionProbThreshold) {
+          containedPoints.add(points.get(p));
+        }
+      }
+      double compactness = 0;
+      if (containedPoints.size() > 1) {
+        // cluster not empty
+        SphereCluster minEnclosingCluster = new SphereCluster(containedPoints, 
numDims);
+        double minRadius = minEnclosingCluster.getRadius();
+        double cfRadius = ((SphereCluster) clustering.get(c)).getRadius();
+        if (Math.abs(minRadius - cfRadius) < 0.1e-10) {
+          compactness = 1;
+        }
+        else if (minRadius < cfRadius)
+          compactness = minRadius / cfRadius;
+        else {
+          System.out.println("Optimal radius bigger then real one (" + 
(cfRadius - minRadius)
+              + "), this is really wrong");
+          compactness = 1;
+        }
+      }
+      else {
+        double cfRadius = ((SphereCluster) clustering.get(c)).getRadius();
+        if (cfRadius == 0)
+          compactness = 1;
+      }
+
+      // weight by weight of cluster???
+      totalCompactness += compactness;
+      clustering.get(c).setMeasureValue("Compactness", 
Double.toString(compactness));
+    }
+    return (totalCompactness / numFClusters);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/SSQ.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/SSQ.java 
b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/SSQ.java
new file mode 100644
index 0000000..8ee6a43
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/SSQ.java
@@ -0,0 +1,97 @@
+package org.apache.samoa.evaluation.measures;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import java.util.ArrayList;
+
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.moa.cluster.Clustering;
+import org.apache.samoa.moa.core.DataPoint;
+import org.apache.samoa.moa.evaluation.MeasureCollection;
+
+public class SSQ extends MeasureCollection {
+
+  public SSQ() {
+    super();
+  }
+
+  @Override
+  public String[] getNames() {
+    return new String[] { "SSQ" };
+  }
+
+  @Override
+  protected boolean[] getDefaultEnabled() {
+    return new boolean[] { false };
+  }
+
+  // TODO Work on this later
+  // @Override
+  public void evaluateClusteringSamoa(Clustering clustering,
+      Clustering trueClsutering, ArrayList<Instance> points) {
+    double sum = 0.0;
+    for (Instance point : points) {
+      // don't include noise
+      if (point.classValue() == -1) {
+        continue;
+      }
+
+      double minDistance = Double.MAX_VALUE;
+      for (int c = 0; c < clustering.size(); c++) {
+        double distance = 0.0;
+        double[] center = clustering.get(c).getCenter();
+        for (int i = 0; i < center.length; i++) {
+          double d = point.value(i) - center[i];
+          distance += d * d;
+        }
+        minDistance = Math.min(distance, minDistance);
+      }
+
+      sum += minDistance;
+    }
+
+    addValue(0, sum);
+  }
+
+  @Override
+  public void evaluateClustering(Clustering clustering, Clustering 
trueClsutering, ArrayList<DataPoint> points) {
+    double sum = 0.0;
+    for (int p = 0; p < points.size(); p++) {
+      // don't include noise
+      if (points.get(p).classValue() == -1)
+        continue;
+
+      double minDistance = Double.MAX_VALUE;
+      for (int c = 0; c < clustering.size(); c++) {
+        double distance = 0.0;
+        double[] center = clustering.get(c).getCenter();
+        for (int i = 0; i < center.length; i++) {
+          double d = points.get(p).value(i) - center[i];
+          distance += d * d;
+        }
+        minDistance = Math.min(distance, minDistance);
+      }
+
+      sum += minDistance;
+    }
+
+    addValue(0, sum);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/Separation.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/Separation.java 
b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/Separation.java
new file mode 100644
index 0000000..0d8710a
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/Separation.java
@@ -0,0 +1,119 @@
+package org.apache.samoa.evaluation.measures;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samoa.instances.DenseInstance;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.moa.cluster.Cluster;
+import org.apache.samoa.moa.cluster.Clustering;
+import org.apache.samoa.moa.cluster.SphereCluster;
+import org.apache.samoa.moa.core.DataPoint;
+import org.apache.samoa.moa.evaluation.MeasureCollection;
+
+public class Separation extends MeasureCollection {
+
+  public Separation() {
+    super();
+  }
+
+  @Override
+  protected String[] getNames() {
+    return new String[] { "BSS", "BSS-GT", "BSS-Ratio" };
+  }
+
+  // @Override
+  public void evaluateClusteringSamoa(Clustering clustering,
+      Clustering trueClustering, ArrayList<Instance> points)
+      throws Exception {
+
+    double BSS_GT = 1.0;
+    double BSS;
+    int dimension = points.get(0).numAttributes() - 1;
+    SphereCluster sc = new SphereCluster(points, dimension);
+
+    // DO INTERNAL EVALUATION
+    // clustering.getClustering().get(0).getCenter();
+
+    BSS = getBSS(clustering, sc.getCenter());
+
+    if (trueClustering != null) {
+      List<Instance> listInstances = new ArrayList<>();
+      for (Cluster c : trueClustering.getClustering()) {
+        DenseInstance inst = new DenseInstance(c.getWeight(), c.getCenter());
+        listInstances.add(inst);
+      }
+      SphereCluster gt = new SphereCluster(listInstances, dimension);
+      BSS_GT = getBSS(trueClustering, gt.getCenter());
+    }
+
+    addValue("BSS", BSS);
+    addValue("BSS-GT", BSS_GT);
+    addValue("BSS-Ratio", BSS / BSS_GT);
+
+  }
+
+  private double getBSS(Clustering clustering, double[] mean) {
+    double bss = 0.0;
+    for (int i = 0; i < clustering.size(); i++) {
+      double weight = clustering.get(i).getWeight();
+      double sum = 0.0;
+      for (int j = 0; j < mean.length; j++) {
+        sum += Math.pow((mean[j] - clustering.get(i).getCenter()[j]), 2);
+      }
+      bss += weight * sum;
+    }
+
+    return bss;
+  }
+
+  @Override
+  protected void evaluateClustering(Clustering clustering,
+      Clustering trueClustering, ArrayList<DataPoint> points)
+      throws Exception {
+    double BSS_GT = 1.0;
+    double BSS;
+    int dimension = points.get(0).numAttributes() - 1;
+    SphereCluster sc = new SphereCluster(points, dimension);
+
+    // DO INTERNAL EVALUATION
+    // clustering.getClustering().get(0).getCenter();
+
+    BSS = getBSS(clustering, sc.getCenter());
+
+    if (trueClustering != null) {
+      String s = "";
+      List<Instance> listInstances = new ArrayList<>();
+      for (Cluster c : trueClustering.getClustering()) {
+        DenseInstance inst = new DenseInstance(c.getWeight(), c.getCenter());
+        listInstances.add(inst);
+        s += " " + c.getWeight();
+      }
+      SphereCluster gt = new SphereCluster(listInstances, dimension);
+      BSS_GT = getBSS(trueClustering, gt.getCenter());
+    }
+
+    addValue("BSS", BSS);
+    addValue("BSS-GT", BSS_GT);
+    addValue("BSS-Ratio", BSS / BSS_GT);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/SilhouetteCoefficient.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/SilhouetteCoefficient.java
 
b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/SilhouetteCoefficient.java
new file mode 100644
index 0000000..d0e4bc9
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/SilhouetteCoefficient.java
@@ -0,0 +1,126 @@
+package org.apache.samoa.evaluation.measures;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+
+import org.apache.samoa.moa.cluster.Cluster;
+import org.apache.samoa.moa.cluster.Clustering;
+import org.apache.samoa.moa.core.DataPoint;
+import org.apache.samoa.moa.evaluation.MeasureCollection;
+
+public class SilhouetteCoefficient extends MeasureCollection {
+  private static final double pointInclusionProbThreshold = 0.8;
+
+  public SilhouetteCoefficient() {
+    super();
+  }
+
+  @Override
+  protected boolean[] getDefaultEnabled() {
+    return new boolean[] { false };
+  }
+
+  @Override
+  public String[] getNames() {
+    return new String[] { "SilhCoeff" };
+  }
+
+  public void evaluateClustering(Clustering clustering, Clustering 
trueClustering, ArrayList<DataPoint> points) {
+    int numFCluster = clustering.size();
+
+    double[][] pointInclusionProbFC = new double[points.size()][numFCluster];
+    for (int p = 0; p < points.size(); p++) {
+      DataPoint point = points.get(p);
+      for (int fc = 0; fc < numFCluster; fc++) {
+        Cluster cl = clustering.get(fc);
+        pointInclusionProbFC[p][fc] = cl.getInclusionProbability(point);
+      }
+    }
+
+    double silhCoeff = 0.0;
+    int totalCount = 0;
+    for (int p = 0; p < points.size(); p++) {
+      DataPoint point = points.get(p);
+      ArrayList<Integer> ownClusters = new ArrayList<>();
+      for (int fc = 0; fc < numFCluster; fc++) {
+        if (pointInclusionProbFC[p][fc] > pointInclusionProbThreshold) {
+          ownClusters.add(fc);
+        }
+      }
+
+      if (ownClusters.size() > 0) {
+        double[] distanceByClusters = new double[numFCluster];
+        int[] countsByClusters = new int[numFCluster];
+        // calculate averageDistance of p to all cluster
+        for (int p1 = 0; p1 < points.size(); p1++) {
+          DataPoint point1 = points.get(p1);
+          if (p1 != p && point1.classValue() != -1) {
+            for (int fc = 0; fc < numFCluster; fc++) {
+              if (pointInclusionProbFC[p1][fc] > pointInclusionProbThreshold) {
+                double distance = point.getDistance(point1);
+                distanceByClusters[fc] += distance;
+                countsByClusters[fc]++;
+              }
+            }
+          }
+        }
+
+        // find closest OWN cluster as clusters might overlap
+        double minAvgDistanceOwn = Double.MAX_VALUE;
+        int minOwnIndex = -1;
+        for (int fc : ownClusters) {
+          double normDist = distanceByClusters[fc] / (double) 
countsByClusters[fc];
+          if (normDist < minAvgDistanceOwn) {// && pointInclusionProbFC[p][fc] 
> pointInclusionProbThreshold){
+            minAvgDistanceOwn = normDist;
+            minOwnIndex = fc;
+          }
+        }
+
+        // find closest other (or other own) cluster
+        double minAvgDistanceOther = Double.MAX_VALUE;
+        for (int fc = 0; fc < numFCluster; fc++) {
+          if (fc != minOwnIndex) {
+            double normDist = distanceByClusters[fc] / (double) 
countsByClusters[fc];
+            if (normDist < minAvgDistanceOther) {
+              minAvgDistanceOther = normDist;
+            }
+          }
+        }
+
+        double silhP = (minAvgDistanceOther - minAvgDistanceOwn) / 
Math.max(minAvgDistanceOther, minAvgDistanceOwn);
+        point.setMeasureValue("SC - own", minAvgDistanceOwn);
+        point.setMeasureValue("SC - other", minAvgDistanceOther);
+        point.setMeasureValue("SC", silhP);
+
+        silhCoeff += silhP;
+        totalCount++;
+        // System.out.println(point.getTimestamp()+" Silh "+silhP+" / 
"+avgDistanceOwn+" "+minAvgDistanceOther+" (C"+minIndex+")");
+      }
+    }
+    if (totalCount > 0)
+      silhCoeff /= (double) totalCount;
+    // normalize from -1, 1 to 0,1
+    silhCoeff = (silhCoeff + 1) / 2.0;
+    addValue(0, silhCoeff);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/StatisticalCollection.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/StatisticalCollection.java
 
b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/StatisticalCollection.java
new file mode 100644
index 0000000..13c98a4
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/evaluation/measures/StatisticalCollection.java
@@ -0,0 +1,187 @@
+package org.apache.samoa.evaluation.measures;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.samoa.moa.cluster.Clustering;
+import org.apache.samoa.moa.core.DataPoint;
+import org.apache.samoa.moa.evaluation.MeasureCollection;
+import org.apache.samoa.moa.evaluation.MembershipMatrix;
+
+public class StatisticalCollection extends MeasureCollection {
+  private boolean debug = false;
+
+  @Override
+  protected String[] getNames() {
+    // String[] names = {"van Dongen","Rand statistic", "C Index"};
+    return new String[] { "van Dongen", "Rand statistic" };
+  }
+
+  @Override
+  protected boolean[] getDefaultEnabled() {
+    return new boolean[] { false, false };
+  }
+
+  @Override
+  public void evaluateClustering(Clustering clustering, Clustering 
trueClustering, ArrayList<DataPoint> points)
+      throws Exception {
+
+    MembershipMatrix mm = new MembershipMatrix(clustering, points);
+    int numClasses = mm.getNumClasses();
+    int numCluster = clustering.size() + 1;
+    int n = mm.getTotalEntries();
+
+    double dongenMaxFC = 0;
+    double dongenMaxSumFC = 0;
+    for (int i = 0; i < numCluster; i++) {
+      double max = 0;
+      for (int j = 0; j < numClasses; j++) {
+        if (mm.getClusterClassWeight(i, j) > max)
+          max = mm.getClusterClassWeight(i, j);
+      }
+      dongenMaxFC += max;
+      if (mm.getClusterSum(i) > dongenMaxSumFC)
+        dongenMaxSumFC = mm.getClusterSum(i);
+    }
+
+    double dongenMaxHC = 0;
+    double dongenMaxSumHC = 0;
+    for (int j = 0; j < numClasses; j++) {
+      double max = 0;
+      for (int i = 0; i < numCluster; i++) {
+        if (mm.getClusterClassWeight(i, j) > max)
+          max = mm.getClusterClassWeight(i, j);
+      }
+      dongenMaxHC += max;
+      if (mm.getClassSum(j) > dongenMaxSumHC)
+        dongenMaxSumHC = mm.getClassSum(j);
+    }
+
+    double dongen = (dongenMaxFC + dongenMaxHC) / (2 * n);
+    // normalized dongen
+    // double dongen = 1-(2*n - dongenMaxFC - dongenMaxHC)/(2*n - 
dongenMaxSumFC
+    // - dongenMaxSumHC);
+    if (debug)
+      System.out.println("Dongen HC:" + dongenMaxHC + " FC:" + dongenMaxFC + " 
Total:" + dongen + " n " + n);
+
+    addValue("van Dongen", dongen);
+
+    // Rand index
+    // http://www.cais.ntu.edu.sg/~qihe/menu4.html
+    double m1 = 0;
+    for (int j = 0; j < numClasses; j++) {
+      double v = mm.getClassSum(j);
+      m1 += v * (v - 1) / 2.0;
+    }
+    double m2 = 0;
+    for (int i = 0; i < numCluster; i++) {
+      double v = mm.getClusterSum(i);
+      m2 += v * (v - 1) / 2.0;
+    }
+
+    double m = 0;
+    for (int i = 0; i < numCluster; i++) {
+      for (int j = 0; j < numClasses; j++) {
+        double v = mm.getClusterClassWeight(i, j);
+        m += v * (v - 1) / 2.0;
+      }
+    }
+    double M = n * (n - 1) / 2.0;
+    double rand = (M - m1 - m2 + 2 * m) / M;
+    // normalized rand
+    // double rand = (m - m1*m2/M)/(m1/2.0 + m2/2.0 - m1*m2/M);
+
+    addValue("Rand statistic", rand);
+
+    // addValue("C Index",cindex(clustering, points));
+  }
+
+  public double cindex(Clustering clustering, ArrayList<DataPoint> points) {
+    int numClusters = clustering.size();
+    double withinClustersDistance = 0;
+    int numDistancesWithin = 0;
+    double numDistances = 0;
+
+    // double[] withinClusters = new double[numClusters];
+    double[] minWithinClusters = new double[numClusters];
+    double[] maxWithinClusters = new double[numClusters];
+    ArrayList<Integer>[] pointsInClusters = new ArrayList[numClusters];
+    for (int c = 0; c < numClusters; c++) {
+      pointsInClusters[c] = new ArrayList<>();
+      minWithinClusters[c] = Double.MAX_VALUE;
+      maxWithinClusters[c] = Double.MIN_VALUE;
+    }
+
+    for (int p = 0; p < points.size(); p++) {
+      for (int c = 0; c < clustering.size(); c++) {
+        if (clustering.get(c).getInclusionProbability(points.get(p)) > 0.8) {
+          pointsInClusters[c].add(p);
+          numDistances++;
+        }
+      }
+    }
+
+    // calc within cluster distances + min and max values
+    for (int c = 0; c < numClusters; c++) {
+      int numDistancesInC = 0;
+      ArrayList<Integer> pointsInC = pointsInClusters[c];
+      for (int p = 0; p < pointsInC.size(); p++) {
+        DataPoint point = points.get(pointsInC.get(p));
+        for (int p1 = p + 1; p1 < pointsInC.size(); p1++) {
+          numDistancesWithin++;
+          numDistancesInC++;
+          DataPoint point1 = points.get(pointsInC.get(p1));
+          double dist = point.getDistance(point1);
+          withinClustersDistance += dist;
+          if (minWithinClusters[c] > dist)
+            minWithinClusters[c] = dist;
+          if (maxWithinClusters[c] < dist)
+            maxWithinClusters[c] = dist;
+        }
+      }
+    }
+
+    double minWithin = Double.MAX_VALUE;
+    double maxWithin = Double.MIN_VALUE;
+    for (int c = 0; c < numClusters; c++) {
+      if (minWithinClusters[c] < minWithin)
+        minWithin = minWithinClusters[c];
+      if (maxWithinClusters[c] > maxWithin)
+        maxWithin = maxWithinClusters[c];
+    }
+
+    double cindex = 0;
+    if (numDistancesWithin != 0) {
+      double meanWithinClustersDistance = withinClustersDistance / 
numDistancesWithin;
+      cindex = (meanWithinClustersDistance - minWithin) / (maxWithin - 
minWithin);
+    }
+
+    if (debug) {
+      System.out.println("Min:" + Arrays.toString(minWithinClusters));
+      System.out.println("Max:" + Arrays.toString(maxWithinClusters));
+      System.out.println("totalWithin:" + numDistancesWithin);
+    }
+    return cindex;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldContentEvent.java 
b/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldContentEvent.java
new file mode 100644
index 0000000..8105745
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldContentEvent.java
@@ -0,0 +1,69 @@
+package org.apache.samoa.examples;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ * Example {@link ContentEvent} that contains a single integer.
+ */
+public class HelloWorldContentEvent implements ContentEvent {
+
+  private static final long serialVersionUID = -2406968925730298156L;
+  private final boolean isLastEvent;
+  private final int helloWorldData;
+
+  public HelloWorldContentEvent(int helloWorldData, boolean isLastEvent) {
+    this.isLastEvent = isLastEvent;
+    this.helloWorldData = helloWorldData;
+  }
+
+  /*
+   * No-argument constructor for Kryo
+   */
+  public HelloWorldContentEvent() {
+    this(0, false);
+  }
+
+  @Override
+  public String getKey() {
+    return null;
+  }
+
+  @Override
+  public void setKey(String str) {
+    // do nothing, it's key-less content event
+  }
+
+  @Override
+  public boolean isLastEvent() {
+    return isLastEvent;
+  }
+
+  public int getHelloWorldData() {
+    return helloWorldData;
+  }
+
+  @Override
+  public String toString() {
+    return "HelloWorldContentEvent [helloWorldData=" + helloWorldData + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldDestinationProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldDestinationProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldDestinationProcessor.java
new file mode 100644
index 0000000..22cf604
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldDestinationProcessor.java
@@ -0,0 +1,49 @@
+package org.apache.samoa.examples;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+
+/**
+ * Example {@link Processor} that simply prints the received events to 
standard output.
+ */
+public class HelloWorldDestinationProcessor implements Processor {
+
+  private static final long serialVersionUID = -6042613438148776446L;
+  private int processorId;
+
+  @Override
+  public boolean process(ContentEvent event) {
+    System.out.println(processorId + ": " + event);
+    return true;
+  }
+
+  @Override
+  public void onCreate(int id) {
+    this.processorId = id;
+  }
+
+  @Override
+  public Processor newProcessor(Processor p) {
+    return new HelloWorldDestinationProcessor();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldSourceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldSourceProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldSourceProcessor.java
new file mode 100644
index 0000000..6d936af
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldSourceProcessor.java
@@ -0,0 +1,75 @@
+package org.apache.samoa.examples;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.Random;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+
+/**
+ * Example {@link EntranceProcessor} that generates a stream of random 
integers.
+ */
+public class HelloWorldSourceProcessor implements EntranceProcessor {
+
+  private static final long serialVersionUID = 6212296305865604747L;
+  private Random rnd;
+  private final long maxInst;
+  private long count;
+
+  public HelloWorldSourceProcessor(long maxInst) {
+    this.maxInst = maxInst;
+  }
+
+  @Override
+  public boolean process(ContentEvent event) {
+    // do nothing, API will be refined further
+    return false;
+  }
+
+  @Override
+  public void onCreate(int id) {
+    rnd = new Random(id);
+  }
+
+  @Override
+  public Processor newProcessor(Processor p) {
+    HelloWorldSourceProcessor hwsp = (HelloWorldSourceProcessor) p;
+    return new HelloWorldSourceProcessor(hwsp.maxInst);
+  }
+
+  @Override
+  public boolean isFinished() {
+    return count >= maxInst;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return count < maxInst;
+  }
+
+  @Override
+  public ContentEvent nextEvent() {
+    count++;
+    return new HelloWorldContentEvent(rnd.nextInt(), false);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldTask.java 
b/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldTask.java
new file mode 100644
index 0000000..34ccb65
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/examples/HelloWorldTask.java
@@ -0,0 +1,100 @@
+package org.apache.samoa.examples;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+import org.apache.samoa.tasks.Task;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+import org.apache.samoa.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+
+/**
+ * Example {@link Task} in SAMOA. This task simply sends events from a source 
{@link HelloWorldSourceProcessor} to a
+ * destination {@link HelloWorldDestinationProcessor}. The events are random 
integers generated by the source and
+ * encapsulated in a {@link HelloWorldContentEvent}. The destination prints 
the content of the event to standard output,
+ * prepended by the processor id.
+ * 
+ * The task has 2 main options: the number of events the source will generate 
(-i) and the parallelism level of the
+ * destination (-p).
+ */
+public class HelloWorldTask implements Task, Configurable {
+
+  private static final long serialVersionUID = -5134935141154021352L;
+  private static Logger logger = LoggerFactory.getLogger(HelloWorldTask.class);
+
+  /** The topology builder for the task. */
+  private TopologyBuilder builder;
+  /** The topology that will be created for the task */
+  private Topology helloWorldTopology;
+
+  public IntOption instanceLimitOption = new IntOption("instanceLimit", 'i',
+      "Maximum number of instances to generate (-1 = no limit).", 1000000, -1, 
Integer.MAX_VALUE);
+
+  public IntOption helloWorldParallelismOption = new 
IntOption("parallelismOption", 'p',
+      "Number of destination Processors", 1, 1, Integer.MAX_VALUE);
+
+  public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n',
+      "Identifier of the evaluation", "HelloWorldTask" + new 
SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
+
+  @Override
+  public void init() {
+    // create source EntranceProcessor
+    /* The event source for the topology. Implements EntranceProcessor */
+    HelloWorldSourceProcessor sourceProcessor = new 
HelloWorldSourceProcessor(instanceLimitOption.getValue());
+    builder.addEntranceProcessor(sourceProcessor);
+
+    // create Stream
+    Stream stream = builder.createStream(sourceProcessor);
+
+    // create destination Processor
+    /* The event sink for the topology. Implements Processor */
+    HelloWorldDestinationProcessor destProcessor = new 
HelloWorldDestinationProcessor();
+    builder.addProcessor(destProcessor, 
helloWorldParallelismOption.getValue());
+    builder.connectInputShuffleStream(stream, destProcessor);
+
+    // build the topology
+    helloWorldTopology = builder.build();
+    logger.debug("Successfully built the topology");
+  }
+
+  @Override
+  public Topology getTopology() {
+    return helloWorldTopology;
+  }
+
+  @Override
+  public void setFactory(ComponentFactory factory) {
+    // will be removed when dynamic binding is implemented
+    builder = new TopologyBuilder(factory);
+    logger.debug("Successfully instantiating TopologyBuilder");
+    builder.initTopology(evaluationNameOption.getValue());
+    logger.debug("Successfully initializing SAMOA topology with name {}", 
evaluationNameOption.getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/AdaptiveLearner.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/AdaptiveLearner.java 
b/samoa-api/src/main/java/org/apache/samoa/learners/AdaptiveLearner.java
new file mode 100644
index 0000000..28d0059
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/AdaptiveLearner.java
@@ -0,0 +1,52 @@
+package org.apache.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import org.apache.samoa.moa.classifiers.core.driftdetection.ChangeDetector;
+import org.apache.samoa.topology.Stream;
+
+/**
+ * The Interface Adaptive Learner. Initializing Classifier should initalize PI 
to connect the Classifier with the input
+ * stream and initialize result stream so that other PI can connect to the 
classification result of this classifier
+ */
+
+public interface AdaptiveLearner {
+
+  /**
+   * Gets the change detector item.
+   * 
+   * @return the change detector item
+   */
+  public ChangeDetector getChangeDetector();
+
+  /**
+   * Sets the change detector item.
+   * 
+   * @param cd
+   *          the change detector item
+   */
+  public void setChangeDetector(ChangeDetector cd);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/ClassificationLearner.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/ClassificationLearner.java 
b/samoa-api/src/main/java/org/apache/samoa/learners/ClassificationLearner.java
new file mode 100644
index 0000000..50bfe2e
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/ClassificationLearner.java
@@ -0,0 +1,27 @@
+package org.apache.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.learners.Learner;
+
+public interface ClassificationLearner extends Learner {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContentEvent.java 
b/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContentEvent.java
new file mode 100644
index 0000000..69a4428
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/InstanceContentEvent.java
@@ -0,0 +1,215 @@
+package org.apache.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.SerializableInstance;
+import org.apache.samoa.instances.Instance;
+
+import net.jcip.annotations.Immutable;
+
+//import weka.core.Instance;
+
+/**
+ * The Class InstanceEvent.
+ */
+@Immutable
+final public class InstanceContentEvent implements ContentEvent {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = -8620668863064613845L;
+  private long instanceIndex;
+  private int classifierIndex;
+  private int evaluationIndex;
+  private SerializableInstance instance;
+  private boolean isTraining;
+  private boolean isTesting;
+  private boolean isLast = false;
+
+  public InstanceContentEvent() {
+
+  }
+
+  /**
+   * Instantiates a new instance event.
+   * 
+   * @param index
+   *          the index
+   * @param instance
+   *          the instance
+   * @param isTraining
+   *          the is training
+   */
+  public InstanceContentEvent(long index, Instance instance,
+      boolean isTraining, boolean isTesting) {
+    if (instance != null) {
+      this.instance = new SerializableInstance(instance);
+    }
+    this.instanceIndex = index;
+    this.isTraining = isTraining;
+    this.isTesting = isTesting;
+  }
+
+  /**
+   * Gets the single instance of InstanceEvent.
+   * 
+   * @return the instance.
+   */
+  public Instance getInstance() {
+    return instance;
+  }
+
+  /**
+   * Gets the instance index.
+   * 
+   * @return the index of the data vector.
+   */
+  public long getInstanceIndex() {
+    return instanceIndex;
+  }
+
+  /**
+   * Gets the class id.
+   * 
+   * @return the true class of the vector.
+   */
+  public int getClassId() {
+    // return classId;
+    return (int) instance.classValue();
+  }
+
+  /**
+   * Checks if is training.
+   * 
+   * @return true if this is training data.
+   */
+  public boolean isTraining() {
+    return isTraining;
+  }
+
+  /**
+   * Set training flag.
+   * 
+   * @param training
+   *          flag.
+   */
+  public void setTraining(boolean training) {
+    this.isTraining = training;
+  }
+
+  /**
+   * Checks if is testing.
+   * 
+   * @return true if this is testing data.
+   */
+  public boolean isTesting() {
+    return isTesting;
+  }
+
+  /**
+   * Set testing flag.
+   * 
+   * @param testing
+   *          flag.
+   */
+  public void setTesting(boolean testing) {
+    this.isTesting = testing;
+  }
+
+  /**
+   * Gets the classifier index.
+   * 
+   * @return the classifier index
+   */
+  public int getClassifierIndex() {
+    return classifierIndex;
+  }
+
+  /**
+   * Sets the classifier index.
+   * 
+   * @param classifierIndex
+   *          the new classifier index
+   */
+  public void setClassifierIndex(int classifierIndex) {
+    this.classifierIndex = classifierIndex;
+  }
+
+  /**
+   * Gets the evaluation index.
+   * 
+   * @return the evaluation index
+   */
+  public int getEvaluationIndex() {
+    return evaluationIndex;
+  }
+
+  /**
+   * Sets the evaluation index.
+   * 
+   * @param evaluationIndex
+   *          the new evaluation index
+   */
+  public void setEvaluationIndex(int evaluationIndex) {
+    this.evaluationIndex = evaluationIndex;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.core.ContentEvent#getKey(int)
+   */
+  public String getKey(int key) {
+    if (key == 0)
+      return Long.toString(this.getEvaluationIndex());
+    else
+      return Long.toString(10000
+          * this.getEvaluationIndex()
+          + this.getClassifierIndex());
+  }
+
+  @Override
+  public String getKey() {
+    // System.out.println("InstanceContentEvent 
"+Long.toString(this.instanceIndex));
+    return Long.toString(this.getClassifierIndex());
+  }
+
+  @Override
+  public void setKey(String str) {
+    this.instanceIndex = Long.parseLong(str);
+  }
+
+  @Override
+  public boolean isLastEvent() {
+    return isLast;
+  }
+
+  public void setLast(boolean isLast) {
+    this.isLast = isLast;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/InstancesContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/InstancesContentEvent.java 
b/samoa-api/src/main/java/org/apache/samoa/learners/InstancesContentEvent.java
new file mode 100644
index 0000000..2bab8a6
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/InstancesContentEvent.java
@@ -0,0 +1,201 @@
+package org.apache.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import net.jcip.annotations.Immutable;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.SerializableInstance;
+import org.apache.samoa.instances.Instance;
+
+//import weka.core.Instance;
+
+/**
+ * The Class InstanceEvent.
+ */
+@Immutable
+final public class InstancesContentEvent implements ContentEvent {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = -8620668863064613845L;
+  private long instanceIndex;
+  private int classifierIndex;
+  private int evaluationIndex;
+  // private SerializableInstance instance;
+  private boolean isTraining;
+  private boolean isTesting;
+  private boolean isLast = false;
+
+  public InstancesContentEvent() {
+
+  }
+
+  /**
+   * Instantiates a new instance event.
+   * 
+   * @param index
+   *          the index
+   * @param instance
+   *          the instance
+   * @param isTraining
+   *          the is training
+   */
+  public InstancesContentEvent(long index,// Instance instance,
+      boolean isTraining, boolean isTesting) {
+    /*
+     * if (instance != null) { this.instance = new
+     * SerializableInstance(instance); }
+     */
+    this.instanceIndex = index;
+    this.isTraining = isTraining;
+    this.isTesting = isTesting;
+  }
+
+  public InstancesContentEvent(InstanceContentEvent event) {
+    this.instanceIndex = event.getInstanceIndex();
+    this.isTraining = event.isTraining();
+    this.isTesting = event.isTesting();
+  }
+
+  protected List<Instance> instanceList = new LinkedList<Instance>();
+
+  public void add(Instance instance) {
+    instanceList.add(new SerializableInstance(instance));
+  }
+
+  /**
+   * Gets the single instance of InstanceEvent.
+   * 
+   * @return the instance.
+   */
+  public Instance[] getInstances() {
+    return instanceList.toArray(new Instance[instanceList.size()]);
+  }
+
+  /**
+   * Gets the instance index.
+   * 
+   * @return the index of the data vector.
+   */
+  public long getInstanceIndex() {
+    return instanceIndex;
+  }
+
+  /**
+   * Checks if is training.
+   * 
+   * @return true if this is training data.
+   */
+  public boolean isTraining() {
+    return isTraining;
+  }
+
+  /**
+   * Checks if is testing.
+   * 
+   * @return true if this is testing data.
+   */
+  public boolean isTesting() {
+    return isTesting;
+  }
+
+  /**
+   * Gets the classifier index.
+   * 
+   * @return the classifier index
+   */
+  public int getClassifierIndex() {
+    return classifierIndex;
+  }
+
+  /**
+   * Sets the classifier index.
+   * 
+   * @param classifierIndex
+   *          the new classifier index
+   */
+  public void setClassifierIndex(int classifierIndex) {
+    this.classifierIndex = classifierIndex;
+  }
+
+  /**
+   * Gets the evaluation index.
+   * 
+   * @return the evaluation index
+   */
+  public int getEvaluationIndex() {
+    return evaluationIndex;
+  }
+
+  /**
+   * Sets the evaluation index.
+   * 
+   * @param evaluationIndex
+   *          the new evaluation index
+   */
+  public void setEvaluationIndex(int evaluationIndex) {
+    this.evaluationIndex = evaluationIndex;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.core.ContentEvent#getKey(int)
+   */
+  public String getKey(int key) {
+    if (key == 0)
+      return Long.toString(this.getEvaluationIndex());
+    else
+      return Long.toString(10000
+          * this.getEvaluationIndex()
+          + this.getClassifierIndex());
+  }
+
+  @Override
+  public String getKey() {
+    // System.out.println("InstanceContentEvent 
"+Long.toString(this.instanceIndex));
+    return Long.toString(this.getClassifierIndex());
+  }
+
+  @Override
+  public void setKey(String str) {
+    this.instanceIndex = Long.parseLong(str);
+  }
+
+  @Override
+  public boolean isLastEvent() {
+    return isLast;
+  }
+
+  public void setLast(boolean isLast) {
+    this.isLast = isLast;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/Learner.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/learners/Learner.java 
b/samoa-api/src/main/java/org/apache/samoa/learners/Learner.java
new file mode 100644
index 0000000..730f4d9
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/Learner.java
@@ -0,0 +1,63 @@
+package org.apache.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import java.util.Set;
+
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.TopologyBuilder;
+
+/**
+ * The Interface Classifier. Initializing Classifier should initalize PI to 
connect the Classifier with the input stream
+ * and initialize result stream so that other PI can connect to the 
classification result of this classifier
+ */
+
+public interface Learner extends Serializable {
+
+  /**
+   * Inits the Learner object.
+   * 
+   * @param topologyBuilder
+   *          the topology builder
+   * @param dataset
+   *          the dataset
+   * @param parallelism
+   *          the parallelism
+   */
+  public void init(TopologyBuilder topologyBuilder, Instances dataset, int 
parallelism);
+
+  /**
+   * Gets the input processing item.
+   * 
+   * @return the input processing item
+   */
+  public Processor getInputProcessor();
+
+  /**
+   * Gets the result streams
+   * 
+   * @return the set of result streams
+   */
+  public Set<Stream> getResultStreams();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/RegressionLearner.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/RegressionLearner.java 
b/samoa-api/src/main/java/org/apache/samoa/learners/RegressionLearner.java
new file mode 100644
index 0000000..764c87b
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/RegressionLearner.java
@@ -0,0 +1,27 @@
+package org.apache.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.learners.Learner;
+
+public interface RegressionLearner extends Learner {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/ResultContentEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/ResultContentEvent.java 
b/samoa-api/src/main/java/org/apache/samoa/learners/ResultContentEvent.java
new file mode 100644
index 0000000..3ede55c
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/learners/ResultContentEvent.java
@@ -0,0 +1,213 @@
+package org.apache.samoa.learners;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.SerializableInstance;
+import org.apache.samoa.instances.Instance;
+
+/**
+ * License
+ */
+
+/**
+ * The Class ResultEvent.
+ */
+final public class ResultContentEvent implements ContentEvent {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = -2650420235386873306L;
+  private long instanceIndex;
+  private int classifierIndex;
+  private int evaluationIndex;
+  private SerializableInstance instance;
+
+  private int classId;
+  private double[] classVotes;
+
+  private final boolean isLast;
+
+  public ResultContentEvent() {
+    this.isLast = false;
+  }
+
+  public ResultContentEvent(boolean isLast) {
+    this.isLast = isLast;
+  }
+
+  /**
+   * Instantiates a new result event.
+   * 
+   * @param instanceIndex
+   *          the instance index
+   * @param instance
+   *          the instance
+   * @param classId
+   *          the class id
+   * @param classVotes
+   *          the class votes
+   */
+  public ResultContentEvent(long instanceIndex, Instance instance, int classId,
+      double[] classVotes, boolean isLast) {
+    if (instance != null) {
+      this.instance = new SerializableInstance(instance);
+    }
+    this.instanceIndex = instanceIndex;
+    this.classId = classId;
+    this.classVotes = classVotes;
+    this.isLast = isLast;
+  }
+
+  /**
+   * Gets the single instance of ResultEvent.
+   * 
+   * @return single instance of ResultEvent
+   */
+  public SerializableInstance getInstance() {
+    return instance;
+  }
+
+  /**
+   * Sets the instance.
+   * 
+   * @param instance
+   *          the new instance
+   */
+  public void setInstance(SerializableInstance instance) {
+    this.instance = instance;
+  }
+
+  /**
+   * Gets the num classes.
+   * 
+   * @return the num classes
+   */
+  public int getNumClasses() { // To remove
+    return instance.numClasses();
+  }
+
+  /**
+   * Gets the instance index.
+   * 
+   * @return the index of the data vector.
+   */
+  public long getInstanceIndex() {
+    return instanceIndex;
+  }
+
+  /**
+   * Gets the class id.
+   * 
+   * @return the true class of the vector.
+   */
+  public int getClassId() { // To remove
+    return classId;// (int) instance.classValue();//classId;
+  }
+
+  /**
+   * Gets the class votes.
+   * 
+   * @return the class votes
+   */
+  public double[] getClassVotes() {
+    return classVotes;
+  }
+
+  /**
+   * Sets the class votes.
+   * 
+   * @param classVotes
+   *          the new class votes
+   */
+  public void setClassVotes(double[] classVotes) {
+    this.classVotes = classVotes;
+  }
+
+  /**
+   * Gets the classifier index.
+   * 
+   * @return the classifier index
+   */
+  public int getClassifierIndex() {
+    return classifierIndex;
+  }
+
+  /**
+   * Sets the classifier index.
+   * 
+   * @param classifierIndex
+   *          the new classifier index
+   */
+  public void setClassifierIndex(int classifierIndex) {
+    this.classifierIndex = classifierIndex;
+  }
+
+  /**
+   * Gets the evaluation index.
+   * 
+   * @return the evaluation index
+   */
+  public int getEvaluationIndex() {
+    return evaluationIndex;
+  }
+
+  /**
+   * Sets the evaluation index.
+   * 
+   * @param evaluationIndex
+   *          the new evaluation index
+   */
+  public void setEvaluationIndex(int evaluationIndex) {
+    this.evaluationIndex = evaluationIndex;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.core.ContentEvent#getKey(int)
+   */
+  // @Override
+  public String getKey(int key) {
+    if (key == 0)
+      return Long.toString(this.getEvaluationIndex());
+    else
+      return Long.toString(this.getEvaluationIndex()
+          + 1000 * this.getInstanceIndex());
+  }
+
+  @Override
+  public String getKey() {
+    return Long.toString(this.getEvaluationIndex() % 100);
+  }
+
+  @Override
+  public void setKey(String str) {
+    this.evaluationIndex = Integer.parseInt(str);
+  }
+
+  @Override
+  public boolean isLastEvent() {
+    return isLast;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/LocalLearner.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/LocalLearner.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/LocalLearner.java
new file mode 100644
index 0000000..6d6a664
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/LocalLearner.java
@@ -0,0 +1,76 @@
+package org.apache.samoa.learners.classifiers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+
+/**
+ * Learner interface for non-distributed learners.
+ * 
+ * @author abifet
+ */
+public interface LocalLearner extends Serializable {
+
+  /**
+   * Creates a new learner object.
+   * 
+   * @return the learner
+   */
+  LocalLearner create();
+
+  /**
+   * Predicts the class memberships for a given instance. If an instance is 
unclassified, the returned array elements
+   * must be all zero.
+   * 
+   * @param inst
+   *          the instance to be classified
+   * @return an array containing the estimated membership probabilities of the 
test instance in each class
+   */
+  double[] getVotesForInstance(Instance inst);
+
+  /**
+   * Resets this classifier. It must be similar to starting a new classifier 
from scratch.
+   * 
+   */
+  void resetLearning();
+
+  /**
+   * Trains this classifier incrementally using the given instance.
+   * 
+   * @param inst
+   *          the instance to be used for training
+   */
+  void trainOnInstance(Instance inst);
+
+  /**
+   * Sets where to obtain the information of attributes of Instances
+   * 
+   * @param dataset
+   *          the dataset that contains the information
+   */
+  @Deprecated
+  public void setDataset(Instances dataset);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/LocalLearnerProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/LocalLearnerProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/LocalLearnerProcessor.java
new file mode 100644
index 0000000..5e2c927
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/learners/classifiers/LocalLearnerProcessor.java
@@ -0,0 +1,223 @@
+package org.apache.samoa.learners.classifiers;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * License
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.learners.InstanceContentEvent;
+import org.apache.samoa.learners.ResultContentEvent;
+import org.apache.samoa.moa.classifiers.core.driftdetection.ChangeDetector;
+import org.apache.samoa.topology.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samoa.moa.core.Utils.maxIndex;
+
+/**
+ * The Class LearnerProcessor.
+ */
+final public class LocalLearnerProcessor implements Processor {
+
+  /**
+        * 
+        */
+  private static final long serialVersionUID = -1577910988699148691L;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(LocalLearnerProcessor.class);
+
+  private LocalLearner model;
+  private Stream outputStream;
+  private int modelId;
+  private long instancesCount = 0;
+
+  /**
+   * Sets the learner.
+   * 
+   * @param model
+   *          the model to set
+   */
+  public void setLearner(LocalLearner model) {
+    this.model = model;
+  }
+
+  /**
+   * Gets the learner.
+   * 
+   * @return the model
+   */
+  public LocalLearner getLearner() {
+    return model;
+  }
+
+  /**
+   * Set the output streams.
+   * 
+   * @param outputStream
+   *          the new output stream
+   */
+  public void setOutputStream(Stream outputStream) {
+    this.outputStream = outputStream;
+  }
+
+  /**
+   * Gets the output stream.
+   * 
+   * @return the output stream
+   */
+  public Stream getOutputStream() {
+    return outputStream;
+  }
+
+  /**
+   * Gets the instances count.
+   * 
+   * @return number of observation vectors used in training iteration.
+   */
+  public long getInstancesCount() {
+    return instancesCount;
+  }
+
+  /**
+   * Update stats.
+   * 
+   * @param event
+   *          the event
+   */
+  private void updateStats(InstanceContentEvent event) {
+    Instance inst = event.getInstance();
+    this.model.trainOnInstance(inst);
+    this.instancesCount++;
+    if (this.changeDetector != null) {
+      boolean correctlyClassifies = this.correctlyClassifies(inst);
+      double oldEstimation = this.changeDetector.getEstimation();
+      this.changeDetector.input(correctlyClassifies ? 0 : 1);
+      if (this.changeDetector.getChange() && 
this.changeDetector.getEstimation() > oldEstimation) {
+        // Start a new classifier
+        this.model.resetLearning();
+        this.changeDetector.resetLearning();
+      }
+    }
+  }
+
+  /**
+   * Gets whether this classifier correctly classifies an instance. Uses 
getVotesForInstance to obtain the prediction
+   * and the instance to obtain its true class.
+   * 
+   * 
+   * @param inst
+   *          the instance to be classified
+   * @return true if the instance is correctly classified
+   */
+  private boolean correctlyClassifies(Instance inst) {
+    return maxIndex(model.getVotesForInstance(inst)) == (int) 
inst.classValue();
+  }
+
+  /** The test. */
+  protected int test; // to delete
+
+  /**
+   * On event.
+   * 
+   * @param event
+   *          the event
+   * @return true, if successful
+   */
+  @Override
+  public boolean process(ContentEvent event) {
+
+    InstanceContentEvent inEvent = (InstanceContentEvent) event;
+    Instance instance = inEvent.getInstance();
+
+    if (inEvent.getInstanceIndex() < 0) {
+      // end learning
+      ResultContentEvent outContentEvent = new ResultContentEvent(-1, 
instance, 0,
+          new double[0], inEvent.isLastEvent());
+      outContentEvent.setClassifierIndex(this.modelId);
+      outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
+      outputStream.put(outContentEvent);
+      return false;
+    }
+
+    if (inEvent.isTesting()) {
+      double[] dist = model.getVotesForInstance(instance);
+      ResultContentEvent outContentEvent = new 
ResultContentEvent(inEvent.getInstanceIndex(),
+          instance, inEvent.getClassId(), dist, inEvent.isLastEvent());
+      outContentEvent.setClassifierIndex(this.modelId);
+      outContentEvent.setEvaluationIndex(inEvent.getEvaluationIndex());
+      logger.trace(inEvent.getInstanceIndex() + " {} {}", modelId, dist);
+      outputStream.put(outContentEvent);
+    }
+
+    if (inEvent.isTraining()) {
+      updateStats(inEvent);
+    }
+    return false;
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.core.Processor#onCreate(int)
+   */
+  @Override
+  public void onCreate(int id) {
+    this.modelId = id;
+    model = model.create();
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see samoa.core.Processor#newProcessor(samoa.core.Processor)
+   */
+  @Override
+  public Processor newProcessor(Processor sourceProcessor) {
+    LocalLearnerProcessor newProcessor = new LocalLearnerProcessor();
+    LocalLearnerProcessor originProcessor = (LocalLearnerProcessor) 
sourceProcessor;
+
+    if (originProcessor.getLearner() != null) {
+      newProcessor.setLearner(originProcessor.getLearner().create());
+    }
+
+    if (originProcessor.getChangeDetector() != null) {
+      newProcessor.setChangeDetector(originProcessor.getChangeDetector());
+    }
+
+    newProcessor.setOutputStream(originProcessor.getOutputStream());
+    return newProcessor;
+  }
+
+  protected ChangeDetector changeDetector;
+
+  public ChangeDetector getChangeDetector() {
+    return this.changeDetector;
+  }
+
+  public void setChangeDetector(ChangeDetector cd) {
+    this.changeDetector = cd;
+  }
+
+}

Reply via email to