http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java
index d442572..c266c09 100644
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java
@@ -30,71 +30,72 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * Multithreaded engine.
+ * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsEngine {
-       
-       private static final List<ExecutorService> threadPool = new 
ArrayList<ExecutorService>();
-       
-       /*
-        * Create and manage threads
-        */
-       public static void setNumberOfThreads(int numThreads) {
-               if (numThreads < 1)
-                       throw new IllegalStateException("Number of threads must 
be a positive integer.");
-               
-               if (threadPool.size() > numThreads)
-                       throw new IllegalStateException("You cannot set a 
numThreads smaller than the current size of the threads pool.");
-               
-               if (threadPool.size() < numThreads) {
-                       for (int i=threadPool.size(); i<numThreads; i++) {
-                               
threadPool.add(Executors.newSingleThreadExecutor());
-                       }
-               }
-       }
-       
-       public static int getNumberOfThreads() {
-               return threadPool.size();
-       }
-       
-       public static ExecutorService getThreadWithIndex(int index) {
-               if (threadPool.size() <= 0 )
-                       throw new IllegalStateException("Try to get 
ExecutorService from an empty pool.");
-               index %= threadPool.size();
-               return threadPool.get(index);
-       }
-       
-       /*
-        * Submit topology and start
-        */
-       private static void submitTopology(Topology topology) {
-               ThreadsTopology tTopology = (ThreadsTopology) topology;
-               tTopology.run();
-       }
-       
-       public static void submitTopology(Topology topology, int numThreads) {
-               ThreadsEngine.setNumberOfThreads(numThreads);
-               ThreadsEngine.submitTopology(topology);
-       }
-       
-       /* 
-        * Stop
-        */
-       public static void clearThreadPool() {
-               for (ExecutorService pool:threadPool) {
-                       pool.shutdown();
-               }
-               
-               for (ExecutorService pool:threadPool) {
-                       try {
-                               pool.awaitTermination(10, TimeUnit.SECONDS);
-                       } catch (InterruptedException e) {
-                               e.printStackTrace();
-                       }
-               }
-               
-               threadPool.clear();
-       }
+
+  private static final List<ExecutorService> threadPool = new 
ArrayList<ExecutorService>();
+
+  /*
+   * Create and manage threads
+   */
+  public static void setNumberOfThreads(int numThreads) {
+    if (numThreads < 1)
+      throw new IllegalStateException("Number of threads must be a positive 
integer.");
+
+    if (threadPool.size() > numThreads)
+      throw new IllegalStateException("You cannot set a numThreads smaller 
than the current size of the threads pool.");
+
+    if (threadPool.size() < numThreads) {
+      for (int i = threadPool.size(); i < numThreads; i++) {
+        threadPool.add(Executors.newSingleThreadExecutor());
+      }
+    }
+  }
+
+  public static int getNumberOfThreads() {
+    return threadPool.size();
+  }
+
+  public static ExecutorService getThreadWithIndex(int index) {
+    if (threadPool.size() <= 0)
+      throw new IllegalStateException("Try to get ExecutorService from an 
empty pool.");
+    index %= threadPool.size();
+    return threadPool.get(index);
+  }
+
+  /*
+   * Submit topology and start
+   */
+  private static void submitTopology(Topology topology) {
+    ThreadsTopology tTopology = (ThreadsTopology) topology;
+    tTopology.run();
+  }
+
+  public static void submitTopology(Topology topology, int numThreads) {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    ThreadsEngine.submitTopology(topology);
+  }
+
+  /*
+   * Stop
+   */
+  public static void clearThreadPool() {
+    for (ExecutorService pool : threadPool) {
+      pool.shutdown();
+    }
+
+    for (ExecutorService pool : threadPool) {
+      try {
+        pool.awaitTermination(10, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    threadPool.clear();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java
index 008efb6..470c164 100644
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java
@@ -25,16 +25,17 @@ import 
com.yahoo.labs.samoa.topology.LocalEntranceProcessingItem;
 
 /**
  * EntranceProcessingItem for multithreaded engine.
+ * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsEntranceProcessingItem extends LocalEntranceProcessingItem 
{
-       
-       public ThreadsEntranceProcessingItem(EntranceProcessor processor) {
-        super(processor);
-    }
-    
-    // The default waiting time when there is no available events is 100ms
-    // Override waitForNewEvents() to change it
+
+  public ThreadsEntranceProcessingItem(EntranceProcessor processor) {
+    super(processor);
+  }
+
+  // The default waiting time when there is no available events is 100ms
+  // Override waitForNewEvents() to change it
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java
index 7cb8c18..4dd33db 100644
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java
@@ -23,39 +23,41 @@ package com.yahoo.labs.samoa.topology.impl;
 import com.yahoo.labs.samoa.core.ContentEvent;
 
 /**
- * Runnable class where each object corresponds to a ContentEvent and an 
assigned PI. 
- * When a PI receives a ContentEvent, it will create a ThreadsEventRunnable 
with the received ContentEvent
- * and an assigned workerPI. This runnable is then submitted to a thread queue 
waiting to be executed.
- * The worker PI will process the received event when the runnable object is 
executed/run.
+ * Runnable class where each object corresponds to a ContentEvent and an
+ * assigned PI. When a PI receives a ContentEvent, it will create a
+ * ThreadsEventRunnable with the received ContentEvent and an assigned 
workerPI.
+ * This runnable is then submitted to a thread queue waiting to be executed. 
The
+ * worker PI will process the received event when the runnable object is
+ * executed/run.
+ * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsEventRunnable implements Runnable {
 
-       private ThreadsProcessingItemInstance workerPi;
-       private ContentEvent event;
-       
-       public ThreadsEventRunnable(ThreadsProcessingItemInstance workerPi, 
ContentEvent event) {
-               this.workerPi = workerPi;
-               this.event = event;
-       }
-       
-       public ThreadsProcessingItemInstance getWorkerProcessingItem() {
-               return this.workerPi;
-       }
-       
-       public ContentEvent getContentEvent() {
-               return this.event;
-       }
-       
-       @Override
-       public void run() {
-               try {
-                       workerPi.processEvent(event);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-               }
-       }
+  private ThreadsProcessingItemInstance workerPi;
+  private ContentEvent event;
+
+  public ThreadsEventRunnable(ThreadsProcessingItemInstance workerPi, 
ContentEvent event) {
+    this.workerPi = workerPi;
+    this.event = event;
+  }
+
+  public ThreadsProcessingItemInstance getWorkerProcessingItem() {
+    return this.workerPi;
+  }
+
+  public ContentEvent getContentEvent() {
+    return this.event;
+  }
+
+  @Override
+  public void run() {
+    try {
+      workerPi.processEvent(event);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java
index 5eb6174..1b83a05 100644
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java
@@ -33,69 +33,71 @@ import com.yahoo.labs.samoa.utils.StreamDestination;
 
 /**
  * ProcessingItem for multithreaded engine.
+ * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsProcessingItem extends AbstractProcessingItem {
-       // Replicas of the ProcessingItem.
-       // When ProcessingItem receives an event, it assigns one
-       // of these replicas to process the event.
-       private List<ThreadsProcessingItemInstance> piInstances;
-       
-       // Each replica of ProcessingItem is assigned to one of the
-       // available threads in a round-robin fashion, i.e.: each 
-       // replica is associated with the index of a thread. 
-       // Each ProcessingItem has a random offset variable so that
-       // the allocation of PI replicas to threads are spread evenly
-       // among all threads.
-       private int offset;
-       
-       /*
-        * Constructor
-        */
-       public ThreadsProcessingItem(Processor processor, int parallelismHint) {
-               super(processor, parallelismHint);
-               this.offset = (int) 
(Math.random()*ThreadsEngine.getNumberOfThreads());
-       }
-       
-       public List<ThreadsProcessingItemInstance> getProcessingItemInstances() 
{
-               return this.piInstances;
-       }
+  // Replicas of the ProcessingItem.
+  // When ProcessingItem receives an event, it assigns one
+  // of these replicas to process the event.
+  private List<ThreadsProcessingItemInstance> piInstances;
+
+  // Each replica of ProcessingItem is assigned to one of the
+  // available threads in a round-robin fashion, i.e.: each
+  // replica is associated with the index of a thread.
+  // Each ProcessingItem has a random offset variable so that
+  // the allocation of PI replicas to threads are spread evenly
+  // among all threads.
+  private int offset;
+
+  /*
+   * Constructor
+   */
+  public ThreadsProcessingItem(Processor processor, int parallelismHint) {
+    super(processor, parallelismHint);
+    this.offset = (int) (Math.random() * ThreadsEngine.getNumberOfThreads());
+  }
+
+  public List<ThreadsProcessingItemInstance> getProcessingItemInstances() {
+    return this.piInstances;
+  }
+
+  /*
+   * Connects to streams
+   */
+  @Override
+  protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
+    StreamDestination destination = new StreamDestination(this, 
this.getParallelism(), scheme);
+    ((ThreadsStream) inputStream).addDestination(destination);
+    return this;
+  }
+
+  /*
+   * Process the received event.
+   */
+  public void processEvent(ContentEvent event, int counter) {
+    if (this.piInstances == null || this.piInstances.size() < 
this.getParallelism())
+      throw new IllegalStateException(
+          "ThreadsWorkerProcessingItem(s) need to be setup before process any 
event (i.e. in ThreadsTopology.start()).");
 
-       /*
-        * Connects to streams
-        */
-       @Override
-    protected ProcessingItem addInputStream(Stream inputStream, 
PartitioningScheme scheme) {
-               StreamDestination destination = new StreamDestination(this, 
this.getParallelism(), scheme);
-               ((ThreadsStream) inputStream).addDestination(destination);
-               return this;
-       }
+    ThreadsProcessingItemInstance piInstance = this.piInstances.get(counter);
+    ThreadsEventRunnable runnable = new ThreadsEventRunnable(piInstance, 
event);
+    
ThreadsEngine.getThreadWithIndex(piInstance.getThreadIndex()).submit(runnable);
+  }
 
-       /*
-        * Process the received event.
-        */
-       public void processEvent(ContentEvent event, int counter) {
-               if (this.piInstances == null || this.piInstances.size() < 
this.getParallelism())
-                       throw new 
IllegalStateException("ThreadsWorkerProcessingItem(s) need to be setup before 
process any event (i.e. in ThreadsTopology.start()).");
-               
-               ThreadsProcessingItemInstance piInstance = 
this.piInstances.get(counter);
-               ThreadsEventRunnable runnable = new 
ThreadsEventRunnable(piInstance, event);
-               
ThreadsEngine.getThreadWithIndex(piInstance.getThreadIndex()).submit(runnable);
-       }
-       
-       /*
-        * Setup the replicas of this PI. 
-        * This should be called after the topology is set up (all Processors 
and PIs are
-        * setup and connected to the respective streams) and before events are 
sent.
-        */
-       public void setupInstances() {
-               this.piInstances = new 
ArrayList<ThreadsProcessingItemInstance>(this.getParallelism());
-               for (int i=0; i<this.getParallelism(); i++) {
-                       Processor newProcessor = 
this.getProcessor().newProcessor(this.getProcessor());
-                       newProcessor.onCreate(i + 1);
-                       this.piInstances.add(new 
ThreadsProcessingItemInstance(newProcessor, this.offset + i));
-               }
-       }
+  /*
+   * Setup the replicas of this PI. This should be called after the topology is
+   * set up (all Processors and PIs are setup and connected to the respective
+   * streams) and before events are sent.
+   */
+  public void setupInstances() {
+    this.piInstances = new 
ArrayList<ThreadsProcessingItemInstance>(this.getParallelism());
+    for (int i = 0; i < this.getParallelism(); i++) {
+      Processor newProcessor = 
this.getProcessor().newProcessor(this.getProcessor());
+      newProcessor.onCreate(i + 1);
+      this.piInstances.add(new ThreadsProcessingItemInstance(newProcessor, 
this.offset + i));
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java
index 9a400d1..73052ea 100644
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java
@@ -24,31 +24,32 @@ import com.yahoo.labs.samoa.core.ContentEvent;
 import com.yahoo.labs.samoa.core.Processor;
 
 /**
- * Lightweight replicas of ThreadProcessingItem.
- * ThreadsProcessingItem manages a list of these objects and 
- * assigns each incoming message to be processed by one of them. 
+ * Lightweight replicas of ThreadProcessingItem. ThreadsProcessingItem manages 
a
+ * list of these objects and assigns each incoming message to be processed by
+ * one of them.
+ * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsProcessingItemInstance {
 
-       private Processor processor;
-       private int threadIndex;
-       
-       public ThreadsProcessingItemInstance(Processor processor, int 
threadIndex) {
-               this.processor = processor;
-               this.threadIndex = threadIndex;
-       }
-       
-       public int getThreadIndex() {
-               return this.threadIndex;
-       }
-       
-       public Processor getProcessor() {
-               return this.processor;
-       }
-
-       public void processEvent(ContentEvent event) {
-               this.processor.process(event);
-       }
+  private Processor processor;
+  private int threadIndex;
+
+  public ThreadsProcessingItemInstance(Processor processor, int threadIndex) {
+    this.processor = processor;
+    this.threadIndex = threadIndex;
+  }
+
+  public int getThreadIndex() {
+    return this.threadIndex;
+  }
+
+  public Processor getProcessor() {
+    return this.processor;
+  }
+
+  public void processEvent(ContentEvent event) {
+    this.processor.process(event);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java
index 5aa86f7..2c02df7 100644
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java
@@ -32,75 +32,78 @@ import com.yahoo.labs.samoa.utils.StreamDestination;
 
 /**
  * Stream for multithreaded engine.
+ * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsStream extends AbstractStream {
-       
-       private List<StreamDestination> destinations;
-       private int counter = 0;
-       private int maxCounter = 1;
-       
-       public ThreadsStream(IProcessingItem sourcePi) {
-               destinations = new LinkedList<StreamDestination>();
-       }
-       
-       public void addDestination(StreamDestination destination) {
-               destinations.add(destination);
-               maxCounter *= destination.getParallelism();
-       }
-       
-       public List<StreamDestination> getDestinations() {
-               return this.destinations;
-       }
-       
-       private int getNextCounter() {
-       if (maxCounter > 0 && counter >= maxCounter) counter = 0;
-       this.counter++;
-       return this.counter;
-    }
 
-    @Override
-    public synchronized void put(ContentEvent event) {
-       this.put(event, this.getNextCounter());
-    }
-    
-    private void put(ContentEvent event, int counter) {
-       ThreadsProcessingItem pi;
-        int parallelism;
-        for (StreamDestination destination:destinations) {
-            pi = (ThreadsProcessingItem) destination.getProcessingItem();
-            parallelism = destination.getParallelism();
-            switch (destination.getPartitioningScheme()) {
-            case SHUFFLE:
-               pi.processEvent(event, counter%parallelism);
-                break;
-            case GROUP_BY_KEY:
-               pi.processEvent(event, getPIIndexForKey(event.getKey(), 
parallelism));
-                break;
-            case BROADCAST:
-               for (int p = 0; p < parallelism; p++) {
-                    pi.processEvent(event, p);
-                }
-                break;
-            }
+  private List<StreamDestination> destinations;
+  private int counter = 0;
+  private int maxCounter = 1;
+
+  public ThreadsStream(IProcessingItem sourcePi) {
+    destinations = new LinkedList<StreamDestination>();
+  }
+
+  public void addDestination(StreamDestination destination) {
+    destinations.add(destination);
+    maxCounter *= destination.getParallelism();
+  }
+
+  public List<StreamDestination> getDestinations() {
+    return this.destinations;
+  }
+
+  private int getNextCounter() {
+    if (maxCounter > 0 && counter >= maxCounter)
+      counter = 0;
+    this.counter++;
+    return this.counter;
+  }
+
+  @Override
+  public synchronized void put(ContentEvent event) {
+    this.put(event, this.getNextCounter());
+  }
+
+  private void put(ContentEvent event, int counter) {
+    ThreadsProcessingItem pi;
+    int parallelism;
+    for (StreamDestination destination : destinations) {
+      pi = (ThreadsProcessingItem) destination.getProcessingItem();
+      parallelism = destination.getParallelism();
+      switch (destination.getPartitioningScheme()) {
+      case SHUFFLE:
+        pi.processEvent(event, counter % parallelism);
+        break;
+      case GROUP_BY_KEY:
+        pi.processEvent(event, getPIIndexForKey(event.getKey(), parallelism));
+        break;
+      case BROADCAST:
+        for (int p = 0; p < parallelism; p++) {
+          pi.processEvent(event, p);
         }
+        break;
+      }
+    }
+  }
+
+  private static int getPIIndexForKey(String key, int parallelism) {
+    // If key is null, return a default index: 0
+    if (key == null)
+      return 0;
+
+    // HashCodeBuilder object does not have reset() method
+    // So all objects that get appended will be included in the
+    // computation of the hashcode.
+    // To avoid initialize a HashCodeBuilder for each event,
+    // here I use the static method with reflection on the event's key
+    int index = HashCodeBuilder.reflectionHashCode(key, true) % parallelism;
+    if (index < 0) {
+      index += parallelism;
     }
-       
-       private static int getPIIndexForKey(String key, int parallelism) {
-               // If key is null, return a default index: 0
-               if (key == null) return 0;
-               
-               // HashCodeBuilder object does not have reset() method
-       // So all objects that get appended will be included in the 
-       // computation of the hashcode. 
-       // To avoid initialize a HashCodeBuilder for each event,
-       // here I use the static method with reflection on the event's key
-               int index = HashCodeBuilder.reflectionHashCode(key, true) % 
parallelism;
-               if (index < 0) {
-                       index += parallelism;
-               }
-               return index;
-       }
+    return index;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java
 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java
index fc9f885..a6bad2b 100644
--- 
a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java
+++ 
b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java
@@ -25,38 +25,42 @@ import com.yahoo.labs.samoa.topology.IProcessingItem;
 
 /**
  * Topology for multithreaded engine.
+ * 
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsTopology extends AbstractTopology {
-       ThreadsTopology(String name) {
-               super(name);
-       }
+  ThreadsTopology(String name) {
+    super(name);
+  }
+
+  public void run() {
+    if (this.getEntranceProcessingItems() == null)
+      throw new IllegalStateException("You need to set entrance PI before 
running the topology.");
+    if (this.getEntranceProcessingItems().size() != 1)
+      throw new IllegalStateException("ThreadsTopology supports 1 entrance PI 
only. Number of entrance PIs is "
+          + this.getEntranceProcessingItems().size());
+
+    this.setupProcessingItemInstances();
+    ThreadsEntranceProcessingItem entrancePi = (ThreadsEntranceProcessingItem) 
this.getEntranceProcessingItems()
+        .toArray()[0];
+    if (entrancePi == null)
+      throw new IllegalStateException("You need to set entrance PI before 
running the topology.");
+    entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple
+                                           // mode
+    entrancePi.startSendingEvents();
+  }
 
-       public void run() {
-       if (this.getEntranceProcessingItems() == null)
-               throw new IllegalStateException("You need to set entrance PI 
before running the topology.");
-       if (this.getEntranceProcessingItems().size() != 1)
-               throw new IllegalStateException("ThreadsTopology supports 1 
entrance PI only. Number of entrance PIs is 
"+this.getEntranceProcessingItems().size());
-       
-       this.setupProcessingItemInstances();
-       ThreadsEntranceProcessingItem entrancePi = 
(ThreadsEntranceProcessingItem) this.getEntranceProcessingItems().toArray()[0];
-       if (entrancePi == null)
-            throw new IllegalStateException("You need to set entrance PI 
before running the topology.");
-       entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in 
simple mode
-        entrancePi.startSendingEvents();
+  /*
+   * Tell all the ThreadsProcessingItems to create & init their replicas
+   * (ThreadsProcessingItemInstance)
+   */
+  private void setupProcessingItemInstances() {
+    for (IProcessingItem pi : this.getProcessingItems()) {
+      if (pi instanceof ThreadsProcessingItem) {
+        ThreadsProcessingItem tpi = (ThreadsProcessingItem) pi;
+        tpi.setupInstances();
+      }
     }
-       
-       /*
-        * Tell all the ThreadsProcessingItems to create & init their
-        * replicas (ThreadsProcessingItemInstance)
-        */
-       private void setupProcessingItemInstances() {
-               for (IProcessingItem pi:this.getProcessingItems()) {
-                       if (pi instanceof ThreadsProcessingItem) {
-                               ThreadsProcessingItem tpi = 
(ThreadsProcessingItem) pi;
-                               tpi.setupInstances();
-                       }
-               }
-       }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
----------------------------------------------------------------------
diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
index 5979d46..0b9b8a2 100644
--- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
+++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java
@@ -24,45 +24,45 @@ import org.junit.Test;
 
 public class AlgosTest {
 
-    @Test(timeout = 60000)
-    public void testVHTWithThreads() throws Exception {
+  @Test(timeout = 60000)
+  public void testVHTWithThreads() throws Exception {
 
-        TestParams vhtConfig = new TestParams.Builder()
-                .inputInstances(200_000)
-                .samplingSize(20_000)
-                .evaluationInstances(200_000)
-                .classifiedInstances(200_000)
-                .classificationsCorrect(55f)
-                .kappaStat(-0.1f)
-                .kappaTempStat(-0.1f)
-                
.cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE + " -t 2")
-                .resultFilePollTimeout(10)
-                .prePollWait(10)
-                .taskClassName(LocalThreadsDoTask.class.getName())
-                .build();
-        TestUtils.test(vhtConfig);
+    TestParams vhtConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(200_000)
+        .classifiedInstances(200_000)
+        .classificationsCorrect(55f)
+        .kappaStat(-0.1f)
+        .kappaTempStat(-0.1f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE + " -t 
2")
+        .resultFilePollTimeout(10)
+        .prePollWait(10)
+        .taskClassName(LocalThreadsDoTask.class.getName())
+        .build();
+    TestUtils.test(vhtConfig);
 
-    }
+  }
 
-    @Test(timeout = 180000)
-    public void testBaggingWithThreads() throws Exception {
-        TestParams baggingConfig = new TestParams.Builder()
-                .inputInstances(100_000)
-                .samplingSize(10_000)
-                .inputDelayMicroSec(100) // prevents saturating the system due 
to unbounded queues
-                .evaluationInstances(90_000)
-                .classifiedInstances(105_000)
-                .classificationsCorrect(55f)
-                .kappaStat(0f)
-                .kappaTempStat(0f)
-                
.cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE + " -t 2")
-                .prePollWait(10)
-                .resultFilePollTimeout(30)
-                .taskClassName(LocalThreadsDoTask.class.getName())
-                .build();
-        TestUtils.test(baggingConfig);
-
-    }
+  @Test(timeout = 180000)
+  public void testBaggingWithThreads() throws Exception {
+    TestParams baggingConfig = new TestParams.Builder()
+        .inputInstances(100_000)
+        .samplingSize(10_000)
+        .inputDelayMicroSec(100) // prevents saturating the system due to
+                                 // unbounded queues
+        .evaluationInstances(90_000)
+        .classifiedInstances(105_000)
+        .classificationsCorrect(55f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE + 
" -t 2")
+        .prePollWait(10)
+        .resultFilePollTimeout(30)
+        .taskClassName(LocalThreadsDoTask.class.getName())
+        .build();
+    TestUtils.test(baggingConfig);
 
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java
index eee8639..12b5b90 100644
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java
@@ -37,78 +37,81 @@ import com.yahoo.labs.samoa.topology.Topology;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsComponentFactoryTest {
-       @Tested private ThreadsComponentFactory factory;
-       @Mocked private Processor processor, processorReplica;
-       @Mocked private EntranceProcessor entranceProcessor;
-       
-       private final int parallelism = 3;
-       private final String topoName = "TestTopology";
-       
+  @Tested
+  private ThreadsComponentFactory factory;
+  @Mocked
+  private Processor processor, processorReplica;
+  @Mocked
+  private EntranceProcessor entranceProcessor;
+
+  private final int parallelism = 3;
+  private final String topoName = "TestTopology";
+
+  @Before
+  public void setUp() throws Exception {
+    factory = new ThreadsComponentFactory();
+  }
+
+  @Test
+  public void testCreatePiNoParallelism() {
+    new NonStrictExpectations() {
+      {
+        processor.newProcessor(processor);
+        result = processorReplica;
+      }
+    };
+    ProcessingItem pi = factory.createPi(processor);
+    assertNotNull("ProcessingItem created is null.", pi);
+    assertEquals("ProcessingItem created is not a ThreadsProcessingItem.", 
ThreadsProcessingItem.class, pi.getClass());
+    assertEquals("Parallelism of PI is not 1", 1, pi.getParallelism(), 0);
+  }
+
+  @Test
+  public void testCreatePiWithParallelism() {
+    new NonStrictExpectations() {
+      {
+        processor.newProcessor(processor);
+        result = processorReplica;
+      }
+    };
+    ProcessingItem pi = factory.createPi(processor, parallelism);
+    assertNotNull("ProcessingItem created is null.", pi);
+    assertEquals("ProcessingItem created is not a ThreadsProcessingItem.", 
ThreadsProcessingItem.class, pi.getClass());
+    assertEquals("Parallelism of PI is not ", parallelism, 
pi.getParallelism(), 0);
+  }
+
+  @Test
+  public void testCreateStream() {
+    new NonStrictExpectations() {
+      {
+        processor.newProcessor(processor);
+        result = processorReplica;
+      }
+    };
+    ProcessingItem pi = factory.createPi(processor);
+
+    Stream stream = factory.createStream(pi);
+    assertNotNull("Stream created is null", stream);
+    assertEquals("Stream created is not a ThreadsStream.", 
ThreadsStream.class, stream.getClass());
+  }
 
-       @Before
-       public void setUp() throws Exception {
-               factory = new ThreadsComponentFactory();
-       }
+  @Test
+  public void testCreateTopology() {
+    Topology topology = factory.createTopology(topoName);
+    assertNotNull("Topology created is null.", topology);
+    assertEquals("Topology created is not a ThreadsTopology.", 
ThreadsTopology.class, topology.getClass());
+  }
 
-       @Test
-       public void testCreatePiNoParallelism() {
-               new NonStrictExpectations() {
-                       {
-                               processor.newProcessor(processor);
-                               result=processorReplica;
-                       }
-               };
-               ProcessingItem pi = factory.createPi(processor);
-               assertNotNull("ProcessingItem created is null.",pi);
-               assertEquals("ProcessingItem created is not a 
ThreadsProcessingItem.",ThreadsProcessingItem.class,pi.getClass());
-               assertEquals("Parallelism of PI is not 
1",1,pi.getParallelism(),0);
-       }
-       
-       @Test
-       public void testCreatePiWithParallelism() {
-               new NonStrictExpectations() {
-                       {
-                               processor.newProcessor(processor);
-                               result=processorReplica;
-                       }
-               };
-               ProcessingItem pi = factory.createPi(processor,parallelism);
-               assertNotNull("ProcessingItem created is null.",pi);
-               assertEquals("ProcessingItem created is not a 
ThreadsProcessingItem.",ThreadsProcessingItem.class,pi.getClass());
-               assertEquals("Parallelism of PI is not 
",parallelism,pi.getParallelism(),0);
-       }
-       
-       @Test
-       public void testCreateStream() {
-               new NonStrictExpectations() {
-                       {
-                               processor.newProcessor(processor);
-                               result=processorReplica;
-                       }
-               };
-               ProcessingItem pi = factory.createPi(processor);
-               
-               Stream stream = factory.createStream(pi);
-               assertNotNull("Stream created is null",stream);
-               assertEquals("Stream created is not a 
ThreadsStream.",ThreadsStream.class,stream.getClass());
-       }
-       
-       @Test
-       public void testCreateTopology() {
-               Topology topology = factory.createTopology(topoName);
-               assertNotNull("Topology created is null.",topology);
-               assertEquals("Topology created is not a 
ThreadsTopology.",ThreadsTopology.class,topology.getClass());
-       }
-       
-       @Test
-       public void testCreateEntrancePi() {
-               EntranceProcessingItem entrancePi = 
factory.createEntrancePi(entranceProcessor);
-               assertNotNull("EntranceProcessingItem created is 
null.",entrancePi);
-               assertEquals("EntranceProcessingItem created is not a 
ThreadsEntranceProcessingItem.",ThreadsEntranceProcessingItem.class,entrancePi.getClass());
-               assertSame("EntranceProcessor is not set 
correctly.",entranceProcessor, entrancePi.getProcessor());
-       }
+  @Test
+  public void testCreateEntrancePi() {
+    EntranceProcessingItem entrancePi = 
factory.createEntrancePi(entranceProcessor);
+    assertNotNull("EntranceProcessingItem created is null.", entrancePi);
+    assertEquals("EntranceProcessingItem created is not a 
ThreadsEntranceProcessingItem.",
+        ThreadsEntranceProcessingItem.class, entrancePi.getClass());
+    assertSame("EntranceProcessor is not set correctly.", entranceProcessor, 
entrancePi.getProcessor());
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java
index cdb8949..c8a3c3d 100644
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java
@@ -29,101 +29,105 @@ import org.junit.Test;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsEngineTest {
 
-       @Mocked ThreadsTopology topology;
-       
-       private final int numThreads = 4;
-       private final int numThreadsSmaller = 3;
-       private final int numThreadsLarger = 5;
-
-       @After
-       public void cleanup() {
-               ThreadsEngine.clearThreadPool();
-       }
-       
-       @Test
-       public void testSetNumberOfThreadsSimple() {
-               ThreadsEngine.setNumberOfThreads(numThreads);
-               assertEquals("Number of threads is not set correctly.", 
numThreads,
-                               ThreadsEngine.getNumberOfThreads(),0);
-       }
-       
-       @Test
-       public void testSetNumberOfThreadsRepeat() {
-               ThreadsEngine.setNumberOfThreads(numThreads);
-               ThreadsEngine.setNumberOfThreads(numThreads);
-               assertEquals("Number of threads is not set correctly.", 
numThreads,
-                               ThreadsEngine.getNumberOfThreads(),0);
-       }
-       
-       @Test
-       public void testSetNumberOfThreadsIncrease() {
-               ThreadsEngine.setNumberOfThreads(numThreads);
-               ThreadsEngine.setNumberOfThreads(numThreadsLarger);
-               assertEquals("Number of threads is not set correctly.", 
numThreadsLarger,
-                               ThreadsEngine.getNumberOfThreads(),0);
-       }
-       
-       @Test(expected=IllegalStateException.class)
-       public void testSetNumberOfThreadsDecrease() {
-               ThreadsEngine.setNumberOfThreads(numThreads);
-               ThreadsEngine.setNumberOfThreads(numThreadsSmaller);
-               // Exception expected
-       }
-       
-       @Test(expected=IllegalStateException.class)
-       public void testSetNumberOfThreadsNegative() {
-               ThreadsEngine.setNumberOfThreads(-1);
-               // Exception expected
-       }
-       
-       @Test(expected=IllegalStateException.class)
-       public void testSetNumberOfThreadsZero() {
-               ThreadsEngine.setNumberOfThreads(0);
-               // Exception expected
-       }
-       
-       @Test
-       public void testClearThreadPool() {
-               ThreadsEngine.setNumberOfThreads(numThreads);
-               ThreadsEngine.clearThreadPool();
-               assertEquals("ThreadsEngine was not shutdown properly.", 0, 
ThreadsEngine.getNumberOfThreads());
-       }
-
-       @Test
-       public void testGetThreadWithIndexWithinPoolSize() {
-               ThreadsEngine.setNumberOfThreads(numThreads);
-               for (int i=0; i<numThreads; i++) {
-                       assertNotNull("ExecutorService is not initialized 
correctly.", ThreadsEngine.getThreadWithIndex(i));
-               }
-       }
-       
-       @Test
-       public void testGetThreadWithIndexOutOfPoolSize() {
-               ThreadsEngine.setNumberOfThreads(numThreads);
-               for (int i=0; i<numThreads+3; i++) {
-                       assertNotNull("ExecutorService is not initialized 
correctly.", ThreadsEngine.getThreadWithIndex(i));
-               }
-       }
-       
-       @Test(expected=IllegalStateException.class)
-       public void testGetThreadWithIndexFromEmptyPool() {
-               for (int i=0; i<numThreads; i++) {
-                       ThreadsEngine.getThreadWithIndex(i);
-               }
-       }
-
-       @Test
-       public void testSubmitTopology() {
-               ThreadsEngine.submitTopology(topology, numThreads);
-               new Verifications() {{
-                   topology.run(); times=1;
-               }};
-               assertEquals("Number of threads is not set correctly.", 
numThreads,
-                               ThreadsEngine.getNumberOfThreads(),0);
-       }
+  @Mocked
+  ThreadsTopology topology;
+
+  private final int numThreads = 4;
+  private final int numThreadsSmaller = 3;
+  private final int numThreadsLarger = 5;
+
+  @After
+  public void cleanup() {
+    ThreadsEngine.clearThreadPool();
+  }
+
+  @Test
+  public void testSetNumberOfThreadsSimple() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    assertEquals("Number of threads is not set correctly.", numThreads,
+        ThreadsEngine.getNumberOfThreads(), 0);
+  }
+
+  @Test
+  public void testSetNumberOfThreadsRepeat() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    assertEquals("Number of threads is not set correctly.", numThreads,
+        ThreadsEngine.getNumberOfThreads(), 0);
+  }
+
+  @Test
+  public void testSetNumberOfThreadsIncrease() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    ThreadsEngine.setNumberOfThreads(numThreadsLarger);
+    assertEquals("Number of threads is not set correctly.", numThreadsLarger,
+        ThreadsEngine.getNumberOfThreads(), 0);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetNumberOfThreadsDecrease() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    ThreadsEngine.setNumberOfThreads(numThreadsSmaller);
+    // Exception expected
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetNumberOfThreadsNegative() {
+    ThreadsEngine.setNumberOfThreads(-1);
+    // Exception expected
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetNumberOfThreadsZero() {
+    ThreadsEngine.setNumberOfThreads(0);
+    // Exception expected
+  }
+
+  @Test
+  public void testClearThreadPool() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    ThreadsEngine.clearThreadPool();
+    assertEquals("ThreadsEngine was not shutdown properly.", 0, 
ThreadsEngine.getNumberOfThreads());
+  }
+
+  @Test
+  public void testGetThreadWithIndexWithinPoolSize() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    for (int i = 0; i < numThreads; i++) {
+      assertNotNull("ExecutorService is not initialized correctly.", 
ThreadsEngine.getThreadWithIndex(i));
+    }
+  }
+
+  @Test
+  public void testGetThreadWithIndexOutOfPoolSize() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    for (int i = 0; i < numThreads + 3; i++) {
+      assertNotNull("ExecutorService is not initialized correctly.", 
ThreadsEngine.getThreadWithIndex(i));
+    }
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testGetThreadWithIndexFromEmptyPool() {
+    for (int i = 0; i < numThreads; i++) {
+      ThreadsEngine.getThreadWithIndex(i);
+    }
+  }
+
+  @Test
+  public void testSubmitTopology() {
+    ThreadsEngine.submitTopology(topology, numThreads);
+    new Verifications() {
+      {
+        topology.run();
+        times = 1;
+      }
+    };
+    assertEquals("Number of threads is not set correctly.", numThreads,
+        ThreadsEngine.getNumberOfThreads(), 0);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
index 2dab489..db2a3fb 100644
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
@@ -34,100 +34,118 @@ import com.yahoo.labs.samoa.topology.Stream;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsEntranceProcessingItemTest {
 
-       @Tested private ThreadsEntranceProcessingItem entrancePi;
-       
-       @Mocked private EntranceProcessor entranceProcessor;
-       @Mocked private Stream outputStream, anotherStream;
-       @Mocked private ContentEvent event;
-       
-       @Mocked private Thread unused;
-       
-       /**
-        * @throws java.lang.Exception
-        */
-       @Before
-       public void setUp() throws Exception {
-               entrancePi = new 
ThreadsEntranceProcessingItem(entranceProcessor);
-       }
-
-       @Test
-       public void testContructor() {
-               assertSame("EntranceProcessor is not set 
correctly.",entranceProcessor,entrancePi.getProcessor());
-       }
-       
-       @Test
-       public void testSetOutputStream() {
-               entrancePi.setOutputStream(outputStream);
-               assertSame("OutoutStream is not set 
correctly.",outputStream,entrancePi.getOutputStream());
-       }
-       
-       @Test
-       public void testSetOutputStreamRepeate() {
-               entrancePi.setOutputStream(outputStream);
-               entrancePi.setOutputStream(outputStream);
-               assertSame("OutoutStream is not set 
correctly.",outputStream,entrancePi.getOutputStream());
-       }
-       
-       @Test(expected=IllegalStateException.class)
-       public void testSetOutputStreamError() {
-               entrancePi.setOutputStream(outputStream);
-               entrancePi.setOutputStream(anotherStream);
-       }
-       
-       @Test
-       public void testStartSendingEvents() {
-               entrancePi.setOutputStream(outputStream);
-               new StrictExpectations() {
-                       {
-                               for (int i=0; i<1; i++) {
-                                       entranceProcessor.isFinished(); 
result=false;
-                                       entranceProcessor.hasNext(); 
result=false;
-                               }
-                               
-                               for (int i=0; i<5; i++) {
-                                       entranceProcessor.isFinished(); 
result=false;
-                                       entranceProcessor.hasNext(); 
result=true;
-                                       entranceProcessor.nextEvent(); 
result=event;
-                                       outputStream.put(event);
-                               }
-                               
-                               for (int i=0; i<2; i++) {
-                                       entranceProcessor.isFinished(); 
result=false;
-                                       entranceProcessor.hasNext(); 
result=false;
-                               }
-                               
-                               for (int i=0; i<5; i++) {
-                                       entranceProcessor.isFinished(); 
result=false;
-                                       entranceProcessor.hasNext(); 
result=true;
-                                       entranceProcessor.nextEvent(); 
result=event;
-                                       outputStream.put(event);
-                               }
-
-                               entranceProcessor.isFinished(); result=true; 
times=1;
-                               entranceProcessor.hasNext(); times=0;
-
-
-                       }
-               };
-               entrancePi.startSendingEvents();
-               new Verifications() {
-                       {
-                               try {
-                                       Thread.sleep(anyInt); times=3;
-                               } catch (InterruptedException e) {
-                                       
-                               }
-                       }
-               };
-       }
-       
-       @Test(expected=IllegalStateException.class)
-       public void testStartSendingEventsError() {
-               entrancePi.startSendingEvents();
-       }
+  @Tested
+  private ThreadsEntranceProcessingItem entrancePi;
+
+  @Mocked
+  private EntranceProcessor entranceProcessor;
+  @Mocked
+  private Stream outputStream, anotherStream;
+  @Mocked
+  private ContentEvent event;
+
+  @Mocked
+  private Thread unused;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    entrancePi = new ThreadsEntranceProcessingItem(entranceProcessor);
+  }
+
+  @Test
+  public void testContructor() {
+    assertSame("EntranceProcessor is not set correctly.", entranceProcessor, 
entrancePi.getProcessor());
+  }
+
+  @Test
+  public void testSetOutputStream() {
+    entrancePi.setOutputStream(outputStream);
+    assertSame("OutoutStream is not set correctly.", outputStream, 
entrancePi.getOutputStream());
+  }
+
+  @Test
+  public void testSetOutputStreamRepeate() {
+    entrancePi.setOutputStream(outputStream);
+    entrancePi.setOutputStream(outputStream);
+    assertSame("OutoutStream is not set correctly.", outputStream, 
entrancePi.getOutputStream());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetOutputStreamError() {
+    entrancePi.setOutputStream(outputStream);
+    entrancePi.setOutputStream(anotherStream);
+  }
+
+  @Test
+  public void testStartSendingEvents() {
+    entrancePi.setOutputStream(outputStream);
+    new StrictExpectations() {
+      {
+        for (int i = 0; i < 1; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = false;
+        }
+
+        for (int i = 0; i < 5; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = true;
+          entranceProcessor.nextEvent();
+          result = event;
+          outputStream.put(event);
+        }
+
+        for (int i = 0; i < 2; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = false;
+        }
+
+        for (int i = 0; i < 5; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = true;
+          entranceProcessor.nextEvent();
+          result = event;
+          outputStream.put(event);
+        }
+
+        entranceProcessor.isFinished();
+        result = true;
+        times = 1;
+        entranceProcessor.hasNext();
+        times = 0;
+
+      }
+    };
+    entrancePi.startSendingEvents();
+    new Verifications() {
+      {
+        try {
+          Thread.sleep(anyInt);
+          times = 3;
+        } catch (InterruptedException e) {
+
+        }
+      }
+    };
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testStartSendingEventsError() {
+    entrancePi.startSendingEvents();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java
index f744162..1e70d10 100644
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java
@@ -31,37 +31,41 @@ import com.yahoo.labs.samoa.core.ContentEvent;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsEventRunnableTest {
 
-       @Tested private ThreadsEventRunnable task;
-       
-       @Mocked private ThreadsProcessingItemInstance piInstance;
-       @Mocked private ContentEvent event;
-       
-       /**
-        * @throws java.lang.Exception
-        */
-       @Before
-       public void setUp() throws Exception {
-               task = new ThreadsEventRunnable(piInstance, event);
-       }
+  @Tested
+  private ThreadsEventRunnable task;
+
+  @Mocked
+  private ThreadsProcessingItemInstance piInstance;
+  @Mocked
+  private ContentEvent event;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    task = new ThreadsEventRunnable(piInstance, event);
+  }
+
+  @Test
+  public void testConstructor() {
+    assertSame("WorkerProcessingItem is not set correctly.", piInstance, 
task.getWorkerProcessingItem());
+    assertSame("ContentEvent is not set correctly.", event, 
task.getContentEvent());
+  }
 
-       @Test
-       public void testConstructor() {
-               assertSame("WorkerProcessingItem is not set 
correctly.",piInstance,task.getWorkerProcessingItem());
-               assertSame("ContentEvent is not set 
correctly.",event,task.getContentEvent());
-       }
-       
-       @Test
-       public void testRun() {
-               task.run();
-               new Verifications () {
-                       {
-                               piInstance.processEvent(event); times=1;
-                       }
-               };
-       }
+  @Test
+  public void testRun() {
+    task.run();
+    new Verifications() {
+      {
+        piInstance.processEvent(event);
+        times = 1;
+      }
+    };
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
index 33af044..d4f78b0 100644
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
@@ -32,36 +32,40 @@ import com.yahoo.labs.samoa.core.Processor;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsProcessingItemInstanceTest {
 
-       @Tested private ThreadsProcessingItemInstance piInstance;
-       
-       @Mocked private Processor processor;
-       @Mocked private ContentEvent event;
-       
-       private final int threadIndex = 2;
-       
-       @Before
-       public void setUp() throws Exception {
-               piInstance = new ThreadsProcessingItemInstance(processor, 
threadIndex);
-       }
+  @Tested
+  private ThreadsProcessingItemInstance piInstance;
+
+  @Mocked
+  private Processor processor;
+  @Mocked
+  private ContentEvent event;
+
+  private final int threadIndex = 2;
+
+  @Before
+  public void setUp() throws Exception {
+    piInstance = new ThreadsProcessingItemInstance(processor, threadIndex);
+  }
+
+  @Test
+  public void testConstructor() {
+    assertSame("Processor is not set correctly.", processor, 
piInstance.getProcessor());
+    assertEquals("Thread index is not set correctly.", threadIndex, 
piInstance.getThreadIndex(), 0);
+  }
 
-       @Test
-       public void testConstructor() {
-               assertSame("Processor is not set correctly.", processor, 
piInstance.getProcessor());
-               assertEquals("Thread index is not set correctly.", threadIndex, 
piInstance.getThreadIndex(),0);
-       }
-       
-       @Test
-       public void testProcessEvent() {
-               piInstance.processEvent(event);
-               new Verifications() {
-                       {
-                               processor.process(event); times=1;
-                       }
-               };
-       }
+  @Test
+  public void testProcessEvent() {
+    piInstance.processEvent(event);
+    new Verifications() {
+      {
+        processor.process(event);
+        times = 1;
+      }
+    };
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java
index ad7cd56..d148e8e 100644
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java
@@ -39,135 +39,142 @@ import com.yahoo.labs.samoa.utils.StreamDestination;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsProcessingItemTest {
 
-       @Tested private ThreadsProcessingItem pi;
-       
-       @Mocked private ThreadsEngine unused;
-       @Mocked private ExecutorService threadPool;
-       @Mocked private ThreadsEventRunnable task;
-       
-       @Mocked private Processor processor, processorReplica;
-       @Mocked private ThreadsStream stream;
-       @Mocked private StreamDestination destination;
-       @Mocked private ContentEvent event;
-       
-       private final int parallelism = 4;
-       private final int counter = 2;
-       
-       private ThreadsProcessingItemInstance instance;
-       
-       
-       @Before
-       public void setUp() throws Exception {
-               new NonStrictExpectations() {
-                       {
-                               processor.newProcessor(processor);
-                               result=processorReplica;
-                       }
-               };
-               pi = new ThreadsProcessingItem(processor, parallelism);
-       }
-
-       @Test
-       public void testConstructor() {
-               assertSame("Processor was not set 
correctly.",processor,pi.getProcessor());
-               assertEquals("Parallelism was not set 
correctly.",parallelism,pi.getParallelism(),0);
-       }
-       
-       @Test
-       public void testConnectInputShuffleStream() {
-               new Expectations() {
-                       {
-                               destination = new StreamDestination(pi, 
parallelism, PartitioningScheme.SHUFFLE);
-                               stream.addDestination(destination);
-                       }
-               };
-               pi.connectInputShuffleStream(stream);
-       }
-       
-       @Test
-       public void testConnectInputKeyStream() {
-               new Expectations() {
-                       {
-                               destination = new StreamDestination(pi, 
parallelism, PartitioningScheme.GROUP_BY_KEY);
-                               stream.addDestination(destination);
-                       }
-               };
-               pi.connectInputKeyStream(stream);
-       }
-       
-       @Test
-       public void testConnectInputAllStream() {
-               new Expectations() {
-                       {
-                               destination = new StreamDestination(pi, 
parallelism, PartitioningScheme.BROADCAST);
-                               stream.addDestination(destination);
-                       }
-               };
-               pi.connectInputAllStream(stream);
-       }
-       
-       @Test
-       public void testSetupInstances() {
-               new Expectations() {
-                       {
-                               for (int i=0; i<parallelism; i++) {
-                                       processor.newProcessor(processor);
-                                       result=processor;
-                               
-                                       processor.onCreate(anyInt);
-                               }
-                       }
-               };
-               pi.setupInstances();
-               List<ThreadsProcessingItemInstance> instances = 
pi.getProcessingItemInstances();
-               assertNotNull("List of PI instances is null.",instances);
-               assertEquals("Number of instances does not match 
parallelism.",parallelism,instances.size(),0);
-               for(int i=0; i<instances.size();i++) {
-                       assertNotNull("Instance "+i+" is 
null.",instances.get(i));
-                       assertEquals("Instance "+i+" is not a 
ThreadsWorkerProcessingItem.",ThreadsProcessingItemInstance.class,instances.get(i).getClass());
-               }
-       }
-       
-       @Test(expected=IllegalStateException.class)
-       public void testProcessEventError() {
-               pi.processEvent(event, counter);
-       }
-       
-       @Test
-       public void testProcessEvent() {
-               new Expectations() {
-                       {
-                               for (int i=0; i<parallelism; i++) {
-                                       processor.newProcessor(processor);
-                                       result=processor;
-                               
-                                       processor.onCreate(anyInt);
-                               }
-                       }
-               };
-               pi.setupInstances();
-               
-               instance = pi.getProcessingItemInstances().get(counter);
-               new NonStrictExpectations() {
-                       {
-                               ThreadsEngine.getThreadWithIndex(anyInt);
-                               result=threadPool;
-                               
-                               
-                       }
-               };
-               new Expectations() {
-                       {
-                               task = new ThreadsEventRunnable(instance, 
event);
-                               threadPool.submit(task);
-                       }
-               };
-               pi.processEvent(event, counter);
-               
-       }
+  @Tested
+  private ThreadsProcessingItem pi;
+
+  @Mocked
+  private ThreadsEngine unused;
+  @Mocked
+  private ExecutorService threadPool;
+  @Mocked
+  private ThreadsEventRunnable task;
+
+  @Mocked
+  private Processor processor, processorReplica;
+  @Mocked
+  private ThreadsStream stream;
+  @Mocked
+  private StreamDestination destination;
+  @Mocked
+  private ContentEvent event;
+
+  private final int parallelism = 4;
+  private final int counter = 2;
+
+  private ThreadsProcessingItemInstance instance;
+
+  @Before
+  public void setUp() throws Exception {
+    new NonStrictExpectations() {
+      {
+        processor.newProcessor(processor);
+        result = processorReplica;
+      }
+    };
+    pi = new ThreadsProcessingItem(processor, parallelism);
+  }
+
+  @Test
+  public void testConstructor() {
+    assertSame("Processor was not set correctly.", processor, 
pi.getProcessor());
+    assertEquals("Parallelism was not set correctly.", parallelism, 
pi.getParallelism(), 0);
+  }
+
+  @Test
+  public void testConnectInputShuffleStream() {
+    new Expectations() {
+      {
+        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.SHUFFLE);
+        stream.addDestination(destination);
+      }
+    };
+    pi.connectInputShuffleStream(stream);
+  }
+
+  @Test
+  public void testConnectInputKeyStream() {
+    new Expectations() {
+      {
+        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.GROUP_BY_KEY);
+        stream.addDestination(destination);
+      }
+    };
+    pi.connectInputKeyStream(stream);
+  }
+
+  @Test
+  public void testConnectInputAllStream() {
+    new Expectations() {
+      {
+        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.BROADCAST);
+        stream.addDestination(destination);
+      }
+    };
+    pi.connectInputAllStream(stream);
+  }
+
+  @Test
+  public void testSetupInstances() {
+    new Expectations() {
+      {
+        for (int i = 0; i < parallelism; i++) {
+          processor.newProcessor(processor);
+          result = processor;
+
+          processor.onCreate(anyInt);
+        }
+      }
+    };
+    pi.setupInstances();
+    List<ThreadsProcessingItemInstance> instances = 
pi.getProcessingItemInstances();
+    assertNotNull("List of PI instances is null.", instances);
+    assertEquals("Number of instances does not match parallelism.", 
parallelism, instances.size(), 0);
+    for (int i = 0; i < instances.size(); i++) {
+      assertNotNull("Instance " + i + " is null.", instances.get(i));
+      assertEquals("Instance " + i + " is not a ThreadsWorkerProcessingItem.", 
ThreadsProcessingItemInstance.class,
+          instances.get(i).getClass());
+    }
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testProcessEventError() {
+    pi.processEvent(event, counter);
+  }
+
+  @Test
+  public void testProcessEvent() {
+    new Expectations() {
+      {
+        for (int i = 0; i < parallelism; i++) {
+          processor.newProcessor(processor);
+          result = processor;
+
+          processor.onCreate(anyInt);
+        }
+      }
+    };
+    pi.setupInstances();
+
+    instance = pi.getProcessingItemInstances().get(counter);
+    new NonStrictExpectations() {
+      {
+        ThreadsEngine.getThreadWithIndex(anyInt);
+        result = threadPool;
+
+      }
+    };
+    new Expectations() {
+      {
+        task = new ThreadsEventRunnable(instance, event);
+        threadPool.submit(task);
+      }
+    };
+    pi.processEvent(event, counter);
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java
index 27d2acd..abe57ce 100644
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java
@@ -41,87 +41,95 @@ import com.yahoo.labs.samoa.utils.StreamDestination;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 @RunWith(Parameterized.class)
 public class ThreadsStreamTest {
-       
-       @Tested private ThreadsStream stream;
-       
-       @Mocked private ThreadsProcessingItem sourcePi, destPi;
-       @Mocked private ContentEvent event;
-       @Mocked private StreamDestination destination;
-
-       private final String eventKey = "eventkey";
-       private final int parallelism;
-       private final PartitioningScheme scheme;
-       
-       
-       @Parameters
-       public static Collection<Object[]> generateParameters() {
-               return Arrays.asList(new Object[][] {
-                        { 2, PartitioningScheme.SHUFFLE },
-                        { 3, PartitioningScheme.GROUP_BY_KEY },
-                        { 4, PartitioningScheme.BROADCAST }
-               });
-       }
-       
-       public ThreadsStreamTest(int parallelism, PartitioningScheme scheme) {
-               this.parallelism = parallelism;
-               this.scheme = scheme;
-       }
-       
-       @Before
-       public void setUp() throws Exception {
-               stream = new ThreadsStream(sourcePi);
-               stream.addDestination(destination);
-       }
-       
-       @Test
-       public void testAddDestination() {
-               boolean found = false;
-               for (StreamDestination sd:stream.getDestinations()) {
-                       if (sd == destination) {
-                               found = true;
-                               break;
-                       }
-               }
-               assertTrue("Destination object was not added in stream's 
destinations set.",found);
-       }
-
-       @Test
-       public void testPut() {
-               new NonStrictExpectations() {
-                       {
-                               event.getKey(); result=eventKey;
-                               destination.getProcessingItem(); result=destPi;
-                               destination.getPartitioningScheme(); 
result=scheme;
-                               destination.getParallelism(); 
result=parallelism;
-                               
-                       }
-               };
-               switch(this.scheme) {
-               case SHUFFLE: case GROUP_BY_KEY:
-                       new Expectations() {
-                               {
-                                       
-                                       // TODO: restrict the range of counter 
value
-                                       destPi.processEvent(event, anyInt); 
times=1;
-                               }
-                       };
-                       break;
-               case BROADCAST:
-                       new Expectations() {
-                               {
-                                       // TODO: restrict the range of counter 
value
-                                       destPi.processEvent(event, anyInt); 
times=parallelism;
-                               }
-                       };
-                       break;
-               }
-               stream.put(event);
-       }
-       
-       
+
+  @Tested
+  private ThreadsStream stream;
+
+  @Mocked
+  private ThreadsProcessingItem sourcePi, destPi;
+  @Mocked
+  private ContentEvent event;
+  @Mocked
+  private StreamDestination destination;
+
+  private final String eventKey = "eventkey";
+  private final int parallelism;
+  private final PartitioningScheme scheme;
+
+  @Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        { 2, PartitioningScheme.SHUFFLE },
+        { 3, PartitioningScheme.GROUP_BY_KEY },
+        { 4, PartitioningScheme.BROADCAST }
+    });
+  }
+
+  public ThreadsStreamTest(int parallelism, PartitioningScheme scheme) {
+    this.parallelism = parallelism;
+    this.scheme = scheme;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    stream = new ThreadsStream(sourcePi);
+    stream.addDestination(destination);
+  }
+
+  @Test
+  public void testAddDestination() {
+    boolean found = false;
+    for (StreamDestination sd : stream.getDestinations()) {
+      if (sd == destination) {
+        found = true;
+        break;
+      }
+    }
+    assertTrue("Destination object was not added in stream's destinations 
set.", found);
+  }
+
+  @Test
+  public void testPut() {
+    new NonStrictExpectations() {
+      {
+        event.getKey();
+        result = eventKey;
+        destination.getProcessingItem();
+        result = destPi;
+        destination.getPartitioningScheme();
+        result = scheme;
+        destination.getParallelism();
+        result = parallelism;
+
+      }
+    };
+    switch (this.scheme) {
+    case SHUFFLE:
+    case GROUP_BY_KEY:
+      new Expectations() {
+        {
+
+          // TODO: restrict the range of counter value
+          destPi.processEvent(event, anyInt);
+          times = 1;
+        }
+      };
+      break;
+    case BROADCAST:
+      new Expectations() {
+        {
+          // TODO: restrict the range of counter value
+          destPi.processEvent(event, anyInt);
+          times = parallelism;
+        }
+      };
+      break;
+    }
+    stream.put(event);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java
index 46847f5..6891a63 100644
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java
@@ -35,50 +35,53 @@ import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 public class ThreadsTopologyTest {
 
-       @Tested private ThreadsTopology topology;
-       
-       @Mocked private ThreadsEntranceProcessingItem entrancePi;
-       @Mocked private EntranceProcessor entranceProcessor;
-       
-       @Before
-       public void setUp() throws Exception {
-               topology = new ThreadsTopology("TestTopology");
-       }
+  @Tested
+  private ThreadsTopology topology;
+
+  @Mocked
+  private ThreadsEntranceProcessingItem entrancePi;
+  @Mocked
+  private EntranceProcessor entranceProcessor;
+
+  @Before
+  public void setUp() throws Exception {
+    topology = new ThreadsTopology("TestTopology");
+  }
+
+  @Test
+  public void testAddEntrancePi() {
+    topology.addEntranceProcessingItem(entrancePi);
+    Set<EntranceProcessingItem> entrancePIs = 
topology.getEntranceProcessingItems();
+    assertNotNull("Set of entrance PIs is null.", entrancePIs);
+    assertEquals("Number of entrance PI in ThreadsTopology must be 1", 1, 
entrancePIs.size());
+    assertSame("Entrance PI was not set correctly.", entrancePi, 
entrancePIs.toArray()[0]);
+    // TODO: verify that entrance PI is in the set of ProcessingItems
+    // Need to access topology's set of PIs (getProcessingItems() method)
+  }
+
+  @Test
+  public void testRun() {
+    topology.addEntranceProcessingItem(entrancePi);
+
+    new Expectations() {
+      {
+        entrancePi.getProcessor();
+        result = entranceProcessor;
+        entranceProcessor.onCreate(anyInt);
+
+        entrancePi.startSendingEvents();
+      }
+    };
+    topology.run();
+  }
 
-       @Test
-       public void testAddEntrancePi() {
-               topology.addEntranceProcessingItem(entrancePi);
-               Set<EntranceProcessingItem> entrancePIs = 
topology.getEntranceProcessingItems();
-               assertNotNull("Set of entrance PIs is null.",entrancePIs);
-               assertEquals("Number of entrance PI in ThreadsTopology must be 
1",1,entrancePIs.size());
-               assertSame("Entrance PI was not set 
correctly.",entrancePi,entrancePIs.toArray()[0]);
-               // TODO: verify that entrance PI is in the set of 
ProcessingItems
-               // Need to access topology's set of PIs (getProcessingItems() 
method)
-       }
-       
-       @Test
-       public void testRun() {
-               topology.addEntranceProcessingItem(entrancePi);
-               
-               new Expectations() {
-                       {
-                               entrancePi.getProcessor();
-                               result=entranceProcessor;
-                               entranceProcessor.onCreate(anyInt);
-                               
-                               entrancePi.startSendingEvents();
-                       }
-               };
-               topology.run();
-       }
-       
-       @Test(expected=IllegalStateException.class)
-       public void testRunWithoutEntrancePI() {
-               topology.run();
-       }
+  @Test(expected = IllegalStateException.class)
+  public void testRunWithoutEntrancePI() {
+    topology.run();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java
index 19c5421..c165b3e 100644
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java
@@ -40,41 +40,43 @@ import com.yahoo.labs.samoa.utils.StreamDestination;
 
 /**
  * @author Anh Thu Vu
- *
+ * 
  */
 @RunWith(Parameterized.class)
 public class StreamDestinationTest {
 
-       @Tested private StreamDestination destination;
-       
-       @Mocked private IProcessingItem pi;
-       private final int parallelism;
-       private final PartitioningScheme scheme;
-       
-       @Parameters
-       public static Collection<Object[]> generateParameters() {
-               return Arrays.asList(new Object[][] {
-                        { 3, PartitioningScheme.SHUFFLE },
-                        { 2, PartitioningScheme.GROUP_BY_KEY },
-                        { 5, PartitioningScheme.BROADCAST }
-               });
-       }
-       
-       public StreamDestinationTest(int parallelism, PartitioningScheme 
scheme) {
-               this.parallelism = parallelism;
-               this.scheme = scheme;
-       }
-       
-       @Before
-       public void setUp() throws Exception {
-               destination = new StreamDestination(pi, parallelism, scheme);
-       }
+  @Tested
+  private StreamDestination destination;
+
+  @Mocked
+  private IProcessingItem pi;
+  private final int parallelism;
+  private final PartitioningScheme scheme;
+
+  @Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        { 3, PartitioningScheme.SHUFFLE },
+        { 2, PartitioningScheme.GROUP_BY_KEY },
+        { 5, PartitioningScheme.BROADCAST }
+    });
+  }
+
+  public StreamDestinationTest(int parallelism, PartitioningScheme scheme) {
+    this.parallelism = parallelism;
+    this.scheme = scheme;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    destination = new StreamDestination(pi, parallelism, scheme);
+  }
 
-       @Test
-       public void testContructor() {
-               assertSame("The IProcessingItem is not set correctly.", pi, 
destination.getProcessingItem());
-               assertEquals("Parallelism value is not set correctly.", 
parallelism, destination.getParallelism(), 0);
-               assertEquals("EventAllocationType is not set correctly.", 
scheme, destination.getPartitioningScheme());
-       }
+  @Test
+  public void testContructor() {
+    assertSame("The IProcessingItem is not set correctly.", pi, 
destination.getProcessingItem());
+    assertEquals("Parallelism value is not set correctly.", parallelism, 
destination.getParallelism(), 0);
+    assertEquals("EventAllocationType is not set correctly.", scheme, 
destination.getPartitioningScheme());
+  }
 
 }

Reply via email to