http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
deleted file mode 100644
index 109e927..0000000
--- 
a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleStream.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.topology.AbstractStream;
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-import com.yahoo.labs.samoa.utils.StreamDestination;
-
-/**
- * 
- * @author abifet
- */
-class SimpleStream extends AbstractStream {
-  private List<StreamDestination> destinations;
-  private int maxCounter;
-  private int eventCounter;
-
-  SimpleStream(IProcessingItem sourcePi) {
-    super(sourcePi);
-    this.destinations = new LinkedList<>();
-    this.eventCounter = 0;
-    this.maxCounter = 1;
-  }
-
-  private int getNextCounter() {
-    if (maxCounter > 0 && eventCounter >= maxCounter)
-      eventCounter = 0;
-    this.eventCounter++;
-    return this.eventCounter;
-  }
-
-  @Override
-  public void put(ContentEvent event) {
-    this.put(event, this.getNextCounter());
-  }
-
-  private void put(ContentEvent event, int counter) {
-    SimpleProcessingItem pi;
-    int parallelism;
-    for (StreamDestination destination : destinations) {
-      pi = (SimpleProcessingItem) destination.getProcessingItem();
-      parallelism = destination.getParallelism();
-      switch (destination.getPartitioningScheme()) {
-      case SHUFFLE:
-        pi.processEvent(event, counter % parallelism);
-        break;
-      case GROUP_BY_KEY:
-        HashCodeBuilder hb = new HashCodeBuilder();
-        hb.append(event.getKey());
-        int key = hb.build() % parallelism;
-        pi.processEvent(event, key);
-        break;
-      case BROADCAST:
-        for (int p = 0; p < parallelism; p++) {
-          pi.processEvent(event, p);
-        }
-        break;
-      }
-    }
-  }
-
-  public void addDestination(StreamDestination destination) {
-    this.destinations.add(destination);
-    if (maxCounter <= 0)
-      maxCounter = 1;
-    maxCounter *= destination.getParallelism();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
 
b/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
deleted file mode 100644
index 5ffa09e..0000000
--- 
a/samoa-local/src/main/java/com/yahoo/labs/samoa/topology/impl/SimpleTopology.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.topology.AbstractTopology;
-
-public class SimpleTopology extends AbstractTopology {
-  SimpleTopology(String name) {
-    super(name);
-  }
-
-  public void run() {
-    if (this.getEntranceProcessingItems() == null)
-      throw new IllegalStateException("You need to set entrance PI before 
running the topology.");
-    if (this.getEntranceProcessingItems().size() != 1)
-      throw new IllegalStateException("SimpleTopology supports 1 entrance PI 
only. Number of entrance PIs is "
-          + this.getEntranceProcessingItems().size());
-
-    SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) 
this.getEntranceProcessingItems()
-        .toArray()[0];
-    entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple 
mode
-    entrancePi.startSendingEvents();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/LocalDoTask.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/main/java/org/apache/samoa/LocalDoTask.java 
b/samoa-local/src/main/java/org/apache/samoa/LocalDoTask.java
new file mode 100644
index 0000000..0a8c3d0
--- /dev/null
+++ b/samoa-local/src/main/java/org/apache/samoa/LocalDoTask.java
@@ -0,0 +1,90 @@
+package org.apache.samoa;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.tasks.Task;
+import org.apache.samoa.topology.impl.SimpleComponentFactory;
+import org.apache.samoa.topology.impl.SimpleEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.ClassOption;
+import com.github.javacliparser.FlagOption;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.Option;
+
+/**
+ * The Class DoTask.
+ */
+public class LocalDoTask {
+
+  // TODO: clean up this class for helping ML Developer in SAMOA
+  // TODO: clean up code from storm-impl
+
+  // It seems that the 3 extra options are not used.
+  // Probably should remove them
+  private static final String SUPPRESS_STATUS_OUT_MSG = "Suppress the task 
status output. Normally it is sent to stderr.";
+  private static final String SUPPRESS_RESULT_OUT_MSG = "Suppress the task 
result output. Normally it is sent to stdout.";
+  private static final String STATUS_UPDATE_FREQ_MSG = "Wait time in 
milliseconds between status updates.";
+  private static final Logger logger = 
LoggerFactory.getLogger(LocalDoTask.class);
+
+  /**
+   * The main method.
+   * 
+   * @param args
+   *          the arguments
+   */
+  public static void main(String[] args) {
+
+    // ArrayList<String> tmpArgs = new ArrayList<String>(Arrays.asList(args));
+
+    // args = tmpArgs.toArray(new String[0]);
+
+    FlagOption suppressStatusOutOpt = new FlagOption("suppressStatusOut", 'S', 
SUPPRESS_STATUS_OUT_MSG);
+
+    FlagOption suppressResultOutOpt = new FlagOption("suppressResultOut", 'R', 
SUPPRESS_RESULT_OUT_MSG);
+
+    IntOption statusUpdateFreqOpt = new IntOption("statusUpdateFrequency", 
'F', STATUS_UPDATE_FREQ_MSG, 1000, 0,
+        Integer.MAX_VALUE);
+
+    Option[] extraOptions = new Option[] { suppressStatusOutOpt, 
suppressResultOutOpt, statusUpdateFreqOpt };
+
+    StringBuilder cliString = new StringBuilder();
+    for (String arg : args) {
+      cliString.append(" ").append(arg);
+    }
+    logger.debug("Command line string = {}", cliString.toString());
+    System.out.println("Command line string = " + cliString.toString());
+
+    Task task;
+    try {
+      task = ClassOption.cliStringToObject(cliString.toString(), Task.class, 
extraOptions);
+      logger.info("Successfully instantiating {}", 
task.getClass().getCanonicalName());
+    } catch (Exception e) {
+      logger.error("Fail to initialize the task", e);
+      System.out.println("Fail to initialize the task" + e);
+      return;
+    }
+    task.setFactory(new SimpleComponentFactory());
+    task.init();
+    SimpleEngine.submitTopology(task.getTopology());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleComponentFactory.java
 
b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleComponentFactory.java
new file mode 100644
index 0000000..c18b72d
--- /dev/null
+++ 
b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleComponentFactory.java
@@ -0,0 +1,53 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+
+public class SimpleComponentFactory implements ComponentFactory {
+
+  public ProcessingItem createPi(Processor processor, int paralellism) {
+    return new SimpleProcessingItem(processor, paralellism);
+  }
+
+  public ProcessingItem createPi(Processor processor) {
+    return this.createPi(processor, 1);
+  }
+
+  public EntranceProcessingItem createEntrancePi(EntranceProcessor processor) {
+    return new SimpleEntranceProcessingItem(processor);
+  }
+
+  public Stream createStream(IProcessingItem sourcePi) {
+    return new SimpleStream(sourcePi);
+  }
+
+  public Topology createTopology(String topoName) {
+    return new SimpleTopology(topoName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEngine.java 
b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEngine.java
new file mode 100644
index 0000000..06d03d3
--- /dev/null
+++ b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEngine.java
@@ -0,0 +1,37 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.Topology;
+
+public class SimpleEngine {
+
+  public static void submitTopology(Topology topology) {
+    SimpleTopology simpleTopology = (SimpleTopology) topology;
+    simpleTopology.run();
+    // runs until completion
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItem.java
 
b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItem.java
new file mode 100644
index 0000000..729ad31
--- /dev/null
+++ 
b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItem.java
@@ -0,0 +1,33 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.LocalEntranceProcessingItem;
+
+class SimpleEntranceProcessingItem extends LocalEntranceProcessingItem {
+  public SimpleEntranceProcessingItem(EntranceProcessor processor) {
+    super(processor);
+  }
+
+  // The default waiting time when there is no available events is 100ms
+  // Override waitForNewEvents() to change it
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleProcessingItem.java
 
b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleProcessingItem.java
new file mode 100644
index 0000000..1dd1562
--- /dev/null
+++ 
b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleProcessingItem.java
@@ -0,0 +1,87 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.AbstractProcessingItem;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.samoa.utils.StreamDestination;
+
+/**
+ * 
+ * @author abifet
+ */
+class SimpleProcessingItem extends AbstractProcessingItem {
+  private IProcessingItem[] arrayProcessingItem;
+
+  SimpleProcessingItem(Processor processor) {
+    super(processor);
+  }
+
+  SimpleProcessingItem(Processor processor, int parallelism) {
+    super(processor);
+    this.setParallelism(parallelism);
+  }
+
+  public IProcessingItem getProcessingItem(int i) {
+    return arrayProcessingItem[i];
+  }
+
+  @Override
+  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+    StreamDestination destination = new StreamDestination(this, 
this.getParallelism(), scheme);
+    ((SimpleStream) inputStream).addDestination(destination);
+    return this;
+  }
+
+  public SimpleProcessingItem copy() {
+    Processor processor = this.getProcessor();
+    return new SimpleProcessingItem(processor.newProcessor(processor));
+  }
+
+  public void processEvent(ContentEvent event, int counter) {
+
+    int parallelism = this.getParallelism();
+    // System.out.println("Process event "+event+" 
(isLast="+event.isLastEvent()+") with counter="+counter+" while 
parallelism="+parallelism);
+    if (this.arrayProcessingItem == null && parallelism > 0) {
+      // Init processing elements, the first time they are needed
+      this.arrayProcessingItem = new IProcessingItem[parallelism];
+      for (int j = 0; j < parallelism; j++) {
+        arrayProcessingItem[j] = this.copy();
+        arrayProcessingItem[j].getProcessor().onCreate(j);
+      }
+    }
+    if (this.arrayProcessingItem != null) {
+      IProcessingItem pi = this.getProcessingItem(counter);
+      Processor p = pi.getProcessor();
+      // System.out.println("PI="+pi+", p="+p);
+      this.getProcessingItem(counter).getProcessor().process(event);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleStream.java 
b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleStream.java
new file mode 100644
index 0000000..3137c60
--- /dev/null
+++ b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleStream.java
@@ -0,0 +1,95 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.topology.AbstractStream;
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.utils.StreamDestination;
+
+/**
+ * 
+ * @author abifet
+ */
+class SimpleStream extends AbstractStream {
+  private List<StreamDestination> destinations;
+  private int maxCounter;
+  private int eventCounter;
+
+  SimpleStream(IProcessingItem sourcePi) {
+    super(sourcePi);
+    this.destinations = new LinkedList<>();
+    this.eventCounter = 0;
+    this.maxCounter = 1;
+  }
+
+  private int getNextCounter() {
+    if (maxCounter > 0 && eventCounter >= maxCounter)
+      eventCounter = 0;
+    this.eventCounter++;
+    return this.eventCounter;
+  }
+
+  @Override
+  public void put(ContentEvent event) {
+    this.put(event, this.getNextCounter());
+  }
+
+  private void put(ContentEvent event, int counter) {
+    SimpleProcessingItem pi;
+    int parallelism;
+    for (StreamDestination destination : destinations) {
+      pi = (SimpleProcessingItem) destination.getProcessingItem();
+      parallelism = destination.getParallelism();
+      switch (destination.getPartitioningScheme()) {
+      case SHUFFLE:
+        pi.processEvent(event, counter % parallelism);
+        break;
+      case GROUP_BY_KEY:
+        HashCodeBuilder hb = new HashCodeBuilder();
+        hb.append(event.getKey());
+        int key = hb.build() % parallelism;
+        pi.processEvent(event, key);
+        break;
+      case BROADCAST:
+        for (int p = 0; p < parallelism; p++) {
+          pi.processEvent(event, p);
+        }
+        break;
+      }
+    }
+  }
+
+  public void addDestination(StreamDestination destination) {
+    this.destinations.add(destination);
+    if (maxCounter <= 0)
+      maxCounter = 1;
+    maxCounter *= destination.getParallelism();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleTopology.java 
b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleTopology.java
new file mode 100644
index 0000000..660ebc0
--- /dev/null
+++ 
b/samoa-local/src/main/java/org/apache/samoa/topology/impl/SimpleTopology.java
@@ -0,0 +1,46 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.topology.AbstractTopology;
+
+public class SimpleTopology extends AbstractTopology {
+  SimpleTopology(String name) {
+    super(name);
+  }
+
+  public void run() {
+    if (this.getEntranceProcessingItems() == null)
+      throw new IllegalStateException("You need to set entrance PI before 
running the topology.");
+    if (this.getEntranceProcessingItems().size() != 1)
+      throw new IllegalStateException("SimpleTopology supports 1 entrance PI 
only. Number of entrance PIs is "
+          + this.getEntranceProcessingItems().size());
+
+    SimpleEntranceProcessingItem entrancePi = (SimpleEntranceProcessingItem) 
this.getEntranceProcessingItems()
+        .toArray()[0];
+    entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple 
mode
+    entrancePi.startSendingEvents();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
deleted file mode 100644
index 2f7c7e1..0000000
--- a/samoa-local/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package com.yahoo.labs.samoa;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.junit.Test;
-
-public class AlgosTest {
-
-  @Test
-  public void testVHTLocal() throws Exception {
-
-    TestParams vhtConfig = new TestParams.Builder()
-        .inputInstances(200_000)
-        .samplingSize(20_000)
-        .evaluationInstances(200_000)
-        .classifiedInstances(200_000)
-        .classificationsCorrect(75f)
-        .kappaStat(0f)
-        .kappaTempStat(0f)
-        .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE)
-        .resultFilePollTimeout(10)
-        .prePollWait(10)
-        .taskClassName(LocalDoTask.class.getName())
-        .build();
-    TestUtils.test(vhtConfig);
-
-  }
-
-  @Test
-  public void testBaggingLocal() throws Exception {
-    TestParams baggingConfig = new TestParams.Builder()
-        .inputInstances(200_000)
-        .samplingSize(20_000)
-        .evaluationInstances(180_000)
-        .classifiedInstances(210_000)
-        .classificationsCorrect(60f)
-        .kappaStat(0f)
-        .kappaTempStat(0f)
-        .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE)
-        .prePollWait(10)
-        .resultFilePollTimeout(10)
-        .taskClassName(LocalDoTask.class.getName())
-        .build();
-    TestUtils.test(baggingConfig);
-
-  }
-
-  @Test
-  public void testNaiveBayesLocal() throws Exception {
-
-    TestParams vhtConfig = new TestParams.Builder()
-        .inputInstances(200_000)
-        .samplingSize(20_000)
-        .evaluationInstances(200_000)
-        .classifiedInstances(200_000)
-        .classificationsCorrect(65f)
-        .kappaStat(0f)
-        .kappaTempStat(0f)
-        .cliStringTemplate(TestParams.Templates.PREQEVAL_NAIVEBAYES_HYPERPLANE)
-        .resultFilePollTimeout(10)
-        .prePollWait(10)
-        .taskClassName(LocalDoTask.class.getName())
-        .build();
-    TestUtils.test(vhtConfig);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
deleted file mode 100644
index c43ef72..0000000
--- 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleComponentFactoryTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import static org.junit.Assert.*;
-import mockit.Mocked;
-import mockit.Tested;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
-import com.yahoo.labs.samoa.topology.ProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.Topology;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-public class SimpleComponentFactoryTest {
-
-  @Tested
-  private SimpleComponentFactory factory;
-  @Mocked
-  private Processor processor, processorReplica;
-  @Mocked
-  private EntranceProcessor entranceProcessor;
-
-  private final int parallelism = 3;
-  private final String topoName = "TestTopology";
-
-  @Before
-  public void setUp() throws Exception {
-    factory = new SimpleComponentFactory();
-  }
-
-  @Test
-  public void testCreatePiNoParallelism() {
-    ProcessingItem pi = factory.createPi(processor);
-    assertNotNull("ProcessingItem created is null.", pi);
-    assertEquals("ProcessingItem created is not a SimpleProcessingItem.", 
SimpleProcessingItem.class, pi.getClass());
-    assertEquals("Parallelism of PI is not 1", 1, pi.getParallelism(), 0);
-  }
-
-  @Test
-  public void testCreatePiWithParallelism() {
-    ProcessingItem pi = factory.createPi(processor, parallelism);
-    assertNotNull("ProcessingItem created is null.", pi);
-    assertEquals("ProcessingItem created is not a SimpleProcessingItem.", 
SimpleProcessingItem.class, pi.getClass());
-    assertEquals("Parallelism of PI is not ", parallelism, 
pi.getParallelism(), 0);
-  }
-
-  @Test
-  public void testCreateStream() {
-    ProcessingItem pi = factory.createPi(processor);
-
-    Stream stream = factory.createStream(pi);
-    assertNotNull("Stream created is null", stream);
-    assertEquals("Stream created is not a SimpleStream.", SimpleStream.class, 
stream.getClass());
-  }
-
-  @Test
-  public void testCreateTopology() {
-    Topology topology = factory.createTopology(topoName);
-    assertNotNull("Topology created is null.", topology);
-    assertEquals("Topology created is not a SimpleTopology.", 
SimpleTopology.class, topology.getClass());
-  }
-
-  @Test
-  public void testCreateEntrancePi() {
-    EntranceProcessingItem entrancePi = 
factory.createEntrancePi(entranceProcessor);
-    assertNotNull("EntranceProcessingItem created is null.", entrancePi);
-    assertEquals("EntranceProcessingItem created is not a 
SimpleEntranceProcessingItem.",
-        SimpleEntranceProcessingItem.class, entrancePi.getClass());
-    assertSame("EntranceProcessor is not set correctly.", entranceProcessor, 
entrancePi.getProcessor());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
deleted file mode 100644
index da4be1e..0000000
--- 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEngineTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import mockit.Mocked;
-import mockit.NonStrictExpectations;
-import mockit.Tested;
-import mockit.Verifications;
-
-import org.junit.Test;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-public class SimpleEngineTest {
-
-  @Tested
-  private SimpleEngine unused;
-  @Mocked
-  private SimpleTopology topology;
-  @Mocked
-  private Runtime mockedRuntime;
-
-  @Test
-  public void testSubmitTopology() {
-    new NonStrictExpectations() {
-      {
-        Runtime.getRuntime();
-        result = mockedRuntime;
-        mockedRuntime.exit(0);
-      }
-    };
-    SimpleEngine.submitTopology(topology);
-    new Verifications() {
-      {
-        topology.run();
-      }
-    };
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
deleted file mode 100644
index 8c1ccaf..0000000
--- 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import static org.junit.Assert.*;
-import mockit.Mocked;
-import mockit.StrictExpectations;
-import mockit.Tested;
-import mockit.Verifications;
-
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.topology.Stream;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-public class SimpleEntranceProcessingItemTest {
-
-  @Tested
-  private SimpleEntranceProcessingItem entrancePi;
-
-  @Mocked
-  private EntranceProcessor entranceProcessor;
-  @Mocked
-  private Stream outputStream, anotherStream;
-  @Mocked
-  private ContentEvent event;
-
-  @Mocked
-  private Thread unused;
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @Before
-  public void setUp() throws Exception {
-    entrancePi = new SimpleEntranceProcessingItem(entranceProcessor);
-  }
-
-  @Test
-  public void testContructor() {
-    assertSame("EntranceProcessor is not set correctly.", entranceProcessor, 
entrancePi.getProcessor());
-  }
-
-  @Test
-  public void testSetOutputStream() {
-    entrancePi.setOutputStream(outputStream);
-    assertSame("OutputStream is not set correctly.", outputStream, 
entrancePi.getOutputStream());
-  }
-
-  @Test
-  public void testSetOutputStreamRepeate() {
-    entrancePi.setOutputStream(outputStream);
-    entrancePi.setOutputStream(outputStream);
-    assertSame("OutputStream is not set correctly.", outputStream, 
entrancePi.getOutputStream());
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetOutputStreamError() {
-    entrancePi.setOutputStream(outputStream);
-    entrancePi.setOutputStream(anotherStream);
-  }
-
-  @Test
-  public void testInjectNextEventSuccess() {
-    entrancePi.setOutputStream(outputStream);
-    new StrictExpectations() {
-      {
-        entranceProcessor.hasNext();
-        result = true;
-
-        entranceProcessor.nextEvent();
-        result = event;
-      }
-    };
-    entrancePi.injectNextEvent();
-    new Verifications() {
-      {
-        outputStream.put(event);
-      }
-    };
-  }
-
-  @Test
-  public void testStartSendingEvents() {
-    entrancePi.setOutputStream(outputStream);
-    new StrictExpectations() {
-      {
-        for (int i = 0; i < 1; i++) {
-          entranceProcessor.isFinished();
-          result = false;
-          entranceProcessor.hasNext();
-          result = false;
-        }
-
-        for (int i = 0; i < 5; i++) {
-          entranceProcessor.isFinished();
-          result = false;
-          entranceProcessor.hasNext();
-          result = true;
-          entranceProcessor.nextEvent();
-          result = event;
-          outputStream.put(event);
-        }
-
-        for (int i = 0; i < 2; i++) {
-          entranceProcessor.isFinished();
-          result = false;
-          entranceProcessor.hasNext();
-          result = false;
-        }
-
-        for (int i = 0; i < 5; i++) {
-          entranceProcessor.isFinished();
-          result = false;
-          entranceProcessor.hasNext();
-          result = true;
-          entranceProcessor.nextEvent();
-          result = event;
-          outputStream.put(event);
-        }
-
-        entranceProcessor.isFinished();
-        result = true;
-        times = 1;
-        entranceProcessor.hasNext();
-        times = 0;
-      }
-    };
-    entrancePi.startSendingEvents();
-    new Verifications() {
-      {
-        try {
-          Thread.sleep(anyInt);
-          times = 3;
-        } catch (InterruptedException e) {
-
-        }
-      }
-    };
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testStartSendingEventsError() {
-    entrancePi.startSendingEvents();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
deleted file mode 100644
index cd076fe..0000000
--- 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleProcessingItemTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import static org.junit.Assert.*;
-
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-import mockit.Expectations;
-import mockit.Mocked;
-import mockit.NonStrictExpectations;
-import mockit.Tested;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.utils.PartitioningScheme;
-import com.yahoo.labs.samoa.utils.StreamDestination;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-public class SimpleProcessingItemTest {
-
-  @Tested
-  private SimpleProcessingItem pi;
-
-  @Mocked
-  private Processor processor;
-  @Mocked
-  private SimpleStream stream;
-  @Mocked
-  private StreamDestination destination;
-  @Mocked
-  private ContentEvent event;
-
-  private final int parallelism = 4;
-  private final int counter = 2;
-
-  @Before
-  public void setUp() throws Exception {
-    pi = new SimpleProcessingItem(processor, parallelism);
-  }
-
-  @Test
-  public void testConstructor() {
-    assertSame("Processor was not set correctly.", processor, 
pi.getProcessor());
-    assertEquals("Parallelism was not set correctly.", parallelism, 
pi.getParallelism(), 0);
-  }
-
-  @Test
-  public void testConnectInputShuffleStream() {
-    new Expectations() {
-      {
-        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.SHUFFLE);
-        stream.addDestination(destination);
-      }
-    };
-    pi.connectInputShuffleStream(stream);
-  }
-
-  @Test
-  public void testConnectInputKeyStream() {
-    new Expectations() {
-      {
-        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.GROUP_BY_KEY);
-        stream.addDestination(destination);
-      }
-    };
-    pi.connectInputKeyStream(stream);
-  }
-
-  @Test
-  public void testConnectInputAllStream() {
-    new Expectations() {
-      {
-        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.BROADCAST);
-        stream.addDestination(destination);
-      }
-    };
-    pi.connectInputAllStream(stream);
-  }
-
-  @Test
-  public void testProcessEvent() {
-    new Expectations() {
-      {
-        for (int i = 0; i < parallelism; i++) {
-          processor.newProcessor(processor);
-          result = processor;
-
-          processor.onCreate(anyInt);
-        }
-
-        processor.process(event);
-      }
-    };
-    pi.processEvent(event, counter);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
deleted file mode 100644
index 25cb5eb..0000000
--- 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleStreamTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import mockit.Expectations;
-import mockit.Mocked;
-import mockit.NonStrictExpectations;
-import mockit.Tested;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.utils.PartitioningScheme;
-import com.yahoo.labs.samoa.utils.StreamDestination;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-@RunWith(Parameterized.class)
-public class SimpleStreamTest {
-
-  @Tested
-  private SimpleStream stream;
-
-  @Mocked
-  private SimpleProcessingItem sourcePi, destPi;
-  @Mocked
-  private ContentEvent event;
-  @Mocked
-  private StreamDestination destination;
-
-  private final String eventKey = "eventkey";
-  private final int parallelism;
-  private final PartitioningScheme scheme;
-
-  @Parameters
-  public static Collection<Object[]> generateParameters() {
-    return Arrays.asList(new Object[][] {
-        { 2, PartitioningScheme.SHUFFLE },
-        { 3, PartitioningScheme.GROUP_BY_KEY },
-        { 4, PartitioningScheme.BROADCAST }
-    });
-  }
-
-  public SimpleStreamTest(int parallelism, PartitioningScheme scheme) {
-    this.parallelism = parallelism;
-    this.scheme = scheme;
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    stream = new SimpleStream(sourcePi);
-    stream.addDestination(destination);
-  }
-
-  @Test
-  public void testPut() {
-    new NonStrictExpectations() {
-      {
-        event.getKey();
-        result = eventKey;
-        destination.getProcessingItem();
-        result = destPi;
-        destination.getPartitioningScheme();
-        result = scheme;
-        destination.getParallelism();
-        result = parallelism;
-
-      }
-    };
-    switch (this.scheme) {
-    case SHUFFLE:
-    case GROUP_BY_KEY:
-      new Expectations() {
-        {
-          // TODO: restrict the range of counter value
-          destPi.processEvent(event, anyInt);
-          times = 1;
-        }
-      };
-      break;
-    case BROADCAST:
-      new Expectations() {
-        {
-          // TODO: restrict the range of counter value
-          destPi.processEvent(event, anyInt);
-          times = parallelism;
-        }
-      };
-      break;
-    }
-    stream.put(event);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
 
b/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
deleted file mode 100644
index 9aaaebd..0000000
--- 
a/samoa-local/src/test/java/com/yahoo/labs/samoa/topology/impl/SimpleTopologyTest.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-import static org.junit.Assert.*;
-
-import java.util.Set;
-
-import mockit.NonStrictExpectations;
-import mockit.Expectations;
-import mockit.Mocked;
-import mockit.Tested;
-
-import org.junit.Before;
-import org.junit.Test;
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-public class SimpleTopologyTest {
-
-  @Tested
-  private SimpleTopology topology;
-
-  @Mocked
-  private SimpleEntranceProcessingItem entrancePi;
-  @Mocked
-  private EntranceProcessor entranceProcessor;
-
-  @Before
-  public void setUp() throws Exception {
-    topology = new SimpleTopology("TestTopology");
-  }
-
-  @Test
-  public void testAddEntrancePi() {
-    topology.addEntranceProcessingItem(entrancePi);
-
-    Set<EntranceProcessingItem> entrancePIs = 
topology.getEntranceProcessingItems();
-    assertNotNull("Set of entrance PIs is null.", entrancePIs);
-    assertEquals("Number of entrance PI in SimpleTopology must be 1", 1, 
entrancePIs.size());
-    assertSame("Entrance PI was not set correctly.", entrancePi, 
entrancePIs.toArray()[0]);
-    // TODO: verify that entrance PI is in the set of ProcessingItems
-    // Need to access topology's set of PIs (getProcessingItems() method)
-  }
-
-  @Test
-  public void testRun() {
-    topology.addEntranceProcessingItem(entrancePi);
-
-    new NonStrictExpectations() {
-      {
-        entrancePi.getProcessor();
-        result = entranceProcessor;
-
-      }
-    };
-
-    new Expectations() {
-      {
-        entranceProcessor.onCreate(anyInt);
-        entrancePi.startSendingEvents();
-      }
-    };
-    topology.run();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testRunWithoutEntrancePI() {
-    topology.run();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/AlgosTest.java
----------------------------------------------------------------------
diff --git a/samoa-local/src/test/java/org/apache/samoa/AlgosTest.java 
b/samoa-local/src/test/java/org/apache/samoa/AlgosTest.java
new file mode 100644
index 0000000..f35b92a
--- /dev/null
+++ b/samoa-local/src/test/java/org/apache/samoa/AlgosTest.java
@@ -0,0 +1,87 @@
+package org.apache.samoa;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.LocalDoTask;
+import org.junit.Test;
+
+public class AlgosTest {
+
+  @Test
+  public void testVHTLocal() throws Exception {
+
+    TestParams vhtConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(200_000)
+        .classifiedInstances(200_000)
+        .classificationsCorrect(75f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE)
+        .resultFilePollTimeout(10)
+        .prePollWait(10)
+        .taskClassName(LocalDoTask.class.getName())
+        .build();
+    TestUtils.test(vhtConfig);
+
+  }
+
+  @Test
+  public void testBaggingLocal() throws Exception {
+    TestParams baggingConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(180_000)
+        .classifiedInstances(210_000)
+        .classificationsCorrect(60f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE)
+        .prePollWait(10)
+        .resultFilePollTimeout(10)
+        .taskClassName(LocalDoTask.class.getName())
+        .build();
+    TestUtils.test(baggingConfig);
+
+  }
+
+  @Test
+  public void testNaiveBayesLocal() throws Exception {
+
+    TestParams vhtConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(200_000)
+        .classifiedInstances(200_000)
+        .classificationsCorrect(65f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_NAIVEBAYES_HYPERPLANE)
+        .resultFilePollTimeout(10)
+        .prePollWait(10)
+        .taskClassName(LocalDoTask.class.getName())
+        .build();
+    TestUtils.test(vhtConfig);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleComponentFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleComponentFactoryTest.java
 
b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleComponentFactoryTest.java
new file mode 100644
index 0000000..3085d4c
--- /dev/null
+++ 
b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleComponentFactoryTest.java
@@ -0,0 +1,103 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.Tested;
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+import org.apache.samoa.topology.impl.SimpleComponentFactory;
+import org.apache.samoa.topology.impl.SimpleEntranceProcessingItem;
+import org.apache.samoa.topology.impl.SimpleProcessingItem;
+import org.apache.samoa.topology.impl.SimpleStream;
+import org.apache.samoa.topology.impl.SimpleTopology;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+public class SimpleComponentFactoryTest {
+
+  @Tested
+  private SimpleComponentFactory factory;
+  @Mocked
+  private Processor processor, processorReplica;
+  @Mocked
+  private EntranceProcessor entranceProcessor;
+
+  private final int parallelism = 3;
+  private final String topoName = "TestTopology";
+
+  @Before
+  public void setUp() throws Exception {
+    factory = new SimpleComponentFactory();
+  }
+
+  @Test
+  public void testCreatePiNoParallelism() {
+    ProcessingItem pi = factory.createPi(processor);
+    assertNotNull("ProcessingItem created is null.", pi);
+    assertEquals("ProcessingItem created is not a SimpleProcessingItem.", 
SimpleProcessingItem.class, pi.getClass());
+    assertEquals("Parallelism of PI is not 1", 1, pi.getParallelism(), 0);
+  }
+
+  @Test
+  public void testCreatePiWithParallelism() {
+    ProcessingItem pi = factory.createPi(processor, parallelism);
+    assertNotNull("ProcessingItem created is null.", pi);
+    assertEquals("ProcessingItem created is not a SimpleProcessingItem.", 
SimpleProcessingItem.class, pi.getClass());
+    assertEquals("Parallelism of PI is not ", parallelism, 
pi.getParallelism(), 0);
+  }
+
+  @Test
+  public void testCreateStream() {
+    ProcessingItem pi = factory.createPi(processor);
+
+    Stream stream = factory.createStream(pi);
+    assertNotNull("Stream created is null", stream);
+    assertEquals("Stream created is not a SimpleStream.", SimpleStream.class, 
stream.getClass());
+  }
+
+  @Test
+  public void testCreateTopology() {
+    Topology topology = factory.createTopology(topoName);
+    assertNotNull("Topology created is null.", topology);
+    assertEquals("Topology created is not a SimpleTopology.", 
SimpleTopology.class, topology.getClass());
+  }
+
+  @Test
+  public void testCreateEntrancePi() {
+    EntranceProcessingItem entrancePi = 
factory.createEntrancePi(entranceProcessor);
+    assertNotNull("EntranceProcessingItem created is null.", entrancePi);
+    assertEquals("EntranceProcessingItem created is not a 
SimpleEntranceProcessingItem.",
+        SimpleEntranceProcessingItem.class, entrancePi.getClass());
+    assertSame("EntranceProcessor is not set correctly.", entranceProcessor, 
entrancePi.getProcessor());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEngineTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEngineTest.java
 
b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEngineTest.java
new file mode 100644
index 0000000..59d5afe
--- /dev/null
+++ 
b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEngineTest.java
@@ -0,0 +1,62 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import mockit.Tested;
+import mockit.Verifications;
+
+import org.apache.samoa.topology.impl.SimpleEngine;
+import org.apache.samoa.topology.impl.SimpleTopology;
+import org.junit.Test;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+public class SimpleEngineTest {
+
+  @Tested
+  private SimpleEngine unused;
+  @Mocked
+  private SimpleTopology topology;
+  @Mocked
+  private Runtime mockedRuntime;
+
+  @Test
+  public void testSubmitTopology() {
+    new NonStrictExpectations() {
+      {
+        Runtime.getRuntime();
+        result = mockedRuntime;
+        mockedRuntime.exit(0);
+      }
+    };
+    SimpleEngine.submitTopology(topology);
+    new Verifications() {
+      {
+        topology.run();
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
 
b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
new file mode 100644
index 0000000..41d1f46
--- /dev/null
+++ 
b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleEntranceProcessingItemTest.java
@@ -0,0 +1,172 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.StrictExpectations;
+import mockit.Tested;
+import mockit.Verifications;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.impl.SimpleEntranceProcessingItem;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+public class SimpleEntranceProcessingItemTest {
+
+  @Tested
+  private SimpleEntranceProcessingItem entrancePi;
+
+  @Mocked
+  private EntranceProcessor entranceProcessor;
+  @Mocked
+  private Stream outputStream, anotherStream;
+  @Mocked
+  private ContentEvent event;
+
+  @Mocked
+  private Thread unused;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    entrancePi = new SimpleEntranceProcessingItem(entranceProcessor);
+  }
+
+  @Test
+  public void testContructor() {
+    assertSame("EntranceProcessor is not set correctly.", entranceProcessor, 
entrancePi.getProcessor());
+  }
+
+  @Test
+  public void testSetOutputStream() {
+    entrancePi.setOutputStream(outputStream);
+    assertSame("OutputStream is not set correctly.", outputStream, 
entrancePi.getOutputStream());
+  }
+
+  @Test
+  public void testSetOutputStreamRepeate() {
+    entrancePi.setOutputStream(outputStream);
+    entrancePi.setOutputStream(outputStream);
+    assertSame("OutputStream is not set correctly.", outputStream, 
entrancePi.getOutputStream());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetOutputStreamError() {
+    entrancePi.setOutputStream(outputStream);
+    entrancePi.setOutputStream(anotherStream);
+  }
+
+  @Test
+  public void testInjectNextEventSuccess() {
+    entrancePi.setOutputStream(outputStream);
+    new StrictExpectations() {
+      {
+        entranceProcessor.hasNext();
+        result = true;
+
+        entranceProcessor.nextEvent();
+        result = event;
+      }
+    };
+    entrancePi.injectNextEvent();
+    new Verifications() {
+      {
+        outputStream.put(event);
+      }
+    };
+  }
+
+  @Test
+  public void testStartSendingEvents() {
+    entrancePi.setOutputStream(outputStream);
+    new StrictExpectations() {
+      {
+        for (int i = 0; i < 1; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = false;
+        }
+
+        for (int i = 0; i < 5; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = true;
+          entranceProcessor.nextEvent();
+          result = event;
+          outputStream.put(event);
+        }
+
+        for (int i = 0; i < 2; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = false;
+        }
+
+        for (int i = 0; i < 5; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = true;
+          entranceProcessor.nextEvent();
+          result = event;
+          outputStream.put(event);
+        }
+
+        entranceProcessor.isFinished();
+        result = true;
+        times = 1;
+        entranceProcessor.hasNext();
+        times = 0;
+      }
+    };
+    entrancePi.startSendingEvents();
+    new Verifications() {
+      {
+        try {
+          Thread.sleep(anyInt);
+          times = 3;
+        } catch (InterruptedException e) {
+
+        }
+      }
+    };
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testStartSendingEventsError() {
+    entrancePi.startSendingEvents();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleProcessingItemTest.java
 
b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleProcessingItemTest.java
new file mode 100644
index 0000000..42602ec
--- /dev/null
+++ 
b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleProcessingItemTest.java
@@ -0,0 +1,125 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import mockit.Tested;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.impl.SimpleProcessingItem;
+import org.apache.samoa.topology.impl.SimpleStream;
+import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.samoa.utils.StreamDestination;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+public class SimpleProcessingItemTest {
+
+  @Tested
+  private SimpleProcessingItem pi;
+
+  @Mocked
+  private Processor processor;
+  @Mocked
+  private SimpleStream stream;
+  @Mocked
+  private StreamDestination destination;
+  @Mocked
+  private ContentEvent event;
+
+  private final int parallelism = 4;
+  private final int counter = 2;
+
+  @Before
+  public void setUp() throws Exception {
+    pi = new SimpleProcessingItem(processor, parallelism);
+  }
+
+  @Test
+  public void testConstructor() {
+    assertSame("Processor was not set correctly.", processor, 
pi.getProcessor());
+    assertEquals("Parallelism was not set correctly.", parallelism, 
pi.getParallelism(), 0);
+  }
+
+  @Test
+  public void testConnectInputShuffleStream() {
+    new Expectations() {
+      {
+        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.SHUFFLE);
+        stream.addDestination(destination);
+      }
+    };
+    pi.connectInputShuffleStream(stream);
+  }
+
+  @Test
+  public void testConnectInputKeyStream() {
+    new Expectations() {
+      {
+        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.GROUP_BY_KEY);
+        stream.addDestination(destination);
+      }
+    };
+    pi.connectInputKeyStream(stream);
+  }
+
+  @Test
+  public void testConnectInputAllStream() {
+    new Expectations() {
+      {
+        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.BROADCAST);
+        stream.addDestination(destination);
+      }
+    };
+    pi.connectInputAllStream(stream);
+  }
+
+  @Test
+  public void testProcessEvent() {
+    new Expectations() {
+      {
+        for (int i = 0; i < parallelism; i++) {
+          processor.newProcessor(processor);
+          result = processor;
+
+          processor.onCreate(anyInt);
+        }
+
+        processor.process(event);
+      }
+    };
+    pi.processEvent(event, counter);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleStreamTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleStreamTest.java
 
b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleStreamTest.java
new file mode 100644
index 0000000..51e25bb
--- /dev/null
+++ 
b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleStreamTest.java
@@ -0,0 +1,122 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import mockit.Tested;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.topology.impl.SimpleProcessingItem;
+import org.apache.samoa.topology.impl.SimpleStream;
+import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.samoa.utils.StreamDestination;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+@RunWith(Parameterized.class)
+public class SimpleStreamTest {
+
+  @Tested
+  private SimpleStream stream;
+
+  @Mocked
+  private SimpleProcessingItem sourcePi, destPi;
+  @Mocked
+  private ContentEvent event;
+  @Mocked
+  private StreamDestination destination;
+
+  private final String eventKey = "eventkey";
+  private final int parallelism;
+  private final PartitioningScheme scheme;
+
+  @Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        { 2, PartitioningScheme.SHUFFLE },
+        { 3, PartitioningScheme.GROUP_BY_KEY },
+        { 4, PartitioningScheme.BROADCAST }
+    });
+  }
+
+  public SimpleStreamTest(int parallelism, PartitioningScheme scheme) {
+    this.parallelism = parallelism;
+    this.scheme = scheme;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    stream = new SimpleStream(sourcePi);
+    stream.addDestination(destination);
+  }
+
+  @Test
+  public void testPut() {
+    new NonStrictExpectations() {
+      {
+        event.getKey();
+        result = eventKey;
+        destination.getProcessingItem();
+        result = destPi;
+        destination.getPartitioningScheme();
+        result = scheme;
+        destination.getParallelism();
+        result = parallelism;
+
+      }
+    };
+    switch (this.scheme) {
+    case SHUFFLE:
+    case GROUP_BY_KEY:
+      new Expectations() {
+        {
+          // TODO: restrict the range of counter value
+          destPi.processEvent(event, anyInt);
+          times = 1;
+        }
+      };
+      break;
+    case BROADCAST:
+      new Expectations() {
+        {
+          // TODO: restrict the range of counter value
+          destPi.processEvent(event, anyInt);
+          times = parallelism;
+        }
+      };
+      break;
+    }
+    stream.put(event);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleTopologyTest.java
 
b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleTopologyTest.java
new file mode 100644
index 0000000..6d8d728
--- /dev/null
+++ 
b/samoa-local/src/test/java/org/apache/samoa/topology/impl/SimpleTopologyTest.java
@@ -0,0 +1,97 @@
+package org.apache.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.Set;
+
+import mockit.NonStrictExpectations;
+import mockit.Expectations;
+import mockit.Mocked;
+import mockit.Tested;
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.impl.SimpleEntranceProcessingItem;
+import org.apache.samoa.topology.impl.SimpleTopology;
+import org.junit.Before;
+import org.junit.Test;
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+public class SimpleTopologyTest {
+
+  @Tested
+  private SimpleTopology topology;
+
+  @Mocked
+  private SimpleEntranceProcessingItem entrancePi;
+  @Mocked
+  private EntranceProcessor entranceProcessor;
+
+  @Before
+  public void setUp() throws Exception {
+    topology = new SimpleTopology("TestTopology");
+  }
+
+  @Test
+  public void testAddEntrancePi() {
+    topology.addEntranceProcessingItem(entrancePi);
+
+    Set<EntranceProcessingItem> entrancePIs = 
topology.getEntranceProcessingItems();
+    assertNotNull("Set of entrance PIs is null.", entrancePIs);
+    assertEquals("Number of entrance PI in SimpleTopology must be 1", 1, 
entrancePIs.size());
+    assertSame("Entrance PI was not set correctly.", entrancePi, 
entrancePIs.toArray()[0]);
+    // TODO: verify that entrance PI is in the set of ProcessingItems
+    // Need to access topology's set of PIs (getProcessingItems() method)
+  }
+
+  @Test
+  public void testRun() {
+    topology.addEntranceProcessingItem(entrancePi);
+
+    new NonStrictExpectations() {
+      {
+        entrancePi.getProcessor();
+        result = entranceProcessor;
+
+      }
+    };
+
+    new Expectations() {
+      {
+        entranceProcessor.onCreate(anyInt);
+        entrancePi.startSendingEvents();
+      }
+    };
+    topology.run();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testRunWithoutEntrancePI() {
+    topology.run();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-s4/pom.xml b/samoa-s4/pom.xml
index 8135be9..0f9b804 100644
--- a/samoa-s4/pom.xml
+++ b/samoa-s4/pom.xml
@@ -31,14 +31,14 @@
 
   <artifactId>samoa-s4</artifactId>
   <parent>
-    <groupId>com.yahoo.labs.samoa</groupId>
+    <groupId>org.apache.samoa</groupId>
     <artifactId>samoa</artifactId>
     <version>0.3.0-SNAPSHOT</version>
   </parent>
 
   <dependencies>
     <dependency>
-      <groupId>com.yahoo.labs.samoa</groupId>
+      <groupId>org.apache.samoa</groupId>
       <artifactId>samoa-api</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -117,7 +117,7 @@
               
<Implementation-Version>${project.version}</Implementation-Version>
               <Implementation-Vendor>Yahoo Labs</Implementation-Vendor>
               <Implementation-Vendor-Id>SAMOA</Implementation-Vendor-Id>
-              
<S4-App-Class>com.yahoo.labs.samoa.topology.impl.S4DoTask</S4-App-Class>
+              
<S4-App-Class>org.apache.samoa.topology.impl.S4DoTask</S4-App-Class>
               <S4-Version>${s4.version}</S4-Version>
             </manifestEntries>
           </archive>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/samoa-s4-adapter/pom.xml
----------------------------------------------------------------------
diff --git a/samoa-s4/samoa-s4-adapter/pom.xml 
b/samoa-s4/samoa-s4-adapter/pom.xml
index 27c8d51..5a66a1e 100644
--- a/samoa-s4/samoa-s4-adapter/pom.xml
+++ b/samoa-s4/samoa-s4-adapter/pom.xml
@@ -37,22 +37,16 @@
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   <modelVersion>4.0.0</modelVersion>
 
-  <!-- PARENT MODULE SAMOA-S4 <parent> 
<groupId>com.yahoo.labs.bcn.samoa</groupId> <artifactId>samoa-s4</artifactId> 
<version>0.1</version> 
-    </parent> -->
-
-  <!-- SAMOA-S4-ADAPTER MODUEL -->
   <artifactId>samoa-s4-adapter</artifactId>
-  <groupId>com.yahoo.labs.bcn.samoa</groupId>
+  <groupId>org.apache.samoa</groupId>
   <version>0.1</version>
   <name>samoa-s4-adapter</name>
   <description>Adapter module to connect to external stream and also to 
provide entrance processing items for SAMOA</description>
 
   <dependencies>
-    <!-- dependency> <artifactId>samoa-api</artifactId> 
<groupId>com.yahoo.labs.bcn.samoa</groupId> <version>0.1</version> 
-      </dependency> -->
     <dependency>
       <artifactId>samoa-s4</artifactId>
-      <groupId>com.yahoo.labs.bcn.samoa</groupId>
+      <groupId>org.apache.samoa</groupId>
       <version>0.1</version>
     </dependency>
   </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
deleted file mode 100644
index 24602a4..0000000
--- 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4ComponentFactory.java
+++ /dev/null
@@ -1,98 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.topology.ComponentFactory;
-import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-import com.yahoo.labs.samoa.topology.ProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.Topology;
-
-/**
- * S4 Platform Component Factory
- * 
- * @author severien
- * 
- */
-public class S4ComponentFactory implements ComponentFactory {
-
-  public static final Logger logger = 
LoggerFactory.getLogger(S4ComponentFactory.class);
-  protected S4DoTask app;
-
-  @Override
-  public ProcessingItem createPi(Processor processor, int paralellism) {
-    S4ProcessingItem processingItem = new S4ProcessingItem(app);
-    // TODO refactor how to set the paralellism level
-    processingItem.setParalellismLevel(paralellism);
-    processingItem.setProcessor(processor);
-
-    return processingItem;
-  }
-
-  @Override
-  public ProcessingItem createPi(Processor processor) {
-    return this.createPi(processor, 1);
-  }
-
-  @Override
-  public EntranceProcessingItem createEntrancePi(EntranceProcessor 
entranceProcessor) {
-    // TODO Create source Entry processing item that connects to an external
-    // stream
-    S4EntranceProcessingItem entrancePi = new 
S4EntranceProcessingItem(entranceProcessor, app);
-    entrancePi.setParallelism(1); // FIXME should not be set to 1 statically
-    return entrancePi;
-  }
-
-  @Override
-  public Stream createStream(IProcessingItem sourcePi) {
-    S4Stream aStream = new S4Stream(app);
-    return aStream;
-  }
-
-  @Override
-  public Topology createTopology(String topoName) {
-    return new S4Topology(topoName);
-  }
-
-  /**
-   * Initialization method.
-   * 
-   * @param evalTask
-   */
-  public void init(String evalTask) {
-    // Task is initiated in the DoTaskApp
-  }
-
-  /**
-   * Sets S4 application.
-   * 
-   * @param app
-   */
-  public void setApp(S4DoTask app) {
-    this.app = app;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java 
b/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
deleted file mode 100644
index cc4b18d..0000000
--- a/samoa-s4/src/main/java/com/yahoo/labs/samoa/topology/impl/S4DoTask.java
+++ /dev/null
@@ -1,268 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-/**
- * License
- */
-
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.s4.base.Event;
-import org.apache.s4.base.KeyFinder;
-import org.apache.s4.core.App;
-import org.apache.s4.core.ProcessingElement;
-import org.apache.s4.core.Stream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.github.javacliparser.Option;
-import com.github.javacliparser.ClassOption;
-import com.yahoo.labs.samoa.core.Globals;
-import com.yahoo.labs.samoa.tasks.Task;
-import com.yahoo.labs.samoa.topology.ComponentFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-
-/*
- * S4 App that runs samoa Tasks
- *
- * */
-
-/**
- * The Class DoTaskApp.
- */
-final public class S4DoTask extends App {
-
-  private final Logger logger = LoggerFactory.getLogger(S4DoTask.class);
-  Task task;
-
-  @Inject
-  @Named("evalTask")
-  public String evalTask;
-
-  public S4DoTask() {
-    super();
-  }
-
-  /** The engine. */
-  protected ComponentFactory componentFactory;
-
-  /**
-   * Gets the factory.
-   * 
-   * @return the factory
-   */
-  public ComponentFactory getFactory() {
-    return componentFactory;
-  }
-
-  /**
-   * Sets the factory.
-   * 
-   * @param factory
-   *          the new factory
-   */
-  public void setFactory(ComponentFactory factory) {
-    this.componentFactory = factory;
-  }
-
-  /*
-   * Build the application
-   * 
-   * @see org.apache.s4.core.App#onInit()
-   */
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.s4.core.App#onInit()
-   */
-  @Override
-  protected void onInit() {
-    logger.info("DoTaskApp onInit");
-    // ConsoleReporters prints S4 metrics
-    // MetricsRegistry mr = new MetricsRegistry();
-    //
-    // CsvReporter.enable(new File(System.getProperty("user.home")
-    // + "/monitor/"), 10, TimeUnit.SECONDS);
-    // ConsoleReporter.enable(10, TimeUnit.SECONDS);
-    try {
-      System.err.println();
-      System.err.println(Globals.getWorkbenchInfoString());
-      System.err.println();
-
-    } catch (Exception ex) {
-      ex.printStackTrace();
-    }
-    S4ComponentFactory factory = new S4ComponentFactory();
-    factory.setApp(this);
-
-    // logger.debug("LC {}", lc);
-
-    // task = TaskProvider.getTask(evalTask);
-
-    // EXAMPLE OPTIONS
-    // -l Clustream -g Clustream -i 100000 -s (RandomRBFGeneratorEvents -K
-    // 5 -N 0.0)
-    // String[] args = new String[] {evalTask,"-l", "Clustream","-g",
-    // "Clustream", "-i", "100000", "-s", "(RamdomRBFGeneratorsEvents",
-    // "-K", "5", "-N", "0.0)"};
-    // String[] args = new String[] { evalTask, "-l", "clustream.Clustream",
-    // "-g", "clustream.Clustream", "-i", "100000", "-s",
-    // "(RandomRBFGeneratorEvents", "-K", "5", "-N", "0.0)" };
-    logger.debug("PARAMETERS {}", evalTask);
-    // params = params.replace(":", " ");
-    List<String> parameters = new ArrayList<String>();
-    // parameters.add(evalTask);
-    try {
-      parameters.addAll(Arrays.asList(URLDecoder.decode(evalTask, 
"UTF-8").split(" ")));
-    } catch (UnsupportedEncodingException ex) {
-      ex.printStackTrace();
-    }
-    String[] args = parameters.toArray(new String[0]);
-    Option[] extraOptions = new Option[] {};
-    // build a single string by concatenating cli options
-    StringBuilder cliString = new StringBuilder();
-    for (int i = 0; i < args.length; i++) {
-      cliString.append(" ").append(args[i]);
-    }
-
-    // parse options
-    try {
-      task = (Task) ClassOption.cliStringToObject(cliString.toString(), 
Task.class, extraOptions);
-      task.setFactory(factory);
-      task.init();
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.s4.core.App#onStart()
-   */
-  @Override
-  protected void onStart() {
-    logger.info("Starting DoTaskApp... App Partition [{}]", 
this.getPartitionId());
-    // <<<<<<< HEAD Task doesn't have start in latest storm-impl
-    // TODO change the way the app starts
-    // if (this.getPartitionId() == 0)
-    S4Topology s4topology = (S4Topology) getTask().getTopology();
-    S4EntranceProcessingItem epi = (S4EntranceProcessingItem) 
s4topology.getEntranceProcessingItem();
-    while (epi.injectNextEvent())
-      // inject events from the EntrancePI
-      ;
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.s4.core.App#onClose()
-   */
-  @Override
-  protected void onClose() {
-    System.out.println("Closing DoTaskApp...");
-
-  }
-
-  /**
-   * Gets the task.
-   * 
-   * @return the task
-   */
-  public Task getTask() {
-    return task;
-  }
-
-  // These methods are protected in App and can not be accessed from outside.
-  // They are
-  // called from parallel classifiers and evaluations. Is there a better way
-  // to do that?
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.s4.core.App#createPE(java.lang.Class)
-   */
-  @Override
-  public <T extends ProcessingElement> T createPE(Class<T> type) {
-    return super.createPE(type);
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.s4.core.App#createStream(java.lang.String,
-   * org.apache.s4.base.KeyFinder, org.apache.s4.core.ProcessingElement[])
-   */
-  @Override
-  public <T extends Event> Stream<T> createStream(String name, KeyFinder<T> 
finder,
-      ProcessingElement... processingElements) {
-    return super.createStream(name, finder, processingElements);
-  }
-
-  /*
-   * (non-Javadoc)
-   * 
-   * @see org.apache.s4.core.App#createStream(java.lang.String,
-   * org.apache.s4.core.ProcessingElement[])
-   */
-  @Override
-  public <T extends Event> Stream<T> createStream(String name, 
ProcessingElement... processingElements) {
-    return super.createStream(name, processingElements);
-  }
-
-  // @com.beust.jcommander.Parameters(separators = "=")
-  // class Parameters {
-  //
-  // @Parameter(names={"-lc","-local"}, description="Local clustering method")
-  // private String localClustering;
-  //
-  // @Parameter(names={"-gc","-global"},
-  // description="Global clustering method")
-  // private String globalClustering;
-  //
-  // }
-  //
-  // class ParametersConverter {// implements IStringConverter<String[]> {
-  //
-  //
-  // public String[] convertToArgs(String value) {
-  //
-  // String[] params = value.split(",");
-  // String[] args = new String[params.length*2];
-  // for(int i=0; i<params.length ; i++) {
-  // args[i] = params[i].split("=")[0];
-  // args[i+1] = params[i].split("=")[1];
-  // i++;
-  // }
-  // return args;
-  // }
-  //
-  // }
-
-}


Reply via email to