http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java index d442572..c266c09 100644 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngine.java @@ -30,71 +30,72 @@ import java.util.concurrent.TimeUnit; /** * Multithreaded engine. + * * @author Anh Thu Vu - * + * */ public class ThreadsEngine { - - private static final List<ExecutorService> threadPool = new ArrayList<ExecutorService>(); - - /* - * Create and manage threads - */ - public static void setNumberOfThreads(int numThreads) { - if (numThreads < 1) - throw new IllegalStateException("Number of threads must be a positive integer."); - - if (threadPool.size() > numThreads) - throw new IllegalStateException("You cannot set a numThreads smaller than the current size of the threads pool."); - - if (threadPool.size() < numThreads) { - for (int i=threadPool.size(); i<numThreads; i++) { - threadPool.add(Executors.newSingleThreadExecutor()); - } - } - } - - public static int getNumberOfThreads() { - return threadPool.size(); - } - - public static ExecutorService getThreadWithIndex(int index) { - if (threadPool.size() <= 0 ) - throw new IllegalStateException("Try to get ExecutorService from an empty pool."); - index %= threadPool.size(); - return threadPool.get(index); - } - - /* - * Submit topology and start - */ - private static void submitTopology(Topology topology) { - ThreadsTopology tTopology = (ThreadsTopology) topology; - tTopology.run(); - } - - public static void submitTopology(Topology topology, int numThreads) { - ThreadsEngine.setNumberOfThreads(numThreads); - ThreadsEngine.submitTopology(topology); - } - - /* - * Stop - */ - public static void clearThreadPool() { - for (ExecutorService pool:threadPool) { - pool.shutdown(); - } - - for (ExecutorService pool:threadPool) { - try { - pool.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - threadPool.clear(); - } + + private static final List<ExecutorService> threadPool = new ArrayList<ExecutorService>(); + + /* + * Create and manage threads + */ + public static void setNumberOfThreads(int numThreads) { + if (numThreads < 1) + throw new IllegalStateException("Number of threads must be a positive integer."); + + if (threadPool.size() > numThreads) + throw new IllegalStateException("You cannot set a numThreads smaller than the current size of the threads pool."); + + if (threadPool.size() < numThreads) { + for (int i = threadPool.size(); i < numThreads; i++) { + threadPool.add(Executors.newSingleThreadExecutor()); + } + } + } + + public static int getNumberOfThreads() { + return threadPool.size(); + } + + public static ExecutorService getThreadWithIndex(int index) { + if (threadPool.size() <= 0) + throw new IllegalStateException("Try to get ExecutorService from an empty pool."); + index %= threadPool.size(); + return threadPool.get(index); + } + + /* + * Submit topology and start + */ + private static void submitTopology(Topology topology) { + ThreadsTopology tTopology = (ThreadsTopology) topology; + tTopology.run(); + } + + public static void submitTopology(Topology topology, int numThreads) { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.submitTopology(topology); + } + + /* + * Stop + */ + public static void clearThreadPool() { + for (ExecutorService pool : threadPool) { + pool.shutdown(); + } + + for (ExecutorService pool : threadPool) { + try { + pool.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + threadPool.clear(); + } }
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java index 008efb6..470c164 100644 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItem.java @@ -25,16 +25,17 @@ import com.yahoo.labs.samoa.topology.LocalEntranceProcessingItem; /** * EntranceProcessingItem for multithreaded engine. + * * @author Anh Thu Vu - * + * */ public class ThreadsEntranceProcessingItem extends LocalEntranceProcessingItem { - - public ThreadsEntranceProcessingItem(EntranceProcessor processor) { - super(processor); - } - - // The default waiting time when there is no available events is 100ms - // Override waitForNewEvents() to change it + + public ThreadsEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + // The default waiting time when there is no available events is 100ms + // Override waitForNewEvents() to change it } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java index 7cb8c18..4dd33db 100644 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnable.java @@ -23,39 +23,41 @@ package com.yahoo.labs.samoa.topology.impl; import com.yahoo.labs.samoa.core.ContentEvent; /** - * Runnable class where each object corresponds to a ContentEvent and an assigned PI. - * When a PI receives a ContentEvent, it will create a ThreadsEventRunnable with the received ContentEvent - * and an assigned workerPI. This runnable is then submitted to a thread queue waiting to be executed. - * The worker PI will process the received event when the runnable object is executed/run. + * Runnable class where each object corresponds to a ContentEvent and an + * assigned PI. When a PI receives a ContentEvent, it will create a + * ThreadsEventRunnable with the received ContentEvent and an assigned workerPI. + * This runnable is then submitted to a thread queue waiting to be executed. The + * worker PI will process the received event when the runnable object is + * executed/run. + * * @author Anh Thu Vu - * + * */ public class ThreadsEventRunnable implements Runnable { - private ThreadsProcessingItemInstance workerPi; - private ContentEvent event; - - public ThreadsEventRunnable(ThreadsProcessingItemInstance workerPi, ContentEvent event) { - this.workerPi = workerPi; - this.event = event; - } - - public ThreadsProcessingItemInstance getWorkerProcessingItem() { - return this.workerPi; - } - - public ContentEvent getContentEvent() { - return this.event; - } - - @Override - public void run() { - try { - workerPi.processEvent(event); - } - catch (Exception e) { - e.printStackTrace(); - } - } + private ThreadsProcessingItemInstance workerPi; + private ContentEvent event; + + public ThreadsEventRunnable(ThreadsProcessingItemInstance workerPi, ContentEvent event) { + this.workerPi = workerPi; + this.event = event; + } + + public ThreadsProcessingItemInstance getWorkerProcessingItem() { + return this.workerPi; + } + + public ContentEvent getContentEvent() { + return this.event; + } + + @Override + public void run() { + try { + workerPi.processEvent(event); + } catch (Exception e) { + e.printStackTrace(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java index 5eb6174..1b83a05 100644 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItem.java @@ -33,69 +33,71 @@ import com.yahoo.labs.samoa.utils.StreamDestination; /** * ProcessingItem for multithreaded engine. + * * @author Anh Thu Vu - * + * */ public class ThreadsProcessingItem extends AbstractProcessingItem { - // Replicas of the ProcessingItem. - // When ProcessingItem receives an event, it assigns one - // of these replicas to process the event. - private List<ThreadsProcessingItemInstance> piInstances; - - // Each replica of ProcessingItem is assigned to one of the - // available threads in a round-robin fashion, i.e.: each - // replica is associated with the index of a thread. - // Each ProcessingItem has a random offset variable so that - // the allocation of PI replicas to threads are spread evenly - // among all threads. - private int offset; - - /* - * Constructor - */ - public ThreadsProcessingItem(Processor processor, int parallelismHint) { - super(processor, parallelismHint); - this.offset = (int) (Math.random()*ThreadsEngine.getNumberOfThreads()); - } - - public List<ThreadsProcessingItemInstance> getProcessingItemInstances() { - return this.piInstances; - } + // Replicas of the ProcessingItem. + // When ProcessingItem receives an event, it assigns one + // of these replicas to process the event. + private List<ThreadsProcessingItemInstance> piInstances; + + // Each replica of ProcessingItem is assigned to one of the + // available threads in a round-robin fashion, i.e.: each + // replica is associated with the index of a thread. + // Each ProcessingItem has a random offset variable so that + // the allocation of PI replicas to threads are spread evenly + // among all threads. + private int offset; + + /* + * Constructor + */ + public ThreadsProcessingItem(Processor processor, int parallelismHint) { + super(processor, parallelismHint); + this.offset = (int) (Math.random() * ThreadsEngine.getNumberOfThreads()); + } + + public List<ThreadsProcessingItemInstance> getProcessingItemInstances() { + return this.piInstances; + } + + /* + * Connects to streams + */ + @Override + protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { + StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme); + ((ThreadsStream) inputStream).addDestination(destination); + return this; + } + + /* + * Process the received event. + */ + public void processEvent(ContentEvent event, int counter) { + if (this.piInstances == null || this.piInstances.size() < this.getParallelism()) + throw new IllegalStateException( + "ThreadsWorkerProcessingItem(s) need to be setup before process any event (i.e. in ThreadsTopology.start())."); - /* - * Connects to streams - */ - @Override - protected ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme) { - StreamDestination destination = new StreamDestination(this, this.getParallelism(), scheme); - ((ThreadsStream) inputStream).addDestination(destination); - return this; - } + ThreadsProcessingItemInstance piInstance = this.piInstances.get(counter); + ThreadsEventRunnable runnable = new ThreadsEventRunnable(piInstance, event); + ThreadsEngine.getThreadWithIndex(piInstance.getThreadIndex()).submit(runnable); + } - /* - * Process the received event. - */ - public void processEvent(ContentEvent event, int counter) { - if (this.piInstances == null || this.piInstances.size() < this.getParallelism()) - throw new IllegalStateException("ThreadsWorkerProcessingItem(s) need to be setup before process any event (i.e. in ThreadsTopology.start())."); - - ThreadsProcessingItemInstance piInstance = this.piInstances.get(counter); - ThreadsEventRunnable runnable = new ThreadsEventRunnable(piInstance, event); - ThreadsEngine.getThreadWithIndex(piInstance.getThreadIndex()).submit(runnable); - } - - /* - * Setup the replicas of this PI. - * This should be called after the topology is set up (all Processors and PIs are - * setup and connected to the respective streams) and before events are sent. - */ - public void setupInstances() { - this.piInstances = new ArrayList<ThreadsProcessingItemInstance>(this.getParallelism()); - for (int i=0; i<this.getParallelism(); i++) { - Processor newProcessor = this.getProcessor().newProcessor(this.getProcessor()); - newProcessor.onCreate(i + 1); - this.piInstances.add(new ThreadsProcessingItemInstance(newProcessor, this.offset + i)); - } - } + /* + * Setup the replicas of this PI. This should be called after the topology is + * set up (all Processors and PIs are setup and connected to the respective + * streams) and before events are sent. + */ + public void setupInstances() { + this.piInstances = new ArrayList<ThreadsProcessingItemInstance>(this.getParallelism()); + for (int i = 0; i < this.getParallelism(); i++) { + Processor newProcessor = this.getProcessor().newProcessor(this.getProcessor()); + newProcessor.onCreate(i + 1); + this.piInstances.add(new ThreadsProcessingItemInstance(newProcessor, this.offset + i)); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java index 9a400d1..73052ea 100644 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstance.java @@ -24,31 +24,32 @@ import com.yahoo.labs.samoa.core.ContentEvent; import com.yahoo.labs.samoa.core.Processor; /** - * Lightweight replicas of ThreadProcessingItem. - * ThreadsProcessingItem manages a list of these objects and - * assigns each incoming message to be processed by one of them. + * Lightweight replicas of ThreadProcessingItem. ThreadsProcessingItem manages a + * list of these objects and assigns each incoming message to be processed by + * one of them. + * * @author Anh Thu Vu - * + * */ public class ThreadsProcessingItemInstance { - private Processor processor; - private int threadIndex; - - public ThreadsProcessingItemInstance(Processor processor, int threadIndex) { - this.processor = processor; - this.threadIndex = threadIndex; - } - - public int getThreadIndex() { - return this.threadIndex; - } - - public Processor getProcessor() { - return this.processor; - } - - public void processEvent(ContentEvent event) { - this.processor.process(event); - } + private Processor processor; + private int threadIndex; + + public ThreadsProcessingItemInstance(Processor processor, int threadIndex) { + this.processor = processor; + this.threadIndex = threadIndex; + } + + public int getThreadIndex() { + return this.threadIndex; + } + + public Processor getProcessor() { + return this.processor; + } + + public void processEvent(ContentEvent event) { + this.processor.process(event); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java index 5aa86f7..2c02df7 100644 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsStream.java @@ -32,75 +32,78 @@ import com.yahoo.labs.samoa.utils.StreamDestination; /** * Stream for multithreaded engine. + * * @author Anh Thu Vu - * + * */ public class ThreadsStream extends AbstractStream { - - private List<StreamDestination> destinations; - private int counter = 0; - private int maxCounter = 1; - - public ThreadsStream(IProcessingItem sourcePi) { - destinations = new LinkedList<StreamDestination>(); - } - - public void addDestination(StreamDestination destination) { - destinations.add(destination); - maxCounter *= destination.getParallelism(); - } - - public List<StreamDestination> getDestinations() { - return this.destinations; - } - - private int getNextCounter() { - if (maxCounter > 0 && counter >= maxCounter) counter = 0; - this.counter++; - return this.counter; - } - @Override - public synchronized void put(ContentEvent event) { - this.put(event, this.getNextCounter()); - } - - private void put(ContentEvent event, int counter) { - ThreadsProcessingItem pi; - int parallelism; - for (StreamDestination destination:destinations) { - pi = (ThreadsProcessingItem) destination.getProcessingItem(); - parallelism = destination.getParallelism(); - switch (destination.getPartitioningScheme()) { - case SHUFFLE: - pi.processEvent(event, counter%parallelism); - break; - case GROUP_BY_KEY: - pi.processEvent(event, getPIIndexForKey(event.getKey(), parallelism)); - break; - case BROADCAST: - for (int p = 0; p < parallelism; p++) { - pi.processEvent(event, p); - } - break; - } + private List<StreamDestination> destinations; + private int counter = 0; + private int maxCounter = 1; + + public ThreadsStream(IProcessingItem sourcePi) { + destinations = new LinkedList<StreamDestination>(); + } + + public void addDestination(StreamDestination destination) { + destinations.add(destination); + maxCounter *= destination.getParallelism(); + } + + public List<StreamDestination> getDestinations() { + return this.destinations; + } + + private int getNextCounter() { + if (maxCounter > 0 && counter >= maxCounter) + counter = 0; + this.counter++; + return this.counter; + } + + @Override + public synchronized void put(ContentEvent event) { + this.put(event, this.getNextCounter()); + } + + private void put(ContentEvent event, int counter) { + ThreadsProcessingItem pi; + int parallelism; + for (StreamDestination destination : destinations) { + pi = (ThreadsProcessingItem) destination.getProcessingItem(); + parallelism = destination.getParallelism(); + switch (destination.getPartitioningScheme()) { + case SHUFFLE: + pi.processEvent(event, counter % parallelism); + break; + case GROUP_BY_KEY: + pi.processEvent(event, getPIIndexForKey(event.getKey(), parallelism)); + break; + case BROADCAST: + for (int p = 0; p < parallelism; p++) { + pi.processEvent(event, p); } + break; + } + } + } + + private static int getPIIndexForKey(String key, int parallelism) { + // If key is null, return a default index: 0 + if (key == null) + return 0; + + // HashCodeBuilder object does not have reset() method + // So all objects that get appended will be included in the + // computation of the hashcode. + // To avoid initialize a HashCodeBuilder for each event, + // here I use the static method with reflection on the event's key + int index = HashCodeBuilder.reflectionHashCode(key, true) % parallelism; + if (index < 0) { + index += parallelism; } - - private static int getPIIndexForKey(String key, int parallelism) { - // If key is null, return a default index: 0 - if (key == null) return 0; - - // HashCodeBuilder object does not have reset() method - // So all objects that get appended will be included in the - // computation of the hashcode. - // To avoid initialize a HashCodeBuilder for each event, - // here I use the static method with reflection on the event's key - int index = HashCodeBuilder.reflectionHashCode(key, true) % parallelism; - if (index < 0) { - index += parallelism; - } - return index; - } + return index; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java index fc9f885..a6bad2b 100644 --- a/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java +++ b/samoa-threads/src/main/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopology.java @@ -25,38 +25,42 @@ import com.yahoo.labs.samoa.topology.IProcessingItem; /** * Topology for multithreaded engine. + * * @author Anh Thu Vu - * + * */ public class ThreadsTopology extends AbstractTopology { - ThreadsTopology(String name) { - super(name); - } + ThreadsTopology(String name) { + super(name); + } + + public void run() { + if (this.getEntranceProcessingItems() == null) + throw new IllegalStateException("You need to set entrance PI before running the topology."); + if (this.getEntranceProcessingItems().size() != 1) + throw new IllegalStateException("ThreadsTopology supports 1 entrance PI only. Number of entrance PIs is " + + this.getEntranceProcessingItems().size()); + + this.setupProcessingItemInstances(); + ThreadsEntranceProcessingItem entrancePi = (ThreadsEntranceProcessingItem) this.getEntranceProcessingItems() + .toArray()[0]; + if (entrancePi == null) + throw new IllegalStateException("You need to set entrance PI before running the topology."); + entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple + // mode + entrancePi.startSendingEvents(); + } - public void run() { - if (this.getEntranceProcessingItems() == null) - throw new IllegalStateException("You need to set entrance PI before running the topology."); - if (this.getEntranceProcessingItems().size() != 1) - throw new IllegalStateException("ThreadsTopology supports 1 entrance PI only. Number of entrance PIs is "+this.getEntranceProcessingItems().size()); - - this.setupProcessingItemInstances(); - ThreadsEntranceProcessingItem entrancePi = (ThreadsEntranceProcessingItem) this.getEntranceProcessingItems().toArray()[0]; - if (entrancePi == null) - throw new IllegalStateException("You need to set entrance PI before running the topology."); - entrancePi.getProcessor().onCreate(0); // id=0 as it is not used in simple mode - entrancePi.startSendingEvents(); + /* + * Tell all the ThreadsProcessingItems to create & init their replicas + * (ThreadsProcessingItemInstance) + */ + private void setupProcessingItemInstances() { + for (IProcessingItem pi : this.getProcessingItems()) { + if (pi instanceof ThreadsProcessingItem) { + ThreadsProcessingItem tpi = (ThreadsProcessingItem) pi; + tpi.setupInstances(); + } } - - /* - * Tell all the ThreadsProcessingItems to create & init their - * replicas (ThreadsProcessingItemInstance) - */ - private void setupProcessingItemInstances() { - for (IProcessingItem pi:this.getProcessingItems()) { - if (pi instanceof ThreadsProcessingItem) { - ThreadsProcessingItem tpi = (ThreadsProcessingItem) pi; - tpi.setupInstances(); - } - } - } + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java index 5979d46..0b9b8a2 100644 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java +++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/AlgosTest.java @@ -24,45 +24,45 @@ import org.junit.Test; public class AlgosTest { - @Test(timeout = 60000) - public void testVHTWithThreads() throws Exception { + @Test(timeout = 60000) + public void testVHTWithThreads() throws Exception { - TestParams vhtConfig = new TestParams.Builder() - .inputInstances(200_000) - .samplingSize(20_000) - .evaluationInstances(200_000) - .classifiedInstances(200_000) - .classificationsCorrect(55f) - .kappaStat(-0.1f) - .kappaTempStat(-0.1f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE + " -t 2") - .resultFilePollTimeout(10) - .prePollWait(10) - .taskClassName(LocalThreadsDoTask.class.getName()) - .build(); - TestUtils.test(vhtConfig); + TestParams vhtConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(200_000) + .classifiedInstances(200_000) + .classificationsCorrect(55f) + .kappaStat(-0.1f) + .kappaTempStat(-0.1f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE + " -t 2") + .resultFilePollTimeout(10) + .prePollWait(10) + .taskClassName(LocalThreadsDoTask.class.getName()) + .build(); + TestUtils.test(vhtConfig); - } + } - @Test(timeout = 180000) - public void testBaggingWithThreads() throws Exception { - TestParams baggingConfig = new TestParams.Builder() - .inputInstances(100_000) - .samplingSize(10_000) - .inputDelayMicroSec(100) // prevents saturating the system due to unbounded queues - .evaluationInstances(90_000) - .classifiedInstances(105_000) - .classificationsCorrect(55f) - .kappaStat(0f) - .kappaTempStat(0f) - .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE + " -t 2") - .prePollWait(10) - .resultFilePollTimeout(30) - .taskClassName(LocalThreadsDoTask.class.getName()) - .build(); - TestUtils.test(baggingConfig); - - } + @Test(timeout = 180000) + public void testBaggingWithThreads() throws Exception { + TestParams baggingConfig = new TestParams.Builder() + .inputInstances(100_000) + .samplingSize(10_000) + .inputDelayMicroSec(100) // prevents saturating the system due to + // unbounded queues + .evaluationInstances(90_000) + .classifiedInstances(105_000) + .classificationsCorrect(55f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE + " -t 2") + .prePollWait(10) + .resultFilePollTimeout(30) + .taskClassName(LocalThreadsDoTask.class.getName()) + .build(); + TestUtils.test(baggingConfig); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java index eee8639..12b5b90 100644 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java +++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java @@ -37,78 +37,81 @@ import com.yahoo.labs.samoa.topology.Topology; /** * @author Anh Thu Vu - * + * */ public class ThreadsComponentFactoryTest { - @Tested private ThreadsComponentFactory factory; - @Mocked private Processor processor, processorReplica; - @Mocked private EntranceProcessor entranceProcessor; - - private final int parallelism = 3; - private final String topoName = "TestTopology"; - + @Tested + private ThreadsComponentFactory factory; + @Mocked + private Processor processor, processorReplica; + @Mocked + private EntranceProcessor entranceProcessor; + + private final int parallelism = 3; + private final String topoName = "TestTopology"; + + @Before + public void setUp() throws Exception { + factory = new ThreadsComponentFactory(); + } + + @Test + public void testCreatePiNoParallelism() { + new NonStrictExpectations() { + { + processor.newProcessor(processor); + result = processorReplica; + } + }; + ProcessingItem pi = factory.createPi(processor); + assertNotNull("ProcessingItem created is null.", pi); + assertEquals("ProcessingItem created is not a ThreadsProcessingItem.", ThreadsProcessingItem.class, pi.getClass()); + assertEquals("Parallelism of PI is not 1", 1, pi.getParallelism(), 0); + } + + @Test + public void testCreatePiWithParallelism() { + new NonStrictExpectations() { + { + processor.newProcessor(processor); + result = processorReplica; + } + }; + ProcessingItem pi = factory.createPi(processor, parallelism); + assertNotNull("ProcessingItem created is null.", pi); + assertEquals("ProcessingItem created is not a ThreadsProcessingItem.", ThreadsProcessingItem.class, pi.getClass()); + assertEquals("Parallelism of PI is not ", parallelism, pi.getParallelism(), 0); + } + + @Test + public void testCreateStream() { + new NonStrictExpectations() { + { + processor.newProcessor(processor); + result = processorReplica; + } + }; + ProcessingItem pi = factory.createPi(processor); + + Stream stream = factory.createStream(pi); + assertNotNull("Stream created is null", stream); + assertEquals("Stream created is not a ThreadsStream.", ThreadsStream.class, stream.getClass()); + } - @Before - public void setUp() throws Exception { - factory = new ThreadsComponentFactory(); - } + @Test + public void testCreateTopology() { + Topology topology = factory.createTopology(topoName); + assertNotNull("Topology created is null.", topology); + assertEquals("Topology created is not a ThreadsTopology.", ThreadsTopology.class, topology.getClass()); + } - @Test - public void testCreatePiNoParallelism() { - new NonStrictExpectations() { - { - processor.newProcessor(processor); - result=processorReplica; - } - }; - ProcessingItem pi = factory.createPi(processor); - assertNotNull("ProcessingItem created is null.",pi); - assertEquals("ProcessingItem created is not a ThreadsProcessingItem.",ThreadsProcessingItem.class,pi.getClass()); - assertEquals("Parallelism of PI is not 1",1,pi.getParallelism(),0); - } - - @Test - public void testCreatePiWithParallelism() { - new NonStrictExpectations() { - { - processor.newProcessor(processor); - result=processorReplica; - } - }; - ProcessingItem pi = factory.createPi(processor,parallelism); - assertNotNull("ProcessingItem created is null.",pi); - assertEquals("ProcessingItem created is not a ThreadsProcessingItem.",ThreadsProcessingItem.class,pi.getClass()); - assertEquals("Parallelism of PI is not ",parallelism,pi.getParallelism(),0); - } - - @Test - public void testCreateStream() { - new NonStrictExpectations() { - { - processor.newProcessor(processor); - result=processorReplica; - } - }; - ProcessingItem pi = factory.createPi(processor); - - Stream stream = factory.createStream(pi); - assertNotNull("Stream created is null",stream); - assertEquals("Stream created is not a ThreadsStream.",ThreadsStream.class,stream.getClass()); - } - - @Test - public void testCreateTopology() { - Topology topology = factory.createTopology(topoName); - assertNotNull("Topology created is null.",topology); - assertEquals("Topology created is not a ThreadsTopology.",ThreadsTopology.class,topology.getClass()); - } - - @Test - public void testCreateEntrancePi() { - EntranceProcessingItem entrancePi = factory.createEntrancePi(entranceProcessor); - assertNotNull("EntranceProcessingItem created is null.",entrancePi); - assertEquals("EntranceProcessingItem created is not a ThreadsEntranceProcessingItem.",ThreadsEntranceProcessingItem.class,entrancePi.getClass()); - assertSame("EntranceProcessor is not set correctly.",entranceProcessor, entrancePi.getProcessor()); - } + @Test + public void testCreateEntrancePi() { + EntranceProcessingItem entrancePi = factory.createEntrancePi(entranceProcessor); + assertNotNull("EntranceProcessingItem created is null.", entrancePi); + assertEquals("EntranceProcessingItem created is not a ThreadsEntranceProcessingItem.", + ThreadsEntranceProcessingItem.class, entrancePi.getClass()); + assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor()); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java index cdb8949..c8a3c3d 100644 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java +++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java @@ -29,101 +29,105 @@ import org.junit.Test; /** * @author Anh Thu Vu - * + * */ public class ThreadsEngineTest { - @Mocked ThreadsTopology topology; - - private final int numThreads = 4; - private final int numThreadsSmaller = 3; - private final int numThreadsLarger = 5; - - @After - public void cleanup() { - ThreadsEngine.clearThreadPool(); - } - - @Test - public void testSetNumberOfThreadsSimple() { - ThreadsEngine.setNumberOfThreads(numThreads); - assertEquals("Number of threads is not set correctly.", numThreads, - ThreadsEngine.getNumberOfThreads(),0); - } - - @Test - public void testSetNumberOfThreadsRepeat() { - ThreadsEngine.setNumberOfThreads(numThreads); - ThreadsEngine.setNumberOfThreads(numThreads); - assertEquals("Number of threads is not set correctly.", numThreads, - ThreadsEngine.getNumberOfThreads(),0); - } - - @Test - public void testSetNumberOfThreadsIncrease() { - ThreadsEngine.setNumberOfThreads(numThreads); - ThreadsEngine.setNumberOfThreads(numThreadsLarger); - assertEquals("Number of threads is not set correctly.", numThreadsLarger, - ThreadsEngine.getNumberOfThreads(),0); - } - - @Test(expected=IllegalStateException.class) - public void testSetNumberOfThreadsDecrease() { - ThreadsEngine.setNumberOfThreads(numThreads); - ThreadsEngine.setNumberOfThreads(numThreadsSmaller); - // Exception expected - } - - @Test(expected=IllegalStateException.class) - public void testSetNumberOfThreadsNegative() { - ThreadsEngine.setNumberOfThreads(-1); - // Exception expected - } - - @Test(expected=IllegalStateException.class) - public void testSetNumberOfThreadsZero() { - ThreadsEngine.setNumberOfThreads(0); - // Exception expected - } - - @Test - public void testClearThreadPool() { - ThreadsEngine.setNumberOfThreads(numThreads); - ThreadsEngine.clearThreadPool(); - assertEquals("ThreadsEngine was not shutdown properly.", 0, ThreadsEngine.getNumberOfThreads()); - } - - @Test - public void testGetThreadWithIndexWithinPoolSize() { - ThreadsEngine.setNumberOfThreads(numThreads); - for (int i=0; i<numThreads; i++) { - assertNotNull("ExecutorService is not initialized correctly.", ThreadsEngine.getThreadWithIndex(i)); - } - } - - @Test - public void testGetThreadWithIndexOutOfPoolSize() { - ThreadsEngine.setNumberOfThreads(numThreads); - for (int i=0; i<numThreads+3; i++) { - assertNotNull("ExecutorService is not initialized correctly.", ThreadsEngine.getThreadWithIndex(i)); - } - } - - @Test(expected=IllegalStateException.class) - public void testGetThreadWithIndexFromEmptyPool() { - for (int i=0; i<numThreads; i++) { - ThreadsEngine.getThreadWithIndex(i); - } - } - - @Test - public void testSubmitTopology() { - ThreadsEngine.submitTopology(topology, numThreads); - new Verifications() {{ - topology.run(); times=1; - }}; - assertEquals("Number of threads is not set correctly.", numThreads, - ThreadsEngine.getNumberOfThreads(),0); - } + @Mocked + ThreadsTopology topology; + + private final int numThreads = 4; + private final int numThreadsSmaller = 3; + private final int numThreadsLarger = 5; + + @After + public void cleanup() { + ThreadsEngine.clearThreadPool(); + } + + @Test + public void testSetNumberOfThreadsSimple() { + ThreadsEngine.setNumberOfThreads(numThreads); + assertEquals("Number of threads is not set correctly.", numThreads, + ThreadsEngine.getNumberOfThreads(), 0); + } + + @Test + public void testSetNumberOfThreadsRepeat() { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.setNumberOfThreads(numThreads); + assertEquals("Number of threads is not set correctly.", numThreads, + ThreadsEngine.getNumberOfThreads(), 0); + } + + @Test + public void testSetNumberOfThreadsIncrease() { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.setNumberOfThreads(numThreadsLarger); + assertEquals("Number of threads is not set correctly.", numThreadsLarger, + ThreadsEngine.getNumberOfThreads(), 0); + } + + @Test(expected = IllegalStateException.class) + public void testSetNumberOfThreadsDecrease() { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.setNumberOfThreads(numThreadsSmaller); + // Exception expected + } + + @Test(expected = IllegalStateException.class) + public void testSetNumberOfThreadsNegative() { + ThreadsEngine.setNumberOfThreads(-1); + // Exception expected + } + + @Test(expected = IllegalStateException.class) + public void testSetNumberOfThreadsZero() { + ThreadsEngine.setNumberOfThreads(0); + // Exception expected + } + + @Test + public void testClearThreadPool() { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.clearThreadPool(); + assertEquals("ThreadsEngine was not shutdown properly.", 0, ThreadsEngine.getNumberOfThreads()); + } + + @Test + public void testGetThreadWithIndexWithinPoolSize() { + ThreadsEngine.setNumberOfThreads(numThreads); + for (int i = 0; i < numThreads; i++) { + assertNotNull("ExecutorService is not initialized correctly.", ThreadsEngine.getThreadWithIndex(i)); + } + } + + @Test + public void testGetThreadWithIndexOutOfPoolSize() { + ThreadsEngine.setNumberOfThreads(numThreads); + for (int i = 0; i < numThreads + 3; i++) { + assertNotNull("ExecutorService is not initialized correctly.", ThreadsEngine.getThreadWithIndex(i)); + } + } + + @Test(expected = IllegalStateException.class) + public void testGetThreadWithIndexFromEmptyPool() { + for (int i = 0; i < numThreads; i++) { + ThreadsEngine.getThreadWithIndex(i); + } + } + + @Test + public void testSubmitTopology() { + ThreadsEngine.submitTopology(topology, numThreads); + new Verifications() { + { + topology.run(); + times = 1; + } + }; + assertEquals("Number of threads is not set correctly.", numThreads, + ThreadsEngine.getNumberOfThreads(), 0); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java index 2dab489..db2a3fb 100644 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java +++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java @@ -34,100 +34,118 @@ import com.yahoo.labs.samoa.topology.Stream; /** * @author Anh Thu Vu - * + * */ public class ThreadsEntranceProcessingItemTest { - @Tested private ThreadsEntranceProcessingItem entrancePi; - - @Mocked private EntranceProcessor entranceProcessor; - @Mocked private Stream outputStream, anotherStream; - @Mocked private ContentEvent event; - - @Mocked private Thread unused; - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - entrancePi = new ThreadsEntranceProcessingItem(entranceProcessor); - } - - @Test - public void testContructor() { - assertSame("EntranceProcessor is not set correctly.",entranceProcessor,entrancePi.getProcessor()); - } - - @Test - public void testSetOutputStream() { - entrancePi.setOutputStream(outputStream); - assertSame("OutoutStream is not set correctly.",outputStream,entrancePi.getOutputStream()); - } - - @Test - public void testSetOutputStreamRepeate() { - entrancePi.setOutputStream(outputStream); - entrancePi.setOutputStream(outputStream); - assertSame("OutoutStream is not set correctly.",outputStream,entrancePi.getOutputStream()); - } - - @Test(expected=IllegalStateException.class) - public void testSetOutputStreamError() { - entrancePi.setOutputStream(outputStream); - entrancePi.setOutputStream(anotherStream); - } - - @Test - public void testStartSendingEvents() { - entrancePi.setOutputStream(outputStream); - new StrictExpectations() { - { - for (int i=0; i<1; i++) { - entranceProcessor.isFinished(); result=false; - entranceProcessor.hasNext(); result=false; - } - - for (int i=0; i<5; i++) { - entranceProcessor.isFinished(); result=false; - entranceProcessor.hasNext(); result=true; - entranceProcessor.nextEvent(); result=event; - outputStream.put(event); - } - - for (int i=0; i<2; i++) { - entranceProcessor.isFinished(); result=false; - entranceProcessor.hasNext(); result=false; - } - - for (int i=0; i<5; i++) { - entranceProcessor.isFinished(); result=false; - entranceProcessor.hasNext(); result=true; - entranceProcessor.nextEvent(); result=event; - outputStream.put(event); - } - - entranceProcessor.isFinished(); result=true; times=1; - entranceProcessor.hasNext(); times=0; - - - } - }; - entrancePi.startSendingEvents(); - new Verifications() { - { - try { - Thread.sleep(anyInt); times=3; - } catch (InterruptedException e) { - - } - } - }; - } - - @Test(expected=IllegalStateException.class) - public void testStartSendingEventsError() { - entrancePi.startSendingEvents(); - } + @Tested + private ThreadsEntranceProcessingItem entrancePi; + + @Mocked + private EntranceProcessor entranceProcessor; + @Mocked + private Stream outputStream, anotherStream; + @Mocked + private ContentEvent event; + + @Mocked + private Thread unused; + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + entrancePi = new ThreadsEntranceProcessingItem(entranceProcessor); + } + + @Test + public void testContructor() { + assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor()); + } + + @Test + public void testSetOutputStream() { + entrancePi.setOutputStream(outputStream); + assertSame("OutoutStream is not set correctly.", outputStream, entrancePi.getOutputStream()); + } + + @Test + public void testSetOutputStreamRepeate() { + entrancePi.setOutputStream(outputStream); + entrancePi.setOutputStream(outputStream); + assertSame("OutoutStream is not set correctly.", outputStream, entrancePi.getOutputStream()); + } + + @Test(expected = IllegalStateException.class) + public void testSetOutputStreamError() { + entrancePi.setOutputStream(outputStream); + entrancePi.setOutputStream(anotherStream); + } + + @Test + public void testStartSendingEvents() { + entrancePi.setOutputStream(outputStream); + new StrictExpectations() { + { + for (int i = 0; i < 1; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = false; + } + + for (int i = 0; i < 5; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = true; + entranceProcessor.nextEvent(); + result = event; + outputStream.put(event); + } + + for (int i = 0; i < 2; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = false; + } + + for (int i = 0; i < 5; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = true; + entranceProcessor.nextEvent(); + result = event; + outputStream.put(event); + } + + entranceProcessor.isFinished(); + result = true; + times = 1; + entranceProcessor.hasNext(); + times = 0; + + } + }; + entrancePi.startSendingEvents(); + new Verifications() { + { + try { + Thread.sleep(anyInt); + times = 3; + } catch (InterruptedException e) { + + } + } + }; + } + + @Test(expected = IllegalStateException.class) + public void testStartSendingEventsError() { + entrancePi.startSendingEvents(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java index f744162..1e70d10 100644 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java +++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java @@ -31,37 +31,41 @@ import com.yahoo.labs.samoa.core.ContentEvent; /** * @author Anh Thu Vu - * + * */ public class ThreadsEventRunnableTest { - @Tested private ThreadsEventRunnable task; - - @Mocked private ThreadsProcessingItemInstance piInstance; - @Mocked private ContentEvent event; - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - task = new ThreadsEventRunnable(piInstance, event); - } + @Tested + private ThreadsEventRunnable task; + + @Mocked + private ThreadsProcessingItemInstance piInstance; + @Mocked + private ContentEvent event; + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + task = new ThreadsEventRunnable(piInstance, event); + } + + @Test + public void testConstructor() { + assertSame("WorkerProcessingItem is not set correctly.", piInstance, task.getWorkerProcessingItem()); + assertSame("ContentEvent is not set correctly.", event, task.getContentEvent()); + } - @Test - public void testConstructor() { - assertSame("WorkerProcessingItem is not set correctly.",piInstance,task.getWorkerProcessingItem()); - assertSame("ContentEvent is not set correctly.",event,task.getContentEvent()); - } - - @Test - public void testRun() { - task.run(); - new Verifications () { - { - piInstance.processEvent(event); times=1; - } - }; - } + @Test + public void testRun() { + task.run(); + new Verifications() { + { + piInstance.processEvent(event); + times = 1; + } + }; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java index 33af044..d4f78b0 100644 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java +++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java @@ -32,36 +32,40 @@ import com.yahoo.labs.samoa.core.Processor; /** * @author Anh Thu Vu - * + * */ public class ThreadsProcessingItemInstanceTest { - @Tested private ThreadsProcessingItemInstance piInstance; - - @Mocked private Processor processor; - @Mocked private ContentEvent event; - - private final int threadIndex = 2; - - @Before - public void setUp() throws Exception { - piInstance = new ThreadsProcessingItemInstance(processor, threadIndex); - } + @Tested + private ThreadsProcessingItemInstance piInstance; + + @Mocked + private Processor processor; + @Mocked + private ContentEvent event; + + private final int threadIndex = 2; + + @Before + public void setUp() throws Exception { + piInstance = new ThreadsProcessingItemInstance(processor, threadIndex); + } + + @Test + public void testConstructor() { + assertSame("Processor is not set correctly.", processor, piInstance.getProcessor()); + assertEquals("Thread index is not set correctly.", threadIndex, piInstance.getThreadIndex(), 0); + } - @Test - public void testConstructor() { - assertSame("Processor is not set correctly.", processor, piInstance.getProcessor()); - assertEquals("Thread index is not set correctly.", threadIndex, piInstance.getThreadIndex(),0); - } - - @Test - public void testProcessEvent() { - piInstance.processEvent(event); - new Verifications() { - { - processor.process(event); times=1; - } - }; - } + @Test + public void testProcessEvent() { + piInstance.processEvent(event); + new Verifications() { + { + processor.process(event); + times = 1; + } + }; + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java index ad7cd56..d148e8e 100644 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java +++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java @@ -39,135 +39,142 @@ import com.yahoo.labs.samoa.utils.StreamDestination; /** * @author Anh Thu Vu - * + * */ public class ThreadsProcessingItemTest { - @Tested private ThreadsProcessingItem pi; - - @Mocked private ThreadsEngine unused; - @Mocked private ExecutorService threadPool; - @Mocked private ThreadsEventRunnable task; - - @Mocked private Processor processor, processorReplica; - @Mocked private ThreadsStream stream; - @Mocked private StreamDestination destination; - @Mocked private ContentEvent event; - - private final int parallelism = 4; - private final int counter = 2; - - private ThreadsProcessingItemInstance instance; - - - @Before - public void setUp() throws Exception { - new NonStrictExpectations() { - { - processor.newProcessor(processor); - result=processorReplica; - } - }; - pi = new ThreadsProcessingItem(processor, parallelism); - } - - @Test - public void testConstructor() { - assertSame("Processor was not set correctly.",processor,pi.getProcessor()); - assertEquals("Parallelism was not set correctly.",parallelism,pi.getParallelism(),0); - } - - @Test - public void testConnectInputShuffleStream() { - new Expectations() { - { - destination = new StreamDestination(pi, parallelism, PartitioningScheme.SHUFFLE); - stream.addDestination(destination); - } - }; - pi.connectInputShuffleStream(stream); - } - - @Test - public void testConnectInputKeyStream() { - new Expectations() { - { - destination = new StreamDestination(pi, parallelism, PartitioningScheme.GROUP_BY_KEY); - stream.addDestination(destination); - } - }; - pi.connectInputKeyStream(stream); - } - - @Test - public void testConnectInputAllStream() { - new Expectations() { - { - destination = new StreamDestination(pi, parallelism, PartitioningScheme.BROADCAST); - stream.addDestination(destination); - } - }; - pi.connectInputAllStream(stream); - } - - @Test - public void testSetupInstances() { - new Expectations() { - { - for (int i=0; i<parallelism; i++) { - processor.newProcessor(processor); - result=processor; - - processor.onCreate(anyInt); - } - } - }; - pi.setupInstances(); - List<ThreadsProcessingItemInstance> instances = pi.getProcessingItemInstances(); - assertNotNull("List of PI instances is null.",instances); - assertEquals("Number of instances does not match parallelism.",parallelism,instances.size(),0); - for(int i=0; i<instances.size();i++) { - assertNotNull("Instance "+i+" is null.",instances.get(i)); - assertEquals("Instance "+i+" is not a ThreadsWorkerProcessingItem.",ThreadsProcessingItemInstance.class,instances.get(i).getClass()); - } - } - - @Test(expected=IllegalStateException.class) - public void testProcessEventError() { - pi.processEvent(event, counter); - } - - @Test - public void testProcessEvent() { - new Expectations() { - { - for (int i=0; i<parallelism; i++) { - processor.newProcessor(processor); - result=processor; - - processor.onCreate(anyInt); - } - } - }; - pi.setupInstances(); - - instance = pi.getProcessingItemInstances().get(counter); - new NonStrictExpectations() { - { - ThreadsEngine.getThreadWithIndex(anyInt); - result=threadPool; - - - } - }; - new Expectations() { - { - task = new ThreadsEventRunnable(instance, event); - threadPool.submit(task); - } - }; - pi.processEvent(event, counter); - - } + @Tested + private ThreadsProcessingItem pi; + + @Mocked + private ThreadsEngine unused; + @Mocked + private ExecutorService threadPool; + @Mocked + private ThreadsEventRunnable task; + + @Mocked + private Processor processor, processorReplica; + @Mocked + private ThreadsStream stream; + @Mocked + private StreamDestination destination; + @Mocked + private ContentEvent event; + + private final int parallelism = 4; + private final int counter = 2; + + private ThreadsProcessingItemInstance instance; + + @Before + public void setUp() throws Exception { + new NonStrictExpectations() { + { + processor.newProcessor(processor); + result = processorReplica; + } + }; + pi = new ThreadsProcessingItem(processor, parallelism); + } + + @Test + public void testConstructor() { + assertSame("Processor was not set correctly.", processor, pi.getProcessor()); + assertEquals("Parallelism was not set correctly.", parallelism, pi.getParallelism(), 0); + } + + @Test + public void testConnectInputShuffleStream() { + new Expectations() { + { + destination = new StreamDestination(pi, parallelism, PartitioningScheme.SHUFFLE); + stream.addDestination(destination); + } + }; + pi.connectInputShuffleStream(stream); + } + + @Test + public void testConnectInputKeyStream() { + new Expectations() { + { + destination = new StreamDestination(pi, parallelism, PartitioningScheme.GROUP_BY_KEY); + stream.addDestination(destination); + } + }; + pi.connectInputKeyStream(stream); + } + + @Test + public void testConnectInputAllStream() { + new Expectations() { + { + destination = new StreamDestination(pi, parallelism, PartitioningScheme.BROADCAST); + stream.addDestination(destination); + } + }; + pi.connectInputAllStream(stream); + } + + @Test + public void testSetupInstances() { + new Expectations() { + { + for (int i = 0; i < parallelism; i++) { + processor.newProcessor(processor); + result = processor; + + processor.onCreate(anyInt); + } + } + }; + pi.setupInstances(); + List<ThreadsProcessingItemInstance> instances = pi.getProcessingItemInstances(); + assertNotNull("List of PI instances is null.", instances); + assertEquals("Number of instances does not match parallelism.", parallelism, instances.size(), 0); + for (int i = 0; i < instances.size(); i++) { + assertNotNull("Instance " + i + " is null.", instances.get(i)); + assertEquals("Instance " + i + " is not a ThreadsWorkerProcessingItem.", ThreadsProcessingItemInstance.class, + instances.get(i).getClass()); + } + } + + @Test(expected = IllegalStateException.class) + public void testProcessEventError() { + pi.processEvent(event, counter); + } + + @Test + public void testProcessEvent() { + new Expectations() { + { + for (int i = 0; i < parallelism; i++) { + processor.newProcessor(processor); + result = processor; + + processor.onCreate(anyInt); + } + } + }; + pi.setupInstances(); + + instance = pi.getProcessingItemInstances().get(counter); + new NonStrictExpectations() { + { + ThreadsEngine.getThreadWithIndex(anyInt); + result = threadPool; + + } + }; + new Expectations() { + { + task = new ThreadsEventRunnable(instance, event); + threadPool.submit(task); + } + }; + pi.processEvent(event, counter); + + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java index 27d2acd..abe57ce 100644 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java +++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java @@ -41,87 +41,95 @@ import com.yahoo.labs.samoa.utils.StreamDestination; /** * @author Anh Thu Vu - * + * */ @RunWith(Parameterized.class) public class ThreadsStreamTest { - - @Tested private ThreadsStream stream; - - @Mocked private ThreadsProcessingItem sourcePi, destPi; - @Mocked private ContentEvent event; - @Mocked private StreamDestination destination; - - private final String eventKey = "eventkey"; - private final int parallelism; - private final PartitioningScheme scheme; - - - @Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][] { - { 2, PartitioningScheme.SHUFFLE }, - { 3, PartitioningScheme.GROUP_BY_KEY }, - { 4, PartitioningScheme.BROADCAST } - }); - } - - public ThreadsStreamTest(int parallelism, PartitioningScheme scheme) { - this.parallelism = parallelism; - this.scheme = scheme; - } - - @Before - public void setUp() throws Exception { - stream = new ThreadsStream(sourcePi); - stream.addDestination(destination); - } - - @Test - public void testAddDestination() { - boolean found = false; - for (StreamDestination sd:stream.getDestinations()) { - if (sd == destination) { - found = true; - break; - } - } - assertTrue("Destination object was not added in stream's destinations set.",found); - } - - @Test - public void testPut() { - new NonStrictExpectations() { - { - event.getKey(); result=eventKey; - destination.getProcessingItem(); result=destPi; - destination.getPartitioningScheme(); result=scheme; - destination.getParallelism(); result=parallelism; - - } - }; - switch(this.scheme) { - case SHUFFLE: case GROUP_BY_KEY: - new Expectations() { - { - - // TODO: restrict the range of counter value - destPi.processEvent(event, anyInt); times=1; - } - }; - break; - case BROADCAST: - new Expectations() { - { - // TODO: restrict the range of counter value - destPi.processEvent(event, anyInt); times=parallelism; - } - }; - break; - } - stream.put(event); - } - - + + @Tested + private ThreadsStream stream; + + @Mocked + private ThreadsProcessingItem sourcePi, destPi; + @Mocked + private ContentEvent event; + @Mocked + private StreamDestination destination; + + private final String eventKey = "eventkey"; + private final int parallelism; + private final PartitioningScheme scheme; + + @Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][] { + { 2, PartitioningScheme.SHUFFLE }, + { 3, PartitioningScheme.GROUP_BY_KEY }, + { 4, PartitioningScheme.BROADCAST } + }); + } + + public ThreadsStreamTest(int parallelism, PartitioningScheme scheme) { + this.parallelism = parallelism; + this.scheme = scheme; + } + + @Before + public void setUp() throws Exception { + stream = new ThreadsStream(sourcePi); + stream.addDestination(destination); + } + + @Test + public void testAddDestination() { + boolean found = false; + for (StreamDestination sd : stream.getDestinations()) { + if (sd == destination) { + found = true; + break; + } + } + assertTrue("Destination object was not added in stream's destinations set.", found); + } + + @Test + public void testPut() { + new NonStrictExpectations() { + { + event.getKey(); + result = eventKey; + destination.getProcessingItem(); + result = destPi; + destination.getPartitioningScheme(); + result = scheme; + destination.getParallelism(); + result = parallelism; + + } + }; + switch (this.scheme) { + case SHUFFLE: + case GROUP_BY_KEY: + new Expectations() { + { + + // TODO: restrict the range of counter value + destPi.processEvent(event, anyInt); + times = 1; + } + }; + break; + case BROADCAST: + new Expectations() { + { + // TODO: restrict the range of counter value + destPi.processEvent(event, anyInt); + times = parallelism; + } + }; + break; + } + stream.put(event); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java index 46847f5..6891a63 100644 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java +++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java @@ -35,50 +35,53 @@ import com.yahoo.labs.samoa.topology.EntranceProcessingItem; /** * @author Anh Thu Vu - * + * */ public class ThreadsTopologyTest { - @Tested private ThreadsTopology topology; - - @Mocked private ThreadsEntranceProcessingItem entrancePi; - @Mocked private EntranceProcessor entranceProcessor; - - @Before - public void setUp() throws Exception { - topology = new ThreadsTopology("TestTopology"); - } + @Tested + private ThreadsTopology topology; + + @Mocked + private ThreadsEntranceProcessingItem entrancePi; + @Mocked + private EntranceProcessor entranceProcessor; + + @Before + public void setUp() throws Exception { + topology = new ThreadsTopology("TestTopology"); + } + + @Test + public void testAddEntrancePi() { + topology.addEntranceProcessingItem(entrancePi); + Set<EntranceProcessingItem> entrancePIs = topology.getEntranceProcessingItems(); + assertNotNull("Set of entrance PIs is null.", entrancePIs); + assertEquals("Number of entrance PI in ThreadsTopology must be 1", 1, entrancePIs.size()); + assertSame("Entrance PI was not set correctly.", entrancePi, entrancePIs.toArray()[0]); + // TODO: verify that entrance PI is in the set of ProcessingItems + // Need to access topology's set of PIs (getProcessingItems() method) + } + + @Test + public void testRun() { + topology.addEntranceProcessingItem(entrancePi); + + new Expectations() { + { + entrancePi.getProcessor(); + result = entranceProcessor; + entranceProcessor.onCreate(anyInt); + + entrancePi.startSendingEvents(); + } + }; + topology.run(); + } - @Test - public void testAddEntrancePi() { - topology.addEntranceProcessingItem(entrancePi); - Set<EntranceProcessingItem> entrancePIs = topology.getEntranceProcessingItems(); - assertNotNull("Set of entrance PIs is null.",entrancePIs); - assertEquals("Number of entrance PI in ThreadsTopology must be 1",1,entrancePIs.size()); - assertSame("Entrance PI was not set correctly.",entrancePi,entrancePIs.toArray()[0]); - // TODO: verify that entrance PI is in the set of ProcessingItems - // Need to access topology's set of PIs (getProcessingItems() method) - } - - @Test - public void testRun() { - topology.addEntranceProcessingItem(entrancePi); - - new Expectations() { - { - entrancePi.getProcessor(); - result=entranceProcessor; - entranceProcessor.onCreate(anyInt); - - entrancePi.startSendingEvents(); - } - }; - topology.run(); - } - - @Test(expected=IllegalStateException.class) - public void testRunWithoutEntrancePI() { - topology.run(); - } + @Test(expected = IllegalStateException.class) + public void testRunWithoutEntrancePI() { + topology.run(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/23a35dbe/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java index 19c5421..c165b3e 100644 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java +++ b/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java @@ -40,41 +40,43 @@ import com.yahoo.labs.samoa.utils.StreamDestination; /** * @author Anh Thu Vu - * + * */ @RunWith(Parameterized.class) public class StreamDestinationTest { - @Tested private StreamDestination destination; - - @Mocked private IProcessingItem pi; - private final int parallelism; - private final PartitioningScheme scheme; - - @Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][] { - { 3, PartitioningScheme.SHUFFLE }, - { 2, PartitioningScheme.GROUP_BY_KEY }, - { 5, PartitioningScheme.BROADCAST } - }); - } - - public StreamDestinationTest(int parallelism, PartitioningScheme scheme) { - this.parallelism = parallelism; - this.scheme = scheme; - } - - @Before - public void setUp() throws Exception { - destination = new StreamDestination(pi, parallelism, scheme); - } + @Tested + private StreamDestination destination; + + @Mocked + private IProcessingItem pi; + private final int parallelism; + private final PartitioningScheme scheme; + + @Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][] { + { 3, PartitioningScheme.SHUFFLE }, + { 2, PartitioningScheme.GROUP_BY_KEY }, + { 5, PartitioningScheme.BROADCAST } + }); + } + + public StreamDestinationTest(int parallelism, PartitioningScheme scheme) { + this.parallelism = parallelism; + this.scheme = scheme; + } + + @Before + public void setUp() throws Exception { + destination = new StreamDestination(pi, parallelism, scheme); + } - @Test - public void testContructor() { - assertSame("The IProcessingItem is not set correctly.", pi, destination.getProcessingItem()); - assertEquals("Parallelism value is not set correctly.", parallelism, destination.getParallelism(), 0); - assertEquals("EventAllocationType is not set correctly.", scheme, destination.getPartitioningScheme()); - } + @Test + public void testContructor() { + assertSame("The IProcessingItem is not set correctly.", pi, destination.getProcessingItem()); + assertEquals("Parallelism value is not set correctly.", parallelism, destination.getParallelism(), 0); + assertEquals("EventAllocationType is not set correctly.", scheme, destination.getPartitioningScheme()); + } }
