Repository: incubator-samoa
Updated Branches:
  refs/heads/master [created] 787864b6a


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
new file mode 100644
index 0000000..2dab489
--- /dev/null
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEntranceProcessingItemTest.java
@@ -0,0 +1,133 @@
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package com.yahoo.labs.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.StrictExpectations;
+import mockit.Tested;
+import mockit.Verifications;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.topology.Stream;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsEntranceProcessingItemTest {
+
+       @Tested private ThreadsEntranceProcessingItem entrancePi;
+       
+       @Mocked private EntranceProcessor entranceProcessor;
+       @Mocked private Stream outputStream, anotherStream;
+       @Mocked private ContentEvent event;
+       
+       @Mocked private Thread unused;
+       
+       /**
+        * @throws java.lang.Exception
+        */
+       @Before
+       public void setUp() throws Exception {
+               entrancePi = new 
ThreadsEntranceProcessingItem(entranceProcessor);
+       }
+
+       @Test
+       public void testContructor() {
+               assertSame("EntranceProcessor is not set 
correctly.",entranceProcessor,entrancePi.getProcessor());
+       }
+       
+       @Test
+       public void testSetOutputStream() {
+               entrancePi.setOutputStream(outputStream);
+               assertSame("OutoutStream is not set 
correctly.",outputStream,entrancePi.getOutputStream());
+       }
+       
+       @Test
+       public void testSetOutputStreamRepeate() {
+               entrancePi.setOutputStream(outputStream);
+               entrancePi.setOutputStream(outputStream);
+               assertSame("OutoutStream is not set 
correctly.",outputStream,entrancePi.getOutputStream());
+       }
+       
+       @Test(expected=IllegalStateException.class)
+       public void testSetOutputStreamError() {
+               entrancePi.setOutputStream(outputStream);
+               entrancePi.setOutputStream(anotherStream);
+       }
+       
+       @Test
+       public void testStartSendingEvents() {
+               entrancePi.setOutputStream(outputStream);
+               new StrictExpectations() {
+                       {
+                               for (int i=0; i<1; i++) {
+                                       entranceProcessor.isFinished(); 
result=false;
+                                       entranceProcessor.hasNext(); 
result=false;
+                               }
+                               
+                               for (int i=0; i<5; i++) {
+                                       entranceProcessor.isFinished(); 
result=false;
+                                       entranceProcessor.hasNext(); 
result=true;
+                                       entranceProcessor.nextEvent(); 
result=event;
+                                       outputStream.put(event);
+                               }
+                               
+                               for (int i=0; i<2; i++) {
+                                       entranceProcessor.isFinished(); 
result=false;
+                                       entranceProcessor.hasNext(); 
result=false;
+                               }
+                               
+                               for (int i=0; i<5; i++) {
+                                       entranceProcessor.isFinished(); 
result=false;
+                                       entranceProcessor.hasNext(); 
result=true;
+                                       entranceProcessor.nextEvent(); 
result=event;
+                                       outputStream.put(event);
+                               }
+
+                               entranceProcessor.isFinished(); result=true; 
times=1;
+                               entranceProcessor.hasNext(); times=0;
+
+
+                       }
+               };
+               entrancePi.startSendingEvents();
+               new Verifications() {
+                       {
+                               try {
+                                       Thread.sleep(anyInt); times=3;
+                               } catch (InterruptedException e) {
+                                       
+                               }
+                       }
+               };
+       }
+       
+       @Test(expected=IllegalStateException.class)
+       public void testStartSendingEventsError() {
+               entrancePi.startSendingEvents();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java
new file mode 100644
index 0000000..f744162
--- /dev/null
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsEventRunnableTest.java
@@ -0,0 +1,67 @@
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package com.yahoo.labs.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.Tested;
+import mockit.Verifications;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsEventRunnableTest {
+
+       @Tested private ThreadsEventRunnable task;
+       
+       @Mocked private ThreadsProcessingItemInstance piInstance;
+       @Mocked private ContentEvent event;
+       
+       /**
+        * @throws java.lang.Exception
+        */
+       @Before
+       public void setUp() throws Exception {
+               task = new ThreadsEventRunnable(piInstance, event);
+       }
+
+       @Test
+       public void testConstructor() {
+               assertSame("WorkerProcessingItem is not set 
correctly.",piInstance,task.getWorkerProcessingItem());
+               assertSame("ContentEvent is not set 
correctly.",event,task.getContentEvent());
+       }
+       
+       @Test
+       public void testRun() {
+               task.run();
+               new Verifications () {
+                       {
+                               piInstance.processEvent(event); times=1;
+                       }
+               };
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
new file mode 100644
index 0000000..33af044
--- /dev/null
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemInstanceTest.java
@@ -0,0 +1,67 @@
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package com.yahoo.labs.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+import mockit.Mocked;
+import mockit.Tested;
+import mockit.Verifications;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsProcessingItemInstanceTest {
+
+       @Tested private ThreadsProcessingItemInstance piInstance;
+       
+       @Mocked private Processor processor;
+       @Mocked private ContentEvent event;
+       
+       private final int threadIndex = 2;
+       
+       @Before
+       public void setUp() throws Exception {
+               piInstance = new ThreadsProcessingItemInstance(processor, 
threadIndex);
+       }
+
+       @Test
+       public void testConstructor() {
+               assertSame("Processor is not set correctly.", processor, 
piInstance.getProcessor());
+               assertEquals("Thread index is not set correctly.", threadIndex, 
piInstance.getThreadIndex(),0);
+       }
+       
+       @Test
+       public void testProcessEvent() {
+               piInstance.processEvent(event);
+               new Verifications() {
+                       {
+                               processor.process(event); times=1;
+                       }
+               };
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java
new file mode 100644
index 0000000..ad7cd56
--- /dev/null
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsProcessingItemTest.java
@@ -0,0 +1,173 @@
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package com.yahoo.labs.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import mockit.Tested;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.core.Processor;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import com.yahoo.labs.samoa.utils.StreamDestination;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsProcessingItemTest {
+
+       @Tested private ThreadsProcessingItem pi;
+       
+       @Mocked private ThreadsEngine unused;
+       @Mocked private ExecutorService threadPool;
+       @Mocked private ThreadsEventRunnable task;
+       
+       @Mocked private Processor processor, processorReplica;
+       @Mocked private ThreadsStream stream;
+       @Mocked private StreamDestination destination;
+       @Mocked private ContentEvent event;
+       
+       private final int parallelism = 4;
+       private final int counter = 2;
+       
+       private ThreadsProcessingItemInstance instance;
+       
+       
+       @Before
+       public void setUp() throws Exception {
+               new NonStrictExpectations() {
+                       {
+                               processor.newProcessor(processor);
+                               result=processorReplica;
+                       }
+               };
+               pi = new ThreadsProcessingItem(processor, parallelism);
+       }
+
+       @Test
+       public void testConstructor() {
+               assertSame("Processor was not set 
correctly.",processor,pi.getProcessor());
+               assertEquals("Parallelism was not set 
correctly.",parallelism,pi.getParallelism(),0);
+       }
+       
+       @Test
+       public void testConnectInputShuffleStream() {
+               new Expectations() {
+                       {
+                               destination = new StreamDestination(pi, 
parallelism, PartitioningScheme.SHUFFLE);
+                               stream.addDestination(destination);
+                       }
+               };
+               pi.connectInputShuffleStream(stream);
+       }
+       
+       @Test
+       public void testConnectInputKeyStream() {
+               new Expectations() {
+                       {
+                               destination = new StreamDestination(pi, 
parallelism, PartitioningScheme.GROUP_BY_KEY);
+                               stream.addDestination(destination);
+                       }
+               };
+               pi.connectInputKeyStream(stream);
+       }
+       
+       @Test
+       public void testConnectInputAllStream() {
+               new Expectations() {
+                       {
+                               destination = new StreamDestination(pi, 
parallelism, PartitioningScheme.BROADCAST);
+                               stream.addDestination(destination);
+                       }
+               };
+               pi.connectInputAllStream(stream);
+       }
+       
+       @Test
+       public void testSetupInstances() {
+               new Expectations() {
+                       {
+                               for (int i=0; i<parallelism; i++) {
+                                       processor.newProcessor(processor);
+                                       result=processor;
+                               
+                                       processor.onCreate(anyInt);
+                               }
+                       }
+               };
+               pi.setupInstances();
+               List<ThreadsProcessingItemInstance> instances = 
pi.getProcessingItemInstances();
+               assertNotNull("List of PI instances is null.",instances);
+               assertEquals("Number of instances does not match 
parallelism.",parallelism,instances.size(),0);
+               for(int i=0; i<instances.size();i++) {
+                       assertNotNull("Instance "+i+" is 
null.",instances.get(i));
+                       assertEquals("Instance "+i+" is not a 
ThreadsWorkerProcessingItem.",ThreadsProcessingItemInstance.class,instances.get(i).getClass());
+               }
+       }
+       
+       @Test(expected=IllegalStateException.class)
+       public void testProcessEventError() {
+               pi.processEvent(event, counter);
+       }
+       
+       @Test
+       public void testProcessEvent() {
+               new Expectations() {
+                       {
+                               for (int i=0; i<parallelism; i++) {
+                                       processor.newProcessor(processor);
+                                       result=processor;
+                               
+                                       processor.onCreate(anyInt);
+                               }
+                       }
+               };
+               pi.setupInstances();
+               
+               instance = pi.getProcessingItemInstances().get(counter);
+               new NonStrictExpectations() {
+                       {
+                               ThreadsEngine.getThreadWithIndex(anyInt);
+                               result=threadPool;
+                               
+                               
+                       }
+               };
+               new Expectations() {
+                       {
+                               task = new ThreadsEventRunnable(instance, 
event);
+                               threadPool.submit(task);
+                       }
+               };
+               pi.processEvent(event, counter);
+               
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java
new file mode 100644
index 0000000..27d2acd
--- /dev/null
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsStreamTest.java
@@ -0,0 +1,127 @@
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package com.yahoo.labs.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import mockit.NonStrictExpectations;
+import mockit.Tested;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.yahoo.labs.samoa.core.ContentEvent;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import com.yahoo.labs.samoa.utils.StreamDestination;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+@RunWith(Parameterized.class)
+public class ThreadsStreamTest {
+       
+       @Tested private ThreadsStream stream;
+       
+       @Mocked private ThreadsProcessingItem sourcePi, destPi;
+       @Mocked private ContentEvent event;
+       @Mocked private StreamDestination destination;
+
+       private final String eventKey = "eventkey";
+       private final int parallelism;
+       private final PartitioningScheme scheme;
+       
+       
+       @Parameters
+       public static Collection<Object[]> generateParameters() {
+               return Arrays.asList(new Object[][] {
+                        { 2, PartitioningScheme.SHUFFLE },
+                        { 3, PartitioningScheme.GROUP_BY_KEY },
+                        { 4, PartitioningScheme.BROADCAST }
+               });
+       }
+       
+       public ThreadsStreamTest(int parallelism, PartitioningScheme scheme) {
+               this.parallelism = parallelism;
+               this.scheme = scheme;
+       }
+       
+       @Before
+       public void setUp() throws Exception {
+               stream = new ThreadsStream(sourcePi);
+               stream.addDestination(destination);
+       }
+       
+       @Test
+       public void testAddDestination() {
+               boolean found = false;
+               for (StreamDestination sd:stream.getDestinations()) {
+                       if (sd == destination) {
+                               found = true;
+                               break;
+                       }
+               }
+               assertTrue("Destination object was not added in stream's 
destinations set.",found);
+       }
+
+       @Test
+       public void testPut() {
+               new NonStrictExpectations() {
+                       {
+                               event.getKey(); result=eventKey;
+                               destination.getProcessingItem(); result=destPi;
+                               destination.getPartitioningScheme(); 
result=scheme;
+                               destination.getParallelism(); 
result=parallelism;
+                               
+                       }
+               };
+               switch(this.scheme) {
+               case SHUFFLE: case GROUP_BY_KEY:
+                       new Expectations() {
+                               {
+                                       
+                                       // TODO: restrict the range of counter 
value
+                                       destPi.processEvent(event, anyInt); 
times=1;
+                               }
+                       };
+                       break;
+               case BROADCAST:
+                       new Expectations() {
+                               {
+                                       // TODO: restrict the range of counter 
value
+                                       destPi.processEvent(event, anyInt); 
times=parallelism;
+                               }
+                       };
+                       break;
+               }
+               stream.put(event);
+       }
+       
+       
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java
new file mode 100644
index 0000000..46847f5
--- /dev/null
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/topology/impl/ThreadsTopologyTest.java
@@ -0,0 +1,84 @@
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package com.yahoo.labs.samoa.topology.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.Set;
+
+import mockit.Expectations;
+import mockit.Mocked;
+import mockit.Tested;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.yahoo.labs.samoa.core.EntranceProcessor;
+import com.yahoo.labs.samoa.topology.EntranceProcessingItem;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+public class ThreadsTopologyTest {
+
+       @Tested private ThreadsTopology topology;
+       
+       @Mocked private ThreadsEntranceProcessingItem entrancePi;
+       @Mocked private EntranceProcessor entranceProcessor;
+       
+       @Before
+       public void setUp() throws Exception {
+               topology = new ThreadsTopology("TestTopology");
+       }
+
+       @Test
+       public void testAddEntrancePi() {
+               topology.addEntranceProcessingItem(entrancePi);
+               Set<EntranceProcessingItem> entrancePIs = 
topology.getEntranceProcessingItems();
+               assertNotNull("Set of entrance PIs is null.",entrancePIs);
+               assertEquals("Number of entrance PI in ThreadsTopology must be 
1",1,entrancePIs.size());
+               assertSame("Entrance PI was not set 
correctly.",entrancePi,entrancePIs.toArray()[0]);
+               // TODO: verify that entrance PI is in the set of 
ProcessingItems
+               // Need to access topology's set of PIs (getProcessingItems() 
method)
+       }
+       
+       @Test
+       public void testRun() {
+               topology.addEntranceProcessingItem(entrancePi);
+               
+               new Expectations() {
+                       {
+                               entrancePi.getProcessor();
+                               result=entranceProcessor;
+                               entranceProcessor.onCreate(anyInt);
+                               
+                               entrancePi.startSendingEvents();
+                       }
+               };
+               topology.run();
+       }
+       
+       @Test(expected=IllegalStateException.class)
+       public void testRunWithoutEntrancePI() {
+               topology.run();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java
 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java
new file mode 100644
index 0000000..19c5421
--- /dev/null
+++ 
b/samoa-threads/src/test/java/com/yahoo/labs/samoa/utils/StreamDestinationTest.java
@@ -0,0 +1,80 @@
+package com.yahoo.labs.samoa.utils;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2013 Yahoo! Inc.
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import mockit.Mocked;
+import mockit.Tested;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.yahoo.labs.samoa.topology.IProcessingItem;
+import com.yahoo.labs.samoa.utils.PartitioningScheme;
+import com.yahoo.labs.samoa.utils.StreamDestination;
+
+/**
+ * @author Anh Thu Vu
+ *
+ */
+@RunWith(Parameterized.class)
+public class StreamDestinationTest {
+
+       @Tested private StreamDestination destination;
+       
+       @Mocked private IProcessingItem pi;
+       private final int parallelism;
+       private final PartitioningScheme scheme;
+       
+       @Parameters
+       public static Collection<Object[]> generateParameters() {
+               return Arrays.asList(new Object[][] {
+                        { 3, PartitioningScheme.SHUFFLE },
+                        { 2, PartitioningScheme.GROUP_BY_KEY },
+                        { 5, PartitioningScheme.BROADCAST }
+               });
+       }
+       
+       public StreamDestinationTest(int parallelism, PartitioningScheme 
scheme) {
+               this.parallelism = parallelism;
+               this.scheme = scheme;
+       }
+       
+       @Before
+       public void setUp() throws Exception {
+               destination = new StreamDestination(pi, parallelism, scheme);
+       }
+
+       @Test
+       public void testContructor() {
+               assertSame("The IProcessingItem is not set correctly.", pi, 
destination.getProcessingItem());
+               assertEquals("Parallelism value is not set correctly.", 
parallelism, destination.getParallelism(), 0);
+               assertEquals("EventAllocationType is not set correctly.", 
scheme, destination.getPartitioningScheme());
+       }
+
+}

Reply via email to