http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-storm/src/test/java/org/apache/samoa/topology/impl/StormProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-storm/src/test/java/org/apache/samoa/topology/impl/StormProcessingItemTest.java
 
b/samoa-storm/src/test/java/org/apache/samoa/topology/impl/StormProcessingItemTest.java
new file mode 100644
index 0000000..4673903
--- /dev/null
+++ 
b/samoa-storm/src/test/java/org/apache/samoa/topology/impl/StormProcessingItemTest.java
@@ -0,0 +1,83 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.assertEquals;
+import mockit.Expectations;
+import mockit.MockUp;
+import mockit.Mocked;
+import mockit.Tested;
+import mockit.Verifications;
+
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.impl.StormProcessingItem;
+import org.apache.samoa.topology.impl.StormTopology;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.TopologyBuilder;
+
+public class StormProcessingItemTest {
+  private static final int PARRALLELISM_HINT_2 = 2;
+  private static final int PARRALLELISM_HINT_4 = 4;
+  private static final String ID = "id";
+  @Tested
+  private StormProcessingItem pi;
+  @Mocked
+  private Processor processor;
+  @Mocked
+  private StormTopology topology;
+  @Mocked
+  private TopologyBuilder stormBuilder = new TopologyBuilder();
+
+  @Before
+  public void setUp() {
+    pi = new StormProcessingItem(processor, ID, PARRALLELISM_HINT_2);
+  }
+
+  @Test
+  public void testAddToTopology() {
+    new Expectations() {
+      {
+        topology.getStormBuilder();
+        result = stormBuilder;
+
+        stormBuilder.setBolt(ID, (IRichBolt) any, anyInt);
+        result = new MockUp<BoltDeclarer>() {
+        }.getMockInstance();
+      }
+    };
+
+    pi.addToTopology(topology, PARRALLELISM_HINT_4); // this parallelism hint 
is ignored
+
+    new Verifications() {
+      {
+        assertEquals(pi.getProcessor(), processor);
+        // TODO add methods to explore a topology and verify them
+        assertEquals(pi.getParallelism(), PARRALLELISM_HINT_2);
+        assertEquals(pi.getId(), ID);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-test/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-test/pom.xml b/samoa-test/pom.xml
index 901c10e..8d33917 100644
--- a/samoa-test/pom.xml
+++ b/samoa-test/pom.xml
@@ -23,7 +23,7 @@
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <parent>
     <artifactId>samoa</artifactId>
-    <groupId>com.yahoo.labs.samoa</groupId>
+    <groupId>org.apache.samoa</groupId>
     <version>0.3.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
@@ -41,7 +41,7 @@
       <version>2.4</version>
     </dependency>
     <dependency>
-      <groupId>com.yahoo.labs.samoa</groupId>
+      <groupId>org.apache.samoa</groupId>
       <artifactId>samoa-api</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java
----------------------------------------------------------------------
diff --git a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java 
b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java
deleted file mode 100644
index 1cc0ba1..0000000
--- a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestParams.java
+++ /dev/null
@@ -1,234 +0,0 @@
-package com.yahoo.labs.samoa;
-
-public class TestParams {
-
-  /**
-   * templates that take the following parameters:
-   * <ul>
-   * <li>the output file location as an argument (-d),
-   * <li>the maximum number of instances for testing/training (-i)
-   * <li>the sampling size (-f)
-   * <li>the delay in ms between input instances (-w) , default is zero
-   * </ul>
-   * as well as the maximum number of instances for testing/training (-i) and 
the sampling size (-f)
-   */
-  public static class Templates {
-
-    public final static String PREQEVAL_VHT_RANDOMTREE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
-        + "-l 
(com.yahoo.labs.samoa.learners.classifiers.trees.VerticalHoeffdingTree -p 4) " +
-        "-s (com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator 
-c 2 -o 10 -u 10)";
-
-    public final static String PREQEVAL_NAIVEBAYES_HYPERPLANE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
-        + "-l (classifiers.SingleClassifier -l 
com.yahoo.labs.samoa.learners.classifiers.NaiveBayes) " +
-        "-s (com.yahoo.labs.samoa.moa.streams.generators.HyperplaneGenerator 
-c 2)";
-
-    // setting the number of nominal attributes to zero significantly reduces
-    // the processing time,
-    // so that it's acceptable in a test case
-    public final static String PREQEVAL_BAGGING_RANDOMTREE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
-        + "-l (com.yahoo.labs.samoa.learners.classifiers.ensemble.Bagging) " +
-        "-s (com.yahoo.labs.samoa.moa.streams.generators.RandomTreeGenerator 
-c 2 -o 0 -u 10)";
-
-  }
-
-  public static final String EVALUATION_INSTANCES = "evaluation instances";
-  public static final String CLASSIFIED_INSTANCES = "classified instances";
-  public static final String CLASSIFICATIONS_CORRECT = "classifications 
correct (percent)";
-  public static final String KAPPA_STAT = "Kappa Statistic (percent)";
-  public static final String KAPPA_TEMP_STAT = "Kappa Temporal Statistic 
(percent)";
-
-  private long inputInstances;
-  private long samplingSize;
-  private long evaluationInstances;
-  private long classifiedInstances;
-  private float classificationsCorrect;
-  private float kappaStat;
-  private float kappaTempStat;
-  private String cliStringTemplate;
-  private int pollTimeoutSeconds;
-  private final int prePollWait;
-  private int inputDelayMicroSec;
-  private String taskClassName;
-
-  private TestParams(String taskClassName,
-      long inputInstances,
-      long samplingSize,
-      long evaluationInstances,
-      long classifiedInstances,
-      float classificationsCorrect,
-      float kappaStat,
-      float kappaTempStat,
-      String cliStringTemplate,
-      int pollTimeoutSeconds,
-      int prePollWait,
-      int inputDelayMicroSec) {
-    this.taskClassName = taskClassName;
-    this.inputInstances = inputInstances;
-    this.samplingSize = samplingSize;
-    this.evaluationInstances = evaluationInstances;
-    this.classifiedInstances = classifiedInstances;
-    this.classificationsCorrect = classificationsCorrect;
-    this.kappaStat = kappaStat;
-    this.kappaTempStat = kappaTempStat;
-    this.cliStringTemplate = cliStringTemplate;
-    this.pollTimeoutSeconds = pollTimeoutSeconds;
-    this.prePollWait = prePollWait;
-    this.inputDelayMicroSec = inputDelayMicroSec;
-  }
-
-  public String getTaskClassName() {
-    return taskClassName;
-  }
-
-  public long getInputInstances() {
-    return inputInstances;
-  }
-
-  public long getSamplingSize() {
-    return samplingSize;
-  }
-
-  public int getPollTimeoutSeconds() {
-    return pollTimeoutSeconds;
-  }
-
-  public int getPrePollWaitSeconds() {
-    return prePollWait;
-  }
-
-  public String getCliStringTemplate() {
-    return cliStringTemplate;
-  }
-
-  public long getEvaluationInstances() {
-    return evaluationInstances;
-  }
-
-  public long getClassifiedInstances() {
-    return classifiedInstances;
-  }
-
-  public float getClassificationsCorrect() {
-    return classificationsCorrect;
-  }
-
-  public float getKappaStat() {
-    return kappaStat;
-  }
-
-  public float getKappaTempStat() {
-    return kappaTempStat;
-  }
-
-  public int getInputDelayMicroSec() {
-    return inputDelayMicroSec;
-  }
-
-  @Override
-  public String toString() {
-    return "TestParams{\n" +
-        "inputInstances=" + inputInstances + "\n" +
-        "samplingSize=" + samplingSize + "\n" +
-        "evaluationInstances=" + evaluationInstances + "\n" +
-        "classifiedInstances=" + classifiedInstances + "\n" +
-        "classificationsCorrect=" + classificationsCorrect + "\n" +
-        "kappaStat=" + kappaStat + "\n" +
-        "kappaTempStat=" + kappaTempStat + "\n" +
-        "cliStringTemplate='" + cliStringTemplate + '\'' + "\n" +
-        "pollTimeoutSeconds=" + pollTimeoutSeconds + "\n" +
-        "prePollWait=" + prePollWait + "\n" +
-        "taskClassName='" + taskClassName + '\'' + "\n" +
-        "inputDelayMicroSec=" + inputDelayMicroSec + "\n" +
-        '}';
-  }
-
-  public static class Builder {
-    private long inputInstances;
-    private long samplingSize;
-    private long evaluationInstances;
-    private long classifiedInstances;
-    private float classificationsCorrect;
-    private float kappaStat = 0f;
-    private float kappaTempStat = 0f;
-    private String cliStringTemplate;
-    private int pollTimeoutSeconds = 10;
-    private int prePollWaitSeconds = 10;
-    private String taskClassName;
-    private int inputDelayMicroSec = 0;
-
-    public Builder taskClassName(String taskClassName) {
-      this.taskClassName = taskClassName;
-      return this;
-    }
-
-    public Builder inputInstances(long inputInstances) {
-      this.inputInstances = inputInstances;
-      return this;
-    }
-
-    public Builder samplingSize(long samplingSize) {
-      this.samplingSize = samplingSize;
-      return this;
-    }
-
-    public Builder evaluationInstances(long evaluationInstances) {
-      this.evaluationInstances = evaluationInstances;
-      return this;
-    }
-
-    public Builder classifiedInstances(long classifiedInstances) {
-      this.classifiedInstances = classifiedInstances;
-      return this;
-    }
-
-    public Builder classificationsCorrect(float classificationsCorrect) {
-      this.classificationsCorrect = classificationsCorrect;
-      return this;
-    }
-
-    public Builder kappaStat(float kappaStat) {
-      this.kappaStat = kappaStat;
-      return this;
-    }
-
-    public Builder kappaTempStat(float kappaTempStat) {
-      this.kappaTempStat = kappaTempStat;
-      return this;
-    }
-
-    public Builder cliStringTemplate(String cliStringTemplate) {
-      this.cliStringTemplate = cliStringTemplate;
-      return this;
-    }
-
-    public Builder resultFilePollTimeout(int pollTimeoutSeconds) {
-      this.pollTimeoutSeconds = pollTimeoutSeconds;
-      return this;
-    }
-
-    public Builder inputDelayMicroSec(int inputDelayMicroSec) {
-      this.inputDelayMicroSec = inputDelayMicroSec;
-      return this;
-    }
-
-    public Builder prePollWait(int prePollWaitSeconds) {
-      this.prePollWaitSeconds = prePollWaitSeconds;
-      return this;
-    }
-
-    public TestParams build() {
-      return new TestParams(taskClassName,
-          inputInstances,
-          samplingSize,
-          evaluationInstances,
-          classifiedInstances,
-          classificationsCorrect,
-          kappaStat,
-          kappaTempStat,
-          cliStringTemplate,
-          pollTimeoutSeconds,
-          prePollWaitSeconds,
-          inputDelayMicroSec);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java
----------------------------------------------------------------------
diff --git a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java 
b/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java
deleted file mode 100644
index ae226c7..0000000
--- a/samoa-test/src/test/java/com/yahoo/labs/samoa/TestUtils.java
+++ /dev/null
@@ -1,152 +0,0 @@
-package com.yahoo.labs.samoa;/*
-* #%L
-* SAMOA
-* %%
-* Copyright (C) 2013 - 2014 Yahoo! Inc.
-* %%
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*      http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-* #L%
-*/
-
-import org.apache.commons.csv.CSVFormat;
-import org.apache.commons.csv.CSVRecord;
-import org.apache.commons.io.input.Tailer;
-import org.apache.commons.io.input.TailerListenerAdapter;
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.Reader;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Iterator;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestUtils {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestUtils.class.getName());
-
-  public static void test(final TestParams testParams) throws IOException, 
ClassNotFoundException,
-      NoSuchMethodException, InvocationTargetException, 
IllegalAccessException, InterruptedException {
-
-    final File tempFile = File.createTempFile("test", "test");
-
-    LOG.info("Starting test, output file is {}, test config is \n{}", 
tempFile.getAbsolutePath(), testParams.toString());
-
-    Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
-
-      @Override
-      public Void call() throws Exception {
-        try {
-          Class.forName(testParams.getTaskClassName())
-              .getMethod("main", String[].class)
-              .invoke(null, (Object) String.format(
-                  testParams.getCliStringTemplate(),
-                  tempFile.getAbsolutePath(),
-                  testParams.getInputInstances(),
-                  testParams.getSamplingSize(),
-                  testParams.getInputDelayMicroSec()
-                  ).split("[ ]"));
-        } catch (Exception e) {
-          LOG.error("Cannot execute test {} {}", e.getMessage(), 
e.getCause().getMessage());
-        }
-        return null;
-      }
-    });
-
-    
Thread.sleep(TimeUnit.SECONDS.toMillis(testParams.getPrePollWaitSeconds()));
-
-    CountDownLatch signalComplete = new CountDownLatch(1);
-
-    final Tailer tailer = Tailer.create(tempFile, new 
TestResultsTailerAdapter(signalComplete), 1000);
-    new Thread(new Runnable() {
-      @Override
-      public void run() {
-        tailer.run();
-      }
-    }).start();
-
-    signalComplete.await();
-    tailer.stop();
-
-    assertResults(tempFile, testParams);
-  }
-
-  public static void assertResults(File outputFile, 
com.yahoo.labs.samoa.TestParams testParams) throws IOException {
-
-    LOG.info("Checking results file " + outputFile.getAbsolutePath());
-    // 1. parse result file with csv parser
-    Reader in = new FileReader(outputFile);
-    Iterable<CSVRecord> records = CSVFormat.EXCEL.withSkipHeaderRecord(false)
-        
.withIgnoreEmptyLines(true).withDelimiter(',').withCommentMarker('#').parse(in);
-    CSVRecord last = null;
-    Iterator<CSVRecord> iterator = records.iterator();
-    CSVRecord header = iterator.next();
-    Assert.assertEquals("Invalid number of columns", 5, header.size());
-
-    Assert
-        .assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.EVALUATION_INSTANCES, header.get(0).trim());
-    Assert
-        .assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.CLASSIFIED_INSTANCES, header.get(1).trim());
-    Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.CLASSIFICATIONS_CORRECT, header.get(2)
-        .trim());
-    Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.KAPPA_STAT, header.get(3).trim());
-    Assert.assertEquals("Unexpected column", 
com.yahoo.labs.samoa.TestParams.KAPPA_TEMP_STAT, header.get(4).trim());
-
-    // 2. check last line result
-    while (iterator.hasNext()) {
-      last = iterator.next();
-    }
-
-    assertTrue(String.format("Unmet threshold expected %d got %f",
-        testParams.getEvaluationInstances(), Float.parseFloat(last.get(0))),
-        testParams.getEvaluationInstances() <= Float.parseFloat(last.get(0)));
-    assertTrue(String.format("Unmet threshold expected %d got %f", 
testParams.getClassifiedInstances(),
-        Float.parseFloat(last.get(1))),
-        testParams.getClassifiedInstances() <= Float.parseFloat(last.get(1)));
-    assertTrue(String.format("Unmet threshold expected %f got %f",
-        testParams.getClassificationsCorrect(), Float.parseFloat(last.get(2))),
-        testParams.getClassificationsCorrect() <= 
Float.parseFloat(last.get(2)));
-    assertTrue(String.format("Unmet threshold expected %f got %f",
-        testParams.getKappaStat(), Float.parseFloat(last.get(3))),
-        testParams.getKappaStat() <= Float.parseFloat(last.get(3)));
-    assertTrue(String.format("Unmet threshold expected %f got %f",
-        testParams.getKappaTempStat(), Float.parseFloat(last.get(4))),
-        testParams.getKappaTempStat() <= Float.parseFloat(last.get(4)));
-
-  }
-
-  private static class TestResultsTailerAdapter extends TailerListenerAdapter {
-
-    private final CountDownLatch signalComplete;
-
-    public TestResultsTailerAdapter(CountDownLatch signalComplete) {
-      this.signalComplete = signalComplete;
-    }
-
-    @Override
-    public void handle(String line) {
-      if ("# COMPLETED".equals(line.trim())) {
-        signalComplete.countDown();
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-test/src/test/java/org/apache/samoa/TestParams.java
----------------------------------------------------------------------
diff --git a/samoa-test/src/test/java/org/apache/samoa/TestParams.java 
b/samoa-test/src/test/java/org/apache/samoa/TestParams.java
new file mode 100644
index 0000000..e217922
--- /dev/null
+++ b/samoa-test/src/test/java/org/apache/samoa/TestParams.java
@@ -0,0 +1,234 @@
+package org.apache.samoa;
+
+public class TestParams {
+
+  /**
+   * templates that take the following parameters:
+   * <ul>
+   * <li>the output file location as an argument (-d),
+   * <li>the maximum number of instances for testing/training (-i)
+   * <li>the sampling size (-f)
+   * <li>the delay in ms between input instances (-w) , default is zero
+   * </ul>
+   * as well as the maximum number of instances for testing/training (-i) and 
the sampling size (-f)
+   */
+  public static class Templates {
+
+    public final static String PREQEVAL_VHT_RANDOMTREE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
+        + "-l 
(org.apache.samoa.learners.classifiers.trees.VerticalHoeffdingTree -p 4) " +
+        "-s (org.apache.samoa.moa.streams.generators.RandomTreeGenerator -c 2 
-o 10 -u 10)";
+
+    public final static String PREQEVAL_NAIVEBAYES_HYPERPLANE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
+        + "-l (classifiers.SingleClassifier -l 
org.apache.samoa.learners.classifiers.NaiveBayes) " +
+        "-s (org.apache.samoa.moa.streams.generators.HyperplaneGenerator -c 
2)";
+
+    // setting the number of nominal attributes to zero significantly reduces
+    // the processing time,
+    // so that it's acceptable in a test case
+    public final static String PREQEVAL_BAGGING_RANDOMTREE = 
"PrequentialEvaluation -d %s -i %d -f %d -w %d "
+        + "-l (org.apache.samoa.learners.classifiers.ensemble.Bagging) " +
+        "-s (org.apache.samoa.moa.streams.generators.RandomTreeGenerator -c 2 
-o 0 -u 10)";
+
+  }
+
+  public static final String EVALUATION_INSTANCES = "evaluation instances";
+  public static final String CLASSIFIED_INSTANCES = "classified instances";
+  public static final String CLASSIFICATIONS_CORRECT = "classifications 
correct (percent)";
+  public static final String KAPPA_STAT = "Kappa Statistic (percent)";
+  public static final String KAPPA_TEMP_STAT = "Kappa Temporal Statistic 
(percent)";
+
+  private long inputInstances;
+  private long samplingSize;
+  private long evaluationInstances;
+  private long classifiedInstances;
+  private float classificationsCorrect;
+  private float kappaStat;
+  private float kappaTempStat;
+  private String cliStringTemplate;
+  private int pollTimeoutSeconds;
+  private final int prePollWait;
+  private int inputDelayMicroSec;
+  private String taskClassName;
+
+  private TestParams(String taskClassName,
+      long inputInstances,
+      long samplingSize,
+      long evaluationInstances,
+      long classifiedInstances,
+      float classificationsCorrect,
+      float kappaStat,
+      float kappaTempStat,
+      String cliStringTemplate,
+      int pollTimeoutSeconds,
+      int prePollWait,
+      int inputDelayMicroSec) {
+    this.taskClassName = taskClassName;
+    this.inputInstances = inputInstances;
+    this.samplingSize = samplingSize;
+    this.evaluationInstances = evaluationInstances;
+    this.classifiedInstances = classifiedInstances;
+    this.classificationsCorrect = classificationsCorrect;
+    this.kappaStat = kappaStat;
+    this.kappaTempStat = kappaTempStat;
+    this.cliStringTemplate = cliStringTemplate;
+    this.pollTimeoutSeconds = pollTimeoutSeconds;
+    this.prePollWait = prePollWait;
+    this.inputDelayMicroSec = inputDelayMicroSec;
+  }
+
+  public String getTaskClassName() {
+    return taskClassName;
+  }
+
+  public long getInputInstances() {
+    return inputInstances;
+  }
+
+  public long getSamplingSize() {
+    return samplingSize;
+  }
+
+  public int getPollTimeoutSeconds() {
+    return pollTimeoutSeconds;
+  }
+
+  public int getPrePollWaitSeconds() {
+    return prePollWait;
+  }
+
+  public String getCliStringTemplate() {
+    return cliStringTemplate;
+  }
+
+  public long getEvaluationInstances() {
+    return evaluationInstances;
+  }
+
+  public long getClassifiedInstances() {
+    return classifiedInstances;
+  }
+
+  public float getClassificationsCorrect() {
+    return classificationsCorrect;
+  }
+
+  public float getKappaStat() {
+    return kappaStat;
+  }
+
+  public float getKappaTempStat() {
+    return kappaTempStat;
+  }
+
+  public int getInputDelayMicroSec() {
+    return inputDelayMicroSec;
+  }
+
+  @Override
+  public String toString() {
+    return "TestParams{\n" +
+        "inputInstances=" + inputInstances + "\n" +
+        "samplingSize=" + samplingSize + "\n" +
+        "evaluationInstances=" + evaluationInstances + "\n" +
+        "classifiedInstances=" + classifiedInstances + "\n" +
+        "classificationsCorrect=" + classificationsCorrect + "\n" +
+        "kappaStat=" + kappaStat + "\n" +
+        "kappaTempStat=" + kappaTempStat + "\n" +
+        "cliStringTemplate='" + cliStringTemplate + '\'' + "\n" +
+        "pollTimeoutSeconds=" + pollTimeoutSeconds + "\n" +
+        "prePollWait=" + prePollWait + "\n" +
+        "taskClassName='" + taskClassName + '\'' + "\n" +
+        "inputDelayMicroSec=" + inputDelayMicroSec + "\n" +
+        '}';
+  }
+
+  public static class Builder {
+    private long inputInstances;
+    private long samplingSize;
+    private long evaluationInstances;
+    private long classifiedInstances;
+    private float classificationsCorrect;
+    private float kappaStat = 0f;
+    private float kappaTempStat = 0f;
+    private String cliStringTemplate;
+    private int pollTimeoutSeconds = 10;
+    private int prePollWaitSeconds = 10;
+    private String taskClassName;
+    private int inputDelayMicroSec = 0;
+
+    public Builder taskClassName(String taskClassName) {
+      this.taskClassName = taskClassName;
+      return this;
+    }
+
+    public Builder inputInstances(long inputInstances) {
+      this.inputInstances = inputInstances;
+      return this;
+    }
+
+    public Builder samplingSize(long samplingSize) {
+      this.samplingSize = samplingSize;
+      return this;
+    }
+
+    public Builder evaluationInstances(long evaluationInstances) {
+      this.evaluationInstances = evaluationInstances;
+      return this;
+    }
+
+    public Builder classifiedInstances(long classifiedInstances) {
+      this.classifiedInstances = classifiedInstances;
+      return this;
+    }
+
+    public Builder classificationsCorrect(float classificationsCorrect) {
+      this.classificationsCorrect = classificationsCorrect;
+      return this;
+    }
+
+    public Builder kappaStat(float kappaStat) {
+      this.kappaStat = kappaStat;
+      return this;
+    }
+
+    public Builder kappaTempStat(float kappaTempStat) {
+      this.kappaTempStat = kappaTempStat;
+      return this;
+    }
+
+    public Builder cliStringTemplate(String cliStringTemplate) {
+      this.cliStringTemplate = cliStringTemplate;
+      return this;
+    }
+
+    public Builder resultFilePollTimeout(int pollTimeoutSeconds) {
+      this.pollTimeoutSeconds = pollTimeoutSeconds;
+      return this;
+    }
+
+    public Builder inputDelayMicroSec(int inputDelayMicroSec) {
+      this.inputDelayMicroSec = inputDelayMicroSec;
+      return this;
+    }
+
+    public Builder prePollWait(int prePollWaitSeconds) {
+      this.prePollWaitSeconds = prePollWaitSeconds;
+      return this;
+    }
+
+    public TestParams build() {
+      return new TestParams(taskClassName,
+          inputInstances,
+          samplingSize,
+          evaluationInstances,
+          classifiedInstances,
+          classificationsCorrect,
+          kappaStat,
+          kappaTempStat,
+          cliStringTemplate,
+          pollTimeoutSeconds,
+          prePollWaitSeconds,
+          inputDelayMicroSec);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-test/src/test/java/org/apache/samoa/TestUtils.java
----------------------------------------------------------------------
diff --git a/samoa-test/src/test/java/org/apache/samoa/TestUtils.java 
b/samoa-test/src/test/java/org/apache/samoa/TestUtils.java
new file mode 100644
index 0000000..320b8a8
--- /dev/null
+++ b/samoa-test/src/test/java/org/apache/samoa/TestUtils.java
@@ -0,0 +1,152 @@
+package org.apache.samoa;/*
+* #%L
+* SAMOA
+* %%
+* Copyright (C) 2013 - 2014 Yahoo! Inc.
+* %%
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* #L%
+*/
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.io.input.Tailer;
+import org.apache.commons.io.input.TailerListenerAdapter;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestUtils.class.getName());
+
+  public static void test(final TestParams testParams) throws IOException, 
ClassNotFoundException,
+      NoSuchMethodException, InvocationTargetException, 
IllegalAccessException, InterruptedException {
+
+    final File tempFile = File.createTempFile("test", "test");
+
+    LOG.info("Starting test, output file is {}, test config is \n{}", 
tempFile.getAbsolutePath(), testParams.toString());
+
+    Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
+
+      @Override
+      public Void call() throws Exception {
+        try {
+          Class.forName(testParams.getTaskClassName())
+              .getMethod("main", String[].class)
+              .invoke(null, (Object) String.format(
+                  testParams.getCliStringTemplate(),
+                  tempFile.getAbsolutePath(),
+                  testParams.getInputInstances(),
+                  testParams.getSamplingSize(),
+                  testParams.getInputDelayMicroSec()
+                  ).split("[ ]"));
+        } catch (Exception e) {
+          LOG.error("Cannot execute test {} {}", e.getMessage(), 
e.getCause().getMessage());
+        }
+        return null;
+      }
+    });
+
+    
Thread.sleep(TimeUnit.SECONDS.toMillis(testParams.getPrePollWaitSeconds()));
+
+    CountDownLatch signalComplete = new CountDownLatch(1);
+
+    final Tailer tailer = Tailer.create(tempFile, new 
TestResultsTailerAdapter(signalComplete), 1000);
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        tailer.run();
+      }
+    }).start();
+
+    signalComplete.await();
+    tailer.stop();
+
+    assertResults(tempFile, testParams);
+  }
+
+  public static void assertResults(File outputFile, 
org.apache.samoa.TestParams testParams) throws IOException {
+
+    LOG.info("Checking results file " + outputFile.getAbsolutePath());
+    // 1. parse result file with csv parser
+    Reader in = new FileReader(outputFile);
+    Iterable<CSVRecord> records = CSVFormat.EXCEL.withSkipHeaderRecord(false)
+        
.withIgnoreEmptyLines(true).withDelimiter(',').withCommentMarker('#').parse(in);
+    CSVRecord last = null;
+    Iterator<CSVRecord> iterator = records.iterator();
+    CSVRecord header = iterator.next();
+    Assert.assertEquals("Invalid number of columns", 5, header.size());
+
+    Assert
+        .assertEquals("Unexpected column", 
org.apache.samoa.TestParams.EVALUATION_INSTANCES, header.get(0).trim());
+    Assert
+        .assertEquals("Unexpected column", 
org.apache.samoa.TestParams.CLASSIFIED_INSTANCES, header.get(1).trim());
+    Assert.assertEquals("Unexpected column", 
org.apache.samoa.TestParams.CLASSIFICATIONS_CORRECT, header.get(2)
+        .trim());
+    Assert.assertEquals("Unexpected column", 
org.apache.samoa.TestParams.KAPPA_STAT, header.get(3).trim());
+    Assert.assertEquals("Unexpected column", 
org.apache.samoa.TestParams.KAPPA_TEMP_STAT, header.get(4).trim());
+
+    // 2. check last line result
+    while (iterator.hasNext()) {
+      last = iterator.next();
+    }
+
+    assertTrue(String.format("Unmet threshold expected %d got %f",
+        testParams.getEvaluationInstances(), Float.parseFloat(last.get(0))),
+        testParams.getEvaluationInstances() <= Float.parseFloat(last.get(0)));
+    assertTrue(String.format("Unmet threshold expected %d got %f", 
testParams.getClassifiedInstances(),
+        Float.parseFloat(last.get(1))),
+        testParams.getClassifiedInstances() <= Float.parseFloat(last.get(1)));
+    assertTrue(String.format("Unmet threshold expected %f got %f",
+        testParams.getClassificationsCorrect(), Float.parseFloat(last.get(2))),
+        testParams.getClassificationsCorrect() <= 
Float.parseFloat(last.get(2)));
+    assertTrue(String.format("Unmet threshold expected %f got %f",
+        testParams.getKappaStat(), Float.parseFloat(last.get(3))),
+        testParams.getKappaStat() <= Float.parseFloat(last.get(3)));
+    assertTrue(String.format("Unmet threshold expected %f got %f",
+        testParams.getKappaTempStat(), Float.parseFloat(last.get(4))),
+        testParams.getKappaTempStat() <= Float.parseFloat(last.get(4)));
+
+  }
+
+  private static class TestResultsTailerAdapter extends TailerListenerAdapter {
+
+    private final CountDownLatch signalComplete;
+
+    public TestResultsTailerAdapter(CountDownLatch signalComplete) {
+      this.signalComplete = signalComplete;
+    }
+
+    @Override
+    public void handle(String line) {
+      if ("# COMPLETED".equals(line.trim())) {
+        signalComplete.countDown();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-threads/pom.xml b/samoa-threads/pom.xml
index a7d9ab5..2634ef0 100644
--- a/samoa-threads/pom.xml
+++ b/samoa-threads/pom.xml
@@ -31,14 +31,14 @@
 
   <artifactId>samoa-threads</artifactId>
   <parent>
-    <groupId>com.yahoo.labs.samoa</groupId>
+    <groupId>org.apache.samoa</groupId>
     <artifactId>samoa</artifactId>
     <version>0.3.0-SNAPSHOT</version>
   </parent>
 
   <dependencies>
     <dependency>
-      <groupId>com.yahoo.labs.samoa</groupId>
+      <groupId>org.apache.samoa</groupId>
       <artifactId>samoa-api</artifactId>
       <version>${project.version}</version>
       <exclusions>
@@ -53,7 +53,7 @@
       </exclusions>
     </dependency>
     <dependency>
-      <groupId>com.yahoo.labs.samoa</groupId>
+      <groupId>org.apache.samoa</groupId>
       <artifactId>samoa-test</artifactId>
       <type>test-jar</type>
       <classifier>test-jar-with-dependencies</classifier>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java
deleted file mode 100644
index 2ac9ec1..0000000
--- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/LocalThreadsDoTask.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package com.yahoo.labs.samoa;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.javacliparser.ClassOption;
-import com.yahoo.labs.samoa.tasks.Task;
-import com.yahoo.labs.samoa.topology.impl.ThreadsComponentFactory;
-import com.yahoo.labs.samoa.topology.impl.ThreadsEngine;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-public class LocalThreadsDoTask {
-  private static final Logger logger = 
LoggerFactory.getLogger(LocalThreadsDoTask.class);
-
-  /**
-   * The main method.
-   * 
-   * @param args
-   *          the arguments
-   */
-  public static void main(String[] args) {
-
-    ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
-
-    // Get number of threads for multithreading mode
-    int numThreads = 1;
-    for (int i = 0; i < tmpArgs.size() - 1; i++) {
-      if (tmpArgs.get(i).equals("-t")) {
-        try {
-          numThreads = Integer.parseInt(tmpArgs.get(i + 1));
-          tmpArgs.remove(i + 1);
-          tmpArgs.remove(i);
-        } catch (NumberFormatException e) {
-          System.err.println("Invalid number of threads.");
-          System.err.println(e.getStackTrace());
-        }
-      }
-    }
-    logger.info("Number of threads:{}", numThreads);
-
-    args = tmpArgs.toArray(new String[0]);
-
-    StringBuilder cliString = new StringBuilder();
-    for (int i = 0; i < args.length; i++) {
-      cliString.append(" ").append(args[i]);
-    }
-    logger.debug("Command line string = {}", cliString.toString());
-    System.out.println("Command line string = " + cliString.toString());
-
-    Task task = null;
-    try {
-      task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, null);
-      logger.info("Sucessfully instantiating {}", 
task.getClass().getCanonicalName());
-    } catch (Exception e) {
-      logger.error("Fail to initialize the task", e);
-      System.out.println("Fail to initialize the task" + e);
-      return;
-    }
-    task.setFactory(new ThreadsComponentFactory());
-    task.init();
-
-    ThreadsEngine.submitTopology(task.getTopology(), numThreads);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java
deleted file mode 100644
index 91f213b..0000000
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactory.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.topology.ComponentFactory;
-import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-import com.yahoo.labs.samoa.topology.ProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.Topology;
-
-/**
- * ComponentFactory for multithreaded engine
- * 
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsComponentFactory implements ComponentFactory {
-
-  @Override
-  public ProcessingItem createPi(Processor processor) {
-    return this.createPi(processor, 1);
-  }
-
-  @Override
-  public ProcessingItem createPi(Processor processor, int paralellism) {
-    return new ThreadsProcessingItem(processor, paralellism);
-  }
-
-  @Override
-  public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
-    return new ThreadsEntranceProcessingItem(entranceProcessor);
-  }
-
-  @Override
-  public Stream createStream(IProcessingItem sourcePi) {
-    return new ThreadsStream(sourcePi);
-  }
-
-  @Override
-  public Topology createTopology(String topoName) {
-    return new ThreadsTopology(topoName);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java
deleted file mode 100644
index c266c09..0000000
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.topology.Topology;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Multithreaded engine.
- * 
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsEngine {
-
-  private static final List<ExecutorService> threadPool = new 
ArrayList<ExecutorService>();
-
-  /*
-   * Create and manage threads
-   */
-  public static void setNumberOfThreads(int numThreads) {
-    if (numThreads < 1)
-      throw new IllegalStateException("Number of threads must be a positive 
integer.");
-
-    if (threadPool.size() > numThreads)
-      throw new IllegalStateException("You cannot set a numThreads smaller 
than the current size of the threads pool.");
-
-    if (threadPool.size() < numThreads) {
-      for (int i = threadPool.size(); i < numThreads; i++) {
-        threadPool.add(Executors.newSingleThreadExecutor());
-      }
-    }
-  }
-
-  public static int getNumberOfThreads() {
-    return threadPool.size();
-  }
-
-  public static ExecutorService getThreadWithIndex(int index) {
-    if (threadPool.size() <= 0)
-      throw new IllegalStateException("Try to get ExecutorService from an 
empty pool.");
-    index %= threadPool.size();
-    return threadPool.get(index);
-  }
-
-  /*
-   * Submit topology and start
-   */
-  private static void submitTopology(Topology topology) {
-    ThreadsTopology tTopology = (ThreadsTopology) topology;
-    tTopology.run();
-  }
-
-  public static void submitTopology(Topology topology, int numThreads) {
-    ThreadsEngine.setNumberOfThreads(numThreads);
-    ThreadsEngine.submitTopology(topology);
-  }
-
-  /*
-   * Stop
-   */
-  public static void clearThreadPool() {
-    for (ExecutorService pool : threadPool) {
-      pool.shutdown();
-    }
-
-    for (ExecutorService pool : threadPool) {
-      try {
-        pool.awaitTermination(10, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    }
-
-    threadPool.clear();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java
deleted file mode 100644
index 470c164..0000000
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.topology.LocalEntranceProcessingItem;
-
-/**
- * EntranceProcessingItem for multithreaded engine.
- * 
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsEntranceProcessingItem extends LocalEntranceProcessingItem 
{
-
-  public ThreadsEntranceProcessingItem(EntranceProcessor processor) {
-    super(processor);
-  }
-
-  // The default waiting time when there is no available events is 100ms
-  // Override waitForNewEvents() to change it
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java
deleted file mode 100644
index 5c4c9e9..0000000
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-
-/**
- * Runnable class where each object corresponds to a ContentEvent and an 
assigned PI. When a PI receives a ContentEvent,
- * it will create a ThreadsEventRunnable with the received ContentEvent and an 
assigned workerPI. This runnable is then
- * submitted to a thread queue waiting to be executed. The worker PI will 
process the received event when the runnable
- * object is executed/run.
- * 
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsEventRunnable implements Runnable {
-
-  private ThreadsProcessingItemInstance workerPi;
-  private ContentEvent event;
-
-  public ThreadsEventRunnable(ThreadsProcessingItemInstance workerPi, 
ContentEvent event) {
-    this.workerPi = workerPi;
-    this.event = event;
-  }
-
-  public ThreadsProcessingItemInstance getWorkerProcessingItem() {
-    return this.workerPi;
-  }
-
-  public ContentEvent getContentEvent() {
-    return this.event;
-  }
-
-  @Override
-  public void run() {
-    try {
-      workerPi.processEvent(event);
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java
deleted file mode 100644
index 1b83a05..0000000
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java
+++ /dev/null
@@ -1,103 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.topology.AbstractProcessingItem;
-import com.yahoo.labs.samoa.topology.ProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.utils.PartitioningScheme;
-import com.yahoo.labs.samoa.utils.StreamDestination;
-
-/**
- * ProcessingItem for multithreaded engine.
- * 
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsProcessingItem extends AbstractProcessingItem {
-  // Replicas of the ProcessingItem.
-  // When ProcessingItem receives an event, it assigns one
-  // of these replicas to process the event.
-  private List<ThreadsProcessingItemInstance> piInstances;
-
-  // Each replica of ProcessingItem is assigned to one of the
-  // available threads in a round-robin fashion, i.e.: each
-  // replica is associated with the index of a thread.
-  // Each ProcessingItem has a random offset variable so that
-  // the allocation of PI replicas to threads are spread evenly
-  // among all threads.
-  private int offset;
-
-  /*
-   * Constructor
-   */
-  public ThreadsProcessingItem(Processor processor, int parallelismHint) {
-    super(processor, parallelismHint);
-    this.offset = (int) (Math.random() * ThreadsEngine.getNumberOfThreads());
-  }
-
-  public List<ThreadsProcessingItemInstance> getProcessingItemInstances() {
-    return this.piInstances;
-  }
-
-  /*
-   * Connects to streams
-   */
-  @Override
-  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
-    StreamDestination destination = new StreamDestination(this, 
this.getParallelism(), scheme);
-    ((ThreadsStream) inputStream).addDestination(destination);
-    return this;
-  }
-
-  /*
-   * Process the received event.
-   */
-  public void processEvent(ContentEvent event, int counter) {
-    if (this.piInstances == null || this.piInstances.size() < 
this.getParallelism())
-      throw new IllegalStateException(
-          "ThreadsWorkerProcessingItem(s) need to be setup before process any 
event (i.e. in ThreadsTopology.start()).");
-
-    ThreadsProcessingItemInstance piInstance = this.piInstances.get(counter);
-    ThreadsEventRunnable runnable = new ThreadsEventRunnable(piInstance, 
event);
-    
ThreadsEngine.getThreadWithIndex(piInstance.getThreadIndex()).submit(runnable);
-  }
-
-  /*
-   * Setup the replicas of this PI. This should be called after the topology is
-   * set up (all Processors and PIs are setup and connected to the respective
-   * streams) and before events are sent.
-   */
-  public void setupInstances() {
-    this.piInstances = new 
ArrayList<ThreadsProcessingItemInstance>(this.getParallelism());
-    for (int i = 0; i < this.getParallelism(); i++) {
-      Processor newProcessor = 
this.getProcessor().newProcessor(this.getProcessor());
-      newProcessor.onCreate(i + 1);
-      this.piInstances.add(new ThreadsProcessingItemInstance(newProcessor, 
this.offset + i));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java
deleted file mode 100644
index 930973b..0000000
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.Processor;
-
-/**
- * Lightweight replicas of ThreadProcessingItem. ThreadsProcessingItem manages 
a list of these objects and assigns each
- * incoming message to be processed by one of them.
- * 
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsProcessingItemInstance {
-
-  private Processor processor;
-  private int threadIndex;
-
-  public ThreadsProcessingItemInstance(Processor processor, int threadIndex) {
-    this.processor = processor;
-    this.threadIndex = threadIndex;
-  }
-
-  public int getThreadIndex() {
-    return this.threadIndex;
-  }
-
-  public Processor getProcessor() {
-    return this.processor;
-  }
-
-  public void processEvent(ContentEvent event) {
-    this.processor.process(event);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java
deleted file mode 100644
index 2c02df7..0000000
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-import com.yahoo.labs.samoa.topology.AbstractStream;
-import com.yahoo.labs.samoa.utils.StreamDestination;
-
-/**
- * Stream for multithreaded engine.
- * 
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsStream extends AbstractStream {
-
-  private List<StreamDestination> destinations;
-  private int counter = 0;
-  private int maxCounter = 1;
-
-  public ThreadsStream(IProcessingItem sourcePi) {
-    destinations = new LinkedList<StreamDestination>();
-  }
-
-  public void addDestination(StreamDestination destination) {
-    destinations.add(destination);
-    maxCounter *= destination.getParallelism();
-  }
-
-  public List<StreamDestination> getDestinations() {
-    return this.destinations;
-  }
-
-  private int getNextCounter() {
-    if (maxCounter > 0 && counter >= maxCounter)
-      counter = 0;
-    this.counter++;
-    return this.counter;
-  }
-
-  @Override
-  public synchronized void put(ContentEvent event) {
-    this.put(event, this.getNextCounter());
-  }
-
-  private void put(ContentEvent event, int counter) {
-    ThreadsProcessingItem pi;
-    int parallelism;
-    for (StreamDestination destination : destinations) {
-      pi = (ThreadsProcessingItem) destination.getProcessingItem();
-      parallelism = destination.getParallelism();
-      switch (destination.getPartitioningScheme()) {
-      case SHUFFLE:
-        pi.processEvent(event, counter % parallelism);
-        break;
-      case GROUP_BY_KEY:
-        pi.processEvent(event, getPIIndexForKey(event.getKey(), parallelism));
-        break;
-      case BROADCAST:
-        for (int p = 0; p < parallelism; p++) {
-          pi.processEvent(event, p);
-        }
-        break;
-      }
-    }
-  }
-
-  private static int getPIIndexForKey(String key, int parallelism) {
-    // If key is null, return a default index: 0
-    if (key == null)
-      return 0;
-
-    // HashCodeBuilder object does not have reset() method
-    // So all objects that get appended will be included in the
-    // computation of the hashcode.
-    // To avoid initialize a HashCodeBuilder for each event,
-    // here I use the static method with reflection on the event's key
-    int index = HashCodeBuilder.reflectionHashCode(key, true) % parallelism;
-    if (index < 0) {
-      index += parallelism;
-    }
-    return index;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java
deleted file mode 100644
index 4ce5e2b..0000000
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.topology.AbstractTopology;
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-
-/**
- * Topology for multithreaded engine.
- * 
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsTopology extends AbstractTopology {
-  ThreadsTopology(String name) {
-    super(name);
-  }
-
-  public void run() {
-    if (this.getEntranceProcessingItems() == null)
-      throw new IllegalStateException("You need to set entrance PI before 
running the topology.");
-    if (this.getEntranceProcessingItems().size() != 1)
-      throw new IllegalStateException("ThreadsTopology supports 1 entrance PI 
only. Number of entrance PIs is "
-          + this.getEntranceProcessingItems().size());
-
-    this.setupProcessingItemInstances();
-    ThreadsEntranceProcessingItem entrancePi = (ThreadsEntranceProcessingItem) 
this.getEntranceProcessingItems()
-        .toArray()[0];
-    if (entrancePi == null)
-      throw new IllegalStateException("You need to set entrance PI before 
running the topology.");
-    entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple 
mode
-    entrancePi.startSendingEvents();
-  }
-
-  /*
-   * Tell all the ThreadsProcessingItems to create & init their replicas
-   * (ThreadsProcessingItemInstance)
-   */
-  private void setupProcessingItemInstances() {
-    for (IProcessingItem pi : this.getProcessingItems()) {
-      if (pi instanceof ThreadsProcessingItem) {
-        ThreadsProcessingItem tpi = (ThreadsProcessingItem) pi;
-        tpi.setupInstances();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/LocalThreadsDoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/org/apache/samoa/LocalThreadsDoTask.java 
b/samoa-threads/src/main/java/org/apache/samoa/LocalThreadsDoTask.java
new file mode 100644
index 0000000..3b5ca06
--- /dev/null
+++ b/samoa-threads/src/main/java/org/apache/samoa/LocalThreadsDoTask.java
@@ -0,0 +1,70 @@
+package org.apache.samoa;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.samoa.tasks.Task;
+import org.apache.samoa.topology.impl.ThreadsComponentFactory;
+import org.apache.samoa.topology.impl.ThreadsEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.ClassOption;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+public class LocalThreadsDoTask {
+  private static final Logger logger = 
LoggerFactory.getLogger(LocalThreadsDoTask.class);
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   */
+  public static void main(String[] args) {
+
+    ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+
+    // Get number of threads for multithreading mode
+    int numThreads = 1;
+    for (int i = 0; i < tmpArgs.size() - 1; i++) {
+      if (tmpArgs.get(i).equals("-t")) {
+        try {
+          numThreads = Integer.parseInt(tmpArgs.get(i + 1));
+          tmpArgs.remove(i + 1);
+          tmpArgs.remove(i);
+        } catch (NumberFormatException e) {
+          System.err.println("Invalid number of threads.");
+          System.err.println(e.getStackTrace());
+        }
+      }
+    }
+    logger.info("Number of threads:{}", numThreads);
+
+    args = tmpArgs.toArray(new String[0]);
+
+    StringBuilder cliString = new StringBuilder();
+    for (int i = 0; i < args.length; i++) {
+      cliString.append(" ").append(args[i]);
+    }
+    logger.debug("Command line string = {}", cliString.toString());
+    System.out.println("Command line string = " + cliString.toString());
+
+    Task task = null;
+    try {
+      task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, null);
+      logger.info("Sucessfully instantiating {}", 
task.getClass().getCanonicalName());
+    } catch (Exception e) {
+      logger.error("Fail to initialize the task", e);
+      System.out.println("Fail to initialize the task" + e);
+      return;
+    }
+    task.setFactory(new ThreadsComponentFactory());
+    task.init();
+
+    ThreadsEngine.submitTopology(task.getTopology(), numThreads);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsComponentFactory.java
 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsComponentFactory.java
new file mode 100644
index 0000000..e27487b
--- /dev/null
+++ 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsComponentFactory.java
@@ -0,0 +1,65 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+
+/**
+ * ComponentFactory for multithreaded engine
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsComponentFactory implements ComponentFactory {
+
+  @Override
+  public ProcessingItem createPi(Processor processor) {
+    return this.createPi(processor, 1);
+  }
+
+  @Override
+  public ProcessingItem createPi(Processor processor, int paralellism) {
+    return new ThreadsProcessingItem(processor, paralellism);
+  }
+
+  @Override
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
+    return new ThreadsEntranceProcessingItem(entranceProcessor);
+  }
+
+  @Override
+  public Stream createStream(IProcessingItem sourcePi) {
+    return new ThreadsStream(sourcePi);
+  }
+
+  @Override
+  public Topology createTopology(String topoName) {
+    return new ThreadsTopology(topoName);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEngine.java 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEngine.java
new file mode 100644
index 0000000..5deb962
--- /dev/null
+++ 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEngine.java
@@ -0,0 +1,101 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samoa.topology.Topology;
+
+/**
+ * Multithreaded engine.
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsEngine {
+
+  private static final List<ExecutorService> threadPool = new 
ArrayList<ExecutorService>();
+
+  /*
+   * Create and manage threads
+   */
+  public static void setNumberOfThreads(int numThreads) {
+    if (numThreads < 1)
+      throw new IllegalStateException("Number of threads must be a positive 
integer.");
+
+    if (threadPool.size() > numThreads)
+      throw new IllegalStateException("You cannot set a numThreads smaller 
than the current size of the threads pool.");
+
+    if (threadPool.size() < numThreads) {
+      for (int i = threadPool.size(); i < numThreads; i++) {
+        threadPool.add(Executors.newSingleThreadExecutor());
+      }
+    }
+  }
+
+  public static int getNumberOfThreads() {
+    return threadPool.size();
+  }
+
+  public static ExecutorService getThreadWithIndex(int index) {
+    if (threadPool.size() <= 0)
+      throw new IllegalStateException("Try to get ExecutorService from an 
empty pool.");
+    index %= threadPool.size();
+    return threadPool.get(index);
+  }
+
+  /*
+   * Submit topology and start
+   */
+  private static void submitTopology(Topology topology) {
+    ThreadsTopology tTopology = (ThreadsTopology) topology;
+    tTopology.run();
+  }
+
+  public static void submitTopology(Topology topology, int numThreads) {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    ThreadsEngine.submitTopology(topology);
+  }
+
+  /*
+   * Stop
+   */
+  public static void clearThreadPool() {
+    for (ExecutorService pool : threadPool) {
+      pool.shutdown();
+    }
+
+    for (ExecutorService pool : threadPool) {
+      try {
+        pool.awaitTermination(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    threadPool.clear();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItem.java
 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItem.java
new file mode 100644
index 0000000..6c09a60
--- /dev/null
+++ 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItem.java
@@ -0,0 +1,41 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.LocalEntranceProcessingItem;
+
+/**
+ * EntranceProcessingItem for multithreaded engine.
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsEntranceProcessingItem extends LocalEntranceProcessingItem 
{
+
+  public ThreadsEntranceProcessingItem(EntranceProcessor processor) {
+    super(processor);
+  }
+
+  // The default waiting time when there is no available events is 100ms
+  // Override waitForNewEvents() to change it
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEventRunnable.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEventRunnable.java
 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEventRunnable.java
new file mode 100644
index 0000000..d812db8
--- /dev/null
+++ 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsEventRunnable.java
@@ -0,0 +1,61 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ * Runnable class where each object corresponds to a ContentEvent and an 
assigned PI. When a PI receives a ContentEvent,
+ * it will create a ThreadsEventRunnable with the received ContentEvent and an 
assigned workerPI. This runnable is then
+ * submitted to a thread queue waiting to be executed. The worker PI will 
process the received event when the runnable
+ * object is executed/run.
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsEventRunnable implements Runnable {
+
+  private ThreadsProcessingItemInstance workerPi;
+  private ContentEvent event;
+
+  public ThreadsEventRunnable(ThreadsProcessingItemInstance workerPi, 
ContentEvent event) {
+    this.workerPi = workerPi;
+    this.event = event;
+  }
+
+  public ThreadsProcessingItemInstance getWorkerProcessingItem() {
+    return this.workerPi;
+  }
+
+  public ContentEvent getContentEvent() {
+    return this.event;
+  }
+
+  @Override
+  public void run() {
+    try {
+      workerPi.processEvent(event);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItem.java
 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItem.java
new file mode 100644
index 0000000..de24a8d
--- /dev/null
+++ 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItem.java
@@ -0,0 +1,103 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.AbstractProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.samoa.utils.StreamDestination;
+
+/**
+ * ProcessingItem for multithreaded engine.
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsProcessingItem extends AbstractProcessingItem {
+  // Replicas of the ProcessingItem.
+  // When ProcessingItem receives an event, it assigns one
+  // of these replicas to process the event.
+  private List<ThreadsProcessingItemInstance> piInstances;
+
+  // Each replica of ProcessingItem is assigned to one of the
+  // available threads in a round-robin fashion, i.e.: each
+  // replica is associated with the index of a thread.
+  // Each ProcessingItem has a random offset variable so that
+  // the allocation of PI replicas to threads are spread evenly
+  // among all threads.
+  private int offset;
+
+  /*
+   * Constructor
+   */
+  public ThreadsProcessingItem(Processor processor, int parallelismHint) {
+    super(processor, parallelismHint);
+    this.offset = (int) (Math.random() * ThreadsEngine.getNumberOfThreads());
+  }
+
+  public List<ThreadsProcessingItemInstance> getProcessingItemInstances() {
+    return this.piInstances;
+  }
+
+  /*
+   * Connects to streams
+   */
+  @Override
+  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+    StreamDestination destination = new StreamDestination(this, 
this.getParallelism(), scheme);
+    ((ThreadsStream) inputStream).addDestination(destination);
+    return this;
+  }
+
+  /*
+   * Process the received event.
+   */
+  public void processEvent(ContentEvent event, int counter) {
+    if (this.piInstances == null || this.piInstances.size() < 
this.getParallelism())
+      throw new IllegalStateException(
+          "ThreadsWorkerProcessingItem(s) need to be setup before process any 
event (i.e. in ThreadsTopology.start()).");
+
+    ThreadsProcessingItemInstance piInstance = this.piInstances.get(counter);
+    ThreadsEventRunnable runnable = new ThreadsEventRunnable(piInstance, 
event);
+    
ThreadsEngine.getThreadWithIndex(piInstance.getThreadIndex()).submit(runnable);
+  }
+
+  /*
+   * Setup the replicas of this PI. This should be called after the topology is
+   * set up (all Processors and PIs are setup and connected to the respective
+   * streams) and before events are sent.
+   */
+  public void setupInstances() {
+    this.piInstances = new 
ArrayList<ThreadsProcessingItemInstance>(this.getParallelism());
+    for (int i = 0; i < this.getParallelism(); i++) {
+      Processor newProcessor = 
this.getProcessor().newProcessor(this.getProcessor());
+      newProcessor.onCreate(i + 1);
+      this.piInstances.add(new ThreadsProcessingItemInstance(newProcessor, 
this.offset + i));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstance.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstance.java
 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstance.java
new file mode 100644
index 0000000..a736cd1
--- /dev/null
+++ 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstance.java
@@ -0,0 +1,54 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+
+/**
+ * Lightweight replicas of ThreadProcessingItem. ThreadsProcessingItem manages 
a list of these objects and assigns each
+ * incoming message to be processed by one of them.
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsProcessingItemInstance {
+
+  private Processor processor;
+  private int threadIndex;
+
+  public ThreadsProcessingItemInstance(Processor processor, int threadIndex) {
+    this.processor = processor;
+    this.threadIndex = threadIndex;
+  }
+
+  public int getThreadIndex() {
+    return this.threadIndex;
+  }
+
+  public Processor getProcessor() {
+    return this.processor;
+  }
+
+  public void processEvent(ContentEvent event) {
+    this.processor.process(event);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsStream.java 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsStream.java
new file mode 100644
index 0000000..d56ae2d
--- /dev/null
+++ 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsStream.java
@@ -0,0 +1,108 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.topology.AbstractStream;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.utils.StreamDestination;
+
+/**
+ * Stream for multithreaded engine.
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsStream extends AbstractStream {
+
+  private List<StreamDestination> destinations;
+  private int counter = 0;
+  private int maxCounter = 1;
+
+  public ThreadsStream(IProcessingItem sourcePi) {
+    destinations = new LinkedList<StreamDestination>();
+  }
+
+  public void addDestination(StreamDestination destination) {
+    destinations.add(destination);
+    maxCounter *= destination.getParallelism();
+  }
+
+  public List<StreamDestination> getDestinations() {
+    return this.destinations;
+  }
+
+  private int getNextCounter() {
+    if (maxCounter > 0 && counter >= maxCounter)
+      counter = 0;
+    this.counter++;
+    return this.counter;
+  }
+
+  @Override
+  public synchronized void put(ContentEvent event) {
+    this.put(event, this.getNextCounter());
+  }
+
+  private void put(ContentEvent event, int counter) {
+    ThreadsProcessingItem pi;
+    int parallelism;
+    for (StreamDestination destination : destinations) {
+      pi = (ThreadsProcessingItem) destination.getProcessingItem();
+      parallelism = destination.getParallelism();
+      switch (destination.getPartitioningScheme()) {
+      case SHUFFLE:
+        pi.processEvent(event, counter % parallelism);
+        break;
+      case GROUP_BY_KEY:
+        pi.processEvent(event, getPIIndexForKey(event.getKey(), parallelism));
+        break;
+      case BROADCAST:
+        for (int p = 0; p < parallelism; p++) {
+          pi.processEvent(event, p);
+        }
+        break;
+      }
+    }
+  }
+
+  private static int getPIIndexForKey(String key, int parallelism) {
+    // If key is null, return a default index: 0
+    if (key == null)
+      return 0;
+
+    // HashCodeBuilder object does not have reset() method
+    // So all objects that get appended will be included in the
+    // computation of the hashcode.
+    // To avoid initialize a HashCodeBuilder for each event,
+    // here I use the static method with reflection on the event's key
+    int index = HashCodeBuilder.reflectionHashCode(key, true) % parallelism;
+    if (index < 0) {
+      index += parallelism;
+    }
+    return index;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsTopology.java
 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsTopology.java
new file mode 100644
index 0000000..4a7876a
--- /dev/null
+++ 
b/samoa-threads/src/main/java/org/apache/samoa/topology/impl/ThreadsTopology.java
@@ -0,0 +1,65 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.AbstractTopology;
+import org.apache.samoa.topology.IProcessingItem;
+
+/**
+ * Topology for multithreaded engine.
+ * 
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsTopology extends AbstractTopology {
+  ThreadsTopology(String name) {
+    super(name);
+  }
+
+  public void run() {
+    if (this.getEntranceProcessingItems() == null)
+      throw new IllegalStateException("You need to set entrance PI before 
running the topology.");
+    if (this.getEntranceProcessingItems().size() != 1)
+      throw new IllegalStateException("ThreadsTopology supports 1 entrance PI 
only. Number of entrance PIs is "
+          + this.getEntranceProcessingItems().size());
+
+    this.setupProcessingItemInstances();
+    ThreadsEntranceProcessingItem entrancePi = (ThreadsEntranceProcessingItem) 
this.getEntranceProcessingItems()
+        .toArray()[0];
+    if (entrancePi == null)
+      throw new IllegalStateException("You need to set entrance PI before 
running the topology.");
+    entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple 
mode
+    entrancePi.startSendingEvents();
+  }
+
+  /*
+   * Tell all the ThreadsProcessingItems to create & init their replicas
+   * (ThreadsProcessingItemInstance)
+   */
+  private void setupProcessingItemInstances() {
+    for (IProcessingItem pi : this.getProcessingItems()) {
+      if (pi instanceof ThreadsProcessingItem) {
+        ThreadsProcessingItem tpi = (ThreadsProcessingItem) pi;
+        tpi.setupInstances();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
----------------------------------------------------------------------
diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
deleted file mode 100644
index c2789b9..0000000
--- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package com.yahoo.labs.samoa;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 - 2014 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.junit.Test;
-
-public class AlgosTest {
-
-  @Test(timeout = 60000)
-  public void testVHTWithThreads() throws Exception {
-
-    TestParams vhtConfig = new TestParams.Builder()
-        .inputInstances(200_000)
-        .samplingSize(20_000)
-        .evaluationInstances(200_000)
-        .classifiedInstances(200_000)
-        .classificationsCorrect(55f)
-        .kappaStat(-0.1f)
-        .kappaTempStat(-0.1f)
-        .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE + " -t 
2")
-        .resultFilePollTimeout(10)
-        .prePollWait(10)
-        .taskClassName(LocalThreadsDoTask.class.getName())
-        .build();
-    TestUtils.test(vhtConfig);
-
-  }
-
-  @Test(timeout = 180000)
-  public void testBaggingWithThreads() throws Exception {
-    TestParams baggingConfig = new TestParams.Builder()
-        .inputInstances(100_000)
-        .samplingSize(10_000)
-        .inputDelayMicroSec(100) // prevents saturating the system due to 
unbounded queues
-        .evaluationInstances(90_000)
-        .classifiedInstances(105_000)
-        .classificationsCorrect(55f)
-        .kappaStat(0f)
-        .kappaTempStat(0f)
-        .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE + 
" -t 2")
-        .prePollWait(10)
-        .resultFilePollTimeout(30)
-        .taskClassName(LocalThreadsDoTask.class.getName())
-        .build();
-    TestUtils.test(baggingConfig);
-
-  }
-
-}


Reply via email to