http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java
new file mode 100644
index 0000000..7c4769e
--- /dev/null
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSamoaUtils.java
@@ -0,0 +1,108 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.github.javacliparser.ClassOption;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.yahoo.labs.samoa.tasks.Task;
+
+/**
+ * Utility class for samoa-storm project. It is used by StormDoTask to process 
its arguments.
+ * @author Arinto Murdopo
+ *
+ */
+public class StormSamoaUtils {
+       
+       private static final Logger logger = 
LoggerFactory.getLogger(StormSamoaUtils.class);
+
+       static final String KEY_FIELD = "key";
+       static final String CONTENT_EVENT_FIELD = "content_event";
+               
+       static Properties getProperties() throws IOException{
+               Properties props = new Properties();
+               InputStream is;
+               
+               File f = new 
File("src/main/resources/samoa-storm-cluster.properties"); // FIXME it does not 
exist anymore
+               is = new FileInputStream(f);
+               
+               try {
+                       props.load(is);
+               } catch (IOException e1) {
+                       System.out.println("Fail to load property file");
+                       return null;
+               } finally{
+                       is.close();
+               }
+               
+               return props;
+       }
+       
+       public static StormTopology argsToTopology(String[] args){
+               StringBuilder cliString = new StringBuilder();
+               for (String arg : args) {
+                       cliString.append(" ").append(arg);
+               }
+               logger.debug("Command line string = {}", cliString.toString());
+
+               Task task = getTask(cliString.toString());
+               
+               //TODO: remove setFactory method with DynamicBinding
+               task.setFactory(new StormComponentFactory());
+               task.init();
+
+               return (StormTopology)task.getTopology();
+       }
+       
+       public static int numWorkers(List<String> tmpArgs){
+               int position = tmpArgs.size() - 1;
+               int numWorkers;
+               
+               try {
+                       numWorkers = Integer.parseInt(tmpArgs.get(position));
+                       tmpArgs.remove(position);
+               } catch (NumberFormatException e) {
+                       numWorkers = 4;
+               }
+               
+               return numWorkers;
+       }
+
+    public static Task getTask(String cliString) {
+        Task task = null;
+        try {
+            logger.debug("Providing task [{}]", cliString);
+            task = ClassOption.cliStringToObject(cliString, Task.class, null);
+        } catch (Exception e) {
+            logger.warn("Fail in initializing the task!");
+            e.printStackTrace();
+        }
+        return task;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java
new file mode 100644
index 0000000..d066e42
--- /dev/null
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormSpoutStream.java
@@ -0,0 +1,65 @@
+//package com.yahoo.labs.samoa.topology.impl;
+//
+///*
+// * #%L
+// * SAMOA
+// * %%
+// * Copyright (C) 2013 Yahoo! Inc.
+// * %%
+// * Licensed under the Apache License, Version 2.0 (the "License");
+// * you may not use this file except in compliance with the License.
+// * You may obtain a copy of the License at
+// * 
+// *      http://www.apache.org/licenses/LICENSE-2.0
+// * 
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// * #L%
+// */
+//
+//import com.yahoo.labs.samoa.core.ContentEvent;
+//import 
com.yahoo.labs.samoa.topology.impl.StormEntranceProcessingItem.StormEntranceSpout;
+//
+///**
+// * Storm Stream that connects into Spout. It wraps the spout itself
+// * @author Arinto Murdopo
+// *
+// */
+//final class StormSpoutStream extends StormStream{
+//
+//     /**
+//      * 
+//      */
+//     private static final long serialVersionUID = -7444653177614988650L;
+//     
+//     private StormEntranceSpout spout;
+//     
+//     StormSpoutStream(String stormComponentId) {
+//             super(stormComponentId);
+//     }
+//
+//     @Override
+//     public void put(ContentEvent contentEvent) {
+//             spout.put(this, contentEvent);
+//     }
+//     
+//    void setSpout(StormEntranceSpout spout){
+//             this.spout = spout;
+//     }
+//
+////   @Override
+////   public void setStreamId(String stream) {
+////           // TODO Auto-generated method stub
+////           
+////   }
+//
+//     @Override
+//     public String getStreamId() {
+//             // TODO Auto-generated method stub
+//             return null;
+//     }
+//
+//}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java
new file mode 100644
index 0000000..f67ab19
--- /dev/null
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormStream.java
@@ -0,0 +1,85 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.UUID;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.topology.Stream;
+
+/**
+ * Abstract class to implement Storm Stream
+ * @author Arinto Murdopo
+ *
+ */
+abstract class StormStream implements Stream, java.io.Serializable {
+               
+       /**
+        * 
+        */
+       private static final long serialVersionUID = 281835563756514852L;
+       protected final String outputStreamId;
+       protected final InputStreamId inputStreamId;
+       
+       public StormStream(String stormComponentId){
+               this.outputStreamId = UUID.randomUUID().toString();
+               this.inputStreamId = new InputStreamId(stormComponentId, 
this.outputStreamId);
+       }
+       
+       @Override
+       public abstract void put(ContentEvent contentEvent);
+       
+       String getOutputId(){
+               return this.outputStreamId;
+       }
+       
+       InputStreamId getInputId(){
+               return this.inputStreamId;
+       }
+       
+       final static class InputStreamId implements java.io.Serializable{
+               
+               /**
+                * 
+                */
+               private static final long serialVersionUID = 
-7457995634133691295L;
+               private final String componentId;
+               private final String streamId;
+               
+               InputStreamId(String componentId, String streamId){
+                       this.componentId = componentId;
+                       this.streamId = streamId;
+               }
+               
+               String getComponentId(){
+                       return componentId;
+               }
+               
+               String getStreamId(){
+                       return streamId;
+               }
+       }
+       
+       @Override
+       public void setBatchSize(int batchSize) {
+               // Ignore batch size
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java
new file mode 100644
index 0000000..7a49d8b
--- /dev/null
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopology.java
@@ -0,0 +1,52 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import backtype.storm.topology.TopologyBuilder;
+
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+import com.yahoo.labs.samoa.topology.AbstractTopology;
+
+/**
+ * Adaptation of SAMOA topology in samoa-storm
+ * @author Arinto Murdopo
+ *
+ */
+public class StormTopology extends AbstractTopology {
+       
+       private TopologyBuilder builder;
+       
+       public StormTopology(String topologyName){
+               super(topologyName);
+               this.builder = new TopologyBuilder();
+       }
+       
+       @Override
+       public void addProcessingItem(IProcessingItem procItem, int 
parallelismHint){
+               StormTopologyNode stormNode = (StormTopologyNode) procItem;
+               stormNode.addToTopology(this, parallelismHint);
+               super.addProcessingItem(procItem, parallelismHint);
+       }
+       
+       public TopologyBuilder getStormBuilder(){
+               return builder;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java
new file mode 100644
index 0000000..07fccbf
--- /dev/null
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologyNode.java
@@ -0,0 +1,34 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+/**
+ * Interface to represent a node in samoa-storm topology.
+ * @author Arinto Murdopo
+ *
+ */
+interface StormTopologyNode {
+
+       void addToTopology(StormTopology topology, int parallelismHint);
+       StormStream createStream();
+       String getId();
+       
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java
 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java
new file mode 100644
index 0000000..1e1b048
--- /dev/null
+++ 
b/samoa-storm/src/main/java/com/yahoo/labs/samoa/topology/impl/StormTopologySubmitter.java
@@ -0,0 +1,133 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.thrift7.TException;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+/**
+ * Helper class to submit SAMOA task into Storm without the need of submitting 
the jar file.
+ * The jar file must be submitted first using StormJarSubmitter class.
+ * @author Arinto Murdopo
+ *
+ */
+public class StormTopologySubmitter {
+       
+       public static String YJP_OPTIONS_KEY="YjpOptions";
+       
+       private static Logger logger = 
LoggerFactory.getLogger(StormTopologySubmitter.class);
+               
+       public static void main(String[] args) throws IOException{
+               Properties props = StormSamoaUtils.getProperties();
+               
+               String uploadedJarLocation = 
props.getProperty(StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY);
+               if(uploadedJarLocation == null){
+                       logger.error("Invalid properties file. It must have key 
{}", 
+                                       
StormJarSubmitter.UPLOADED_JAR_LOCATION_KEY);
+                       return;
+               }
+               
+               List<String> tmpArgs = new 
ArrayList<String>(Arrays.asList(args));
+               int numWorkers = StormSamoaUtils.numWorkers(tmpArgs);
+               
+               args = tmpArgs.toArray(new String[0]);
+               StormTopology stormTopo = StormSamoaUtils.argsToTopology(args);
+
+               Config conf = new Config();
+               conf.putAll(Utils.readStormConfig());
+               conf.putAll(Utils.readCommandLineOpts());
+               conf.setDebug(false);
+               conf.setNumWorkers(numWorkers);
+               
+               String profilerOption = 
+                               
props.getProperty(StormTopologySubmitter.YJP_OPTIONS_KEY);
+               if(profilerOption != null){
+                       String topoWorkerChildOpts =  (String) 
conf.get(Config.TOPOLOGY_WORKER_CHILDOPTS);
+                       StringBuilder optionBuilder = new StringBuilder();
+                       if(topoWorkerChildOpts != null){
+                               optionBuilder.append(topoWorkerChildOpts);      
+                               optionBuilder.append(' ');
+                       }
+                       optionBuilder.append(profilerOption);
+                       conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, 
optionBuilder.toString());
+               }
+
+               Map<String, Object> myConfigMap = new HashMap<String, 
Object>(conf);
+               StringWriter out = new StringWriter();
+
+               try {
+                       JSONValue.writeJSONString(myConfigMap, out);
+               } catch (IOException e) {
+                       System.out.println("Error in writing JSONString");
+                       e.printStackTrace();
+                       return;
+               }
+               
+               Config config = new Config();
+               config.putAll(Utils.readStormConfig());
+               
+               String nimbusHost = (String) config.get(Config.NIMBUS_HOST);
+                               
+               NimbusClient nc = new NimbusClient(nimbusHost);
+               String topologyName = stormTopo.getTopologyName();
+               try {
+                       System.out.println("Submitting topology with name: " 
+                                       + topologyName);
+                       nc.getClient().submitTopology(topologyName, 
uploadedJarLocation,
+                                       out.toString(), 
stormTopo.getStormBuilder().createTopology());
+                       System.out.println(topologyName + " is successfully 
submitted");
+
+               } catch (AlreadyAliveException aae) {
+                       System.out.println("Fail to submit " + topologyName
+                                       + "\nError message: " + aae.get_msg());
+               } catch (InvalidTopologyException ite) {
+                       System.out.println("Invalid topology for " + 
topologyName);
+                       ite.printStackTrace();
+               } catch (TException te) {
+                       System.out.println("Texception for " + topologyName);
+                       te.printStackTrace();
+               }               
+       }
+       
+       private static String uploadedJarLocation(List<String> tmpArgs){
+               int position = tmpArgs.size() - 1;
+               String uploadedJarLocation = tmpArgs.get(position);
+               tmpArgs.remove(position);
+               return uploadedJarLocation;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
----------------------------------------------------------------------
diff --git a/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java 
b/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
new file mode 100644
index 0000000..15b80b5
--- /dev/null
+++ b/samoa-storm/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
@@ -0,0 +1,68 @@
+package com.yahoo.labs.samoa;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.junit.Test;
+
+public class AlgosTest {
+
+
+    @Test(timeout = 60000)
+    public void testVHTWithStorm() throws Exception {
+
+        TestParams vhtConfig = new TestParams.Builder()
+                .inputInstances(200_000)
+                .samplingSize(20_000)
+                .evaluationInstances(200_000)
+                .classifiedInstances(200_000)
+                .classificationsCorrect(55f)
+                .kappaStat(0f)
+                .kappaTempStat(0f)
+                
.cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE)
+                .resultFilePollTimeout(30)
+                .prePollWait(15)
+                .taskClassName(LocalStormDoTask.class.getName())
+                .build();
+        TestUtils.test(vhtConfig);
+
+    }
+
+    @Test(timeout = 120000)
+    public void testBaggingWithStorm() throws Exception {
+        TestParams baggingConfig = new TestParams.Builder()
+                .inputInstances(200_000)
+                .samplingSize(20_000)
+                .evaluationInstances(180_000)
+                .classifiedInstances(190_000)
+                .classificationsCorrect(60f)
+                .kappaStat(0f)
+                .kappaTempStat(0f)
+                
.cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE)
+                .resultFilePollTimeout(40)
+                .prePollWait(20)
+                .taskClassName(LocalStormDoTask.class.getName())
+                .build();
+        TestUtils.test(baggingConfig);
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java
 
b/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java
new file mode 100644
index 0000000..ec8929a
--- /dev/null
+++ 
b/samoa-storm/src/test/java/com/yahoo/labs/samoa/topology/impl/StormProcessingItemTest.java
@@ -0,0 +1,78 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.assertEquals;
+import mockit.Expectations;
+import mockit.MockUp;
+import mockit.Mocked;
+import mockit.Tested;
+import mockit.Verifications;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.TopologyBuilder;
+
+import com.yahoo.labs.samoa.core.Processor;
+
+public class StormProcessingItemTest {
+    private static final int PARRALLELISM_HINT_2 = 2;
+    private static final int PARRALLELISM_HINT_4 = 4;
+    private static final String ID = "id";
+    @Tested private StormProcessingItem pi;
+    @Mocked private Processor processor;
+    @Mocked private StormTopology topology;
+    @Mocked private TopologyBuilder stormBuilder = new TopologyBuilder();
+
+    @Before
+    public void setUp() {
+        pi = new StormProcessingItem(processor, ID, PARRALLELISM_HINT_2);
+    }
+
+    @Test
+    public void testAddToTopology() {
+        new Expectations() {
+            {
+                topology.getStormBuilder();
+                result = stormBuilder;
+
+                stormBuilder.setBolt(ID, (IRichBolt) any, anyInt);
+                result = new MockUp<BoltDeclarer>() {
+                }.getMockInstance();
+            }
+        };
+
+        pi.addToTopology(topology, PARRALLELISM_HINT_4); // this parallelism 
hint is ignored
+
+        new Verifications() {
+            {
+                assertEquals(pi.getProcessor(), processor);
+                // TODO add methods to explore a topology and verify them
+                assertEquals(pi.getParallelism(), PARRALLELISM_HINT_2);
+                assertEquals(pi.getId(), ID);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-test/README.md
----------------------------------------------------------------------
diff --git a/samoa-test/README.md b/samoa-test/README.md
new file mode 100644
index 0000000..63719ef
--- /dev/null
+++ b/samoa-test/README.md
@@ -0,0 +1,14 @@
+This module contains a test framework for simplifying regression testing of 
Samoa algorithms on various platforms.
+
+The test framework is generic and reusable for multiple platforms. The 
platform modules that make use of the test framework add a maven dependency to 
a test-jar artifact of the samoa-test module. This test-jar artifact includes 
the test framework classes and its dependencies. 
+
+For defining tests, we reuse the code from the test framework but customize 
tests according to the platform capabilities.
+
+For each algorithm to test, we must provide :
+
+* the task class for the platform
+* the algorithm (referring to the provided string templates in this module)
+* the input parameters
+* the expectations (thresholds or values)
+
+See existing code in samo-local, samoa-threads and samoa-storm for some 
examples.

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-test/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-test/pom.xml b/samoa-test/pom.xml
new file mode 100644
index 0000000..2ee103b
--- /dev/null
+++ b/samoa-test/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>samoa</artifactId>
+        <groupId>com.yahoo.labs.samoa</groupId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>samoa-test</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-csv</artifactId>
+            <version>1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>2.4</version>
+        </dependency>
+        <dependency>
+            <groupId>com.yahoo.labs.samoa</groupId>
+            <artifactId>samoa-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.4</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptors>
+                        
<descriptor>src/main/assembly/test-jar-with-dependencies.xml</descriptor>
+                    </descriptors>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-test/src/main/assembly/test-jar-with-dependencies.xml
----------------------------------------------------------------------
diff --git a/samoa-test/src/main/assembly/test-jar-with-dependencies.xml 
b/samoa-test/src/main/assembly/test-jar-with-dependencies.xml
new file mode 100644
index 0000000..51465cc
--- /dev/null
+++ b/samoa-test/src/main/assembly/test-jar-with-dependencies.xml
@@ -0,0 +1,19 @@
+<assembly 
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
+    <id>test-jar-with-dependencies</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <!-- we're creating the test-jar as an attachement -->
+            <useProjectAttachments>true</useProjectAttachments>
+            <useTransitiveDependencies>false</useTransitiveDependencies>
+            <unpack>true</unpack>
+        </dependencySet>
+    </dependencySets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java
----------------------------------------------------------------------
diff --git a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java 
b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java
new file mode 100644
index 0000000..08ad94f
--- /dev/null
+++ b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java
@@ -0,0 +1,235 @@
+package com.yahoo.labs.samoa;
+
+public class TestParams {
+
+    /**
+     *  templates that take the following parameters:
+     *  <ul>
+     *      <li>the output file location as an argument (-d),
+     *      <li>the maximum number of instances for testing/training (-i)
+     *      <li>the sampling size (-f)
+     *      <li>the delay in ms between input instances (-w) , default is zero
+     *  </ul>
+     *  as well as the maximum number of instances for testing/training (-i) 
and the sampling size (-f)
+     */
+    public static class Templates {
+
+        public final static String PREQEVAL_VHT_RANDOMTREE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
+                + "-l 
(com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree -p 4) " +
+                "-s 
(com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 10 -u 
10)";
+
+        public final static String PREQEVAL_NAIVEBAYES_HYPERPLANE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
+                + "-l (classifiers.SingleClassifier -l 
com.yahoo.labs.samoa.learners.classifiers.NaiveBayes) " +
+                "-s 
(com.yahoo.labs.samoa.moa.streams.generators.HyperplaneGenerator -c 2)";
+
+        // setting the number of nominal attributes to zero significantly 
reduces the processing time,
+        // so that it's acceptable in a test case
+        public final static String PREQEVAL_BAGGING_RANDOMTREE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
+                + "-l 
(com.yahoo.labs.samoa.learners.classifiers.ensemble.Bagging) " +
+                "-s 
(com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator -c 2 -o 0 -u 
10)";
+
+    }
+
+
+    public static final String EVALUATION_INSTANCES = "evaluation instances";
+    public static final String CLASSIFIED_INSTANCES = "classified instances";
+    public static final String CLASSIFICATIONS_CORRECT = "classifications 
correct (percent)";
+    public static final String KAPPA_STAT = "Kappa Statistic (percent)";
+    public static final String KAPPA_TEMP_STAT = "Kappa Temporal Statistic 
(percent)";
+
+
+    private long inputInstances;
+    private long samplingSize;
+    private long evaluationInstances;
+    private long classifiedInstances;
+    private float classificationsCorrect;
+    private float kappaStat;
+    private float kappaTempStat;
+    private String cliStringTemplate;
+    private int pollTimeoutSeconds;
+    private final int prePollWait;
+    private int inputDelayMicroSec;
+    private String taskClassName;
+
+    private TestParams(String taskClassName,
+                       long inputInstances,
+                       long samplingSize,
+                       long evaluationInstances,
+                       long classifiedInstances,
+                       float classificationsCorrect,
+                       float kappaStat,
+                       float kappaTempStat,
+                       String cliStringTemplate,
+                       int pollTimeoutSeconds,
+                       int prePollWait,
+                       int inputDelayMicroSec) {
+        this.taskClassName = taskClassName;
+        this.inputInstances = inputInstances;
+        this.samplingSize = samplingSize;
+        this.evaluationInstances = evaluationInstances;
+        this.classifiedInstances = classifiedInstances;
+        this.classificationsCorrect = classificationsCorrect;
+        this.kappaStat = kappaStat;
+        this.kappaTempStat = kappaTempStat;
+        this.cliStringTemplate = cliStringTemplate;
+        this.pollTimeoutSeconds = pollTimeoutSeconds;
+        this.prePollWait = prePollWait;
+        this.inputDelayMicroSec = inputDelayMicroSec;
+    }
+
+    public String getTaskClassName() {
+        return taskClassName;
+    }
+
+    public long getInputInstances() {
+        return inputInstances;
+    }
+
+    public long getSamplingSize() {
+        return samplingSize;
+    }
+
+    public int getPollTimeoutSeconds() {
+        return pollTimeoutSeconds;
+    }
+
+    public int getPrePollWaitSeconds() {
+        return prePollWait;
+    }
+
+    public String getCliStringTemplate() {
+        return cliStringTemplate;
+    }
+
+    public long getEvaluationInstances() {
+        return evaluationInstances;
+    }
+
+    public long getClassifiedInstances() {
+        return classifiedInstances;
+    }
+
+    public float getClassificationsCorrect() {
+        return classificationsCorrect;
+    }
+
+    public float getKappaStat() {
+        return kappaStat;
+    }
+
+    public float getKappaTempStat() {
+        return kappaTempStat;
+    }
+
+    public int getInputDelayMicroSec() {
+        return inputDelayMicroSec;
+    }
+
+    @Override
+    public String toString() {
+        return "TestParams{\n" +
+                "inputInstances=" + inputInstances + "\n" +
+                "samplingSize=" + samplingSize + "\n" +
+                "evaluationInstances=" + evaluationInstances + "\n" +
+                "classifiedInstances=" + classifiedInstances + "\n" +
+                "classificationsCorrect=" + classificationsCorrect + "\n" +
+                "kappaStat=" + kappaStat + "\n" +
+                "kappaTempStat=" + kappaTempStat + "\n" +
+                "cliStringTemplate='" + cliStringTemplate + '\'' + "\n" +
+                "pollTimeoutSeconds=" + pollTimeoutSeconds + "\n" +
+                "prePollWait=" + prePollWait + "\n" +
+                "taskClassName='" + taskClassName + '\'' + "\n" +
+                "inputDelayMicroSec=" + inputDelayMicroSec + "\n" +
+                '}';
+    }
+
+    public static class Builder {
+        private long inputInstances;
+        private long samplingSize;
+        private long evaluationInstances;
+        private long classifiedInstances;
+        private float classificationsCorrect;
+        private float kappaStat =0f;
+        private float kappaTempStat =0f;
+        private String cliStringTemplate;
+        private int pollTimeoutSeconds = 10;
+        private int prePollWaitSeconds = 10;
+        private String taskClassName;
+        private int inputDelayMicroSec = 0;
+
+        public Builder taskClassName(String taskClassName) {
+            this.taskClassName = taskClassName;
+            return this;
+        }
+
+        public Builder inputInstances(long inputInstances) {
+            this.inputInstances = inputInstances;
+            return this;
+        }
+
+        public Builder samplingSize(long samplingSize) {
+            this.samplingSize = samplingSize;
+            return this;
+        }
+
+        public Builder evaluationInstances(long evaluationInstances) {
+            this.evaluationInstances = evaluationInstances;
+            return this;
+        }
+
+        public Builder classifiedInstances(long classifiedInstances) {
+            this.classifiedInstances = classifiedInstances;
+            return this;
+        }
+
+        public Builder classificationsCorrect(float classificationsCorrect) {
+            this.classificationsCorrect = classificationsCorrect;
+            return this;
+        }
+
+        public Builder kappaStat(float kappaStat) {
+            this.kappaStat = kappaStat;
+            return this;
+        }
+
+        public Builder kappaTempStat(float kappaTempStat) {
+            this.kappaTempStat = kappaTempStat;
+            return this;
+        }
+
+        public Builder cliStringTemplate(String cliStringTemplate) {
+            this.cliStringTemplate = cliStringTemplate;
+            return this;
+        }
+
+        public Builder resultFilePollTimeout(int pollTimeoutSeconds) {
+            this.pollTimeoutSeconds = pollTimeoutSeconds;
+            return this;
+        }
+
+        public Builder inputDelayMicroSec(int inputDelayMicroSec) {
+            this.inputDelayMicroSec = inputDelayMicroSec;
+            return this;
+        }
+
+        public Builder prePollWait(int prePollWaitSeconds) {
+            this.prePollWaitSeconds = prePollWaitSeconds;
+            return this;
+        }
+
+        public TestParams build() {
+            return new TestParams(taskClassName,
+                    inputInstances,
+                    samplingSize,
+                    evaluationInstances,
+                    classifiedInstances,
+                    classificationsCorrect,
+                    kappaStat,
+                    kappaTempStat,
+                    cliStringTemplate,
+                    pollTimeoutSeconds,
+                    prePollWaitSeconds,
+                    inputDelayMicroSec);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java
----------------------------------------------------------------------
diff --git a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java 
b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java
new file mode 100644
index 0000000..d66f5df
--- /dev/null
+++ b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java
@@ -0,0 +1,153 @@
+package com.yahoo.labs.samoa;/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.io.input.Tailer;
+import org.apache.commons.io.input.TailerListenerAdapter;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestUtils.class.getName());
+
+
+    public static void test(final TestParams testParams) throws IOException, 
ClassNotFoundException, NoSuchMethodException, InvocationTargetException, 
IllegalAccessException, InterruptedException {
+
+        final File tempFile = File.createTempFile("test", "test");
+
+        LOG.info("Starting test, output file is {}, test config is \n{}", 
tempFile.getAbsolutePath(), testParams.toString());
+
+        Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
+
+            @Override
+            public Void call() throws Exception {
+                try {
+                    Class.forName(testParams.getTaskClassName())
+                            .getMethod("main", String[].class)
+                            .invoke(null, (Object) String.format(
+                                    testParams.getCliStringTemplate(),
+                                    tempFile.getAbsolutePath(),
+                                    testParams.getInputInstances(),
+                                    testParams.getSamplingSize(),
+                                    testParams.getInputDelayMicroSec()
+                                    ).split("[ ]"));
+                } catch (Exception e) {
+                    LOG.error("Cannot execute test {} {}", e.getMessage(), 
e.getCause().getMessage());
+                }
+                return null;
+            }
+        });
+
+        
Thread.sleep(TimeUnit.SECONDS.toMillis(testParams.getPrePollWaitSeconds()));
+
+        CountDownLatch signalComplete = new CountDownLatch(1);
+
+        final Tailer tailer = Tailer.create(tempFile, new 
TestResultsTailerAdapter(signalComplete), 1000);
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                tailer.run();
+            }
+        }).start();
+
+        signalComplete.await();
+        tailer.stop();
+
+        assertResults(tempFile, testParams);
+    }
+
+    public static void assertResults(File outputFile, 
com.yahoo.labs.samoa.TestParams testParams) throws IOException {
+
+        LOG.info("Checking results file " + outputFile.getAbsolutePath());
+        // 1. parse result file with csv parser
+        Reader in = new FileReader(outputFile);
+        Iterable<CSVRecord> records = 
CSVFormat.EXCEL.withSkipHeaderRecord(false)
+                
.withIgnoreEmptyLines(true).withDelimiter(',').withCommentMarker('#').parse(in);
+        CSVRecord last = null;
+        Iterator<CSVRecord> iterator = records.iterator();
+        CSVRecord header = iterator.next();
+        Assert.assertEquals("Invalid number of columns", 5, header.size());
+
+        Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.EVALUATION_INSTANCES, header.get(0).trim());
+        Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.CLASSIFIED_INSTANCES, header.get(1).trim());
+        Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.CLASSIFICATIONS_CORRECT, header.get(2).trim());
+        Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.KAPPA_STAT, header.get(3).trim());
+        Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.KAPPA_TEMP_STAT, header.get(4).trim());
+
+        // 2. check last line result
+        while (iterator.hasNext()) {
+            last = iterator.next();
+        }
+
+        assertTrue(String.format("Unmet threshold expected %d got %f",
+                testParams.getEvaluationInstances(), 
Float.parseFloat(last.get(0))),
+                testParams.getEvaluationInstances() <= 
Float.parseFloat(last.get(0)));
+        assertTrue(String.format("Unmet threshold expected %d got %f", 
testParams.getClassifiedInstances(),
+                Float.parseFloat(last.get(1))),
+                testParams.getClassifiedInstances() <= 
Float.parseFloat(last.get(1)));
+        assertTrue(String.format("Unmet threshold expected %f got %f",
+                testParams.getClassificationsCorrect(), 
Float.parseFloat(last.get(2))),
+                testParams.getClassificationsCorrect() <= 
Float.parseFloat(last.get(2)));
+        assertTrue(String.format("Unmet threshold expected %f got %f",
+                testParams.getKappaStat(), Float.parseFloat(last.get(3))),
+                testParams.getKappaStat() <= Float.parseFloat(last.get(3)));
+        assertTrue(String.format("Unmet threshold expected %f got %f",
+                testParams.getKappaTempStat(), Float.parseFloat(last.get(4))),
+                testParams.getKappaTempStat() <= 
Float.parseFloat(last.get(4)));
+
+    }
+
+
+    private static class TestResultsTailerAdapter extends 
TailerListenerAdapter {
+
+        private final CountDownLatch signalComplete;
+
+        public TestResultsTailerAdapter(CountDownLatch signalComplete) {
+            this.signalComplete = signalComplete;
+        }
+
+        @Override
+        public void handle(String line) {
+            if ("# COMPLETED".equals(line.trim())) {
+                signalComplete.countDown();
+            }
+        }
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-threads/pom.xml b/samoa-threads/pom.xml
new file mode 100644
index 0000000..c4a6fb4
--- /dev/null
+++ b/samoa-threads/pom.xml
@@ -0,0 +1,112 @@
+<!--
+  #%L
+  SAMOA
+  %%
+  Copyright (C) 2013 Yahoo! Inc.
+  %%
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  #L%
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <name>samoa-threads</name>
+    <description>Multithreading local engine for SAMOA</description>
+
+    <artifactId>samoa-threads</artifactId>
+    <parent>
+        <groupId>com.yahoo.labs.samoa</groupId>
+        <artifactId>samoa</artifactId>
+        <version>0.3.0-SNAPSHOT</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.yahoo.labs.samoa</groupId>
+            <artifactId>samoa-api</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.yahoo.labs.samoa</groupId>
+            <artifactId>samoa-test</artifactId>
+            <type>test-jar</type>
+            <classifier>test-jar-with-dependencies</classifier>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>${slf4j-simple.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- SAMOA assembly -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>${maven-assembly-plugin.version}</version>
+                <configuration>
+                    <finalName>SAMOA-Threads-${project.version}</finalName>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <attach>false</attach>
+                    <outputDirectory>../target</outputDirectory>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <archive>
+                        <manifestEntries>
+                            
<Bundle-Version>${parsedVersion.osgiVersion}</Bundle-Version>
+                            
<Bundle-Description>${project.description}</Bundle-Description>
+                            
<Implementation-Version>${project.version}</Implementation-Version>
+                            <Implementation-Vendor>Yahoo 
Labs</Implementation-Vendor>
+                            
<Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id>
+                        </manifestEntries>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id> <!-- this is used for 
inheritance merges -->
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>${maven-surefire-plugin.version}</version>
+                <configuration>
+                    <argLine>-Xmx1G</argLine>
+                    <redirectTestOutputToFile>false</redirectTestOutputToFile>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java
new file mode 100644
index 0000000..21ccf9e
--- /dev/null
+++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java
@@ -0,0 +1,70 @@
+package com.yahoo.labs.samoa;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.ClassOption;
+import com.yahoo.labs.samoa.tasks.Task;
+import com.yahoo.labs.samoa.topology.impl.ThreadsComponentFactory;
+import com.yahoo.labs.samoa.topology.impl.ThreadsEngine;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+public class LocalThreadsDoTask {
+    private static final Logger logger = 
LoggerFactory.getLogger(LocalThreadsDoTask.class);
+
+    /**
+     * The main method.
+     * 
+     * @param args
+     *            the arguments
+     */
+    public static void main(String[] args) {
+
+        ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+        
+        // Get number of threads for multithreading mode
+        int numThreads = 1;
+        for (int i=0; i<tmpArgs.size()-1; i++) {
+               if (tmpArgs.get(i).equals("-t")) {
+                       try {
+                               numThreads = Integer.parseInt(tmpArgs.get(i+1));
+                               tmpArgs.remove(i+1);
+                               tmpArgs.remove(i);
+                       } catch (NumberFormatException e) {
+                               System.err.println("Invalid number of 
threads.");
+                               System.err.println(e.getStackTrace());
+                       }
+               }
+        }
+        logger.info("Number of threads:{}", numThreads);
+        
+        args = tmpArgs.toArray(new String[0]);
+
+        StringBuilder cliString = new StringBuilder();
+        for (int i = 0; i < args.length; i++) {
+            cliString.append(" ").append(args[i]);
+        }
+        logger.debug("Command line string = {}", cliString.toString());
+        System.out.println("Command line string = " + cliString.toString());
+
+        Task task = null;
+        try {
+            task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, null);
+            logger.info("Sucessfully instantiating {}", 
task.getClass().getCanonicalName());
+        } catch (Exception e) {
+            logger.error("Fail to initialize the task", e);
+            System.out.println("Fail to initialize the task" + e);
+            return;
+        }
+        task.setFactory(new ThreadsComponentFactory());
+        task.init();
+        
+        ThreadsEngine.submitTopology(task.getTopology(), numThreads);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java
new file mode 100644
index 0000000..ac68da2
--- /dev/null
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java
@@ -0,0 +1,64 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.ComponentFactory;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.Topology;
+
+/**
+ * ComponentFactory for multithreaded engine
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsComponentFactory implements ComponentFactory {
+
+       @Override
+       public ProcessingItem createPi(Processor processor) {
+               return this.createPi(processor, 1);
+       }
+
+       @Override
+       public ProcessingItem createPi(Processor processor, int paralellism) {
+               return new ThreadsProcessingItem(processor, paralellism);
+       }
+
+       @Override
+       public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
+               return new ThreadsEntranceProcessingItem(entranceProcessor);
+       }
+
+       @Override
+       public Stream createStream(IProcessingItem sourcePi) {
+               return new ThreadsStream(sourcePi);
+       }
+
+       @Override
+       public Topology createTopology(String topoName) {
+               return new ThreadsTopology(topoName);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java
new file mode 100644
index 0000000..d442572
--- /dev/null
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java
@@ -0,0 +1,100 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.topology.Topology;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Multithreaded engine.
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsEngine {
+       
+       private static final List<ExecutorService> threadPool = new 
ArrayList<ExecutorService>();
+       
+       /*
+        * Create and manage threads
+        */
+       public static void setNumberOfThreads(int numThreads) {
+               if (numThreads < 1)
+                       throw new IllegalStateException("Number of threads must 
be a positive integer.");
+               
+               if (threadPool.size() > numThreads)
+                       throw new IllegalStateException("You cannot set a 
numThreads smaller than the current size of the threads pool.");
+               
+               if (threadPool.size() < numThreads) {
+                       for (int i=threadPool.size(); i<numThreads; i++) {
+                               
threadPool.add(Executors.newSingleThreadExecutor());
+                       }
+               }
+       }
+       
+       public static int getNumberOfThreads() {
+               return threadPool.size();
+       }
+       
+       public static ExecutorService getThreadWithIndex(int index) {
+               if (threadPool.size() <= 0 )
+                       throw new IllegalStateException("Try to get 
ExecutorService from an empty pool.");
+               index %= threadPool.size();
+               return threadPool.get(index);
+       }
+       
+       /*
+        * Submit topology and start
+        */
+       private static void submitTopology(Topology topology) {
+               ThreadsTopology tTopology = (ThreadsTopology) topology;
+               tTopology.run();
+       }
+       
+       public static void submitTopology(Topology topology, int numThreads) {
+               ThreadsEngine.setNumberOfThreads(numThreads);
+               ThreadsEngine.submitTopology(topology);
+       }
+       
+       /* 
+        * Stop
+        */
+       public static void clearThreadPool() {
+               for (ExecutorService pool:threadPool) {
+                       pool.shutdown();
+               }
+               
+               for (ExecutorService pool:threadPool) {
+                       try {
+                               pool.awaitTermination(10, TimeUnit.SECONDS);
+                       } catch (InterruptedException e) {
+                               e.printStackTrace();
+                       }
+               }
+               
+               threadPool.clear();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java
new file mode 100644
index 0000000..008efb6
--- /dev/null
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java
@@ -0,0 +1,40 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.topology.LocalEntranceProcessingItem;
+
+/**
+ * EntranceProcessingItem for multithreaded engine.
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsEntranceProcessingItem extends LocalEntranceProcessingItem 
{
+       
+       public ThreadsEntranceProcessingItem(EntranceProcessor processor) {
+        super(processor);
+    }
+    
+    // The default waiting time when there is no available events is 100ms
+    // Override waitForNewEvents() to change it
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java
new file mode 100644
index 0000000..7cb8c18
--- /dev/null
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java
@@ -0,0 +1,61 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+
+/**
+ * Runnable class where each object corresponds to a ContentEvent and an 
assigned PI. 
+ * When a PI receives a ContentEvent, it will create a ThreadsEventRunnable 
with the received ContentEvent
+ * and an assigned workerPI. This runnable is then submitted to a thread queue 
waiting to be executed.
+ * The worker PI will process the received event when the runnable object is 
executed/run.
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsEventRunnable implements Runnable {
+
+       private ThreadsProcessingItemInstance workerPi;
+       private ContentEvent event;
+       
+       public ThreadsEventRunnable(ThreadsProcessingItemInstance workerPi, 
ContentEvent event) {
+               this.workerPi = workerPi;
+               this.event = event;
+       }
+       
+       public ThreadsProcessingItemInstance getWorkerProcessingItem() {
+               return this.workerPi;
+       }
+       
+       public ContentEvent getContentEvent() {
+               return this.event;
+       }
+       
+       @Override
+       public void run() {
+               try {
+                       workerPi.processEvent(event);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java
new file mode 100644
index 0000000..5eb6174
--- /dev/null
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java
@@ -0,0 +1,101 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.AbstractProcessingItem;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import com.yahoo.labs.samoa.utils.StreamDestination;
+
+/**
+ * ProcessingItem for multithreaded engine.
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsProcessingItem extends AbstractProcessingItem {
+       // Replicas of the ProcessingItem.
+       // When ProcessingItem receives an event, it assigns one
+       // of these replicas to process the event.
+       private List<ThreadsProcessingItemInstance> piInstances;
+       
+       // Each replica of ProcessingItem is assigned to one of the
+       // available threads in a round-robin fashion, i.e.: each 
+       // replica is associated with the index of a thread. 
+       // Each ProcessingItem has a random offset variable so that
+       // the allocation of PI replicas to threads are spread evenly
+       // among all threads.
+       private int offset;
+       
+       /*
+        * Constructor
+        */
+       public ThreadsProcessingItem(Processor processor, int parallelismHint) {
+               super(processor, parallelismHint);
+               this.offset = (int) 
(Math.random()*ThreadsEngine.getNumberOfThreads());
+       }
+       
+       public List<ThreadsProcessingItemInstance> getProcessingItemInstances() 
{
+               return this.piInstances;
+       }
+
+       /*
+        * Connects to streams
+        */
+       @Override
+    protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+               StreamDestination destination = new StreamDestination(this, 
this.getParallelism(), scheme);
+               ((ThreadsStream) inputStream).addDestination(destination);
+               return this;
+       }
+
+       /*
+        * Process the received event.
+        */
+       public void processEvent(ContentEvent event, int counter) {
+               if (this.piInstances == null || this.piInstances.size() < 
this.getParallelism())
+                       throw new 
IllegalStateException("ThreadsWorkerProcessingItem(s) need to be setup before 
process any event (i.e. in ThreadsTopology.start()).");
+               
+               ThreadsProcessingItemInstance piInstance = 
this.piInstances.get(counter);
+               ThreadsEventRunnable runnable = new 
ThreadsEventRunnable(piInstance, event);
+               
ThreadsEngine.getThreadWithIndex(piInstance.getThreadIndex()).submit(runnable);
+       }
+       
+       /*
+        * Setup the replicas of this PI. 
+        * This should be called after the topology is set up (all Processors 
and PIs are
+        * setup and connected to the respective streams) and before events are 
sent.
+        */
+       public void setupInstances() {
+               this.piInstances = new 
ArrayList<ThreadsProcessingItemInstance>(this.getParallelism());
+               for (int i=0; i<this.getParallelism(); i++) {
+                       Processor newProcessor = 
this.getProcessor().newProcessor(this.getProcessor());
+                       newProcessor.onCreate(i + 1);
+                       this.piInstances.add(new 
ThreadsProcessingItemInstance(newProcessor, this.offset + i));
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java
new file mode 100644
index 0000000..9a400d1
--- /dev/null
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java
@@ -0,0 +1,54 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+
+/**
+ * Lightweight replicas of ThreadProcessingItem.
+ * ThreadsProcessingItem manages a list of these objects and 
+ * assigns each incoming message to be processed by one of them. 
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsProcessingItemInstance {
+
+       private Processor processor;
+       private int threadIndex;
+       
+       public ThreadsProcessingItemInstance(Processor processor, int 
threadIndex) {
+               this.processor = processor;
+               this.threadIndex = threadIndex;
+       }
+       
+       public int getThreadIndex() {
+               return this.threadIndex;
+       }
+       
+       public Processor getProcessor() {
+               return this.processor;
+       }
+
+       public void processEvent(ContentEvent event) {
+               this.processor.process(event);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java
new file mode 100644
index 0000000..5aa86f7
--- /dev/null
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java
@@ -0,0 +1,106 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+import com.yahoo.labs.samoa.topology.AbstractStream;
+import com.yahoo.labs.samoa.utils.StreamDestination;
+
+/**
+ * Stream for multithreaded engine.
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsStream extends AbstractStream {
+       
+       private List<StreamDestination> destinations;
+       private int counter = 0;
+       private int maxCounter = 1;
+       
+       public ThreadsStream(IProcessingItem sourcePi) {
+               destinations = new LinkedList<StreamDestination>();
+       }
+       
+       public void addDestination(StreamDestination destination) {
+               destinations.add(destination);
+               maxCounter *= destination.getParallelism();
+       }
+       
+       public List<StreamDestination> getDestinations() {
+               return this.destinations;
+       }
+       
+       private int getNextCounter() {
+       if (maxCounter > 0 && counter >= maxCounter) counter = 0;
+       this.counter++;
+       return this.counter;
+    }
+
+    @Override
+    public synchronized void put(ContentEvent event) {
+       this.put(event, this.getNextCounter());
+    }
+    
+    private void put(ContentEvent event, int counter) {
+       ThreadsProcessingItem pi;
+        int parallelism;
+        for (StreamDestination destination:destinations) {
+            pi = (ThreadsProcessingItem) destination.getProcessingItem();
+            parallelism = destination.getParallelism();
+            switch (destination.getPartitioningScheme()) {
+            case SHUFFLE:
+               pi.processEvent(event, counter%parallelism);
+                break;
+            case GROUP_BY_KEY:
+               pi.processEvent(event, getPIIndexForKey(event.getKey(), 
parallelism));
+                break;
+            case BROADCAST:
+               for (int p = 0; p < parallelism; p++) {
+                    pi.processEvent(event, p);
+                }
+                break;
+            }
+        }
+    }
+       
+       private static int getPIIndexForKey(String key, int parallelism) {
+               // If key is null, return a default index: 0
+               if (key == null) return 0;
+               
+               // HashCodeBuilder object does not have reset() method
+       // So all objects that get appended will be included in the 
+       // computation of the hashcode. 
+       // To avoid initialize a HashCodeBuilder for each event,
+       // here I use the static method with reflection on the event's key
+               int index = HashCodeBuilder.reflectionHashCode(key, true) % 
parallelism;
+               if (index < 0) {
+                       index += parallelism;
+               }
+               return index;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java
new file mode 100644
index 0000000..fc9f885
--- /dev/null
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java
@@ -0,0 +1,62 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import com.yahoo.labs.samoa.topology.AbstractTopology;
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+
+/**
+ * Topology for multithreaded engine.
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsTopology extends AbstractTopology {
+       ThreadsTopology(String name) {
+               super(name);
+       }
+
+       public void run() {
+       if (this.getEntranceProcessingItems() == null)
+               throw new IllegalStateException("You need to set entrance PI 
before running the topology.");
+       if (this.getEntranceProcessingItems().size() != 1)
+               throw new IllegalStateException("ThreadsTopology supports 1 
entrance PI only. Number of entrance PIs is 
"+this.getEntranceProcessingItems().size());
+       
+       this.setupProcessingItemInstances();
+       ThreadsEntranceProcessingItem entrancePi = 
(ThreadsEntranceProcessingItem) this.getEntranceProcessingItems().toArray()[0];
+       if (entrancePi == null)
+            throw new IllegalStateException("You need to set entrance PI 
before running the topology.");
+       entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in 
simple mode
+        entrancePi.startSendingEvents();
+    }
+       
+       /*
+        * Tell all the ThreadsProcessingItems to create & init their
+        * replicas (ThreadsProcessingItemInstance)
+        */
+       private void setupProcessingItemInstances() {
+               for (IProcessingItem pi:this.getProcessingItems()) {
+                       if (pi instanceof ThreadsProcessingItem) {
+                               ThreadsProcessingItem tpi = 
(ThreadsProcessingItem) pi;
+                               tpi.setupInstances();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
----------------------------------------------------------------------
diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
new file mode 100644
index 0000000..5979d46
--- /dev/null
+++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
@@ -0,0 +1,68 @@
+package com.yahoo.labs.samoa;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.junit.Test;
+
+public class AlgosTest {
+
+    @Test(timeout = 60000)
+    public void testVHTWithThreads() throws Exception {
+
+        TestParams vhtConfig = new TestParams.Builder()
+                .inputInstances(200_000)
+                .samplingSize(20_000)
+                .evaluationInstances(200_000)
+                .classifiedInstances(200_000)
+                .classificationsCorrect(55f)
+                .kappaStat(-0.1f)
+                .kappaTempStat(-0.1f)
+                
.cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE + " -t 2")
+                .resultFilePollTimeout(10)
+                .prePollWait(10)
+                .taskClassName(LocalThreadsDoTask.class.getName())
+                .build();
+        TestUtils.test(vhtConfig);
+
+    }
+
+    @Test(timeout = 180000)
+    public void testBaggingWithThreads() throws Exception {
+        TestParams baggingConfig = new TestParams.Builder()
+                .inputInstances(100_000)
+                .samplingSize(10_000)
+                .inputDelayMicroSec(100) // prevents saturating the system due 
to unbounded queues
+                .evaluationInstances(90_000)
+                .classifiedInstances(105_000)
+                .classificationsCorrect(55f)
+                .kappaStat(0f)
+                .kappaTempStat(0f)
+                
.cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE + " -t 2")
+                .prePollWait(10)
+                .resultFilePollTimeout(30)
+                .taskClassName(LocalThreadsDoTask.class.getName())
+                .build();
+        TestUtils.test(baggingConfig);
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java
new file mode 100644
index 0000000..eee8639
--- /dev/null
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java
@@ -0,0 +1,114 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import mockit.Tested;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+import com.yahoo.labs.samoa.topology.ProcessingItem;
+import com.yahoo.labs.samoa.topology.Stream;
+import com.yahoo.labs.samoa.topology.Topology;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsComponentFactoryTest {
+       @Tested private ThreadsComponentFactory factory;
+       @Mocked private Processor processor, processorReplica;
+       @Mocked private EntranceProcessor entranceProcessor;
+       
+       private final int parallelism = 3;
+       private final String topoName = "TestTopology";
+       
+
+       @Before
+       public void setUp() throws Exception {
+               factory = new ThreadsComponentFactory();
+       }
+
+       @Test
+       public void testCreatePiNoParallelism() {
+               new NonStrictExpectations() {
+                       {
+                               processor.newProcessor(processor);
+                               result=processorReplica;
+                       }
+               };
+               ProcessingItem pi = factory.createPi(processor);
+               assertNotNull("ProcessingItem created is null.",pi);
+               assertEquals("ProcessingItem created is not a 
ThreadsProcessingItem.",ThreadsProcessingItem.class,pi.getClass());
+               assertEquals("Parallelism of PI is not 
1",1,pi.getParallelism(),0);
+       }
+       
+       @Test
+       public void testCreatePiWithParallelism() {
+               new NonStrictExpectations() {
+                       {
+                               processor.newProcessor(processor);
+                               result=processorReplica;
+                       }
+               };
+               ProcessingItem pi = factory.createPi(processor,parallelism);
+               assertNotNull("ProcessingItem created is null.",pi);
+               assertEquals("ProcessingItem created is not a 
ThreadsProcessingItem.",ThreadsProcessingItem.class,pi.getClass());
+               assertEquals("Parallelism of PI is not 
",parallelism,pi.getParallelism(),0);
+       }
+       
+       @Test
+       public void testCreateStream() {
+               new NonStrictExpectations() {
+                       {
+                               processor.newProcessor(processor);
+                               result=processorReplica;
+                       }
+               };
+               ProcessingItem pi = factory.createPi(processor);
+               
+               Stream stream = factory.createStream(pi);
+               assertNotNull("Stream created is null",stream);
+               assertEquals("Stream created is not a 
ThreadsStream.",ThreadsStream.class,stream.getClass());
+       }
+       
+       @Test
+       public void testCreateTopology() {
+               Topology topology = factory.createTopology(topoName);
+               assertNotNull("Topology created is null.",topology);
+               assertEquals("Topology created is not a 
ThreadsTopology.",ThreadsTopology.class,topology.getClass());
+       }
+       
+       @Test
+       public void testCreateEntrancePi() {
+               EntranceProcessingItem entrancePi = 
factory.createEntrancePi(entranceProcessor);
+               assertNotNull("EntranceProcessingItem created is 
null.",entrancePi);
+               assertEquals("EntranceProcessingItem created is not a 
ThreadsEntranceProcessingItem.",ThreadsEntranceProcessingItem.class,entrancePi.getClass());
+               assertSame("EntranceProcessor is not set 
correctly.",entranceProcessor, entrancePi.getProcessor());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java
new file mode 100644
index 0000000..cdb8949
--- /dev/null
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java
@@ -0,0 +1,129 @@
+package com.yahoo.labs.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.Verifications;
+
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsEngineTest {
+
+       @Mocked ThreadsTopology topology;
+       
+       private final int numThreads = 4;
+       private final int numThreadsSmaller = 3;
+       private final int numThreadsLarger = 5;
+
+       @After
+       public void cleanup() {
+               ThreadsEngine.clearThreadPool();
+       }
+       
+       @Test
+       public void testSetNumberOfThreadsSimple() {
+               ThreadsEngine.setNumberOfThreads(numThreads);
+               assertEquals("Number of threads is not set correctly.", 
numThreads,
+                               ThreadsEngine.getNumberOfThreads(),0);
+       }
+       
+       @Test
+       public void testSetNumberOfThreadsRepeat() {
+               ThreadsEngine.setNumberOfThreads(numThreads);
+               ThreadsEngine.setNumberOfThreads(numThreads);
+               assertEquals("Number of threads is not set correctly.", 
numThreads,
+                               ThreadsEngine.getNumberOfThreads(),0);
+       }
+       
+       @Test
+       public void testSetNumberOfThreadsIncrease() {
+               ThreadsEngine.setNumberOfThreads(numThreads);
+               ThreadsEngine.setNumberOfThreads(numThreadsLarger);
+               assertEquals("Number of threads is not set correctly.", 
numThreadsLarger,
+                               ThreadsEngine.getNumberOfThreads(),0);
+       }
+       
+       @Test(expected=IllegalStateException.class)
+       public void testSetNumberOfThreadsDecrease() {
+               ThreadsEngine.setNumberOfThreads(numThreads);
+               ThreadsEngine.setNumberOfThreads(numThreadsSmaller);
+               // Exception expected
+       }
+       
+       @Test(expected=IllegalStateException.class)
+       public void testSetNumberOfThreadsNegative() {
+               ThreadsEngine.setNumberOfThreads(-1);
+               // Exception expected
+       }
+       
+       @Test(expected=IllegalStateException.class)
+       public void testSetNumberOfThreadsZero() {
+               ThreadsEngine.setNumberOfThreads(0);
+               // Exception expected
+       }
+       
+       @Test
+       public void testClearThreadPool() {
+               ThreadsEngine.setNumberOfThreads(numThreads);
+               ThreadsEngine.clearThreadPool();
+               assertEquals("ThreadsEngine was not shutdown properly.", 0, 
ThreadsEngine.getNumberOfThreads());
+       }
+
+       @Test
+       public void testGetThreadWithIndexWithinPoolSize() {
+               ThreadsEngine.setNumberOfThreads(numThreads);
+               for (int i=0; i<numThreads; i++) {
+                       assertNotNull("ExecutorService is not initialized 
correctly.", ThreadsEngine.getThreadWithIndex(i));
+               }
+       }
+       
+       @Test
+       public void testGetThreadWithIndexOutOfPoolSize() {
+               ThreadsEngine.setNumberOfThreads(numThreads);
+               for (int i=0; i<numThreads+3; i++) {
+                       assertNotNull("ExecutorService is not initialized 
correctly.", ThreadsEngine.getThreadWithIndex(i));
+               }
+       }
+       
+       @Test(expected=IllegalStateException.class)
+       public void testGetThreadWithIndexFromEmptyPool() {
+               for (int i=0; i<numThreads; i++) {
+                       ThreadsEngine.getThreadWithIndex(i);
+               }
+       }
+
+       @Test
+       public void testSubmitTopology() {
+               ThreadsEngine.submitTopology(topology, numThreads);
+               new Verifications() {{
+                   topology.run(); times=1;
+               }};
+               assertEquals("Number of threads is not set correctly.", 
numThreads,
+                               ThreadsEngine.getNumberOfThreads(),0);
+       }
+
+}

Reply via email to