Repository: incubator-samoa
Updated Branches:
  refs/heads/master 64ef7a921 -> 9b178f631


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java
deleted file mode 100644
index 12b5b90..0000000
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import static org.junit.Assert.*;
-import mockit.Mocked;
-import mockit.NonStrictExpectations;
-import mockit.Tested;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
-import com.yahoo.labs.samoa.topology.ProcessingItem;
-import com.yahoo.labs.samoa.topology.Stream;
-import com.yahoo.labs.samoa.topology.Topology;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsComponentFactoryTest {
-  @Tested
-  private ThreadsComponentFactory factory;
-  @Mocked
-  private Processor processor, processorReplica;
-  @Mocked
-  private EntranceProcessor entranceProcessor;
-
-  private final int parallelism = 3;
-  private final String topoName = "TestTopology";
-
-  @Before
-  public void setUp() throws Exception {
-    factory = new ThreadsComponentFactory();
-  }
-
-  @Test
-  public void testCreatePiNoParallelism() {
-    new NonStrictExpectations() {
-      {
-        processor.newProcessor(processor);
-        result = processorReplica;
-      }
-    };
-    ProcessingItem pi = factory.createPi(processor);
-    assertNotNull("ProcessingItem created is null.", pi);
-    assertEquals("ProcessingItem created is not a ThreadsProcessingItem.", 
ThreadsProcessingItem.class, pi.getClass());
-    assertEquals("Parallelism of PI is not 1", 1, pi.getParallelism(), 0);
-  }
-
-  @Test
-  public void testCreatePiWithParallelism() {
-    new NonStrictExpectations() {
-      {
-        processor.newProcessor(processor);
-        result = processorReplica;
-      }
-    };
-    ProcessingItem pi = factory.createPi(processor, parallelism);
-    assertNotNull("ProcessingItem created is null.", pi);
-    assertEquals("ProcessingItem created is not a ThreadsProcessingItem.", 
ThreadsProcessingItem.class, pi.getClass());
-    assertEquals("Parallelism of PI is not ", parallelism, 
pi.getParallelism(), 0);
-  }
-
-  @Test
-  public void testCreateStream() {
-    new NonStrictExpectations() {
-      {
-        processor.newProcessor(processor);
-        result = processorReplica;
-      }
-    };
-    ProcessingItem pi = factory.createPi(processor);
-
-    Stream stream = factory.createStream(pi);
-    assertNotNull("Stream created is null", stream);
-    assertEquals("Stream created is not a ThreadsStream.", 
ThreadsStream.class, stream.getClass());
-  }
-
-  @Test
-  public void testCreateTopology() {
-    Topology topology = factory.createTopology(topoName);
-    assertNotNull("Topology created is null.", topology);
-    assertEquals("Topology created is not a ThreadsTopology.", 
ThreadsTopology.class, topology.getClass());
-  }
-
-  @Test
-  public void testCreateEntrancePi() {
-    EntranceProcessingItem entrancePi = 
factory.createEntrancePi(entranceProcessor);
-    assertNotNull("EntranceProcessingItem created is null.", entrancePi);
-    assertEquals("EntranceProcessingItem created is not a 
ThreadsEntranceProcessingItem.",
-        ThreadsEntranceProcessingItem.class, entrancePi.getClass());
-    assertSame("EntranceProcessor is not set correctly.", entranceProcessor, 
entrancePi.getProcessor());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java
deleted file mode 100644
index c8a3c3d..0000000
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-package com.yahoo.labs.samoa.topology.impl;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import static org.junit.Assert.*;
-import mockit.Mocked;
-import mockit.Verifications;
-
-import org.junit.After;
-import org.junit.Test;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsEngineTest {
-
-  @Mocked
-  ThreadsTopology topology;
-
-  private final int numThreads = 4;
-  private final int numThreadsSmaller = 3;
-  private final int numThreadsLarger = 5;
-
-  @After
-  public void cleanup() {
-    ThreadsEngine.clearThreadPool();
-  }
-
-  @Test
-  public void testSetNumberOfThreadsSimple() {
-    ThreadsEngine.setNumberOfThreads(numThreads);
-    assertEquals("Number of threads is not set correctly.", numThreads,
-        ThreadsEngine.getNumberOfThreads(), 0);
-  }
-
-  @Test
-  public void testSetNumberOfThreadsRepeat() {
-    ThreadsEngine.setNumberOfThreads(numThreads);
-    ThreadsEngine.setNumberOfThreads(numThreads);
-    assertEquals("Number of threads is not set correctly.", numThreads,
-        ThreadsEngine.getNumberOfThreads(), 0);
-  }
-
-  @Test
-  public void testSetNumberOfThreadsIncrease() {
-    ThreadsEngine.setNumberOfThreads(numThreads);
-    ThreadsEngine.setNumberOfThreads(numThreadsLarger);
-    assertEquals("Number of threads is not set correctly.", numThreadsLarger,
-        ThreadsEngine.getNumberOfThreads(), 0);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetNumberOfThreadsDecrease() {
-    ThreadsEngine.setNumberOfThreads(numThreads);
-    ThreadsEngine.setNumberOfThreads(numThreadsSmaller);
-    // Exception expected
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetNumberOfThreadsNegative() {
-    ThreadsEngine.setNumberOfThreads(-1);
-    // Exception expected
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetNumberOfThreadsZero() {
-    ThreadsEngine.setNumberOfThreads(0);
-    // Exception expected
-  }
-
-  @Test
-  public void testClearThreadPool() {
-    ThreadsEngine.setNumberOfThreads(numThreads);
-    ThreadsEngine.clearThreadPool();
-    assertEquals("ThreadsEngine was not shutdown properly.", 0, 
ThreadsEngine.getNumberOfThreads());
-  }
-
-  @Test
-  public void testGetThreadWithIndexWithinPoolSize() {
-    ThreadsEngine.setNumberOfThreads(numThreads);
-    for (int i = 0; i < numThreads; i++) {
-      assertNotNull("ExecutorService is not initialized correctly.", 
ThreadsEngine.getThreadWithIndex(i));
-    }
-  }
-
-  @Test
-  public void testGetThreadWithIndexOutOfPoolSize() {
-    ThreadsEngine.setNumberOfThreads(numThreads);
-    for (int i = 0; i < numThreads + 3; i++) {
-      assertNotNull("ExecutorService is not initialized correctly.", 
ThreadsEngine.getThreadWithIndex(i));
-    }
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testGetThreadWithIndexFromEmptyPool() {
-    for (int i = 0; i < numThreads; i++) {
-      ThreadsEngine.getThreadWithIndex(i);
-    }
-  }
-
-  @Test
-  public void testSubmitTopology() {
-    ThreadsEngine.submitTopology(topology, numThreads);
-    new Verifications() {
-      {
-        topology.run();
-        times = 1;
-      }
-    };
-    assertEquals("Number of threads is not set correctly.", numThreads,
-        ThreadsEngine.getNumberOfThreads(), 0);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
deleted file mode 100644
index db2a3fb..0000000
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-package com.yahoo.labs.samoa.topology.impl;
-
-import static org.junit.Assert.*;
-import mockit.Mocked;
-import mockit.StrictExpectations;
-import mockit.Tested;
-import mockit.Verifications;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.topology.Stream;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsEntranceProcessingItemTest {
-
-  @Tested
-  private ThreadsEntranceProcessingItem entrancePi;
-
-  @Mocked
-  private EntranceProcessor entranceProcessor;
-  @Mocked
-  private Stream outputStream, anotherStream;
-  @Mocked
-  private ContentEvent event;
-
-  @Mocked
-  private Thread unused;
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @Before
-  public void setUp() throws Exception {
-    entrancePi = new ThreadsEntranceProcessingItem(entranceProcessor);
-  }
-
-  @Test
-  public void testContructor() {
-    assertSame("EntranceProcessor is not set correctly.", entranceProcessor, 
entrancePi.getProcessor());
-  }
-
-  @Test
-  public void testSetOutputStream() {
-    entrancePi.setOutputStream(outputStream);
-    assertSame("OutoutStream is not set correctly.", outputStream, 
entrancePi.getOutputStream());
-  }
-
-  @Test
-  public void testSetOutputStreamRepeate() {
-    entrancePi.setOutputStream(outputStream);
-    entrancePi.setOutputStream(outputStream);
-    assertSame("OutoutStream is not set correctly.", outputStream, 
entrancePi.getOutputStream());
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testSetOutputStreamError() {
-    entrancePi.setOutputStream(outputStream);
-    entrancePi.setOutputStream(anotherStream);
-  }
-
-  @Test
-  public void testStartSendingEvents() {
-    entrancePi.setOutputStream(outputStream);
-    new StrictExpectations() {
-      {
-        for (int i = 0; i < 1; i++) {
-          entranceProcessor.isFinished();
-          result = false;
-          entranceProcessor.hasNext();
-          result = false;
-        }
-
-        for (int i = 0; i < 5; i++) {
-          entranceProcessor.isFinished();
-          result = false;
-          entranceProcessor.hasNext();
-          result = true;
-          entranceProcessor.nextEvent();
-          result = event;
-          outputStream.put(event);
-        }
-
-        for (int i = 0; i < 2; i++) {
-          entranceProcessor.isFinished();
-          result = false;
-          entranceProcessor.hasNext();
-          result = false;
-        }
-
-        for (int i = 0; i < 5; i++) {
-          entranceProcessor.isFinished();
-          result = false;
-          entranceProcessor.hasNext();
-          result = true;
-          entranceProcessor.nextEvent();
-          result = event;
-          outputStream.put(event);
-        }
-
-        entranceProcessor.isFinished();
-        result = true;
-        times = 1;
-        entranceProcessor.hasNext();
-        times = 0;
-
-      }
-    };
-    entrancePi.startSendingEvents();
-    new Verifications() {
-      {
-        try {
-          Thread.sleep(anyInt);
-          times = 3;
-        } catch (InterruptedException e) {
-
-        }
-      }
-    };
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testStartSendingEventsError() {
-    entrancePi.startSendingEvents();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java
deleted file mode 100644
index 1e70d10..0000000
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-package com.yahoo.labs.samoa.topology.impl;
-
-import static org.junit.Assert.*;
-import mockit.Mocked;
-import mockit.Tested;
-import mockit.Verifications;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsEventRunnableTest {
-
-  @Tested
-  private ThreadsEventRunnable task;
-
-  @Mocked
-  private ThreadsProcessingItemInstance piInstance;
-  @Mocked
-  private ContentEvent event;
-
-  /**
-   * @throws java.lang.Exception
-   */
-  @Before
-  public void setUp() throws Exception {
-    task = new ThreadsEventRunnable(piInstance, event);
-  }
-
-  @Test
-  public void testConstructor() {
-    assertSame("WorkerProcessingItem is not set correctly.", piInstance, 
task.getWorkerProcessingItem());
-    assertSame("ContentEvent is not set correctly.", event, 
task.getContentEvent());
-  }
-
-  @Test
-  public void testRun() {
-    task.run();
-    new Verifications() {
-      {
-        piInstance.processEvent(event);
-        times = 1;
-      }
-    };
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
deleted file mode 100644
index d4f78b0..0000000
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-package com.yahoo.labs.samoa.topology.impl;
-
-import static org.junit.Assert.*;
-import mockit.Mocked;
-import mockit.Tested;
-import mockit.Verifications;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.Processor;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsProcessingItemInstanceTest {
-
-  @Tested
-  private ThreadsProcessingItemInstance piInstance;
-
-  @Mocked
-  private Processor processor;
-  @Mocked
-  private ContentEvent event;
-
-  private final int threadIndex = 2;
-
-  @Before
-  public void setUp() throws Exception {
-    piInstance = new ThreadsProcessingItemInstance(processor, threadIndex);
-  }
-
-  @Test
-  public void testConstructor() {
-    assertSame("Processor is not set correctly.", processor, 
piInstance.getProcessor());
-    assertEquals("Thread index is not set correctly.", threadIndex, 
piInstance.getThreadIndex(), 0);
-  }
-
-  @Test
-  public void testProcessEvent() {
-    piInstance.processEvent(event);
-    new Verifications() {
-      {
-        processor.process(event);
-        times = 1;
-      }
-    };
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java
deleted file mode 100644
index d148e8e..0000000
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-package com.yahoo.labs.samoa.topology.impl;
-
-import static org.junit.Assert.*;
-
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-import mockit.Expectations;
-import mockit.Mocked;
-import mockit.NonStrictExpectations;
-import mockit.Tested;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.core.Processor;
-import com.yahoo.labs.samoa.utils.PartitioningScheme;
-import com.yahoo.labs.samoa.utils.StreamDestination;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsProcessingItemTest {
-
-  @Tested
-  private ThreadsProcessingItem pi;
-
-  @Mocked
-  private ThreadsEngine unused;
-  @Mocked
-  private ExecutorService threadPool;
-  @Mocked
-  private ThreadsEventRunnable task;
-
-  @Mocked
-  private Processor processor, processorReplica;
-  @Mocked
-  private ThreadsStream stream;
-  @Mocked
-  private StreamDestination destination;
-  @Mocked
-  private ContentEvent event;
-
-  private final int parallelism = 4;
-  private final int counter = 2;
-
-  private ThreadsProcessingItemInstance instance;
-
-  @Before
-  public void setUp() throws Exception {
-    new NonStrictExpectations() {
-      {
-        processor.newProcessor(processor);
-        result = processorReplica;
-      }
-    };
-    pi = new ThreadsProcessingItem(processor, parallelism);
-  }
-
-  @Test
-  public void testConstructor() {
-    assertSame("Processor was not set correctly.", processor, 
pi.getProcessor());
-    assertEquals("Parallelism was not set correctly.", parallelism, 
pi.getParallelism(), 0);
-  }
-
-  @Test
-  public void testConnectInputShuffleStream() {
-    new Expectations() {
-      {
-        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.SHUFFLE);
-        stream.addDestination(destination);
-      }
-    };
-    pi.connectInputShuffleStream(stream);
-  }
-
-  @Test
-  public void testConnectInputKeyStream() {
-    new Expectations() {
-      {
-        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.GROUP_BY_KEY);
-        stream.addDestination(destination);
-      }
-    };
-    pi.connectInputKeyStream(stream);
-  }
-
-  @Test
-  public void testConnectInputAllStream() {
-    new Expectations() {
-      {
-        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.BROADCAST);
-        stream.addDestination(destination);
-      }
-    };
-    pi.connectInputAllStream(stream);
-  }
-
-  @Test
-  public void testSetupInstances() {
-    new Expectations() {
-      {
-        for (int i = 0; i < parallelism; i++) {
-          processor.newProcessor(processor);
-          result = processor;
-
-          processor.onCreate(anyInt);
-        }
-      }
-    };
-    pi.setupInstances();
-    List<ThreadsProcessingItemInstance> instances = 
pi.getProcessingItemInstances();
-    assertNotNull("List of PI instances is null.", instances);
-    assertEquals("Number of instances does not match parallelism.", 
parallelism, instances.size(), 0);
-    for (int i = 0; i < instances.size(); i++) {
-      assertNotNull("Instance " + i + " is null.", instances.get(i));
-      assertEquals("Instance " + i + " is not a ThreadsWorkerProcessingItem.", 
ThreadsProcessingItemInstance.class,
-          instances.get(i).getClass());
-    }
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testProcessEventError() {
-    pi.processEvent(event, counter);
-  }
-
-  @Test
-  public void testProcessEvent() {
-    new Expectations() {
-      {
-        for (int i = 0; i < parallelism; i++) {
-          processor.newProcessor(processor);
-          result = processor;
-
-          processor.onCreate(anyInt);
-        }
-      }
-    };
-    pi.setupInstances();
-
-    instance = pi.getProcessingItemInstances().get(counter);
-    new NonStrictExpectations() {
-      {
-        ThreadsEngine.getThreadWithIndex(anyInt);
-        result = threadPool;
-
-      }
-    };
-    new Expectations() {
-      {
-        task = new ThreadsEventRunnable(instance, event);
-        threadPool.submit(task);
-      }
-    };
-    pi.processEvent(event, counter);
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java
deleted file mode 100644
index abe57ce..0000000
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-package com.yahoo.labs.samoa.topology.impl;
-
-import static org.junit.Assert.*;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import mockit.Expectations;
-import mockit.Mocked;
-import mockit.NonStrictExpectations;
-import mockit.Tested;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.yahoo.labs.samoa.core.ContentEvent;
-import com.yahoo.labs.samoa.utils.PartitioningScheme;
-import com.yahoo.labs.samoa.utils.StreamDestination;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-@RunWith(Parameterized.class)
-public class ThreadsStreamTest {
-
-  @Tested
-  private ThreadsStream stream;
-
-  @Mocked
-  private ThreadsProcessingItem sourcePi, destPi;
-  @Mocked
-  private ContentEvent event;
-  @Mocked
-  private StreamDestination destination;
-
-  private final String eventKey = "eventkey";
-  private final int parallelism;
-  private final PartitioningScheme scheme;
-
-  @Parameters
-  public static Collection<Object[]> generateParameters() {
-    return Arrays.asList(new Object[][] {
-        { 2, PartitioningScheme.SHUFFLE },
-        { 3, PartitioningScheme.GROUP_BY_KEY },
-        { 4, PartitioningScheme.BROADCAST }
-    });
-  }
-
-  public ThreadsStreamTest(int parallelism, PartitioningScheme scheme) {
-    this.parallelism = parallelism;
-    this.scheme = scheme;
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    stream = new ThreadsStream(sourcePi);
-    stream.addDestination(destination);
-  }
-
-  @Test
-  public void testAddDestination() {
-    boolean found = false;
-    for (StreamDestination sd : stream.getDestinations()) {
-      if (sd == destination) {
-        found = true;
-        break;
-      }
-    }
-    assertTrue("Destination object was not added in stream's destinations 
set.", found);
-  }
-
-  @Test
-  public void testPut() {
-    new NonStrictExpectations() {
-      {
-        event.getKey();
-        result = eventKey;
-        destination.getProcessingItem();
-        result = destPi;
-        destination.getPartitioningScheme();
-        result = scheme;
-        destination.getParallelism();
-        result = parallelism;
-
-      }
-    };
-    switch (this.scheme) {
-    case SHUFFLE:
-    case GROUP_BY_KEY:
-      new Expectations() {
-        {
-
-          // TODO: restrict the range of counter value
-          destPi.processEvent(event, anyInt);
-          times = 1;
-        }
-      };
-      break;
-    case BROADCAST:
-      new Expectations() {
-        {
-          // TODO: restrict the range of counter value
-          destPi.processEvent(event, anyInt);
-          times = parallelism;
-        }
-      };
-      break;
-    }
-    stream.put(event);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java
deleted file mode 100644
index 6891a63..0000000
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-package com.yahoo.labs.samoa.topology.impl;
-
-import static org.junit.Assert.*;
-
-import java.util.Set;
-
-import mockit.Expectations;
-import mockit.Mocked;
-import mockit.Tested;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import com.yahoo.labs.samoa.core.EntranceProcessor;
-import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-public class ThreadsTopologyTest {
-
-  @Tested
-  private ThreadsTopology topology;
-
-  @Mocked
-  private ThreadsEntranceProcessingItem entrancePi;
-  @Mocked
-  private EntranceProcessor entranceProcessor;
-
-  @Before
-  public void setUp() throws Exception {
-    topology = new ThreadsTopology("TestTopology");
-  }
-
-  @Test
-  public void testAddEntrancePi() {
-    topology.addEntranceProcessingItem(entrancePi);
-    Set<EntranceProcessingItem> entrancePIs = 
topology.getEntranceProcessingItems();
-    assertNotNull("Set of entrance PIs is null.", entrancePIs);
-    assertEquals("Number of entrance PI in ThreadsTopology must be 1", 1, 
entrancePIs.size());
-    assertSame("Entrance PI was not set correctly.", entrancePi, 
entrancePIs.toArray()[0]);
-    // TODO: verify that entrance PI is in the set of ProcessingItems
-    // Need to access topology's set of PIs (getProcessingItems() method)
-  }
-
-  @Test
-  public void testRun() {
-    topology.addEntranceProcessingItem(entrancePi);
-
-    new Expectations() {
-      {
-        entrancePi.getProcessor();
-        result = entranceProcessor;
-        entranceProcessor.onCreate(anyInt);
-
-        entrancePi.startSendingEvents();
-      }
-    };
-    topology.run();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testRunWithoutEntrancePI() {
-    topology.run();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java
deleted file mode 100644
index c165b3e..0000000
--- 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package com.yahoo.labs.samoa.utils;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2013 Yahoo! Inc.
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import static org.junit.Assert.*;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import mockit.Mocked;
-import mockit.Tested;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import com.yahoo.labs.samoa.topology.IProcessingItem;
-import com.yahoo.labs.samoa.utils.PartitioningScheme;
-import com.yahoo.labs.samoa.utils.StreamDestination;
-
-/**
- * @author Anh Thu Vu
- * 
- */
-@RunWith(Parameterized.class)
-public class StreamDestinationTest {
-
-  @Tested
-  private StreamDestination destination;
-
-  @Mocked
-  private IProcessingItem pi;
-  private final int parallelism;
-  private final PartitioningScheme scheme;
-
-  @Parameters
-  public static Collection<Object[]> generateParameters() {
-    return Arrays.asList(new Object[][] {
-        { 3, PartitioningScheme.SHUFFLE },
-        { 2, PartitioningScheme.GROUP_BY_KEY },
-        { 5, PartitioningScheme.BROADCAST }
-    });
-  }
-
-  public StreamDestinationTest(int parallelism, PartitioningScheme scheme) {
-    this.parallelism = parallelism;
-    this.scheme = scheme;
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    destination = new StreamDestination(pi, parallelism, scheme);
-  }
-
-  @Test
-  public void testContructor() {
-    assertSame("The IProcessingItem is not set correctly.", pi, 
destination.getProcessingItem());
-    assertEquals("Parallelism value is not set correctly.", parallelism, 
destination.getParallelism(), 0);
-    assertEquals("EventAllocationType is not set correctly.", scheme, 
destination.getPartitioningScheme());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/AlgosTest.java
----------------------------------------------------------------------
diff --git a/samoa-threads/src/test/java/org/apache/samoa/AlgosTest.java 
b/samoa-threads/src/test/java/org/apache/samoa/AlgosTest.java
new file mode 100644
index 0000000..e14b1e4
--- /dev/null
+++ b/samoa-threads/src/test/java/org/apache/samoa/AlgosTest.java
@@ -0,0 +1,70 @@
+package org.apache.samoa;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 - 2014 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.LocalThreadsDoTask;
+import org.apache.samoa.TestParams;
+import org.apache.samoa.TestUtils;
+import org.junit.Test;
+
+public class AlgosTest {
+
+  @Test(timeout = 60000)
+  public void testVHTWithThreads() throws Exception {
+
+    TestParams vhtConfig = new TestParams.Builder()
+        .inputInstances(200_000)
+        .samplingSize(20_000)
+        .evaluationInstances(200_000)
+        .classifiedInstances(200_000)
+        .classificationsCorrect(55f)
+        .kappaStat(-0.1f)
+        .kappaTempStat(-0.1f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE + " -t 
2")
+        .resultFilePollTimeout(10)
+        .prePollWait(10)
+        .taskClassName(LocalThreadsDoTask.class.getName())
+        .build();
+    TestUtils.test(vhtConfig);
+
+  }
+
+  @Test(timeout = 180000)
+  public void testBaggingWithThreads() throws Exception {
+    TestParams baggingConfig = new TestParams.Builder()
+        .inputInstances(100_000)
+        .samplingSize(10_000)
+        .inputDelayMicroSec(100) // prevents saturating the system due to 
unbounded queues
+        .evaluationInstances(90_000)
+        .classifiedInstances(105_000)
+        .classificationsCorrect(55f)
+        .kappaStat(0f)
+        .kappaTempStat(0f)
+        .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE + 
" -t 2")
+        .prePollWait(10)
+        .resultFilePollTimeout(30)
+        .taskClassName(LocalThreadsDoTask.class.getName())
+        .build();
+    TestUtils.test(baggingConfig);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsComponentFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsComponentFactoryTest.java
 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsComponentFactoryTest.java
new file mode 100644
index 0000000..24762e3
--- /dev/null
+++ 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsComponentFactoryTest.java
@@ -0,0 +1,121 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import mockit.Tested;
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.ProcessingItem;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+import org.apache.samoa.topology.impl.ThreadsComponentFactory;
+import org.apache.samoa.topology.impl.ThreadsEntranceProcessingItem;
+import org.apache.samoa.topology.impl.ThreadsProcessingItem;
+import org.apache.samoa.topology.impl.ThreadsStream;
+import org.apache.samoa.topology.impl.ThreadsTopology;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsComponentFactoryTest {
+  @Tested
+  private ThreadsComponentFactory factory;
+  @Mocked
+  private Processor processor, processorReplica;
+  @Mocked
+  private EntranceProcessor entranceProcessor;
+
+  private final int parallelism = 3;
+  private final String topoName = "TestTopology";
+
+  @Before
+  public void setUp() throws Exception {
+    factory = new ThreadsComponentFactory();
+  }
+
+  @Test
+  public void testCreatePiNoParallelism() {
+    new NonStrictExpectations() {
+      {
+        processor.newProcessor(processor);
+        result = processorReplica;
+      }
+    };
+    ProcessingItem pi = factory.createPi(processor);
+    assertNotNull("ProcessingItem created is null.", pi);
+    assertEquals("ProcessingItem created is not a ThreadsProcessingItem.", 
ThreadsProcessingItem.class, pi.getClass());
+    assertEquals("Parallelism of PI is not 1", 1, pi.getParallelism(), 0);
+  }
+
+  @Test
+  public void testCreatePiWithParallelism() {
+    new NonStrictExpectations() {
+      {
+        processor.newProcessor(processor);
+        result = processorReplica;
+      }
+    };
+    ProcessingItem pi = factory.createPi(processor, parallelism);
+    assertNotNull("ProcessingItem created is null.", pi);
+    assertEquals("ProcessingItem created is not a ThreadsProcessingItem.", 
ThreadsProcessingItem.class, pi.getClass());
+    assertEquals("Parallelism of PI is not ", parallelism, 
pi.getParallelism(), 0);
+  }
+
+  @Test
+  public void testCreateStream() {
+    new NonStrictExpectations() {
+      {
+        processor.newProcessor(processor);
+        result = processorReplica;
+      }
+    };
+    ProcessingItem pi = factory.createPi(processor);
+
+    Stream stream = factory.createStream(pi);
+    assertNotNull("Stream created is null", stream);
+    assertEquals("Stream created is not a ThreadsStream.", 
ThreadsStream.class, stream.getClass());
+  }
+
+  @Test
+  public void testCreateTopology() {
+    Topology topology = factory.createTopology(topoName);
+    assertNotNull("Topology created is null.", topology);
+    assertEquals("Topology created is not a ThreadsTopology.", 
ThreadsTopology.class, topology.getClass());
+  }
+
+  @Test
+  public void testCreateEntrancePi() {
+    EntranceProcessingItem entrancePi = 
factory.createEntrancePi(entranceProcessor);
+    assertNotNull("EntranceProcessingItem created is null.", entrancePi);
+    assertEquals("EntranceProcessingItem created is not a 
ThreadsEntranceProcessingItem.",
+        ThreadsEntranceProcessingItem.class, entrancePi.getClass());
+    assertSame("EntranceProcessor is not set correctly.", entranceProcessor, 
entrancePi.getProcessor());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEngineTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEngineTest.java
 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEngineTest.java
new file mode 100644
index 0000000..cf8ec34
--- /dev/null
+++ 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEngineTest.java
@@ -0,0 +1,135 @@
+package org.apache.samoa.topology.impl;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.Verifications;
+
+import org.apache.samoa.topology.impl.ThreadsEngine;
+import org.apache.samoa.topology.impl.ThreadsTopology;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsEngineTest {
+
+  @Mocked
+  ThreadsTopology topology;
+
+  private final int numThreads = 4;
+  private final int numThreadsSmaller = 3;
+  private final int numThreadsLarger = 5;
+
+  @After
+  public void cleanup() {
+    ThreadsEngine.clearThreadPool();
+  }
+
+  @Test
+  public void testSetNumberOfThreadsSimple() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    assertEquals("Number of threads is not set correctly.", numThreads,
+        ThreadsEngine.getNumberOfThreads(), 0);
+  }
+
+  @Test
+  public void testSetNumberOfThreadsRepeat() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    assertEquals("Number of threads is not set correctly.", numThreads,
+        ThreadsEngine.getNumberOfThreads(), 0);
+  }
+
+  @Test
+  public void testSetNumberOfThreadsIncrease() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    ThreadsEngine.setNumberOfThreads(numThreadsLarger);
+    assertEquals("Number of threads is not set correctly.", numThreadsLarger,
+        ThreadsEngine.getNumberOfThreads(), 0);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetNumberOfThreadsDecrease() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    ThreadsEngine.setNumberOfThreads(numThreadsSmaller);
+    // Exception expected
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetNumberOfThreadsNegative() {
+    ThreadsEngine.setNumberOfThreads(-1);
+    // Exception expected
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetNumberOfThreadsZero() {
+    ThreadsEngine.setNumberOfThreads(0);
+    // Exception expected
+  }
+
+  @Test
+  public void testClearThreadPool() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    ThreadsEngine.clearThreadPool();
+    assertEquals("ThreadsEngine was not shutdown properly.", 0, 
ThreadsEngine.getNumberOfThreads());
+  }
+
+  @Test
+  public void testGetThreadWithIndexWithinPoolSize() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    for (int i = 0; i < numThreads; i++) {
+      assertNotNull("ExecutorService is not initialized correctly.", 
ThreadsEngine.getThreadWithIndex(i));
+    }
+  }
+
+  @Test
+  public void testGetThreadWithIndexOutOfPoolSize() {
+    ThreadsEngine.setNumberOfThreads(numThreads);
+    for (int i = 0; i < numThreads + 3; i++) {
+      assertNotNull("ExecutorService is not initialized correctly.", 
ThreadsEngine.getThreadWithIndex(i));
+    }
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testGetThreadWithIndexFromEmptyPool() {
+    for (int i = 0; i < numThreads; i++) {
+      ThreadsEngine.getThreadWithIndex(i);
+    }
+  }
+
+  @Test
+  public void testSubmitTopology() {
+    ThreadsEngine.submitTopology(topology, numThreads);
+    new Verifications() {
+      {
+        topology.run();
+        times = 1;
+      }
+    };
+    assertEquals("Number of threads is not set correctly.", numThreads,
+        ThreadsEngine.getNumberOfThreads(), 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
new file mode 100644
index 0000000..355da61
--- /dev/null
+++ 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
@@ -0,0 +1,151 @@
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package org.apache.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.StrictExpectations;
+import mockit.Tested;
+import mockit.Verifications;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.impl.ThreadsEntranceProcessingItem;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsEntranceProcessingItemTest {
+
+  @Tested
+  private ThreadsEntranceProcessingItem entrancePi;
+
+  @Mocked
+  private EntranceProcessor entranceProcessor;
+  @Mocked
+  private Stream outputStream, anotherStream;
+  @Mocked
+  private ContentEvent event;
+
+  @Mocked
+  private Thread unused;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    entrancePi = new ThreadsEntranceProcessingItem(entranceProcessor);
+  }
+
+  @Test
+  public void testContructor() {
+    assertSame("EntranceProcessor is not set correctly.", entranceProcessor, 
entrancePi.getProcessor());
+  }
+
+  @Test
+  public void testSetOutputStream() {
+    entrancePi.setOutputStream(outputStream);
+    assertSame("OutoutStream is not set correctly.", outputStream, 
entrancePi.getOutputStream());
+  }
+
+  @Test
+  public void testSetOutputStreamRepeate() {
+    entrancePi.setOutputStream(outputStream);
+    entrancePi.setOutputStream(outputStream);
+    assertSame("OutoutStream is not set correctly.", outputStream, 
entrancePi.getOutputStream());
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testSetOutputStreamError() {
+    entrancePi.setOutputStream(outputStream);
+    entrancePi.setOutputStream(anotherStream);
+  }
+
+  @Test
+  public void testStartSendingEvents() {
+    entrancePi.setOutputStream(outputStream);
+    new StrictExpectations() {
+      {
+        for (int i = 0; i < 1; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = false;
+        }
+
+        for (int i = 0; i < 5; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = true;
+          entranceProcessor.nextEvent();
+          result = event;
+          outputStream.put(event);
+        }
+
+        for (int i = 0; i < 2; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = false;
+        }
+
+        for (int i = 0; i < 5; i++) {
+          entranceProcessor.isFinished();
+          result = false;
+          entranceProcessor.hasNext();
+          result = true;
+          entranceProcessor.nextEvent();
+          result = event;
+          outputStream.put(event);
+        }
+
+        entranceProcessor.isFinished();
+        result = true;
+        times = 1;
+        entranceProcessor.hasNext();
+        times = 0;
+
+      }
+    };
+    entrancePi.startSendingEvents();
+    new Verifications() {
+      {
+        try {
+          Thread.sleep(anyInt);
+          times = 3;
+        } catch (InterruptedException e) {
+
+        }
+      }
+    };
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testStartSendingEventsError() {
+    entrancePi.startSendingEvents();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEventRunnableTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEventRunnableTest.java
 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEventRunnableTest.java
new file mode 100644
index 0000000..c5afe80
--- /dev/null
+++ 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEventRunnableTest.java
@@ -0,0 +1,72 @@
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package org.apache.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.Tested;
+import mockit.Verifications;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.topology.impl.ThreadsEventRunnable;
+import org.apache.samoa.topology.impl.ThreadsProcessingItemInstance;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsEventRunnableTest {
+
+  @Tested
+  private ThreadsEventRunnable task;
+
+  @Mocked
+  private ThreadsProcessingItemInstance piInstance;
+  @Mocked
+  private ContentEvent event;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    task = new ThreadsEventRunnable(piInstance, event);
+  }
+
+  @Test
+  public void testConstructor() {
+    assertSame("WorkerProcessingItem is not set correctly.", piInstance, 
task.getWorkerProcessingItem());
+    assertSame("ContentEvent is not set correctly.", event, 
task.getContentEvent());
+  }
+
+  @Test
+  public void testRun() {
+    task.run();
+    new Verifications() {
+      {
+        piInstance.processEvent(event);
+        times = 1;
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
new file mode 100644
index 0000000..7b64e48
--- /dev/null
+++ 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
@@ -0,0 +1,71 @@
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package org.apache.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.Tested;
+import mockit.Verifications;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.impl.ThreadsProcessingItemInstance;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsProcessingItemInstanceTest {
+
+  @Tested
+  private ThreadsProcessingItemInstance piInstance;
+
+  @Mocked
+  private Processor processor;
+  @Mocked
+  private ContentEvent event;
+
+  private final int threadIndex = 2;
+
+  @Before
+  public void setUp() throws Exception {
+    piInstance = new ThreadsProcessingItemInstance(processor, threadIndex);
+  }
+
+  @Test
+  public void testConstructor() {
+    assertSame("Processor is not set correctly.", processor, 
piInstance.getProcessor());
+    assertEquals("Thread index is not set correctly.", threadIndex, 
piInstance.getThreadIndex(), 0);
+  }
+
+  @Test
+  public void testProcessEvent() {
+    piInstance.processEvent(event);
+    new Verifications() {
+      {
+        processor.process(event);
+        times = 1;
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemTest.java
 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemTest.java
new file mode 100644
index 0000000..0f59936
--- /dev/null
+++ 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemTest.java
@@ -0,0 +1,184 @@
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package org.apache.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import mockit.Tested;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+import org.apache.samoa.topology.impl.ThreadsEngine;
+import org.apache.samoa.topology.impl.ThreadsEventRunnable;
+import org.apache.samoa.topology.impl.ThreadsProcessingItem;
+import org.apache.samoa.topology.impl.ThreadsProcessingItemInstance;
+import org.apache.samoa.topology.impl.ThreadsStream;
+import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.samoa.utils.StreamDestination;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsProcessingItemTest {
+
+  @Tested
+  private ThreadsProcessingItem pi;
+
+  @Mocked
+  private ThreadsEngine unused;
+  @Mocked
+  private ExecutorService threadPool;
+  @Mocked
+  private ThreadsEventRunnable task;
+
+  @Mocked
+  private Processor processor, processorReplica;
+  @Mocked
+  private ThreadsStream stream;
+  @Mocked
+  private StreamDestination destination;
+  @Mocked
+  private ContentEvent event;
+
+  private final int parallelism = 4;
+  private final int counter = 2;
+
+  private ThreadsProcessingItemInstance instance;
+
+  @Before
+  public void setUp() throws Exception {
+    new NonStrictExpectations() {
+      {
+        processor.newProcessor(processor);
+        result = processorReplica;
+      }
+    };
+    pi = new ThreadsProcessingItem(processor, parallelism);
+  }
+
+  @Test
+  public void testConstructor() {
+    assertSame("Processor was not set correctly.", processor, 
pi.getProcessor());
+    assertEquals("Parallelism was not set correctly.", parallelism, 
pi.getParallelism(), 0);
+  }
+
+  @Test
+  public void testConnectInputShuffleStream() {
+    new Expectations() {
+      {
+        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.SHUFFLE);
+        stream.addDestination(destination);
+      }
+    };
+    pi.connectInputShuffleStream(stream);
+  }
+
+  @Test
+  public void testConnectInputKeyStream() {
+    new Expectations() {
+      {
+        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.GROUP_BY_KEY);
+        stream.addDestination(destination);
+      }
+    };
+    pi.connectInputKeyStream(stream);
+  }
+
+  @Test
+  public void testConnectInputAllStream() {
+    new Expectations() {
+      {
+        destination = new StreamDestination(pi, parallelism, 
PartitioningScheme.BROADCAST);
+        stream.addDestination(destination);
+      }
+    };
+    pi.connectInputAllStream(stream);
+  }
+
+  @Test
+  public void testSetupInstances() {
+    new Expectations() {
+      {
+        for (int i = 0; i < parallelism; i++) {
+          processor.newProcessor(processor);
+          result = processor;
+
+          processor.onCreate(anyInt);
+        }
+      }
+    };
+    pi.setupInstances();
+    List<ThreadsProcessingItemInstance> instances = 
pi.getProcessingItemInstances();
+    assertNotNull("List of PI instances is null.", instances);
+    assertEquals("Number of instances does not match parallelism.", 
parallelism, instances.size(), 0);
+    for (int i = 0; i < instances.size(); i++) {
+      assertNotNull("Instance " + i + " is null.", instances.get(i));
+      assertEquals("Instance " + i + " is not a ThreadsWorkerProcessingItem.", 
ThreadsProcessingItemInstance.class,
+          instances.get(i).getClass());
+    }
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testProcessEventError() {
+    pi.processEvent(event, counter);
+  }
+
+  @Test
+  public void testProcessEvent() {
+    new Expectations() {
+      {
+        for (int i = 0; i < parallelism; i++) {
+          processor.newProcessor(processor);
+          result = processor;
+
+          processor.onCreate(anyInt);
+        }
+      }
+    };
+    pi.setupInstances();
+
+    instance = pi.getProcessingItemInstances().get(counter);
+    new NonStrictExpectations() {
+      {
+        ThreadsEngine.getThreadWithIndex(anyInt);
+        result = threadPool;
+
+      }
+    };
+    new Expectations() {
+      {
+        task = new ThreadsEventRunnable(instance, event);
+        threadPool.submit(task);
+      }
+    };
+    pi.processEvent(event, counter);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsStreamTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsStreamTest.java
 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsStreamTest.java
new file mode 100644
index 0000000..ef970ab
--- /dev/null
+++ 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsStreamTest.java
@@ -0,0 +1,136 @@
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package org.apache.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import mockit.Tested;
+
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.topology.impl.ThreadsProcessingItem;
+import org.apache.samoa.topology.impl.ThreadsStream;
+import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.samoa.utils.StreamDestination;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+@RunWith(Parameterized.class)
+public class ThreadsStreamTest {
+
+  @Tested
+  private ThreadsStream stream;
+
+  @Mocked
+  private ThreadsProcessingItem sourcePi, destPi;
+  @Mocked
+  private ContentEvent event;
+  @Mocked
+  private StreamDestination destination;
+
+  private final String eventKey = "eventkey";
+  private final int parallelism;
+  private final PartitioningScheme scheme;
+
+  @Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        { 2, PartitioningScheme.SHUFFLE },
+        { 3, PartitioningScheme.GROUP_BY_KEY },
+        { 4, PartitioningScheme.BROADCAST }
+    });
+  }
+
+  public ThreadsStreamTest(int parallelism, PartitioningScheme scheme) {
+    this.parallelism = parallelism;
+    this.scheme = scheme;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    stream = new ThreadsStream(sourcePi);
+    stream.addDestination(destination);
+  }
+
+  @Test
+  public void testAddDestination() {
+    boolean found = false;
+    for (StreamDestination sd : stream.getDestinations()) {
+      if (sd == destination) {
+        found = true;
+        break;
+      }
+    }
+    assertTrue("Destination object was not added in stream's destinations 
set.", found);
+  }
+
+  @Test
+  public void testPut() {
+    new NonStrictExpectations() {
+      {
+        event.getKey();
+        result = eventKey;
+        destination.getProcessingItem();
+        result = destPi;
+        destination.getPartitioningScheme();
+        result = scheme;
+        destination.getParallelism();
+        result = parallelism;
+
+      }
+    };
+    switch (this.scheme) {
+    case SHUFFLE:
+    case GROUP_BY_KEY:
+      new Expectations() {
+        {
+
+          // TODO: restrict the range of counter value
+          destPi.processEvent(event, anyInt);
+          times = 1;
+        }
+      };
+      break;
+    case BROADCAST:
+      new Expectations() {
+        {
+          // TODO: restrict the range of counter value
+          destPi.processEvent(event, anyInt);
+          times = parallelism;
+        }
+      };
+      break;
+    }
+    stream.put(event);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsTopologyTest.java
 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsTopologyTest.java
new file mode 100644
index 0000000..fb593ae
--- /dev/null
+++ 
b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsTopologyTest.java
@@ -0,0 +1,88 @@
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package org.apache.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.Set;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import mockit.Tested;
+
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.topology.EntranceProcessingItem;
+import org.apache.samoa.topology.impl.ThreadsEntranceProcessingItem;
+import org.apache.samoa.topology.impl.ThreadsTopology;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+public class ThreadsTopologyTest {
+
+  @Tested
+  private ThreadsTopology topology;
+
+  @Mocked
+  private ThreadsEntranceProcessingItem entrancePi;
+  @Mocked
+  private EntranceProcessor entranceProcessor;
+
+  @Before
+  public void setUp() throws Exception {
+    topology = new ThreadsTopology("TestTopology");
+  }
+
+  @Test
+  public void testAddEntrancePi() {
+    topology.addEntranceProcessingItem(entrancePi);
+    Set<EntranceProcessingItem> entrancePIs = 
topology.getEntranceProcessingItems();
+    assertNotNull("Set of entrance PIs is null.", entrancePIs);
+    assertEquals("Number of entrance PI in ThreadsTopology must be 1", 1, 
entrancePIs.size());
+    assertSame("Entrance PI was not set correctly.", entrancePi, 
entrancePIs.toArray()[0]);
+    // TODO: verify that entrance PI is in the set of ProcessingItems
+    // Need to access topology's set of PIs (getProcessingItems() method)
+  }
+
+  @Test
+  public void testRun() {
+    topology.addEntranceProcessingItem(entrancePi);
+
+    new Expectations() {
+      {
+        entrancePi.getProcessor();
+        result = entranceProcessor;
+        entranceProcessor.onCreate(anyInt);
+
+        entrancePi.startSendingEvents();
+      }
+    };
+    topology.run();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testRunWithoutEntrancePI() {
+    topology.run();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/utils/StreamDestinationTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/org/apache/samoa/utils/StreamDestinationTest.java 
b/samoa-threads/src/test/java/org/apache/samoa/utils/StreamDestinationTest.java
new file mode 100644
index 0000000..aa3ec74
--- /dev/null
+++ 
b/samoa-threads/src/test/java/org/apache/samoa/utils/StreamDestinationTest.java
@@ -0,0 +1,81 @@
+package org.apache.samoa.utils;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import mockit.Mocked;
+import mockit.Tested;
+
+import org.apache.samoa.topology.IProcessingItem;
+import org.apache.samoa.utils.PartitioningScheme;
+import org.apache.samoa.utils.StreamDestination;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * @author Anh Thu Vu
+ * 
+ */
+@RunWith(Parameterized.class)
+public class StreamDestinationTest {
+
+  @Tested
+  private StreamDestination destination;
+
+  @Mocked
+  private IProcessingItem pi;
+  private final int parallelism;
+  private final PartitioningScheme scheme;
+
+  @Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][] {
+        { 3, PartitioningScheme.SHUFFLE },
+        { 2, PartitioningScheme.GROUP_BY_KEY },
+        { 5, PartitioningScheme.BROADCAST }
+    });
+  }
+
+  public StreamDestinationTest(int parallelism, PartitioningScheme scheme) {
+    this.parallelism = parallelism;
+    this.scheme = scheme;
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    destination = new StreamDestination(pi, parallelism, scheme);
+  }
+
+  @Test
+  public void testContructor() {
+    assertSame("The IProcessingItem is not set correctly.", pi, 
destination.getProcessingItem());
+    assertEquals("Parallelism value is not set correctly.", parallelism, 
destination.getParallelism(), 0);
+    assertEquals("EventAllocationType is not set correctly.", scheme, 
destination.getPartitioningScheme());
+  }
+
+}

Reply via email to