http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/InstancesHeader.java
----------------------------------------------------------------------
diff --git 
a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/InstancesHeader.java
 
b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/InstancesHeader.java
new file mode 100644
index 0000000..1ffa6e7
--- /dev/null
+++ 
b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/InstancesHeader.java
@@ -0,0 +1,131 @@
+package com.yahoo.labs.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2007 University of Waikato, Hamilton, New Zealand
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+/**
+ * Class for storing the header or context of a data stream. It allows to know 
the number of attributes and classes.
+ *
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public class InstancesHeader extends Instances {
+
+    private static final long serialVersionUID = 1L;
+
+    public InstancesHeader(Instances i) {
+        super(i, 0);
+    }
+
+    public InstancesHeader() {
+        super();
+    }
+    
+   /* @Override
+    public boolean add(Instance i) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean readInstance(Reader r) throws IOException {
+        throw new UnsupportedOperationException();
+    }*/
+
+    public static String getClassNameString(InstancesHeader context) {
+        if (context == null) {
+            return "[class]";
+        }
+        return "[class:" + context.classAttribute().name() + "]";
+    }
+
+    public static String getClassLabelString(InstancesHeader context,
+            int classLabelIndex) {
+        if ((context == null) || (classLabelIndex >= context.numClasses())) {
+            return "<class " + (classLabelIndex + 1) + ">";
+        }
+        return "<class " + (classLabelIndex + 1) + ":"
+                + context.classAttribute().value(classLabelIndex) + ">";
+    }
+
+    // is impervious to class index changes - attIndex is true attribute index
+    // regardless of class position
+    public static String getAttributeNameString(InstancesHeader context,
+            int attIndex) {
+        if ((context == null) || (attIndex >= context.numAttributes())) {
+            return "[att " + (attIndex + 1) + "]";
+        }
+        int instAttIndex = attIndex < context.classIndex() ? attIndex
+                : attIndex + 1;
+        return "[att " + (attIndex + 1) + ":"
+                + context.attribute(instAttIndex).name() + "]";
+    }
+
+    // is impervious to class index changes - attIndex is true attribute index
+    // regardless of class position
+    public static String getNominalValueString(InstancesHeader context,
+            int attIndex, int valIndex) {
+        if (context != null) {
+            int instAttIndex = attIndex < context.classIndex() ? attIndex
+                    : attIndex + 1;
+            if ((instAttIndex < context.numAttributes())
+                    && (valIndex < 
context.attribute(instAttIndex).numValues())) {
+                return "{val " + (valIndex + 1) + ":"
+                        + context.attribute(instAttIndex).value(valIndex) + 
"}";
+            }
+        }
+        return "{val " + (valIndex + 1) + "}";
+    }
+
+    // is impervious to class index changes - attIndex is true attribute index
+    // regardless of class position
+    public static String getNumericValueString(InstancesHeader context,
+            int attIndex, double value) {
+        if (context != null) {
+            int instAttIndex = attIndex < context.classIndex() ? attIndex
+                    : attIndex + 1;
+            if (instAttIndex < context.numAttributes()) {
+                if (context.attribute(instAttIndex).isDate()) {
+                    return context.attribute(instAttIndex).formatDate(value);
+                }
+            }
+        }
+        return Double.toString(value);
+    }
+
+    
+    //add autom.
+   /* public int classIndex() {
+        throw new UnsupportedOperationException("Not yet implemented");
+    }
+
+    public int numAttributes() {
+        throw new UnsupportedOperationException("Not yet implemented");
+    }
+
+    @Override
+    public Attribute attribute(int nPos) {
+        throw new UnsupportedOperationException("Not yet implemented");
+    }
+    
+    public int numClasses() {
+        return 0;
+    }*/
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SingleClassInstanceData.java
----------------------------------------------------------------------
diff --git 
a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SingleClassInstanceData.java
 
b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SingleClassInstanceData.java
new file mode 100644
index 0000000..878c338
--- /dev/null
+++ 
b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SingleClassInstanceData.java
@@ -0,0 +1,86 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package com.yahoo.labs.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ *
+ * @author abifet
+ */
+public class SingleClassInstanceData implements InstanceData {
+
+    protected double classValue;
+    
+    @Override
+    public int numAttributes() {
+        return 1;
+    }
+
+    @Override
+    public double value(int instAttIndex) {
+        return classValue;
+    }
+
+    @Override
+    public boolean isMissing(int indexAttribute) {
+        return Double.isNaN(this.value(indexAttribute));
+    }
+
+    @Override
+    public int numValues() {
+        return 1;
+    }
+
+    @Override
+    public int index(int i) {
+        return 0;
+    }
+
+    @Override
+    public double valueSparse(int i) {
+        return value(i);
+    }
+
+    @Override
+    public boolean isMissingSparse(int indexAttribute) {
+        return Double.isNaN(this.value(indexAttribute));
+    }
+
+    /*@Override
+    public double value(Attribute attribute) {
+        return this.classValue;
+    }*/
+
+    @Override
+    public double[] toDoubleArray() {
+        double[] array = {this.classValue};
+        return array;
+    }
+
+    @Override
+    public void setValue(int m_numAttributes, double d) {
+        this.classValue = d;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SingleLabelInstance.java
----------------------------------------------------------------------
diff --git 
a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SingleLabelInstance.java
 
b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SingleLabelInstance.java
new file mode 100644
index 0000000..81b2818
--- /dev/null
+++ 
b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SingleLabelInstance.java
@@ -0,0 +1,261 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package com.yahoo.labs.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * 
+ * @author abifet
+ */
+// public int[] m_AttValues; // for DataPoint
+
+public class SingleLabelInstance implements Instance {
+
+       protected double weight;
+
+       protected InstanceData instanceData;
+
+       protected InstanceData classData;
+
+       // Fast implementation without using Objects
+       // protected double[] attributeValues;
+       // protected double classValue;
+
+       protected InstancesHeader instanceInformation;
+
+       public SingleLabelInstance() {
+               // necessary for kryo serializer
+       }
+
+       public SingleLabelInstance(SingleLabelInstance inst) {
+               this.weight = inst.weight;
+               this.instanceData = inst.instanceData; // copy
+               this.classData = inst.classData; // copy
+               // this.classValue = inst.classValue;
+               // this.attributeValues = inst.attributeValues;
+               this.instanceInformation = inst.instanceInformation;
+       }
+
+       // Dense
+       public SingleLabelInstance(double weight, double[] res) {
+               this.weight = weight;
+               this.instanceData = new DenseInstanceData(res);
+               //this.attributeValues = res;
+               this.classData = new SingleClassInstanceData();
+               // this.classValue = Double.NaN;
+               
+               
+       }
+
+       // Sparse
+       public SingleLabelInstance(double weight, double[] attributeValues,
+                       int[] indexValues, int numberAttributes) {
+               this.weight = weight;
+               this.instanceData = new SparseInstanceData(attributeValues,
+                               indexValues, numberAttributes); // ???
+               this.classData = new SingleClassInstanceData();
+               // this.classValue = Double.NaN;
+               //this.instanceInformation = new InstancesHeader();
+               
+       }
+
+       public SingleLabelInstance(double weight, InstanceData instanceData) {
+               this.weight = weight;
+               this.instanceData = instanceData; // ???
+               // this.classValue = Double.NaN;
+               this.classData = new SingleClassInstanceData();
+               //this.instanceInformation = new InstancesHeader();
+       }
+
+       public SingleLabelInstance(int numAttributes) {
+               this.instanceData = new DenseInstanceData(new 
double[numAttributes]);
+               // m_AttValues = new double[numAttributes];
+               /*
+                * for (int i = 0; i < m_AttValues.length; i++) { 
m_AttValues[i] =
+                * Utils.missingValue(); }
+                */
+               this.weight = 1;
+               this.classData = new SingleClassInstanceData();
+               this.instanceInformation = new InstancesHeader();
+       }
+
+       @Override
+       public double weight() {
+               return weight;
+       }
+
+       @Override
+       public void setWeight(double weight) {
+               this.weight = weight;
+       }
+
+       @Override
+       public Attribute attribute(int instAttIndex) {
+               return this.instanceInformation.attribute(instAttIndex);
+       }
+
+       @Override
+       public void deleteAttributeAt(int i) {
+               // throw new UnsupportedOperationException("Not yet 
implemented");
+       }
+
+       @Override
+       public void insertAttributeAt(int i) {
+               throw new UnsupportedOperationException("Not yet implemented");
+       }
+
+       @Override
+       public int numAttributes() {
+               return this.instanceInformation.numAttributes();
+       }
+
+       @Override
+       public double value(int instAttIndex) {
+               return // attributeValues[instAttIndex]; //
+               this.instanceData.value(instAttIndex);
+       }
+
+       @Override
+       public boolean isMissing(int instAttIndex) {
+               return // Double.isNaN(value(instAttIndex)); //
+               this.instanceData.isMissing(instAttIndex);
+       }
+
+       @Override
+       public int numValues() {
+               return // this.attributeValues.length; //
+               this.instanceData.numValues();
+       }
+
+       @Override
+       public int index(int i) {
+               return // i; //
+               this.instanceData.index(i);
+       }
+
+       @Override
+       public double valueSparse(int i) {
+               return this.instanceData.valueSparse(i);
+       }
+
+       @Override
+       public boolean isMissingSparse(int p) {
+               return this.instanceData.isMissingSparse(p);
+       }
+
+       @Override
+       public double value(Attribute attribute) {
+               // throw new UnsupportedOperationException("Not yet 
implemented");
+               // //Predicates.java
+               return value(attribute.index());
+
+       }
+
+       @Override
+       public String stringValue(int i) {
+               throw new UnsupportedOperationException("Not yet implemented");
+       }
+
+       @Override
+       public double[] toDoubleArray() {
+               return // this.attributeValues; //
+               this.instanceData.toDoubleArray();
+       }
+
+       @Override
+       public void setValue(int numAttribute, double d) {
+               this.instanceData.setValue(numAttribute, d);
+               // this.attributeValues[numAttribute] = d;
+       }
+
+       @Override
+       public double classValue() {
+               return this.classData.value(0);
+               // return classValue;
+       }
+
+       @Override
+       public int classIndex() {
+               return instanceInformation.classIndex();
+       }
+
+       @Override
+       public int numClasses() {
+               return this.instanceInformation.numClasses();
+       }
+
+       @Override
+       public boolean classIsMissing() {
+               return // Double.isNaN(this.classValue);//
+               this.classData.isMissing(0);
+       }
+
+       @Override
+       public Attribute classAttribute() {
+               return this.instanceInformation.attribute(0);
+       }
+
+       @Override
+       public void setClassValue(double d) {
+               this.classData.setValue(0, d);
+               // this.classValue = d;
+       }
+
+       @Override
+       public Instance copy() {
+               SingleLabelInstance inst = new SingleLabelInstance(this);
+               return inst;
+       }
+
+       @Override
+       public Instances dataset() {
+               return this.instanceInformation;
+       }
+
+       @Override
+       public void setDataset(Instances dataset) {
+               this.instanceInformation = new InstancesHeader(dataset);
+       }
+
+       public void addSparseValues(int[] indexValues, double[] attributeValues,
+                       int numberAttributes) {
+               this.instanceData = new SparseInstanceData(attributeValues,
+                               indexValues, numberAttributes); // ???
+       }
+
+       @Override
+       public String toString() {
+               StringBuffer text = new StringBuffer();
+
+               for (int i = 0; i < this.numValues() ; i++) {
+                       if (i > 0)
+                               text.append(",");
+                       text.append(this.value(i));
+               }
+               text.append(",").append(this.weight());
+
+               return text.toString();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstance.java
----------------------------------------------------------------------
diff --git 
a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstance.java
 
b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstance.java
new file mode 100644
index 0000000..66d0715
--- /dev/null
+++ 
b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstance.java
@@ -0,0 +1,49 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package com.yahoo.labs.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ *
+ * @author abifet
+ */
+public class SparseInstance extends SingleLabelInstance{
+    
+    public SparseInstance(double d, double[] res) {
+         super(d,res);
+    }
+    public SparseInstance(SingleLabelInstance inst) {
+        super(inst);
+    }
+
+    public SparseInstance(double numberAttributes) {
+      //super(1, new double[(int) numberAttributes-1]); 
+      super(1,null,null,(int) numberAttributes);  
+    }
+    
+    public SparseInstance(double weight, double[] attributeValues, int[] 
indexValues, int numberAttributes) {
+        super(weight,attributeValues,indexValues,numberAttributes);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java
----------------------------------------------------------------------
diff --git 
a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java
 
b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java
new file mode 100644
index 0000000..e917844
--- /dev/null
+++ 
b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/SparseInstanceData.java
@@ -0,0 +1,172 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package com.yahoo.labs.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ *
+ * @author abifet
+ */
+public class SparseInstanceData implements InstanceData{
+    
+    public SparseInstanceData(double[] attributeValues, int[] indexValues, int 
numberAttributes) {
+       this.attributeValues = attributeValues;
+       this.indexValues = indexValues;
+       this.numberAttributes = numberAttributes;
+    }
+    
+    public SparseInstanceData(int length) {
+       this.attributeValues = new double[length];
+       this.indexValues =  new int[length];
+    }
+    
+    
+    protected double[] attributeValues;
+
+    public double[] getAttributeValues() {
+        return attributeValues;
+    }
+
+    public void setAttributeValues(double[] attributeValues) {
+        this.attributeValues = attributeValues;
+    }
+
+    public int[] getIndexValues() {
+        return indexValues;
+    }
+
+    public void setIndexValues(int[] indexValues) {
+        this.indexValues = indexValues;
+    }
+
+    public int getNumberAttributes() {
+        return numberAttributes;
+    }
+
+    public void setNumberAttributes(int numberAttributes) {
+        this.numberAttributes = numberAttributes;
+    }
+    protected int[] indexValues;
+    protected int numberAttributes;
+
+    @Override
+    public int numAttributes() {
+        return this.numberAttributes;
+    }
+
+    @Override
+    public double value(int indexAttribute) {
+        int location = locateIndex(indexAttribute);
+        //return location == -1 ? 0 : this.attributeValues[location];
+      //      int index = locateIndex(attIndex);
+    if ((location >= 0) && (indexValues[location] == indexAttribute)) {
+      return attributeValues[location];
+    } else {
+      return 0.0;
+    }
+    }
+
+    @Override
+    public boolean isMissing(int indexAttribute) {
+        return Double.isNaN(this.value(indexAttribute));
+    }
+
+    @Override
+    public int numValues() {
+        return this.attributeValues.length;
+    }
+
+    @Override
+    public int index(int indexAttribute) {
+        return this.indexValues[indexAttribute];
+    }
+
+    @Override
+    public double valueSparse(int indexAttribute) {
+        return this.attributeValues[indexAttribute];
+    }
+
+    @Override
+    public boolean isMissingSparse(int indexAttribute) {
+        return Double.isNaN(this.valueSparse(indexAttribute));
+    }
+
+    /*@Override
+    public double value(Attribute attribute) {
+        return value(attribute.index());
+    }*/
+
+    @Override
+    public double[] toDoubleArray() {
+        double[] array = new double[numAttributes()];
+        for (int i=0; i<numValues() ; i++) {
+            array[index(i)] = valueSparse(i);
+        }
+        return array;
+    }
+
+    @Override
+    public void setValue(int attributeIndex, double d) {
+        int index = locateIndex(attributeIndex);
+        if (index(index) == attributeIndex) {
+            this.attributeValues[index] = d;
+        } else {
+            // We need to add the value
+        }
+    }
+    
+    /**
+   * Locates the greatest index that is not greater than the given index.
+   * 
+   * @return the internal index of the attribute index. Returns -1 if no index
+   *         with this property could be found
+   */
+  public int locateIndex(int index) {
+
+    int min = 0;
+    int max = this.indexValues.length - 1;
+
+    if (max == -1) {
+      return -1;
+    }
+
+    // Binary search
+    while ((this.indexValues[min] <= index) && (this.indexValues[max] >= 
index)) {
+      int current = (max + min) / 2;
+      if (this.indexValues[current] > index) {
+        max = current - 1;
+      } else if (this.indexValues[current] < index) {
+        min = current + 1;
+      } else {
+        return current;
+      }
+    }
+    if (this.indexValues[max] < index) {
+      return max;
+    } else {
+      return min - 1;
+    }
+  }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java
----------------------------------------------------------------------
diff --git 
a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java 
b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java
new file mode 100644
index 0000000..f3dc1b9
--- /dev/null
+++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Utils.java
@@ -0,0 +1,88 @@
+package com.yahoo.labs.samoa.instances;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+public class Utils {
+    public static int maxIndex(double[] doubles) {
+
+        double maximum = 0;
+        int maxIndex = 0;
+
+        for (int i = 0; i < doubles.length; i++) {
+            if ((i == 0) || (doubles[i] > maximum)) {
+                maxIndex = i;
+                maximum = doubles[i];
+            }
+        }
+
+        return maxIndex;
+    }
+
+    public static String quote(String string) {
+        boolean quote = false;
+
+        // backquote the following characters
+        if ((string.indexOf('\n') != -1) || (string.indexOf('\r') != -1) || 
(string.indexOf('\'') != -1) || (string.indexOf('"') != -1)
+                || (string.indexOf('\\') != -1) || (string.indexOf('\t') != 
-1) || (string.indexOf('%') != -1) || (string.indexOf('\u001E') != -1)) {
+            string = backQuoteChars(string);
+            quote = true;
+        }
+
+        // Enclose the string in 's if the string contains a recently added
+        // backquote or contains one of the following characters.
+        if ((quote == true) || (string.indexOf('{') != -1) || 
(string.indexOf('}') != -1) || (string.indexOf(',') != -1) || 
(string.equals("?"))
+                || (string.indexOf(' ') != -1) || (string.equals(""))) {
+            string = ("'".concat(string)).concat("'");
+        }
+
+        return string;
+    }
+
+    public static String backQuoteChars(String string) {
+
+        int index;
+        StringBuffer newStringBuffer;
+
+        // replace each of the following characters with the backquoted version
+        char charsFind[] = { '\\', '\'', '\t', '\n', '\r', '"', '%', '\u001E' 
};
+        String charsReplace[] = { "\\\\", "\\'", "\\t", "\\n", "\\r", "\\\"", 
"\\%", "\\u001E" };
+        for (int i = 0; i < charsFind.length; i++) {
+            if (string.indexOf(charsFind[i]) != -1) {
+                newStringBuffer = new StringBuffer();
+                while ((index = string.indexOf(charsFind[i])) != -1) {
+                    if (index > 0) {
+                        newStringBuffer.append(string.substring(0, index));
+                    }
+                    newStringBuffer.append(charsReplace[i]);
+                    if ((index + 1) < string.length()) {
+                        string = string.substring(index + 1);
+                    } else {
+                        string = "";
+                    }
+                }
+                newStringBuffer.append(string);
+                string = newStringBuffer.toString();
+            }
+        }
+
+        return string;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-local/pom.xml b/samoa-local/pom.xml
new file mode 100644
index 0000000..b36a2c8
--- /dev/null
+++ b/samoa-local/pom.xml
@@ -0,0 +1,101 @@
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2013 Yahoo! Inc.
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  #L%
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <name>samoa-local</name>
+    <description>Simple local engine for SAMOA</description>
+    <artifactId>samoa-local</artifactId>
+    <parent>
+        <groupId>com.yahoo.labs.samoa</groupId>
+        <artifactId>samoa</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.yahoo.labs.samoa</groupId>
+            <artifactId>samoa-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.yahoo.labs.samoa</groupId>
+            <artifactId>samoa-test</artifactId>
+            <type>test-jar</type>
+            <classifier>test-jar-with-dependencies</classifier>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>${slf4j-simple.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- SAMOA assembly -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>${maven-assembly-plugin.version}</version>
+                <configuration>
+                    <finalName>SAMOA-Local-${project.version}</finalName>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <attach>false</attach>
+                    <outputDirectory>../target</outputDirectory>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <archive>
+                        <manifestEntries>
+                            
<Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version>
+                            
<Bundle-Description>${project.description}</Bundle-Description>
+                            
<Implementation-Version>${project.version}</Implementation-Version>
+                            <Implementation-Vendor>Yahoo 
Labs</Implementation-Vendor>
+                            
<Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id>
+                        </manifestEntries>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for 
inheritance merges -->
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>${maven-surefire-plugin.version}</version>
+                <configuration>
+                    <argLine>-Xmx1G</argLine>
+                    <redirectTestOutputToFile>false</redirectTestOutputToFile>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java
new file mode 100644
index 0000000..05ee1e1
--- /dev/null
+++ b/samoa-local/src/main/java/com/yahoo/labs/samoa/LocalDoTask.java
@@ -0,0 +1,89 @@
+package com.yahoo.labs.samoa;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.FlagOption;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.Option;
+import com.yahoo.labs.samoa.tasks.Task;
+import com.yahoo.labs.samoa.topology.impl.SimpleComponentFactory;
+import com.yahoo.labs.samoa.topology.impl.SimpleEngine;
+
+/**
+ * The Class DoTask.
+ */
+public class LocalDoTask {
+
+    // TODO: clean up this class for helping ML Developer in SAMOA
+    // TODO: clean up code from storm-impl
+       
+       // It seems that the 3 extra options are not used.
+       // Probably should remove them
+    private static final String SUPPRESS_STATUS_OUT_MSG = "Suppress the task 
status output. Normally it is sent to stderr.";
+    private static final String SUPPRESS_RESULT_OUT_MSG = "Suppress the task 
result output. Normally it is sent to stdout.";
+    private static final String STATUS_UPDATE_FREQ_MSG = "Wait time in 
milliseconds between status updates.";
+    private static final Logger logger = 
LoggerFactory.getLogger(LocalDoTask.class);
+
+    /**
+     * The main method.
+     * 
+     * @param args
+     *            the arguments
+     */
+    public static void main(String[] args) {
+
+        // ArrayList<String> tmpArgs = new 
ArrayList<String>(Arrays.asList(args));
+
+        // args = tmpArgs.toArray(new String[0]);
+
+        FlagOption suppressStatusOutOpt = new FlagOption("suppressStatusOut", 
'S', SUPPRESS_STATUS_OUT_MSG);
+
+        FlagOption suppressResultOutOpt = new FlagOption("suppressResultOut", 
'R', SUPPRESS_RESULT_OUT_MSG);
+
+        IntOption statusUpdateFreqOpt = new IntOption("statusUpdateFrequency", 
'F', STATUS_UPDATE_FREQ_MSG, 1000, 0, Integer.MAX_VALUE);
+
+        Option[] extraOptions = new Option[] { suppressStatusOutOpt, 
suppressResultOutOpt, statusUpdateFreqOpt };
+
+        StringBuilder cliString = new StringBuilder();
+        for (String arg : args) {
+            cliString.append(" ").append(arg);
+        }
+        logger.debug("Command line string = {}", cliString.toString());
+        System.out.println("Command line string = " + cliString.toString());
+
+        Task task;
+        try {
+            task = ClassOption.cliStringToObject(cliString.toString(), 
Task.class, extraOptions);
+            logger.info("Successfully instantiating {}", 
task.getClass().getCanonicalName());
+        } catch (Exception e) {
+            logger.error("Fail to initialize the task", e);
+            System.out.println("Fail to initialize the task" + e);
+            return;
+        }
+        task.setFactory(new SimpleComponentFactory());
+        task.init();
+        SimpleEngine.submitTopology(task.getTopology());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java
 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java
new file mode 100644
index 0000000..b289dbe
--- /dev/null
+++ 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactory.java
@@ -0,0 +1,53 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.ComponentFactory;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.Topology;
+
+public class SimpleComponentFactory implements ComponentFactory {
+
+    public ProcessingItem createPi(Processor processor, int paralellism) {
+        return new SimpleProcessingItem(processor, paralellism);
+    }
+
+    public ProcessingItem createPi(Processor processor) {
+        return this.createPi(processor, 1);
+    }
+
+    public EntranceProcessingItem createEntrancePi(EntranceProcessor 
processor) {
+        return new SimpleEntranceProcessingItem(processor);
+    }
+
+    public Stream createStream(IProcessingItem sourcePi) {
+        return new SimpleStream(sourcePi);
+    }
+
+    public Topology createTopology(String topoName) {
+        return new SimpleTopology(topoName);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java
 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java
new file mode 100644
index 0000000..9d131e1
--- /dev/null
+++ 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEngine.java
@@ -0,0 +1,37 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.topology.Topology;
+
+public class SimpleEngine {
+
+    public static void submitTopology(Topology topology) {
+        SimpleTopology simpleTopology = (SimpleTopology) topology;
+        simpleTopology.run();
+        // runs until completion
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java
 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java
new file mode 100644
index 0000000..4652ebb
--- /dev/null
+++ 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItem.java
@@ -0,0 +1,33 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.topology.LocalEntranceProcessingItem;
+
+class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem {
+    public SimpleEntranceProcessingItem(EntranceProcessor processor) {
+        super(processor);
+    }
+    
+    // The default waiting time when there is no available events is 100ms
+    // Override waitForNewEvents() to change it
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java
 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java
new file mode 100644
index 0000000..e3cc765
--- /dev/null
+++ 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItem.java
@@ -0,0 +1,87 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.AbstractProcessingItem;
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import com.yahoo.labs.samoa.utils.StreamDestination;
+
+/**
+ *
+ * @author abifet
+ */
+class SimpleProcessingItem extends AbstractProcessingItem {
+    private IProcessingItem[] arrayProcessingItem;
+
+    SimpleProcessingItem(Processor processor) {
+        super(processor);
+    }
+    
+    SimpleProcessingItem(Processor processor, int parallelism) {
+       super(processor);
+        this.setParallelism(parallelism);
+    }
+    
+    public IProcessingItem getProcessingItem(int i) {
+        return arrayProcessingItem[i];
+    }
+    
+    @Override
+    protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+               StreamDestination destination = new StreamDestination(this, 
this.getParallelism(), scheme);
+               ((SimpleStream)inputStream).addDestination(destination);
+               return this;
+       }
+
+    public SimpleProcessingItem copy() {
+       Processor processor = this.getProcessor();
+        return new SimpleProcessingItem(processor.newProcessor(processor));
+    }
+
+    public void processEvent(ContentEvent event, int counter) {
+       
+        int parallelism = this.getParallelism();
+        //System.out.println("Process event "+event+" 
(isLast="+event.isLastEvent()+") with counter="+counter+" while 
parallelism="+parallelism);
+        if (this.arrayProcessingItem == null && parallelism > 0) {
+            //Init processing elements, the first time they are needed
+            this.arrayProcessingItem = new IProcessingItem[parallelism];
+            for (int j = 0; j < parallelism; j++) {
+                arrayProcessingItem[j] = this.copy();
+                arrayProcessingItem[j].getProcessor().onCreate(j);
+            }
+        }
+        if (this.arrayProcessingItem != null) {
+               IProcessingItem pi = this.getProcessingItem(counter);
+               Processor p = pi.getProcessor();
+               //System.out.println("PI="+pi+", p="+p);
+            this.getProcessingItem(counter).getProcessor().process(event);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
new file mode 100644
index 0000000..74684a7
--- /dev/null
+++ 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
@@ -0,0 +1,93 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.topology.AbstractStream;
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+import com.yahoo.labs.samoa.utils.StreamDestination;
+
+/**
+ * 
+ * @author abifet
+ */
+class SimpleStream extends AbstractStream {
+    private List<StreamDestination> destinations;
+    private int maxCounter;
+    private int eventCounter;
+
+    SimpleStream(IProcessingItem sourcePi) {
+       super(sourcePi);
+       this.destinations = new LinkedList<>();
+       this.eventCounter = 0;
+       this.maxCounter = 1;
+    }
+
+    private int getNextCounter() {
+       if (maxCounter > 0 && eventCounter >= maxCounter) eventCounter = 0;
+       this.eventCounter++;
+       return this.eventCounter;
+    }
+
+    @Override
+    public void put(ContentEvent event) {
+       this.put(event, this.getNextCounter());
+    }
+    
+    private void put(ContentEvent event, int counter) {
+       SimpleProcessingItem pi;
+        int parallelism;
+        for (StreamDestination destination:destinations) {
+            pi = (SimpleProcessingItem) destination.getProcessingItem();
+            parallelism = destination.getParallelism();
+            switch (destination.getPartitioningScheme()) {
+            case SHUFFLE:
+                pi.processEvent(event, counter % parallelism);
+                break;
+            case GROUP_BY_KEY:
+                HashCodeBuilder hb = new HashCodeBuilder();
+                hb.append(event.getKey());
+                int key = hb.build() % parallelism;
+                pi.processEvent(event, key);
+                break;
+            case BROADCAST:
+                for (int p = 0; p < parallelism; p++) {
+                    pi.processEvent(event, p);
+                }
+                break;
+            }
+        }
+    }
+
+    public void addDestination(StreamDestination destination) {
+        this.destinations.add(destination);
+        if (maxCounter <= 0) maxCounter = 1;
+        maxCounter *= destination.getParallelism();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
new file mode 100644
index 0000000..675b4ac
--- /dev/null
+++ 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
@@ -0,0 +1,44 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.topology.AbstractTopology;
+
+public class SimpleTopology extends AbstractTopology {
+       SimpleTopology(String name) {
+               super(name);
+       }
+
+       public void run() {
+       if (this.getEntranceProcessingItems() == null)
+               throw new IllegalStateException("You need to set entrance PI 
before running the topology.");
+       if (this.getEntranceProcessingItems().size() != 1)
+               throw new IllegalStateException("SimpleTopology supports 1 
entrance PI only. Number of entrance PIs is 
"+this.getEntranceProcessingItems().size());
+       
+       SimpleEntranceProcessingItem entrancePi = 
(SimpleEntranceProcessingItem) this.getEntranceProcessingItems().toArray()[0];
+       entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in 
simple mode
+        entrancePi.startSendingEvents();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/resources/log4j.xml 
b/samoa-local/src/main/resources/log4j.xml
new file mode 100644
index 0000000..cb40dec
--- /dev/null
+++ b/samoa-local/src/main/resources/log4j.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2013 - 2014 Yahoo! Inc.
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  #L%
+  -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration debug="false">
+  
+  <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+     <layout class="org.apache.log4j.PatternLayout">
+      <param name="ConversionPattern" value="%d [%t] %-5p %c (%F:%L) - %m%n"/>
+     </layout>
+  </appender>
+  
+  <root>
+    <priority value="info" />
+    <appender-ref ref="CONSOLE"/>
+  </root>
+</log4j:configuration>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
new file mode 100644
index 0000000..9bf1c2d
--- /dev/null
+++ b/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
@@ -0,0 +1,87 @@
+package com.yahoo.labs.samoa;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.junit.Test;
+
+public class AlgosTest {
+
+
+    @Test
+    public void testVHTLocal() throws Exception {
+
+        TestParams vhtConfig = new TestParams.Builder()
+                .inputInstances(200_000)
+                .samplingSize(20_000)
+                .evaluationInstances(200_000)
+                .classifiedInstances(200_000)
+                .classificationsCorrect(75f)
+                .kappaStat(0f)
+                .kappaTempStat(0f)
+                
.cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE)
+                .resultFilePollTimeout(10)
+                .prePollWait(10)
+                .taskClassName(LocalDoTask.class.getName())
+                .build();
+        TestUtils.test(vhtConfig);
+
+    }
+
+    @Test
+    public void testBaggingLocal() throws Exception {
+        TestParams baggingConfig = new TestParams.Builder()
+                .inputInstances(200_000)
+                .samplingSize(20_000)
+                .evaluationInstances(180_000)
+                .classifiedInstances(210_000)
+                .classificationsCorrect(60f)
+                .kappaStat(0f)
+                .kappaTempStat(0f)
+                
.cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE)
+                .prePollWait(10)
+                .resultFilePollTimeout(10)
+                .taskClassName(LocalDoTask.class.getName())
+                .build();
+        TestUtils.test(baggingConfig);
+
+    }
+
+    @Test
+    public void testNaiveBayesLocal() throws Exception {
+
+        TestParams vhtConfig = new TestParams.Builder()
+                .inputInstances(200_000)
+                .samplingSize(20_000)
+                .evaluationInstances(200_000)
+                .classifiedInstances(200_000)
+                .classificationsCorrect(65f)
+                .kappaStat(0f)
+                .kappaTempStat(0f)
+                
.cliStringTemplate(TestParams.Templates.PREQEVAL_NAIVEBAYES_HYPERPLANE)
+                .resultFilePollTimeout(10)
+                .prePollWait(10)
+                .taskClassName(LocalDoTask.class.getName())
+                .build();
+        TestUtils.test(vhtConfig);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
new file mode 100644
index 0000000..02a9295
--- /dev/null
+++ 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
@@ -0,0 +1,96 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.Tested;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.Topology;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+public class SimpleComponentFactoryTest {
+
+       @Tested private SimpleComponentFactory factory;
+       @Mocked private Processor processor, processorReplica;
+       @Mocked private EntranceProcessor entranceProcessor;
+       
+       private final int parallelism = 3;
+       private final String topoName = "TestTopology";
+       
+
+       @Before
+       public void setUp() throws Exception {
+               factory = new SimpleComponentFactory();
+       }
+
+       @Test
+       public void testCreatePiNoParallelism() {
+               ProcessingItem pi = factory.createPi(processor);
+               assertNotNull("ProcessingItem created is null.",pi);
+               assertEquals("ProcessingItem created is not a 
SimpleProcessingItem.",SimpleProcessingItem.class,pi.getClass());
+               assertEquals("Parallelism of PI is not 
1",1,pi.getParallelism(),0);
+       }
+       
+       @Test
+       public void testCreatePiWithParallelism() {
+               ProcessingItem pi = factory.createPi(processor,parallelism);
+               assertNotNull("ProcessingItem created is null.",pi);
+               assertEquals("ProcessingItem created is not a 
SimpleProcessingItem.",SimpleProcessingItem.class,pi.getClass());
+               assertEquals("Parallelism of PI is not 
",parallelism,pi.getParallelism(),0);
+       }
+       
+       @Test
+       public void testCreateStream() {
+               ProcessingItem pi = factory.createPi(processor);
+               
+               Stream stream = factory.createStream(pi);
+               assertNotNull("Stream created is null",stream);
+               assertEquals("Stream created is not a 
SimpleStream.",SimpleStream.class,stream.getClass());
+       }
+       
+       @Test
+       public void testCreateTopology() {
+               Topology topology = factory.createTopology(topoName);
+               assertNotNull("Topology created is null.",topology);
+               assertEquals("Topology created is not a 
SimpleTopology.",SimpleTopology.class,topology.getClass());
+       }
+       
+       @Test
+       public void testCreateEntrancePi() {
+               EntranceProcessingItem entrancePi = 
factory.createEntrancePi(entranceProcessor);
+               assertNotNull("EntranceProcessingItem created is 
null.",entrancePi);
+               assertEquals("EntranceProcessingItem created is not a 
SimpleEntranceProcessingItem.",SimpleEntranceProcessingItem.class,entrancePi.getClass());
+               assertSame("EntranceProcessor is not set 
correctly.",entranceProcessor, entrancePi.getProcessor());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
new file mode 100644
index 0000000..c4649ed
--- /dev/null
+++ 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
@@ -0,0 +1,57 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import mockit.Tested;
+import mockit.Verifications;
+
+import org.junit.Test;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+public class SimpleEngineTest {
+
+       @Tested private SimpleEngine unused;
+       @Mocked private SimpleTopology topology;
+       @Mocked private Runtime mockedRuntime;
+       
+       @Test
+       public void testSubmitTopology() {
+               new NonStrictExpectations() {
+                       {
+                               Runtime.getRuntime();
+                               result=mockedRuntime;
+                               mockedRuntime.exit(0);
+                       }
+               };
+               SimpleEngine.submitTopology(topology);
+               new Verifications() {
+                       {
+                               topology.run();
+                       }
+               };
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
new file mode 100644
index 0000000..41ae22b
--- /dev/null
+++ 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
@@ -0,0 +1,153 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.StrictExpectations;
+import mockit.Tested;
+import mockit.Verifications;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.topology.Stream;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+public class SimpleEntranceProcessingItemTest {
+
+       @Tested private SimpleEntranceProcessingItem entrancePi;
+       
+       @Mocked private EntranceProcessor entranceProcessor;
+       @Mocked private Stream outputStream, anotherStream;
+       @Mocked private ContentEvent event;
+       
+       @Mocked private Thread unused;
+       
+       /**
+        * @throws java.lang.Exception
+        */
+       @Before
+       public void setUp() throws Exception {
+               entrancePi = new 
SimpleEntranceProcessingItem(entranceProcessor);
+       }
+
+       @Test
+       public void testContructor() {
+               assertSame("EntranceProcessor is not set 
correctly.",entranceProcessor,entrancePi.getProcessor());
+       }
+       
+       @Test
+       public void testSetOutputStream() {
+               entrancePi.setOutputStream(outputStream);
+               assertSame("OutputStream is not set 
correctly.",outputStream,entrancePi.getOutputStream());
+       }
+       
+       @Test
+       public void testSetOutputStreamRepeate() {
+               entrancePi.setOutputStream(outputStream);
+               entrancePi.setOutputStream(outputStream);
+               assertSame("OutputStream is not set 
correctly.",outputStream,entrancePi.getOutputStream());
+       }
+       
+       @Test(expected=IllegalStateException.class)
+       public void testSetOutputStreamError() {
+               entrancePi.setOutputStream(outputStream);
+               entrancePi.setOutputStream(anotherStream);
+       }
+       
+       @Test
+       public void testInjectNextEventSuccess() {
+               entrancePi.setOutputStream(outputStream);
+               new StrictExpectations() {
+                       {
+                               entranceProcessor.hasNext();
+                               result=true;
+                               
+                               entranceProcessor.nextEvent();
+                               result=event;
+                       }
+               };
+               entrancePi.injectNextEvent();
+               new Verifications() {
+                       {
+                               outputStream.put(event);
+                       }
+               };
+       }
+       
+       @Test
+       public void testStartSendingEvents() {
+               entrancePi.setOutputStream(outputStream);
+               new StrictExpectations() {
+                       {
+                               for (int i=0; i<1; i++) {
+                                       entranceProcessor.isFinished(); 
result=false;
+                                       entranceProcessor.hasNext(); 
result=false;
+                               }
+                               
+                               for (int i=0; i<5; i++) {
+                                       entranceProcessor.isFinished(); 
result=false;
+                                       entranceProcessor.hasNext(); 
result=true;
+                                       entranceProcessor.nextEvent(); 
result=event;
+                                       outputStream.put(event);
+                               }
+                               
+                               for (int i=0; i<2; i++) {
+                                       entranceProcessor.isFinished(); 
result=false;
+                                       entranceProcessor.hasNext(); 
result=false;
+                               }
+                               
+                               for (int i=0; i<5; i++) {
+                                       entranceProcessor.isFinished(); 
result=false;
+                                       entranceProcessor.hasNext(); 
result=true;
+                                       entranceProcessor.nextEvent(); 
result=event;
+                                       outputStream.put(event);
+                               }
+
+                               entranceProcessor.isFinished(); result=true; 
times=1;
+                               entranceProcessor.hasNext(); times=0;
+                       }
+               };
+               entrancePi.startSendingEvents();
+               new Verifications() {
+                       {
+                               try {
+                                       Thread.sleep(anyInt); times=3;
+                               } catch (InterruptedException e) {
+
+                               }
+                       }
+               };
+       }
+       
+       @Test(expected=IllegalStateException.class)
+       public void testStartSendingEventsError() {
+               entrancePi.startSendingEvents();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
new file mode 100644
index 0000000..a4a288a
--- /dev/null
+++ 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
@@ -0,0 +1,120 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import mockit.Tested;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import com.yahoo.labs.samoa.utils.StreamDestination;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+public class SimpleProcessingItemTest {
+
+       @Tested private SimpleProcessingItem pi;
+       
+       @Mocked private Processor processor;
+       @Mocked private SimpleStream stream;
+       @Mocked private StreamDestination destination;
+       @Mocked private ContentEvent event;
+       
+       private final int parallelism = 4;
+       private final int counter = 2;
+       
+       
+       @Before
+       public void setUp() throws Exception {
+               pi = new SimpleProcessingItem(processor, parallelism);
+       }
+
+       @Test
+       public void testConstructor() {
+               assertSame("Processor was not set 
correctly.",processor,pi.getProcessor());
+               assertEquals("Parallelism was not set 
correctly.",parallelism,pi.getParallelism(),0);
+       }
+       
+       @Test
+       public void testConnectInputShuffleStream() {
+               new Expectations() {
+                       {
+                               destination = new StreamDestination(pi, 
parallelism, PartitioningScheme.SHUFFLE);
+                               stream.addDestination(destination);
+                       }
+               };
+               pi.connectInputShuffleStream(stream);
+       }
+       
+       @Test
+       public void testConnectInputKeyStream() {
+               new Expectations() {
+                       {
+                               destination = new StreamDestination(pi, 
parallelism, PartitioningScheme.GROUP_BY_KEY);
+                               stream.addDestination(destination);
+                       }
+               };
+               pi.connectInputKeyStream(stream);
+       }
+       
+       @Test
+       public void testConnectInputAllStream() {
+               new Expectations() {
+                       {
+                               destination = new StreamDestination(pi, 
parallelism, PartitioningScheme.BROADCAST);
+                               stream.addDestination(destination);
+                       }
+               };
+               pi.connectInputAllStream(stream);
+       }
+       
+       @Test
+       public void testProcessEvent() {
+               new Expectations() {
+                       {
+                               for (int i=0; i<parallelism; i++) {
+                                       processor.newProcessor(processor);
+                                       result=processor;
+                               
+                                       processor.onCreate(anyInt);
+                               }
+                               
+                               processor.process(event);
+                       }
+               };
+               pi.processEvent(event, counter);
+               
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
new file mode 100644
index 0000000..2a625b5
--- /dev/null
+++ 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
@@ -0,0 +1,111 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import mockit.Tested;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import com.yahoo.labs.samoa.utils.StreamDestination;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+@RunWith(Parameterized.class)
+public class SimpleStreamTest {
+
+       @Tested private SimpleStream stream;
+       
+       @Mocked private SimpleProcessingItem sourcePi, destPi;
+       @Mocked private ContentEvent event;
+       @Mocked private StreamDestination destination;
+
+       private final String eventKey = "eventkey";
+       private final int parallelism;
+       private final PartitioningScheme scheme;
+       
+       
+       @Parameters
+       public static Collection<Object[]> generateParameters() {
+               return Arrays.asList(new Object[][] {
+                        { 2, PartitioningScheme.SHUFFLE },
+                        { 3, PartitioningScheme.GROUP_BY_KEY },
+                        { 4, PartitioningScheme.BROADCAST }
+               });
+       }
+       
+       public SimpleStreamTest(int parallelism, PartitioningScheme scheme) {
+               this.parallelism = parallelism;
+               this.scheme = scheme;
+       }
+       
+       @Before
+       public void setUp() throws Exception {
+               stream = new SimpleStream(sourcePi);
+               stream.addDestination(destination);
+       }
+
+       @Test
+       public void testPut() {
+               new NonStrictExpectations() {
+                       {
+                               event.getKey(); result=eventKey;
+                               destination.getProcessingItem(); result=destPi;
+                               destination.getPartitioningScheme(); 
result=scheme;
+                               destination.getParallelism(); 
result=parallelism;
+                               
+                       }
+               };
+               switch(this.scheme) {
+               case SHUFFLE: case GROUP_BY_KEY:
+                       new Expectations() {
+                               {
+                                       // TODO: restrict the range of counter 
value
+                                       destPi.processEvent(event, anyInt); 
times=1;
+                               }
+                       };
+                       break;
+               case BROADCAST:
+                       new Expectations() {
+                               {
+                                       // TODO: restrict the range of counter 
value
+                                       destPi.processEvent(event, anyInt); 
times=parallelism;
+                               }
+                       };
+                       break;
+               }
+               stream.put(event);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
new file mode 100644
index 0000000..2423778
--- /dev/null
+++ 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
@@ -0,0 +1,93 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.Set;
+
+import mockit.NonStrictExpectations;
+import mockit.Expectations;
+import mockit.Mocked;
+import mockit.Tested;
+
+import org.junit.Before;
+import org.junit.Test;
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+public class SimpleTopologyTest {
+
+       @Tested private SimpleTopology topology;
+       
+       @Mocked private SimpleEntranceProcessingItem entrancePi;
+       @Mocked private EntranceProcessor entranceProcessor;
+       
+       @Before
+       public void setUp() throws Exception {
+               topology = new SimpleTopology("TestTopology");
+       }
+
+       @Test
+       public void testAddEntrancePi() {
+               topology.addEntranceProcessingItem(entrancePi);
+               
+               Set<EntranceProcessingItem> entrancePIs = 
topology.getEntranceProcessingItems();
+               assertNotNull("Set of entrance PIs is null.",entrancePIs);
+               assertEquals("Number of entrance PI in SimpleTopology must be 
1",1,entrancePIs.size());
+               assertSame("Entrance PI was not set 
correctly.",entrancePi,entrancePIs.toArray()[0]);
+               // TODO: verify that entrance PI is in the set of 
ProcessingItems
+               // Need to access topology's set of PIs (getProcessingItems() 
method)
+       }
+       
+       @Test
+       public void testRun() {
+               topology.addEntranceProcessingItem(entrancePi);
+               
+               new NonStrictExpectations() {
+                       {
+                               entrancePi.getProcessor();
+                               result=entranceProcessor;
+                               
+                       }
+               };
+               
+               new Expectations() {
+                       {
+                               entranceProcessor.onCreate(anyInt);     
+                               entrancePi.startSendingEvents();
+                       }
+               };
+               topology.run();
+       }
+       
+       @Test(expected=IllegalStateException.class)
+       public void testRunWithoutEntrancePI() {
+               topology.run();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-s4/pom.xml b/samoa-s4/pom.xml
new file mode 100644
index 0000000..b530d44
--- /dev/null
+++ b/samoa-s4/pom.xml
@@ -0,0 +1,135 @@
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2013 Yahoo! Inc.
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  #L%
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <name>samoa-s4</name>
+    <description>S4 bindings for SAMOA</description>
+
+    <artifactId>samoa-s4</artifactId>
+    <parent>
+        <groupId>com.yahoo.labs.samoa</groupId>
+        <artifactId>samoa</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.yahoo.labs.samoa</groupId>
+            <artifactId>samoa-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.javacliparser</groupId>
+            <artifactId>javacliparser</artifactId>
+            <version>${javacliparser.version}</version>
+        </dependency>
+
+        <!-- S4 dependencies need to be installed separately as they are 
+            not available via Maven yet -->
+        <dependency>
+            <groupId>org.apache.s4</groupId>
+            <artifactId>s4-base</artifactId>
+            <version>${s4.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.s4</groupId>
+            <artifactId>s4-comm</artifactId>
+            <version>${s4.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.s4</groupId>
+            <artifactId>s4-core</artifactId>
+            <version>${s4.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <version>${maven-dependency-plugin.version}</version>
+                <configuration>
+                    
<outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <overWriteReleases>false</overWriteReleases>
+                    <overWriteSnapshots>false</overWriteSnapshots>
+                    <overWriteIfNewer>true</overWriteIfNewer>
+
+                    <excludeGroupIds>org.apache.s4</excludeGroupIds>
+                    <excludeTransitive>true</excludeTransitive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <!-- SAMOA assembly -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>${maven-assembly-plugin.version}</version>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/main/assembly/samoa-s4.xml</descriptor>
+                    </descriptors>
+                    <finalName>SAMOA-S4-${project.version}</finalName>
+                    <attach>false</attach>
+                    <outputDirectory>../target</outputDirectory>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <archive>
+                        <manifestEntries>
+                            
<Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version>
+                            
<Bundle-Description>${project.description}</Bundle-Description>
+                            
<Implementation-Version>${project.version}</Implementation-Version>
+                            <Implementation-Vendor>Yahoo 
Labs</Implementation-Vendor>
+                            
<Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id>
+                            
<S4-App-Class>com.yahoo.labs.samoa.topology.impl.S4DoTask</S4-App-Class>
+                            <S4-Version>${s4.version}</S4-Version>
+                        </manifestEntries>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for 
inheritance merges -->
+                        <phase>package</phase> <!-- bind to the packaging 
phase -->
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/samoa-s4-adapter/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-s4/samoa-s4-adapter/pom.xml 
b/samoa-s4/samoa-s4-adapter/pom.xml
new file mode 100644
index 0000000..031fb8a
--- /dev/null
+++ b/samoa-s4/samoa-s4-adapter/pom.xml
@@ -0,0 +1,52 @@
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2013 Yahoo! Inc.
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  #L%
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <modelVersion>4.0.0</modelVersion>
+
+       <!-- PARENT MODULE SAMOA-S4 
+       <parent>
+               <groupId>com.yahoo.labs.bcn.samoa</groupId>
+               <artifactId>samoa-s4</artifactId>
+               <version>0.1</version>
+       </parent>
+       -->
+
+       <!-- SAMOA-S4-ADAPTER MODUEL -->
+       <artifactId>samoa-s4-adapter</artifactId>
+       <groupId>com.yahoo.labs.bcn.samoa</groupId>
+       <version>0.1</version>
+       <name>samoa-s4-adapter</name>
+       <description>Adapter module to connect to external stream and also to 
provide entrance processing items for SAMOA</description>
+
+       <dependencies>
+               <!-- dependency>
+                       <artifactId>samoa-api</artifactId>
+                       <groupId>com.yahoo.labs.bcn.samoa</groupId>
+                       <version>0.1</version>
+               </dependency> -->
+               <dependency>
+                       <artifactId>samoa-s4</artifactId>
+                       <groupId>com.yahoo.labs.bcn.samoa</groupId>
+                       <version>0.1</version>
+               </dependency>
+       </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4AdapterApp.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4AdapterApp.java
 
b/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4AdapterApp.java
new file mode 100644
index 0000000..df30172
--- /dev/null
+++ 
b/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4AdapterApp.java
@@ -0,0 +1,45 @@
+package samoa.topology.adapter;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.s4.core.adapter.AdapterApp;
+
+import samoa.sandbox.SourceProcessor;
+import samoa.streams.StreamSourceProcessor;
+
+public class S4AdapterApp extends AdapterApp {
+
+       S4EntranceProcessingItem entrancePI;
+       StreamSourceProcessor sourceProcessor;
+       
+       @Override
+       protected void onInit() {
+               entrancePI = new S4EntranceProcessingItem(this);
+               sourceProcessor = new StreamSourceProcessor();
+               entrancePI.setProcessor(sourceProcessor);
+       }
+       
+       @Override
+       protected void onStart() {
+               
+       }
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4EntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4EntranceProcessingItem.java
 
b/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4EntranceProcessingItem.java
new file mode 100644
index 0000000..656c304
--- /dev/null
+++ 
b/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/S4EntranceProcessingItem.java
@@ -0,0 +1,74 @@
+package samoa.topology.adapter;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.s4.core.App;
+import org.apache.s4.core.ProcessingElement;
+
+import samoa.core.Processor;
+import samoa.topology.EntranceProcessingItem;
+import samoa.topology.impl.DoTaskApp;
+import weka.core.Instance;
+
+public class S4EntranceProcessingItem extends ProcessingElement implements 
EntranceProcessingItem {
+
+       private Processor processor;
+       //DoTaskApp app;
+       
+       
+       public S4EntranceProcessingItem(App app){
+               super(app);
+               //this.app = (DoTaskApp) app;
+               this.setSingleton(true);
+               
+       }
+
+       @Override
+       public Processor getProcessor() {
+               return this.processor;
+       }
+
+       @Override
+       public void put(Instance inst) {
+               // do nothing
+               //may not needed
+
+       }
+
+       @Override
+       protected void onCreate() {
+               
+               //              if (this.processor != null){
+//                     this.processor = 
this.processor.newProcessor(this.processor);
+//                     this.processor.onCreate(Integer.parseInt(getId()));
+//             }
+       }
+
+       @Override
+       protected void onRemove() {
+               //do nothing
+               
+       }
+       
+       public void setProcessor(Processor processor){
+               this.processor = processor;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/package-info.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/package-info.java
 
b/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/package-info.java
new file mode 100644
index 0000000..81fd612
--- /dev/null
+++ 
b/samoa-s4/samoa-s4-adapter/src/main/java/samoa/topology/adapter/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * 
+ */
+/**
+ * @author severien
+ *
+ */
+package samoa.topology.adapter;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */

Reply via email to