Repository: incubator-samoa Updated Branches: refs/heads/master 64ef7a921 -> 9b178f631
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java deleted file mode 100644 index 12b5b90..0000000 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsComponentFactoryTest.java +++ /dev/null @@ -1,117 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import static org.junit.Assert.*; -import mockit.Mocked; -import mockit.NonStrictExpectations; -import mockit.Tested; - -import org.junit.Before; -import org.junit.Test; - -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.topology.EntranceProcessingItem; -import com.yahoo.labs.samoa.topology.ProcessingItem; -import com.yahoo.labs.samoa.topology.Stream; -import com.yahoo.labs.samoa.topology.Topology; - -/** - * @author Anh Thu Vu - * - */ -public class ThreadsComponentFactoryTest { - @Tested - private ThreadsComponentFactory factory; - @Mocked - private Processor processor, processorReplica; - @Mocked - private EntranceProcessor entranceProcessor; - - private final int parallelism = 3; - private final String topoName = "TestTopology"; - - @Before - public void setUp() throws Exception { - factory = new ThreadsComponentFactory(); - } - - @Test - public void testCreatePiNoParallelism() { - new NonStrictExpectations() { - { - processor.newProcessor(processor); - result = processorReplica; - } - }; - ProcessingItem pi = factory.createPi(processor); - assertNotNull("ProcessingItem created is null.", pi); - assertEquals("ProcessingItem created is not a ThreadsProcessingItem.", ThreadsProcessingItem.class, pi.getClass()); - assertEquals("Parallelism of PI is not 1", 1, pi.getParallelism(), 0); - } - - @Test - public void testCreatePiWithParallelism() { - new NonStrictExpectations() { - { - processor.newProcessor(processor); - result = processorReplica; - } - }; - ProcessingItem pi = factory.createPi(processor, parallelism); - assertNotNull("ProcessingItem created is null.", pi); - assertEquals("ProcessingItem created is not a ThreadsProcessingItem.", ThreadsProcessingItem.class, pi.getClass()); - assertEquals("Parallelism of PI is not ", parallelism, pi.getParallelism(), 0); - } - - @Test - public void testCreateStream() { - new NonStrictExpectations() { - { - processor.newProcessor(processor); - result = processorReplica; - } - }; - ProcessingItem pi = factory.createPi(processor); - - Stream stream = factory.createStream(pi); - assertNotNull("Stream created is null", stream); - assertEquals("Stream created is not a ThreadsStream.", ThreadsStream.class, stream.getClass()); - } - - @Test - public void testCreateTopology() { - Topology topology = factory.createTopology(topoName); - assertNotNull("Topology created is null.", topology); - assertEquals("Topology created is not a ThreadsTopology.", ThreadsTopology.class, topology.getClass()); - } - - @Test - public void testCreateEntrancePi() { - EntranceProcessingItem entrancePi = factory.createEntrancePi(entranceProcessor); - assertNotNull("EntranceProcessingItem created is null.", entrancePi); - assertEquals("EntranceProcessingItem created is not a ThreadsEntranceProcessingItem.", - ThreadsEntranceProcessingItem.class, entrancePi.getClass()); - assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java deleted file mode 100644 index c8a3c3d..0000000 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEngineTest.java +++ /dev/null @@ -1,133 +0,0 @@ -package com.yahoo.labs.samoa.topology.impl; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import static org.junit.Assert.*; -import mockit.Mocked; -import mockit.Verifications; - -import org.junit.After; -import org.junit.Test; - -/** - * @author Anh Thu Vu - * - */ -public class ThreadsEngineTest { - - @Mocked - ThreadsTopology topology; - - private final int numThreads = 4; - private final int numThreadsSmaller = 3; - private final int numThreadsLarger = 5; - - @After - public void cleanup() { - ThreadsEngine.clearThreadPool(); - } - - @Test - public void testSetNumberOfThreadsSimple() { - ThreadsEngine.setNumberOfThreads(numThreads); - assertEquals("Number of threads is not set correctly.", numThreads, - ThreadsEngine.getNumberOfThreads(), 0); - } - - @Test - public void testSetNumberOfThreadsRepeat() { - ThreadsEngine.setNumberOfThreads(numThreads); - ThreadsEngine.setNumberOfThreads(numThreads); - assertEquals("Number of threads is not set correctly.", numThreads, - ThreadsEngine.getNumberOfThreads(), 0); - } - - @Test - public void testSetNumberOfThreadsIncrease() { - ThreadsEngine.setNumberOfThreads(numThreads); - ThreadsEngine.setNumberOfThreads(numThreadsLarger); - assertEquals("Number of threads is not set correctly.", numThreadsLarger, - ThreadsEngine.getNumberOfThreads(), 0); - } - - @Test(expected = IllegalStateException.class) - public void testSetNumberOfThreadsDecrease() { - ThreadsEngine.setNumberOfThreads(numThreads); - ThreadsEngine.setNumberOfThreads(numThreadsSmaller); - // Exception expected - } - - @Test(expected = IllegalStateException.class) - public void testSetNumberOfThreadsNegative() { - ThreadsEngine.setNumberOfThreads(-1); - // Exception expected - } - - @Test(expected = IllegalStateException.class) - public void testSetNumberOfThreadsZero() { - ThreadsEngine.setNumberOfThreads(0); - // Exception expected - } - - @Test - public void testClearThreadPool() { - ThreadsEngine.setNumberOfThreads(numThreads); - ThreadsEngine.clearThreadPool(); - assertEquals("ThreadsEngine was not shutdown properly.", 0, ThreadsEngine.getNumberOfThreads()); - } - - @Test - public void testGetThreadWithIndexWithinPoolSize() { - ThreadsEngine.setNumberOfThreads(numThreads); - for (int i = 0; i < numThreads; i++) { - assertNotNull("ExecutorService is not initialized correctly.", ThreadsEngine.getThreadWithIndex(i)); - } - } - - @Test - public void testGetThreadWithIndexOutOfPoolSize() { - ThreadsEngine.setNumberOfThreads(numThreads); - for (int i = 0; i < numThreads + 3; i++) { - assertNotNull("ExecutorService is not initialized correctly.", ThreadsEngine.getThreadWithIndex(i)); - } - } - - @Test(expected = IllegalStateException.class) - public void testGetThreadWithIndexFromEmptyPool() { - for (int i = 0; i < numThreads; i++) { - ThreadsEngine.getThreadWithIndex(i); - } - } - - @Test - public void testSubmitTopology() { - ThreadsEngine.submitTopology(topology, numThreads); - new Verifications() { - { - topology.run(); - times = 1; - } - }; - assertEquals("Number of threads is not set correctly.", numThreads, - ThreadsEngine.getNumberOfThreads(), 0); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java deleted file mode 100644 index db2a3fb..0000000 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package com.yahoo.labs.samoa.topology.impl; - -import static org.junit.Assert.*; -import mockit.Mocked; -import mockit.StrictExpectations; -import mockit.Tested; -import mockit.Verifications; - -import org.junit.Before; -import org.junit.Test; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.topology.Stream; - -/** - * @author Anh Thu Vu - * - */ -public class ThreadsEntranceProcessingItemTest { - - @Tested - private ThreadsEntranceProcessingItem entrancePi; - - @Mocked - private EntranceProcessor entranceProcessor; - @Mocked - private Stream outputStream, anotherStream; - @Mocked - private ContentEvent event; - - @Mocked - private Thread unused; - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - entrancePi = new ThreadsEntranceProcessingItem(entranceProcessor); - } - - @Test - public void testContructor() { - assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor()); - } - - @Test - public void testSetOutputStream() { - entrancePi.setOutputStream(outputStream); - assertSame("OutoutStream is not set correctly.", outputStream, entrancePi.getOutputStream()); - } - - @Test - public void testSetOutputStreamRepeate() { - entrancePi.setOutputStream(outputStream); - entrancePi.setOutputStream(outputStream); - assertSame("OutoutStream is not set correctly.", outputStream, entrancePi.getOutputStream()); - } - - @Test(expected = IllegalStateException.class) - public void testSetOutputStreamError() { - entrancePi.setOutputStream(outputStream); - entrancePi.setOutputStream(anotherStream); - } - - @Test - public void testStartSendingEvents() { - entrancePi.setOutputStream(outputStream); - new StrictExpectations() { - { - for (int i = 0; i < 1; i++) { - entranceProcessor.isFinished(); - result = false; - entranceProcessor.hasNext(); - result = false; - } - - for (int i = 0; i < 5; i++) { - entranceProcessor.isFinished(); - result = false; - entranceProcessor.hasNext(); - result = true; - entranceProcessor.nextEvent(); - result = event; - outputStream.put(event); - } - - for (int i = 0; i < 2; i++) { - entranceProcessor.isFinished(); - result = false; - entranceProcessor.hasNext(); - result = false; - } - - for (int i = 0; i < 5; i++) { - entranceProcessor.isFinished(); - result = false; - entranceProcessor.hasNext(); - result = true; - entranceProcessor.nextEvent(); - result = event; - outputStream.put(event); - } - - entranceProcessor.isFinished(); - result = true; - times = 1; - entranceProcessor.hasNext(); - times = 0; - - } - }; - entrancePi.startSendingEvents(); - new Verifications() { - { - try { - Thread.sleep(anyInt); - times = 3; - } catch (InterruptedException e) { - - } - } - }; - } - - @Test(expected = IllegalStateException.class) - public void testStartSendingEventsError() { - entrancePi.startSendingEvents(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java deleted file mode 100644 index 1e70d10..0000000 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package com.yahoo.labs.samoa.topology.impl; - -import static org.junit.Assert.*; -import mockit.Mocked; -import mockit.Tested; -import mockit.Verifications; - -import org.junit.Before; -import org.junit.Test; - -import com.yahoo.labs.samoa.core.ContentEvent; - -/** - * @author Anh Thu Vu - * - */ -public class ThreadsEventRunnableTest { - - @Tested - private ThreadsEventRunnable task; - - @Mocked - private ThreadsProcessingItemInstance piInstance; - @Mocked - private ContentEvent event; - - /** - * @throws java.lang.Exception - */ - @Before - public void setUp() throws Exception { - task = new ThreadsEventRunnable(piInstance, event); - } - - @Test - public void testConstructor() { - assertSame("WorkerProcessingItem is not set correctly.", piInstance, task.getWorkerProcessingItem()); - assertSame("ContentEvent is not set correctly.", event, task.getContentEvent()); - } - - @Test - public void testRun() { - task.run(); - new Verifications() { - { - piInstance.processEvent(event); - times = 1; - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java deleted file mode 100644 index d4f78b0..0000000 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package com.yahoo.labs.samoa.topology.impl; - -import static org.junit.Assert.*; -import mockit.Mocked; -import mockit.Tested; -import mockit.Verifications; - -import org.junit.Before; -import org.junit.Test; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.Processor; - -/** - * @author Anh Thu Vu - * - */ -public class ThreadsProcessingItemInstanceTest { - - @Tested - private ThreadsProcessingItemInstance piInstance; - - @Mocked - private Processor processor; - @Mocked - private ContentEvent event; - - private final int threadIndex = 2; - - @Before - public void setUp() throws Exception { - piInstance = new ThreadsProcessingItemInstance(processor, threadIndex); - } - - @Test - public void testConstructor() { - assertSame("Processor is not set correctly.", processor, piInstance.getProcessor()); - assertEquals("Thread index is not set correctly.", threadIndex, piInstance.getThreadIndex(), 0); - } - - @Test - public void testProcessEvent() { - piInstance.processEvent(event); - new Verifications() { - { - processor.process(event); - times = 1; - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java deleted file mode 100644 index d148e8e..0000000 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package com.yahoo.labs.samoa.topology.impl; - -import static org.junit.Assert.*; - -import java.util.List; -import java.util.concurrent.ExecutorService; - -import mockit.Expectations; -import mockit.Mocked; -import mockit.NonStrictExpectations; -import mockit.Tested; - -import org.junit.Before; -import org.junit.Test; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.core.Processor; -import com.yahoo.labs.samoa.utils.PartitioningScheme; -import com.yahoo.labs.samoa.utils.StreamDestination; - -/** - * @author Anh Thu Vu - * - */ -public class ThreadsProcessingItemTest { - - @Tested - private ThreadsProcessingItem pi; - - @Mocked - private ThreadsEngine unused; - @Mocked - private ExecutorService threadPool; - @Mocked - private ThreadsEventRunnable task; - - @Mocked - private Processor processor, processorReplica; - @Mocked - private ThreadsStream stream; - @Mocked - private StreamDestination destination; - @Mocked - private ContentEvent event; - - private final int parallelism = 4; - private final int counter = 2; - - private ThreadsProcessingItemInstance instance; - - @Before - public void setUp() throws Exception { - new NonStrictExpectations() { - { - processor.newProcessor(processor); - result = processorReplica; - } - }; - pi = new ThreadsProcessingItem(processor, parallelism); - } - - @Test - public void testConstructor() { - assertSame("Processor was not set correctly.", processor, pi.getProcessor()); - assertEquals("Parallelism was not set correctly.", parallelism, pi.getParallelism(), 0); - } - - @Test - public void testConnectInputShuffleStream() { - new Expectations() { - { - destination = new StreamDestination(pi, parallelism, PartitioningScheme.SHUFFLE); - stream.addDestination(destination); - } - }; - pi.connectInputShuffleStream(stream); - } - - @Test - public void testConnectInputKeyStream() { - new Expectations() { - { - destination = new StreamDestination(pi, parallelism, PartitioningScheme.GROUP_BY_KEY); - stream.addDestination(destination); - } - }; - pi.connectInputKeyStream(stream); - } - - @Test - public void testConnectInputAllStream() { - new Expectations() { - { - destination = new StreamDestination(pi, parallelism, PartitioningScheme.BROADCAST); - stream.addDestination(destination); - } - }; - pi.connectInputAllStream(stream); - } - - @Test - public void testSetupInstances() { - new Expectations() { - { - for (int i = 0; i < parallelism; i++) { - processor.newProcessor(processor); - result = processor; - - processor.onCreate(anyInt); - } - } - }; - pi.setupInstances(); - List<ThreadsProcessingItemInstance> instances = pi.getProcessingItemInstances(); - assertNotNull("List of PI instances is null.", instances); - assertEquals("Number of instances does not match parallelism.", parallelism, instances.size(), 0); - for (int i = 0; i < instances.size(); i++) { - assertNotNull("Instance " + i + " is null.", instances.get(i)); - assertEquals("Instance " + i + " is not a ThreadsWorkerProcessingItem.", ThreadsProcessingItemInstance.class, - instances.get(i).getClass()); - } - } - - @Test(expected = IllegalStateException.class) - public void testProcessEventError() { - pi.processEvent(event, counter); - } - - @Test - public void testProcessEvent() { - new Expectations() { - { - for (int i = 0; i < parallelism; i++) { - processor.newProcessor(processor); - result = processor; - - processor.onCreate(anyInt); - } - } - }; - pi.setupInstances(); - - instance = pi.getProcessingItemInstances().get(counter); - new NonStrictExpectations() { - { - ThreadsEngine.getThreadWithIndex(anyInt); - result = threadPool; - - } - }; - new Expectations() { - { - task = new ThreadsEventRunnable(instance, event); - threadPool.submit(task); - } - }; - pi.processEvent(event, counter); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java deleted file mode 100644 index abe57ce..0000000 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package com.yahoo.labs.samoa.topology.impl; - -import static org.junit.Assert.*; - -import java.util.Arrays; -import java.util.Collection; - -import mockit.Expectations; -import mockit.Mocked; -import mockit.NonStrictExpectations; -import mockit.Tested; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import com.yahoo.labs.samoa.core.ContentEvent; -import com.yahoo.labs.samoa.utils.PartitioningScheme; -import com.yahoo.labs.samoa.utils.StreamDestination; - -/** - * @author Anh Thu Vu - * - */ -@RunWith(Parameterized.class) -public class ThreadsStreamTest { - - @Tested - private ThreadsStream stream; - - @Mocked - private ThreadsProcessingItem sourcePi, destPi; - @Mocked - private ContentEvent event; - @Mocked - private StreamDestination destination; - - private final String eventKey = "eventkey"; - private final int parallelism; - private final PartitioningScheme scheme; - - @Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][] { - { 2, PartitioningScheme.SHUFFLE }, - { 3, PartitioningScheme.GROUP_BY_KEY }, - { 4, PartitioningScheme.BROADCAST } - }); - } - - public ThreadsStreamTest(int parallelism, PartitioningScheme scheme) { - this.parallelism = parallelism; - this.scheme = scheme; - } - - @Before - public void setUp() throws Exception { - stream = new ThreadsStream(sourcePi); - stream.addDestination(destination); - } - - @Test - public void testAddDestination() { - boolean found = false; - for (StreamDestination sd : stream.getDestinations()) { - if (sd == destination) { - found = true; - break; - } - } - assertTrue("Destination object was not added in stream's destinations set.", found); - } - - @Test - public void testPut() { - new NonStrictExpectations() { - { - event.getKey(); - result = eventKey; - destination.getProcessingItem(); - result = destPi; - destination.getPartitioningScheme(); - result = scheme; - destination.getParallelism(); - result = parallelism; - - } - }; - switch (this.scheme) { - case SHUFFLE: - case GROUP_BY_KEY: - new Expectations() { - { - - // TODO: restrict the range of counter value - destPi.processEvent(event, anyInt); - times = 1; - } - }; - break; - case BROADCAST: - new Expectations() { - { - // TODO: restrict the range of counter value - destPi.processEvent(event, anyInt); - times = parallelism; - } - }; - break; - } - stream.put(event); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java deleted file mode 100644 index 6891a63..0000000 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ -package com.yahoo.labs.samoa.topology.impl; - -import static org.junit.Assert.*; - -import java.util.Set; - -import mockit.Expectations; -import mockit.Mocked; -import mockit.Tested; - -import org.junit.Before; -import org.junit.Test; - -import com.yahoo.labs.samoa.core.EntranceProcessor; -import com.yahoo.labs.samoa.topology.EntranceProcessingItem; - -/** - * @author Anh Thu Vu - * - */ -public class ThreadsTopologyTest { - - @Tested - private ThreadsTopology topology; - - @Mocked - private ThreadsEntranceProcessingItem entrancePi; - @Mocked - private EntranceProcessor entranceProcessor; - - @Before - public void setUp() throws Exception { - topology = new ThreadsTopology("TestTopology"); - } - - @Test - public void testAddEntrancePi() { - topology.addEntranceProcessingItem(entrancePi); - Set<EntranceProcessingItem> entrancePIs = topology.getEntranceProcessingItems(); - assertNotNull("Set of entrance PIs is null.", entrancePIs); - assertEquals("Number of entrance PI in ThreadsTopology must be 1", 1, entrancePIs.size()); - assertSame("Entrance PI was not set correctly.", entrancePi, entrancePIs.toArray()[0]); - // TODO: verify that entrance PI is in the set of ProcessingItems - // Need to access topology's set of PIs (getProcessingItems() method) - } - - @Test - public void testRun() { - topology.addEntranceProcessingItem(entrancePi); - - new Expectations() { - { - entrancePi.getProcessor(); - result = entranceProcessor; - entranceProcessor.onCreate(anyInt); - - entrancePi.startSendingEvents(); - } - }; - topology.run(); - } - - @Test(expected = IllegalStateException.class) - public void testRunWithoutEntrancePI() { - topology.run(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java b/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java deleted file mode 100644 index c165b3e..0000000 --- a/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java +++ /dev/null @@ -1,82 +0,0 @@ -package com.yahoo.labs.samoa.utils; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2013 Yahoo! Inc. - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import static org.junit.Assert.*; - -import java.util.Arrays; -import java.util.Collection; - -import mockit.Mocked; -import mockit.Tested; - -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -import com.yahoo.labs.samoa.topology.IProcessingItem; -import com.yahoo.labs.samoa.utils.PartitioningScheme; -import com.yahoo.labs.samoa.utils.StreamDestination; - -/** - * @author Anh Thu Vu - * - */ -@RunWith(Parameterized.class) -public class StreamDestinationTest { - - @Tested - private StreamDestination destination; - - @Mocked - private IProcessingItem pi; - private final int parallelism; - private final PartitioningScheme scheme; - - @Parameters - public static Collection<Object[]> generateParameters() { - return Arrays.asList(new Object[][] { - { 3, PartitioningScheme.SHUFFLE }, - { 2, PartitioningScheme.GROUP_BY_KEY }, - { 5, PartitioningScheme.BROADCAST } - }); - } - - public StreamDestinationTest(int parallelism, PartitioningScheme scheme) { - this.parallelism = parallelism; - this.scheme = scheme; - } - - @Before - public void setUp() throws Exception { - destination = new StreamDestination(pi, parallelism, scheme); - } - - @Test - public void testContructor() { - assertSame("The IProcessingItem is not set correctly.", pi, destination.getProcessingItem()); - assertEquals("Parallelism value is not set correctly.", parallelism, destination.getParallelism(), 0); - assertEquals("EventAllocationType is not set correctly.", scheme, destination.getPartitioningScheme()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/AlgosTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/org/apache/samoa/AlgosTest.java b/samoa-threads/src/test/java/org/apache/samoa/AlgosTest.java new file mode 100644 index 0000000..e14b1e4 --- /dev/null +++ b/samoa-threads/src/test/java/org/apache/samoa/AlgosTest.java @@ -0,0 +1,70 @@ +package org.apache.samoa; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.LocalThreadsDoTask; +import org.apache.samoa.TestParams; +import org.apache.samoa.TestUtils; +import org.junit.Test; + +public class AlgosTest { + + @Test(timeout = 60000) + public void testVHTWithThreads() throws Exception { + + TestParams vhtConfig = new TestParams.Builder() + .inputInstances(200_000) + .samplingSize(20_000) + .evaluationInstances(200_000) + .classifiedInstances(200_000) + .classificationsCorrect(55f) + .kappaStat(-0.1f) + .kappaTempStat(-0.1f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_VHT_RANDOMTREE + " -t 2") + .resultFilePollTimeout(10) + .prePollWait(10) + .taskClassName(LocalThreadsDoTask.class.getName()) + .build(); + TestUtils.test(vhtConfig); + + } + + @Test(timeout = 180000) + public void testBaggingWithThreads() throws Exception { + TestParams baggingConfig = new TestParams.Builder() + .inputInstances(100_000) + .samplingSize(10_000) + .inputDelayMicroSec(100) // prevents saturating the system due to unbounded queues + .evaluationInstances(90_000) + .classifiedInstances(105_000) + .classificationsCorrect(55f) + .kappaStat(0f) + .kappaTempStat(0f) + .cliStringTemplate(TestParams.Templates.PREQEVAL_BAGGING_RANDOMTREE + " -t 2") + .prePollWait(10) + .resultFilePollTimeout(30) + .taskClassName(LocalThreadsDoTask.class.getName()) + .build(); + TestUtils.test(baggingConfig); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsComponentFactoryTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsComponentFactoryTest.java b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsComponentFactoryTest.java new file mode 100644 index 0000000..24762e3 --- /dev/null +++ b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsComponentFactoryTest.java @@ -0,0 +1,121 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static org.junit.Assert.*; +import mockit.Mocked; +import mockit.NonStrictExpectations; +import mockit.Tested; + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.ProcessingItem; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; +import org.apache.samoa.topology.impl.ThreadsComponentFactory; +import org.apache.samoa.topology.impl.ThreadsEntranceProcessingItem; +import org.apache.samoa.topology.impl.ThreadsProcessingItem; +import org.apache.samoa.topology.impl.ThreadsStream; +import org.apache.samoa.topology.impl.ThreadsTopology; +import org.junit.Before; +import org.junit.Test; + +/** + * @author Anh Thu Vu + * + */ +public class ThreadsComponentFactoryTest { + @Tested + private ThreadsComponentFactory factory; + @Mocked + private Processor processor, processorReplica; + @Mocked + private EntranceProcessor entranceProcessor; + + private final int parallelism = 3; + private final String topoName = "TestTopology"; + + @Before + public void setUp() throws Exception { + factory = new ThreadsComponentFactory(); + } + + @Test + public void testCreatePiNoParallelism() { + new NonStrictExpectations() { + { + processor.newProcessor(processor); + result = processorReplica; + } + }; + ProcessingItem pi = factory.createPi(processor); + assertNotNull("ProcessingItem created is null.", pi); + assertEquals("ProcessingItem created is not a ThreadsProcessingItem.", ThreadsProcessingItem.class, pi.getClass()); + assertEquals("Parallelism of PI is not 1", 1, pi.getParallelism(), 0); + } + + @Test + public void testCreatePiWithParallelism() { + new NonStrictExpectations() { + { + processor.newProcessor(processor); + result = processorReplica; + } + }; + ProcessingItem pi = factory.createPi(processor, parallelism); + assertNotNull("ProcessingItem created is null.", pi); + assertEquals("ProcessingItem created is not a ThreadsProcessingItem.", ThreadsProcessingItem.class, pi.getClass()); + assertEquals("Parallelism of PI is not ", parallelism, pi.getParallelism(), 0); + } + + @Test + public void testCreateStream() { + new NonStrictExpectations() { + { + processor.newProcessor(processor); + result = processorReplica; + } + }; + ProcessingItem pi = factory.createPi(processor); + + Stream stream = factory.createStream(pi); + assertNotNull("Stream created is null", stream); + assertEquals("Stream created is not a ThreadsStream.", ThreadsStream.class, stream.getClass()); + } + + @Test + public void testCreateTopology() { + Topology topology = factory.createTopology(topoName); + assertNotNull("Topology created is null.", topology); + assertEquals("Topology created is not a ThreadsTopology.", ThreadsTopology.class, topology.getClass()); + } + + @Test + public void testCreateEntrancePi() { + EntranceProcessingItem entrancePi = factory.createEntrancePi(entranceProcessor); + assertNotNull("EntranceProcessingItem created is null.", entrancePi); + assertEquals("EntranceProcessingItem created is not a ThreadsEntranceProcessingItem.", + ThreadsEntranceProcessingItem.class, entrancePi.getClass()); + assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEngineTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEngineTest.java b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEngineTest.java new file mode 100644 index 0000000..cf8ec34 --- /dev/null +++ b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEngineTest.java @@ -0,0 +1,135 @@ +package org.apache.samoa.topology.impl; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static org.junit.Assert.*; +import mockit.Mocked; +import mockit.Verifications; + +import org.apache.samoa.topology.impl.ThreadsEngine; +import org.apache.samoa.topology.impl.ThreadsTopology; +import org.junit.After; +import org.junit.Test; + +/** + * @author Anh Thu Vu + * + */ +public class ThreadsEngineTest { + + @Mocked + ThreadsTopology topology; + + private final int numThreads = 4; + private final int numThreadsSmaller = 3; + private final int numThreadsLarger = 5; + + @After + public void cleanup() { + ThreadsEngine.clearThreadPool(); + } + + @Test + public void testSetNumberOfThreadsSimple() { + ThreadsEngine.setNumberOfThreads(numThreads); + assertEquals("Number of threads is not set correctly.", numThreads, + ThreadsEngine.getNumberOfThreads(), 0); + } + + @Test + public void testSetNumberOfThreadsRepeat() { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.setNumberOfThreads(numThreads); + assertEquals("Number of threads is not set correctly.", numThreads, + ThreadsEngine.getNumberOfThreads(), 0); + } + + @Test + public void testSetNumberOfThreadsIncrease() { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.setNumberOfThreads(numThreadsLarger); + assertEquals("Number of threads is not set correctly.", numThreadsLarger, + ThreadsEngine.getNumberOfThreads(), 0); + } + + @Test(expected = IllegalStateException.class) + public void testSetNumberOfThreadsDecrease() { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.setNumberOfThreads(numThreadsSmaller); + // Exception expected + } + + @Test(expected = IllegalStateException.class) + public void testSetNumberOfThreadsNegative() { + ThreadsEngine.setNumberOfThreads(-1); + // Exception expected + } + + @Test(expected = IllegalStateException.class) + public void testSetNumberOfThreadsZero() { + ThreadsEngine.setNumberOfThreads(0); + // Exception expected + } + + @Test + public void testClearThreadPool() { + ThreadsEngine.setNumberOfThreads(numThreads); + ThreadsEngine.clearThreadPool(); + assertEquals("ThreadsEngine was not shutdown properly.", 0, ThreadsEngine.getNumberOfThreads()); + } + + @Test + public void testGetThreadWithIndexWithinPoolSize() { + ThreadsEngine.setNumberOfThreads(numThreads); + for (int i = 0; i < numThreads; i++) { + assertNotNull("ExecutorService is not initialized correctly.", ThreadsEngine.getThreadWithIndex(i)); + } + } + + @Test + public void testGetThreadWithIndexOutOfPoolSize() { + ThreadsEngine.setNumberOfThreads(numThreads); + for (int i = 0; i < numThreads + 3; i++) { + assertNotNull("ExecutorService is not initialized correctly.", ThreadsEngine.getThreadWithIndex(i)); + } + } + + @Test(expected = IllegalStateException.class) + public void testGetThreadWithIndexFromEmptyPool() { + for (int i = 0; i < numThreads; i++) { + ThreadsEngine.getThreadWithIndex(i); + } + } + + @Test + public void testSubmitTopology() { + ThreadsEngine.submitTopology(topology, numThreads); + new Verifications() { + { + topology.run(); + times = 1; + } + }; + assertEquals("Number of threads is not set correctly.", numThreads, + ThreadsEngine.getNumberOfThreads(), 0); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java new file mode 100644 index 0000000..355da61 --- /dev/null +++ b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java @@ -0,0 +1,151 @@ +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package org.apache.samoa.topology.impl; + +import static org.junit.Assert.*; +import mockit.Mocked; +import mockit.StrictExpectations; +import mockit.Tested; +import mockit.Verifications; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.impl.ThreadsEntranceProcessingItem; +import org.junit.Before; +import org.junit.Test; + +/** + * @author Anh Thu Vu + * + */ +public class ThreadsEntranceProcessingItemTest { + + @Tested + private ThreadsEntranceProcessingItem entrancePi; + + @Mocked + private EntranceProcessor entranceProcessor; + @Mocked + private Stream outputStream, anotherStream; + @Mocked + private ContentEvent event; + + @Mocked + private Thread unused; + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + entrancePi = new ThreadsEntranceProcessingItem(entranceProcessor); + } + + @Test + public void testContructor() { + assertSame("EntranceProcessor is not set correctly.", entranceProcessor, entrancePi.getProcessor()); + } + + @Test + public void testSetOutputStream() { + entrancePi.setOutputStream(outputStream); + assertSame("OutoutStream is not set correctly.", outputStream, entrancePi.getOutputStream()); + } + + @Test + public void testSetOutputStreamRepeate() { + entrancePi.setOutputStream(outputStream); + entrancePi.setOutputStream(outputStream); + assertSame("OutoutStream is not set correctly.", outputStream, entrancePi.getOutputStream()); + } + + @Test(expected = IllegalStateException.class) + public void testSetOutputStreamError() { + entrancePi.setOutputStream(outputStream); + entrancePi.setOutputStream(anotherStream); + } + + @Test + public void testStartSendingEvents() { + entrancePi.setOutputStream(outputStream); + new StrictExpectations() { + { + for (int i = 0; i < 1; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = false; + } + + for (int i = 0; i < 5; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = true; + entranceProcessor.nextEvent(); + result = event; + outputStream.put(event); + } + + for (int i = 0; i < 2; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = false; + } + + for (int i = 0; i < 5; i++) { + entranceProcessor.isFinished(); + result = false; + entranceProcessor.hasNext(); + result = true; + entranceProcessor.nextEvent(); + result = event; + outputStream.put(event); + } + + entranceProcessor.isFinished(); + result = true; + times = 1; + entranceProcessor.hasNext(); + times = 0; + + } + }; + entrancePi.startSendingEvents(); + new Verifications() { + { + try { + Thread.sleep(anyInt); + times = 3; + } catch (InterruptedException e) { + + } + } + }; + } + + @Test(expected = IllegalStateException.class) + public void testStartSendingEventsError() { + entrancePi.startSendingEvents(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEventRunnableTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEventRunnableTest.java b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEventRunnableTest.java new file mode 100644 index 0000000..c5afe80 --- /dev/null +++ b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsEventRunnableTest.java @@ -0,0 +1,72 @@ +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package org.apache.samoa.topology.impl; + +import static org.junit.Assert.*; +import mockit.Mocked; +import mockit.Tested; +import mockit.Verifications; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.topology.impl.ThreadsEventRunnable; +import org.apache.samoa.topology.impl.ThreadsProcessingItemInstance; +import org.junit.Before; +import org.junit.Test; + +/** + * @author Anh Thu Vu + * + */ +public class ThreadsEventRunnableTest { + + @Tested + private ThreadsEventRunnable task; + + @Mocked + private ThreadsProcessingItemInstance piInstance; + @Mocked + private ContentEvent event; + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + task = new ThreadsEventRunnable(piInstance, event); + } + + @Test + public void testConstructor() { + assertSame("WorkerProcessingItem is not set correctly.", piInstance, task.getWorkerProcessingItem()); + assertSame("ContentEvent is not set correctly.", event, task.getContentEvent()); + } + + @Test + public void testRun() { + task.run(); + new Verifications() { + { + piInstance.processEvent(event); + times = 1; + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java new file mode 100644 index 0000000..7b64e48 --- /dev/null +++ b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java @@ -0,0 +1,71 @@ +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package org.apache.samoa.topology.impl; + +import static org.junit.Assert.*; +import mockit.Mocked; +import mockit.Tested; +import mockit.Verifications; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.impl.ThreadsProcessingItemInstance; +import org.junit.Before; +import org.junit.Test; + +/** + * @author Anh Thu Vu + * + */ +public class ThreadsProcessingItemInstanceTest { + + @Tested + private ThreadsProcessingItemInstance piInstance; + + @Mocked + private Processor processor; + @Mocked + private ContentEvent event; + + private final int threadIndex = 2; + + @Before + public void setUp() throws Exception { + piInstance = new ThreadsProcessingItemInstance(processor, threadIndex); + } + + @Test + public void testConstructor() { + assertSame("Processor is not set correctly.", processor, piInstance.getProcessor()); + assertEquals("Thread index is not set correctly.", threadIndex, piInstance.getThreadIndex(), 0); + } + + @Test + public void testProcessEvent() { + piInstance.processEvent(event); + new Verifications() { + { + processor.process(event); + times = 1; + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemTest.java b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemTest.java new file mode 100644 index 0000000..0f59936 --- /dev/null +++ b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsProcessingItemTest.java @@ -0,0 +1,184 @@ +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package org.apache.samoa.topology.impl; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.ExecutorService; + +import mockit.Expectations; +import mockit.Mocked; +import mockit.NonStrictExpectations; +import mockit.Tested; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; +import org.apache.samoa.topology.impl.ThreadsEngine; +import org.apache.samoa.topology.impl.ThreadsEventRunnable; +import org.apache.samoa.topology.impl.ThreadsProcessingItem; +import org.apache.samoa.topology.impl.ThreadsProcessingItemInstance; +import org.apache.samoa.topology.impl.ThreadsStream; +import org.apache.samoa.utils.PartitioningScheme; +import org.apache.samoa.utils.StreamDestination; +import org.junit.Before; +import org.junit.Test; + +/** + * @author Anh Thu Vu + * + */ +public class ThreadsProcessingItemTest { + + @Tested + private ThreadsProcessingItem pi; + + @Mocked + private ThreadsEngine unused; + @Mocked + private ExecutorService threadPool; + @Mocked + private ThreadsEventRunnable task; + + @Mocked + private Processor processor, processorReplica; + @Mocked + private ThreadsStream stream; + @Mocked + private StreamDestination destination; + @Mocked + private ContentEvent event; + + private final int parallelism = 4; + private final int counter = 2; + + private ThreadsProcessingItemInstance instance; + + @Before + public void setUp() throws Exception { + new NonStrictExpectations() { + { + processor.newProcessor(processor); + result = processorReplica; + } + }; + pi = new ThreadsProcessingItem(processor, parallelism); + } + + @Test + public void testConstructor() { + assertSame("Processor was not set correctly.", processor, pi.getProcessor()); + assertEquals("Parallelism was not set correctly.", parallelism, pi.getParallelism(), 0); + } + + @Test + public void testConnectInputShuffleStream() { + new Expectations() { + { + destination = new StreamDestination(pi, parallelism, PartitioningScheme.SHUFFLE); + stream.addDestination(destination); + } + }; + pi.connectInputShuffleStream(stream); + } + + @Test + public void testConnectInputKeyStream() { + new Expectations() { + { + destination = new StreamDestination(pi, parallelism, PartitioningScheme.GROUP_BY_KEY); + stream.addDestination(destination); + } + }; + pi.connectInputKeyStream(stream); + } + + @Test + public void testConnectInputAllStream() { + new Expectations() { + { + destination = new StreamDestination(pi, parallelism, PartitioningScheme.BROADCAST); + stream.addDestination(destination); + } + }; + pi.connectInputAllStream(stream); + } + + @Test + public void testSetupInstances() { + new Expectations() { + { + for (int i = 0; i < parallelism; i++) { + processor.newProcessor(processor); + result = processor; + + processor.onCreate(anyInt); + } + } + }; + pi.setupInstances(); + List<ThreadsProcessingItemInstance> instances = pi.getProcessingItemInstances(); + assertNotNull("List of PI instances is null.", instances); + assertEquals("Number of instances does not match parallelism.", parallelism, instances.size(), 0); + for (int i = 0; i < instances.size(); i++) { + assertNotNull("Instance " + i + " is null.", instances.get(i)); + assertEquals("Instance " + i + " is not a ThreadsWorkerProcessingItem.", ThreadsProcessingItemInstance.class, + instances.get(i).getClass()); + } + } + + @Test(expected = IllegalStateException.class) + public void testProcessEventError() { + pi.processEvent(event, counter); + } + + @Test + public void testProcessEvent() { + new Expectations() { + { + for (int i = 0; i < parallelism; i++) { + processor.newProcessor(processor); + result = processor; + + processor.onCreate(anyInt); + } + } + }; + pi.setupInstances(); + + instance = pi.getProcessingItemInstances().get(counter); + new NonStrictExpectations() { + { + ThreadsEngine.getThreadWithIndex(anyInt); + result = threadPool; + + } + }; + new Expectations() { + { + task = new ThreadsEventRunnable(instance, event); + threadPool.submit(task); + } + }; + pi.processEvent(event, counter); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsStreamTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsStreamTest.java b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsStreamTest.java new file mode 100644 index 0000000..ef970ab --- /dev/null +++ b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsStreamTest.java @@ -0,0 +1,136 @@ +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package org.apache.samoa.topology.impl; + +import static org.junit.Assert.*; + +import java.util.Arrays; +import java.util.Collection; + +import mockit.Expectations; +import mockit.Mocked; +import mockit.NonStrictExpectations; +import mockit.Tested; + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.topology.impl.ThreadsProcessingItem; +import org.apache.samoa.topology.impl.ThreadsStream; +import org.apache.samoa.utils.PartitioningScheme; +import org.apache.samoa.utils.StreamDestination; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * @author Anh Thu Vu + * + */ +@RunWith(Parameterized.class) +public class ThreadsStreamTest { + + @Tested + private ThreadsStream stream; + + @Mocked + private ThreadsProcessingItem sourcePi, destPi; + @Mocked + private ContentEvent event; + @Mocked + private StreamDestination destination; + + private final String eventKey = "eventkey"; + private final int parallelism; + private final PartitioningScheme scheme; + + @Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][] { + { 2, PartitioningScheme.SHUFFLE }, + { 3, PartitioningScheme.GROUP_BY_KEY }, + { 4, PartitioningScheme.BROADCAST } + }); + } + + public ThreadsStreamTest(int parallelism, PartitioningScheme scheme) { + this.parallelism = parallelism; + this.scheme = scheme; + } + + @Before + public void setUp() throws Exception { + stream = new ThreadsStream(sourcePi); + stream.addDestination(destination); + } + + @Test + public void testAddDestination() { + boolean found = false; + for (StreamDestination sd : stream.getDestinations()) { + if (sd == destination) { + found = true; + break; + } + } + assertTrue("Destination object was not added in stream's destinations set.", found); + } + + @Test + public void testPut() { + new NonStrictExpectations() { + { + event.getKey(); + result = eventKey; + destination.getProcessingItem(); + result = destPi; + destination.getPartitioningScheme(); + result = scheme; + destination.getParallelism(); + result = parallelism; + + } + }; + switch (this.scheme) { + case SHUFFLE: + case GROUP_BY_KEY: + new Expectations() { + { + + // TODO: restrict the range of counter value + destPi.processEvent(event, anyInt); + times = 1; + } + }; + break; + case BROADCAST: + new Expectations() { + { + // TODO: restrict the range of counter value + destPi.processEvent(event, anyInt); + times = parallelism; + } + }; + break; + } + stream.put(event); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsTopologyTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsTopologyTest.java b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsTopologyTest.java new file mode 100644 index 0000000..fb593ae --- /dev/null +++ b/samoa-threads/src/test/java/org/apache/samoa/topology/impl/ThreadsTopologyTest.java @@ -0,0 +1,88 @@ +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package org.apache.samoa.topology.impl; + +import static org.junit.Assert.*; + +import java.util.Set; + +import mockit.Expectations; +import mockit.Mocked; +import mockit.Tested; + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.topology.EntranceProcessingItem; +import org.apache.samoa.topology.impl.ThreadsEntranceProcessingItem; +import org.apache.samoa.topology.impl.ThreadsTopology; +import org.junit.Before; +import org.junit.Test; + +/** + * @author Anh Thu Vu + * + */ +public class ThreadsTopologyTest { + + @Tested + private ThreadsTopology topology; + + @Mocked + private ThreadsEntranceProcessingItem entrancePi; + @Mocked + private EntranceProcessor entranceProcessor; + + @Before + public void setUp() throws Exception { + topology = new ThreadsTopology("TestTopology"); + } + + @Test + public void testAddEntrancePi() { + topology.addEntranceProcessingItem(entrancePi); + Set<EntranceProcessingItem> entrancePIs = topology.getEntranceProcessingItems(); + assertNotNull("Set of entrance PIs is null.", entrancePIs); + assertEquals("Number of entrance PI in ThreadsTopology must be 1", 1, entrancePIs.size()); + assertSame("Entrance PI was not set correctly.", entrancePi, entrancePIs.toArray()[0]); + // TODO: verify that entrance PI is in the set of ProcessingItems + // Need to access topology's set of PIs (getProcessingItems() method) + } + + @Test + public void testRun() { + topology.addEntranceProcessingItem(entrancePi); + + new Expectations() { + { + entrancePi.getProcessor(); + result = entranceProcessor; + entranceProcessor.onCreate(anyInt); + + entrancePi.startSendingEvents(); + } + }; + topology.run(); + } + + @Test(expected = IllegalStateException.class) + public void testRunWithoutEntrancePI() { + topology.run(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-threads/src/test/java/org/apache/samoa/utils/StreamDestinationTest.java ---------------------------------------------------------------------- diff --git a/samoa-threads/src/test/java/org/apache/samoa/utils/StreamDestinationTest.java b/samoa-threads/src/test/java/org/apache/samoa/utils/StreamDestinationTest.java new file mode 100644 index 0000000..aa3ec74 --- /dev/null +++ b/samoa-threads/src/test/java/org/apache/samoa/utils/StreamDestinationTest.java @@ -0,0 +1,81 @@ +package org.apache.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static org.junit.Assert.*; + +import java.util.Arrays; +import java.util.Collection; + +import mockit.Mocked; +import mockit.Tested; + +import org.apache.samoa.topology.IProcessingItem; +import org.apache.samoa.utils.PartitioningScheme; +import org.apache.samoa.utils.StreamDestination; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +/** + * @author Anh Thu Vu + * + */ +@RunWith(Parameterized.class) +public class StreamDestinationTest { + + @Tested + private StreamDestination destination; + + @Mocked + private IProcessingItem pi; + private final int parallelism; + private final PartitioningScheme scheme; + + @Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][] { + { 3, PartitioningScheme.SHUFFLE }, + { 2, PartitioningScheme.GROUP_BY_KEY }, + { 5, PartitioningScheme.BROADCAST } + }); + } + + public StreamDestinationTest(int parallelism, PartitioningScheme scheme) { + this.parallelism = parallelism; + this.scheme = scheme; + } + + @Before + public void setUp() throws Exception { + destination = new StreamDestination(pi, parallelism, scheme); + } + + @Test + public void testContructor() { + assertSame("The IProcessingItem is not set correctly.", pi, destination.getProcessingItem()); + assertEquals("Parallelism value is not set correctly.", parallelism, destination.getParallelism(), 0); + assertEquals("EventAllocationType is not set correctly.", scheme, destination.getPartitioningScheme()); + } + +}
