http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java index 60df89c..ad4aa28 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueMultiThreadTest.java @@ -17,299 +17,304 @@ */ package org.apache.streams.local.queues; +import org.apache.streams.util.ComponentUtils; + import com.carrotsearch.randomizedtesting.RandomizedTest; import com.carrotsearch.randomizedtesting.annotations.Repeat; -import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.junit.After; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.management.ManagementFactory; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import javax.management.InstanceNotFoundException; import javax.management.ObjectName; -import java.lang.management.ManagementFactory; -import java.util.concurrent.*; /** * MultiThread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue} */ public class ThroughputQueueMultiThreadTest extends RandomizedTest { - private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueueMultiThreadTest.class); - private static final String MBEAN_ID = "testQueue"; - private static final String STREAM_ID = "test_stream"; - private static long STREAM_START_TIME = (new DateTime()).getMillis(); - - /** - * Remove registered mbeans from previous tests - * @throws Exception - */ - @After - public void unregisterMXBean() throws Exception { - try { - ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME))); - } catch (InstanceNotFoundException ife) { - //No-op - } + private static final Logger LOGGER = LoggerFactory.getLogger(ThroughputQueueMultiThreadTest.class); + private static final String MBEAN_ID = "testQueue"; + private static final String STREAM_ID = "test_stream"; + private static long STREAM_START_TIME = (new DateTime()).getMillis(); + + /** + * Remove registered mbeans from previous tests + * @throws Exception + */ + @After + public void unregisterMXBean() throws Exception { + try { + ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME))); + } catch (InstanceNotFoundException ife) { + //No-op } - - @After - public void removeLocalMBeans() { - try { - ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); - } catch (Exception e) { - //No op. proceed to next test - } + } + + @After + public void removeLocalMBeans() { + try { + ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); + } catch (Exception e) { + //No op. proceed to next test } - - - /** - * Test that queue will block on puts when the queue is full - * @throws InterruptedException - */ - @Test - public void testBlockOnFullQueue() throws InterruptedException { - int queueSize = randomIntBetween(1, 3000); - ExecutorService executor = Executors.newSingleThreadExecutor(); - CountDownLatch full = new CountDownLatch(1); - CountDownLatch finished = new CountDownLatch(1); - ThroughputQueue queue = new ThroughputQueue(queueSize); - BlocksOnFullQueue testThread = new BlocksOnFullQueue(full, finished, queue, queueSize); - executor.submit(testThread); - full.await(); - assertEquals(queueSize, queue.size()); - assertEquals(queueSize, queue.getCurrentSize()); - assertFalse(testThread.isComplete()); //test that it is blocked - safeSleep(1000); - assertFalse(testThread.isComplete()); //still blocked - queue.take(); - finished.await(); - assertEquals(queueSize, queue.size()); - assertEquals(queueSize, queue.getCurrentSize()); - assertTrue(testThread.isComplete()); - executor.shutdownNow(); - executor.awaitTermination(500, TimeUnit.MILLISECONDS); + } + + + /** + * Test that queue will block on puts when the queue is full + * @throws InterruptedException + */ + @Test + public void testBlockOnFullQueue() throws InterruptedException { + int queueSize = randomIntBetween(1, 3000); + ExecutorService executor = Executors.newSingleThreadExecutor(); + CountDownLatch full = new CountDownLatch(1); + CountDownLatch finished = new CountDownLatch(1); + ThroughputQueue queue = new ThroughputQueue(queueSize); + BlocksOnFullQueue testThread = new BlocksOnFullQueue(full, finished, queue, queueSize); + executor.submit(testThread); + full.await(); + assertEquals(queueSize, queue.size()); + assertEquals(queueSize, queue.getCurrentSize()); + assertFalse(testThread.isComplete()); //test that it is blocked + safeSleep(1000); + assertFalse(testThread.isComplete()); //still blocked + queue.take(); + finished.await(); + assertEquals(queueSize, queue.size()); + assertEquals(queueSize, queue.getCurrentSize()); + assertTrue(testThread.isComplete()); + executor.shutdownNow(); + executor.awaitTermination(500, TimeUnit.MILLISECONDS); + } + + /** + * Test that queue will block on Take when queue is empty + * @throws InterruptedException + */ + @Test + public void testBlockOnEmptyQueue() throws InterruptedException { + int queueSize = randomIntBetween(1, 3000); + ExecutorService executor = Executors.newSingleThreadExecutor(); + CountDownLatch empty = new CountDownLatch(1); + CountDownLatch finished = new CountDownLatch(1); + ThroughputQueue queue = new ThroughputQueue(); + BlocksOnEmptyQueue testThread = new BlocksOnEmptyQueue(empty, finished, queueSize, queue); + for(int i=0; i < queueSize; ++i) { + queue.put(i); } - - /** - * Test that queue will block on Take when queue is empty - * @throws InterruptedException - */ - @Test - public void testBlockOnEmptyQueue() throws InterruptedException { - int queueSize = randomIntBetween(1, 3000); - ExecutorService executor = Executors.newSingleThreadExecutor(); - CountDownLatch empty = new CountDownLatch(1); - CountDownLatch finished = new CountDownLatch(1); - ThroughputQueue queue = new ThroughputQueue(); - BlocksOnEmptyQueue testThread = new BlocksOnEmptyQueue(empty, finished, queueSize, queue); - for(int i=0; i < queueSize; ++i) { - queue.put(i); - } - executor.submit(testThread); - empty.await(); - assertEquals(0, queue.size()); - assertEquals(0, queue.getCurrentSize()); - assertFalse(testThread.isComplete()); - safeSleep(1000); - assertFalse(testThread.isComplete()); - queue.put(1); - finished.await(); - assertEquals(0, queue.size()); - assertEquals(0, queue.getCurrentSize()); - assertTrue(testThread.isComplete()); - executor.shutdownNow(); - executor.awaitTermination(500, TimeUnit.MILLISECONDS); + executor.submit(testThread); + empty.await(); + assertEquals(0, queue.size()); + assertEquals(0, queue.getCurrentSize()); + assertFalse(testThread.isComplete()); + safeSleep(1000); + assertFalse(testThread.isComplete()); + queue.put(1); + finished.await(); + assertEquals(0, queue.size()); + assertEquals(0, queue.getCurrentSize()); + assertTrue(testThread.isComplete()); + executor.shutdownNow(); + executor.awaitTermination(500, TimeUnit.MILLISECONDS); + } + + + /** + * Test multiple threads putting and taking from the queue while + * this thread repeatedly calls the MXBean measurement methods. + * Should hammer the queue with request from multiple threads + * of all request types. Purpose is to expose current modification exceptions + * and/or dead locks. + */ + @Test + @Repeat(iterations = 3) + public void testMultiThreadAccessAndInteruptResponse() throws Exception { + int putTakeThreadCount = randomIntBetween(1, 10); + int dataCount = randomIntBetween(1, 2000000); + int pollCount = randomIntBetween(1, 2000000); + int maxSize = randomIntBetween(1, 1000); + CountDownLatch finished = new CountDownLatch(putTakeThreadCount); + ThroughputQueue queue = new ThroughputQueue(maxSize, MBEAN_ID); + ExecutorService executor = Executors.newFixedThreadPool(putTakeThreadCount * 2); + for(int i=0; i < putTakeThreadCount; ++i) { + executor.submit(new PutData(finished, queue, dataCount)); + executor.submit(new TakeData(queue)); } - - - /** - * Test multiple threads putting and taking from the queue while - * this thread repeatedly calls the MXBean measurement methods. - * Should hammer the queue with request from multiple threads - * of all request types. Purpose is to expose current modification exceptions - * and/or dead locks. - */ - @Test - @Repeat(iterations = 3) - public void testMultiThreadAccessAndInteruptResponse() throws Exception { - int putTakeThreadCount = randomIntBetween(1, 10); - int dataCount = randomIntBetween(1, 2000000); - int pollCount = randomIntBetween(1, 2000000); - int maxSize = randomIntBetween(1, 1000); - CountDownLatch finished = new CountDownLatch(putTakeThreadCount); - ThroughputQueue queue = new ThroughputQueue(maxSize, MBEAN_ID); - ExecutorService executor = Executors.newFixedThreadPool(putTakeThreadCount * 2); - for(int i=0; i < putTakeThreadCount; ++i) { - executor.submit(new PutData(finished, queue, dataCount)); - executor.submit(new TakeData(queue)); - } - for(int i=0; i < pollCount; ++i) { - queue.getAvgWait(); - queue.getAdded(); - queue.getCurrentSize(); - queue.getMaxWait(); - queue.getRemoved(); - queue.getThroughput(); - } - finished.await(); - while(!queue.isEmpty()) { - LOGGER.info("Waiting for queue to be emptied..."); - safeSleep(500); - } - long totalData = ((long) dataCount) * putTakeThreadCount; - assertEquals(totalData, queue.getAdded()); - assertEquals(totalData, queue.getRemoved()); - executor.shutdown(); - executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown puts - executor.shutdownNow(); - executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown takes - //Randomized should not report thread leak + for(int i=0; i < pollCount; ++i) { + queue.getAvgWait(); + queue.getAdded(); + queue.getCurrentSize(); + queue.getMaxWait(); + queue.getRemoved(); + queue.getThroughput(); } - - - - private void safeSleep(long sleep) { - try { - Thread.sleep(sleep); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } + finished.await(); + while(!queue.isEmpty()) { + LOGGER.info("Waiting for queue to be emptied..."); + safeSleep(500); } + long totalData = ((long) dataCount) * putTakeThreadCount; + assertEquals(totalData, queue.getAdded()); + assertEquals(totalData, queue.getRemoved()); + executor.shutdown(); + executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown puts + executor.shutdownNow(); + executor.awaitTermination(1000, TimeUnit.MILLISECONDS); //shutdown takes + //Randomized should not report thread leak + } + + + + private void safeSleep(long sleep) { + try { + Thread.sleep(sleep); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } - /** - * Helper runnable for test {@link ThroughputQueueMultiThreadTest#testBlockOnFullQueue()} - */ - private class BlocksOnFullQueue implements Runnable { - - private CountDownLatch full; - volatile private boolean complete; - private int queueSize; - private CountDownLatch finished; - private BlockingQueue queue; + /** + * Helper runnable for test {@link ThroughputQueueMultiThreadTest#testBlockOnFullQueue()} + */ + private class BlocksOnFullQueue implements Runnable { - public BlocksOnFullQueue(CountDownLatch latch, CountDownLatch finished, BlockingQueue queue, int queueSize) { - this.full = latch; - this.complete = false; - this.queueSize = queueSize; - this.finished = finished; - this.queue = queue; - } + private CountDownLatch full; + volatile private boolean complete; + private int queueSize; + private CountDownLatch finished; + private BlockingQueue queue; - @Override - public void run() { - try { - for (int i = 0; i < this.queueSize; ++i) { - this.queue.put(i); - } - this.full.countDown(); - this.queue.put(0); - this.complete = true; - this.finished.countDown(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - } + public BlocksOnFullQueue(CountDownLatch latch, CountDownLatch finished, BlockingQueue queue, int queueSize) { + this.full = latch; + this.complete = false; + this.queueSize = queueSize; + this.finished = finished; + this.queue = queue; + } - public boolean isComplete() { - return this.complete; + @Override + public void run() { + try { + for (int i = 0; i < this.queueSize; ++i) { + this.queue.put(i); } + this.full.countDown(); + this.queue.put(0); + this.complete = true; + this.finished.countDown(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } } - - /** - * Helper runnable class for test {@link ThroughputQueueMultiThreadTest#testBlockOnEmptyQueue()} - */ - private class BlocksOnEmptyQueue implements Runnable { - - private CountDownLatch full; - volatile private boolean complete; - private int queueSize; - private CountDownLatch finished; - private BlockingQueue queue; - - public BlocksOnEmptyQueue(CountDownLatch full, CountDownLatch finished, int queueSize, BlockingQueue queue) { - this.full = full; - this.finished = finished; - this.queueSize = queueSize; - this.queue = queue; - this.complete = false; - } + public boolean isComplete() { + return this.complete; + } + } + + + /** + * Helper runnable class for test {@link ThroughputQueueMultiThreadTest#testBlockOnEmptyQueue()} + */ + private class BlocksOnEmptyQueue implements Runnable { + + private CountDownLatch full; + volatile private boolean complete; + private int queueSize; + private CountDownLatch finished; + private BlockingQueue queue; + + public BlocksOnEmptyQueue(CountDownLatch full, CountDownLatch finished, int queueSize, BlockingQueue queue) { + this.full = full; + this.finished = finished; + this.queueSize = queueSize; + this.queue = queue; + this.complete = false; + } - @Override - public void run() { - try { - for(int i=0; i < this.queueSize; ++i) { - this.queue.take(); - } - this.full.countDown(); - this.queue.take(); - this.complete = true; - this.finished.countDown(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } + @Override + public void run() { + try { + for(int i=0; i < this.queueSize; ++i) { + this.queue.take(); } + this.full.countDown(); + this.queue.take(); + this.complete = true; + this.finished.countDown(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } - public boolean isComplete() { - return this.complete; - } + public boolean isComplete() { + return this.complete; } + } - private class PutData implements Runnable { + private class PutData implements Runnable { - private BlockingQueue queue; - private int dataCount; - private CountDownLatch finished; + private BlockingQueue queue; + private int dataCount; + private CountDownLatch finished; - public PutData(CountDownLatch finished, BlockingQueue queue, int dataCount) { - this.queue = queue; - this.dataCount = dataCount; - this.finished = finished; - } + public PutData(CountDownLatch finished, BlockingQueue queue, int dataCount) { + this.queue = queue; + this.dataCount = dataCount; + this.finished = finished; + } - @Override - public void run() { - try { - for(int i=0; i < this.dataCount; ++i) { - this.queue.put(i); - } - } catch (InterruptedException ie) { - LOGGER.error("PUT DATA interupted !"); - Thread.currentThread().interrupt(); - } - this.finished.countDown(); + @Override + public void run() { + try { + for(int i=0; i < this.dataCount; ++i) { + this.queue.put(i); } + } catch (InterruptedException ie) { + LOGGER.error("PUT DATA interupted !"); + Thread.currentThread().interrupt(); + } + this.finished.countDown(); } + } - private class TakeData implements Runnable { + private class TakeData implements Runnable { - private BlockingQueue queue; + private BlockingQueue queue; - public TakeData(BlockingQueue queue) { - this.queue = queue; - } + public TakeData(BlockingQueue queue) { + this.queue = queue; + } - @Override - public void run() { - try { - while(true) { - this.queue.take(); - } - } catch (InterruptedException ie) { - LOGGER.error("PUT DATA interupted !"); - Thread.currentThread().interrupt(); - } + @Override + public void run() { + try { + while(true) { + this.queue.take(); } + } catch (InterruptedException ie) { + LOGGER.error("PUT DATA interupted !"); + Thread.currentThread().interrupt(); + } } + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java index 8c7f5c5..afe1911 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/queues/ThroughputQueueSingleThreadTest.java @@ -17,234 +17,235 @@ */ package org.apache.streams.local.queues; +import org.apache.streams.util.ComponentUtils; + import com.carrotsearch.randomizedtesting.RandomizedTest; import com.carrotsearch.randomizedtesting.annotations.Repeat; -import org.apache.streams.util.ComponentUtils; import org.joda.time.DateTime; import org.junit.After; import org.junit.Test; +import java.lang.management.ManagementFactory; import javax.management.MBeanServer; import javax.management.ObjectInstance; import javax.management.ObjectName; -import java.lang.management.ManagementFactory; /** * Single thread unit tests for {@link org.apache.streams.local.queues.ThroughputQueue} */ public class ThroughputQueueSingleThreadTest extends RandomizedTest { - private static final String MBEAN_ID = "test_id"; - private static final String STREAM_ID = "test_stream"; - private static long STREAM_START_TIME = (new DateTime()).getMillis(); - - @After - public void removeLocalMBeans() { - try { - ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); - } catch (Exception e) { - //No op. proceed to next test - } + private static final String MBEAN_ID = "test_id"; + private static final String STREAM_ID = "test_stream"; + private static long STREAM_START_TIME = (new DateTime()).getMillis(); + + @After + public void removeLocalMBeans() { + try { + ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); + } catch (Exception e) { + //No op. proceed to next test } - - /** - * Test that take and put queue and dequeue data as expected and all - * measurements form the queue are returning data. - * @throws Exception - */ - @Test - @Repeat(iterations = 3) - public void testTakeAndPut() throws Exception { - ThroughputQueue<Integer> queue = new ThroughputQueue<>(); - int putCount = randomIntBetween(1, 1000); - for(int i=0; i < putCount; ++i) { - queue.put(i); - assertEquals(i+1, queue.size()); - assertEquals(queue.size(), queue.getCurrentSize()); - } - safeSleep(100); //ensure measurable wait time - int takeCount = randomIntBetween(1, putCount); - for(int i=0; i < takeCount; ++i) { - Integer element = queue.take(); - assertNotNull(element); - assertEquals(i, element.intValue()); - assertEquals(putCount - (1+i), queue.size()); - assertEquals(queue.size(), queue.getCurrentSize()); - } - assertEquals(putCount-takeCount, queue.size()); - assertEquals(queue.size(), queue.getCurrentSize()); - assertTrue(0.0 < queue.getMaxWait()); - assertTrue(0.0 < queue.getAvgWait()); - assertTrue(0.0 < queue.getThroughput()); - assertEquals(putCount, queue.getAdded()); - assertEquals(takeCount, queue.getRemoved()); + } + + /** + * Test that take and put queue and dequeue data as expected and all + * measurements form the queue are returning data. + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testTakeAndPut() throws Exception { + ThroughputQueue<Integer> queue = new ThroughputQueue<>(); + int putCount = randomIntBetween(1, 1000); + for(int i=0; i < putCount; ++i) { + queue.put(i); + assertEquals(i+1, queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); } - - /** - * Test that add and remove queue and dequeue data as expected - * and all measurements from the queue are returning data - */ - @Test - @Repeat(iterations = 3) - public void testAddAndRemove() { - ThroughputQueue<Integer> queue = new ThroughputQueue<>(); - int putCount = randomIntBetween(1, 1000); - for(int i=0; i < putCount; ++i) { - queue.add(i); - assertEquals(i+1, queue.size()); - assertEquals(queue.size(), queue.getCurrentSize()); - } - safeSleep(100); //ensure measurable wait time - int takeCount = randomIntBetween(1, putCount); - for(int i=0; i < takeCount; ++i) { - Integer element = queue.remove(); - assertNotNull(element); - assertEquals(i, element.intValue()); - assertEquals(putCount - (1+i), queue.size()); - assertEquals(queue.size(), queue.getCurrentSize()); - } - assertEquals(putCount-takeCount, queue.size()); - assertEquals(queue.size(), queue.getCurrentSize()); - assertTrue(0.0 < queue.getMaxWait()); - assertTrue(0.0 < queue.getAvgWait()); - assertTrue(0.0 < queue.getThroughput()); - assertEquals(putCount, queue.getAdded()); - assertEquals(takeCount, queue.getRemoved()); + safeSleep(100); //ensure measurable wait time + int takeCount = randomIntBetween(1, putCount); + for(int i=0; i < takeCount; ++i) { + Integer element = queue.take(); + assertNotNull(element); + assertEquals(i, element.intValue()); + assertEquals(putCount - (1+i), queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); } - - /** - * Test that offer and poll queue and dequeue data as expected - * and all measurements from the queue are returning data - */ - @Test - @Repeat(iterations = 3) - public void testOfferAndPoll() { - ThroughputQueue<Integer> queue = new ThroughputQueue<>(); - int putCount = randomIntBetween(1, 1000); - for(int i=0; i < putCount; ++i) { - queue.offer(i); - assertEquals(i+1, queue.size()); - assertEquals(queue.size(), queue.getCurrentSize()); - } - safeSleep(100); //ensure measurable wait time - int takeCount = randomIntBetween(1, putCount); - for(int i=0; i < takeCount; ++i) { - Integer element = queue.poll(); - assertNotNull(element); - assertEquals(i, element.intValue()); - assertEquals(putCount - (1+i), queue.size()); - assertEquals(queue.size(), queue.getCurrentSize()); - } - assertEquals(putCount-takeCount, queue.size()); - assertEquals(queue.size(), queue.getCurrentSize()); - assertTrue(0.0 < queue.getMaxWait()); - assertTrue(0.0 < queue.getAvgWait()); - assertTrue(0.0 < queue.getThroughput()); - assertEquals(putCount, queue.getAdded()); - assertEquals(takeCount, queue.getRemoved()); + assertEquals(putCount-takeCount, queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); + assertTrue(0.0 < queue.getMaxWait()); + assertTrue(0.0 < queue.getAvgWait()); + assertTrue(0.0 < queue.getThroughput()); + assertEquals(putCount, queue.getAdded()); + assertEquals(takeCount, queue.getRemoved()); + } + + /** + * Test that add and remove queue and dequeue data as expected + * and all measurements from the queue are returning data + */ + @Test + @Repeat(iterations = 3) + public void testAddAndRemove() { + ThroughputQueue<Integer> queue = new ThroughputQueue<>(); + int putCount = randomIntBetween(1, 1000); + for(int i=0; i < putCount; ++i) { + queue.add(i); + assertEquals(i+1, queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); } - - - - /** - * Test that max wait and avg wait return expected values - * @throws Exception - */ - @Test - public void testWait() throws Exception { - ThroughputQueue queue = new ThroughputQueue(); - int wait = 1000; - - for(int i=0; i < 3; ++i) { - queue.put(1); - safeSleep(wait); - queue.take(); - assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 2));//can't calculate exactly, making sure its close. - assertTrue(queue.getAvgWait() >= wait && queue.getAvgWait() <= (wait * 2)); - } - queue.put(1); - queue.take(); - assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 2));//can't calculate exactly, making sure its close. - assertTrue(queue.getAvgWait() <= 5000 ); - assertTrue(queue.getAvgWait() >= 500); + safeSleep(100); //ensure measurable wait time + int takeCount = randomIntBetween(1, putCount); + for(int i=0; i < takeCount; ++i) { + Integer element = queue.remove(); + assertNotNull(element); + assertEquals(i, element.intValue()); + assertEquals(putCount - (1+i), queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); } - - /** - * Test that throughput returns expected values. - * @throws Exception - */ - @Test - public void testThroughput() throws Exception { - ThroughputQueue queue = new ThroughputQueue(); - int wait = 100; - for(int i=0; i < 10; ++i) { - queue.put(1); - safeSleep(wait); - queue.take(); - } - double throughput = queue.getThroughput(); - assertTrue(throughput <= 15 ); //can't calculate exactly, making sure its close. - assertTrue(throughput >= 5); - - queue = new ThroughputQueue(); - wait = 1000; - for(int i=0; i < 10; ++i) { - queue.put(1); - } - for(int i=0; i < 10; ++i) { - queue.take(); - } - safeSleep(wait); - throughput = queue.getThroughput(); - assertTrue(throughput <= 15 ); //can't calculate exactly, making sure its close. - assertTrue(throughput >= 5); + assertEquals(putCount-takeCount, queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); + assertTrue(0.0 < queue.getMaxWait()); + assertTrue(0.0 < queue.getAvgWait()); + assertTrue(0.0 < queue.getThroughput()); + assertEquals(putCount, queue.getAdded()); + assertEquals(takeCount, queue.getRemoved()); + } + + /** + * Test that offer and poll queue and dequeue data as expected + * and all measurements from the queue are returning data + */ + @Test + @Repeat(iterations = 3) + public void testOfferAndPoll() { + ThroughputQueue<Integer> queue = new ThroughputQueue<>(); + int putCount = randomIntBetween(1, 1000); + for(int i=0; i < putCount; ++i) { + queue.offer(i); + assertEquals(i+1, queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); } - - - /** - * Test that the mbean registers - */ - @Test - public void testMBeanRegistration() { - try { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - Integer beanCount = mbs.getMBeanCount(); - ThroughputQueue queue = new ThroughputQueue(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - assertEquals("Expected bean to be registered", new Integer(beanCount+1), mbs.getMBeanCount()); - ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME))); - assertNotNull(mBean); - } catch (Exception e) { - fail("Failed to register MXBean : "+e.getMessage()); - } + safeSleep(100); //ensure measurable wait time + int takeCount = randomIntBetween(1, putCount); + for(int i=0; i < takeCount; ++i) { + Integer element = queue.poll(); + assertNotNull(element); + assertEquals(i, element.intValue()); + assertEquals(putCount - (1+i), queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); } - - /** - * Test that mulitple mbeans of the same type with a different name can be registered - */ - @Test - public void testMultipleMBeanRegistrations() { - try { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - Integer beanCount = mbs.getMBeanCount(); - int numReg = randomIntBetween(2, 100); - for(int i=0; i < numReg; ++i) { - ThroughputQueue queue = new ThroughputQueue(MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME); - assertEquals("Expected bean to be registered", new Integer(beanCount + (i+1)), mbs.getMBeanCount()); - ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME))); - assertNotNull(mBean); - } - } catch (Exception e) { - fail("Failed to register MXBean : "+e.getMessage()); - } + assertEquals(putCount-takeCount, queue.size()); + assertEquals(queue.size(), queue.getCurrentSize()); + assertTrue(0.0 < queue.getMaxWait()); + assertTrue(0.0 < queue.getAvgWait()); + assertTrue(0.0 < queue.getThroughput()); + assertEquals(putCount, queue.getAdded()); + assertEquals(takeCount, queue.getRemoved()); + } + + + + /** + * Test that max wait and avg wait return expected values + * @throws Exception + */ + @Test + public void testWait() throws Exception { + ThroughputQueue queue = new ThroughputQueue(); + int wait = 1000; + + for(int i=0; i < 3; ++i) { + queue.put(1); + safeSleep(wait); + queue.take(); + assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 2));//can't calculate exactly, making sure its close. + assertTrue(queue.getAvgWait() >= wait && queue.getAvgWait() <= (wait * 2)); + } + queue.put(1); + queue.take(); + assertTrue(queue.getMaxWait() >= wait && queue.getMaxWait() <= (wait * 2));//can't calculate exactly, making sure its close. + assertTrue(queue.getAvgWait() <= 5000 ); + assertTrue(queue.getAvgWait() >= 500); + } + + /** + * Test that throughput returns expected values. + * @throws Exception + */ + @Test + public void testThroughput() throws Exception { + ThroughputQueue queue = new ThroughputQueue(); + int wait = 100; + for(int i=0; i < 10; ++i) { + queue.put(1); + safeSleep(wait); + queue.take(); + } + double throughput = queue.getThroughput(); + assertTrue(throughput <= 15 ); //can't calculate exactly, making sure its close. + assertTrue(throughput >= 5); + + queue = new ThroughputQueue(); + wait = 1000; + for(int i=0; i < 10; ++i) { + queue.put(1); + } + for(int i=0; i < 10; ++i) { + queue.take(); + } + safeSleep(wait); + throughput = queue.getThroughput(); + assertTrue(throughput <= 15 ); //can't calculate exactly, making sure its close. + assertTrue(throughput >= 5); + } + + + /** + * Test that the mbean registers + */ + @Test + public void testMBeanRegistration() { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + Integer beanCount = mbs.getMBeanCount(); + ThroughputQueue queue = new ThroughputQueue(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + assertEquals("Expected bean to be registered", new Integer(beanCount+1), mbs.getMBeanCount()); + ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME))); + assertNotNull(mBean); + } catch (Exception e) { + fail("Failed to register MXBean : "+e.getMessage()); + } + } + + /** + * Test that mulitple mbeans of the same type with a different name can be registered + */ + @Test + public void testMultipleMBeanRegistrations() { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + Integer beanCount = mbs.getMBeanCount(); + int numReg = randomIntBetween(2, 100); + for(int i=0; i < numReg; ++i) { + ThroughputQueue queue = new ThroughputQueue(MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME); + assertEquals("Expected bean to be registered", new Integer(beanCount + (i+1)), mbs.getMBeanCount()); + ObjectInstance mBean = mbs.getObjectInstance(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, MBEAN_ID + "" + i, STREAM_ID, STREAM_START_TIME))); + assertNotNull(mBean); + } + } catch (Exception e) { + fail("Failed to register MXBean : "+e.getMessage()); } + } - private void safeSleep(long sleep) { - try { - Thread.sleep(sleep); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } + private void safeSleep(long sleep) { + try { + Thread.sleep(sleep); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java index 38e948e..2a67550 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java @@ -18,25 +18,28 @@ package org.apache.streams.local.tasks; -import com.google.common.util.concurrent.Uninterruptibles; import org.apache.streams.core.StreamsDatum; -import org.apache.streams.local.counters.DatumStatusCounter; import org.apache.streams.local.counters.StreamsTaskCounter; -import org.apache.streams.local.queues.ThroughputQueue; import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor; import org.apache.streams.local.test.providers.NumericMessageProvider; import org.apache.streams.local.test.writer.DatumCounterWriter; import org.apache.streams.util.ComponentUtils; + +import com.google.common.util.concurrent.Uninterruptibles; import org.junit.After; import org.junit.Test; -import javax.management.InstanceNotFoundException; -import javax.management.ObjectName; -import java.lang.management.ManagementFactory; -import java.util.Queue; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * @@ -44,264 +47,264 @@ import static org.junit.Assert.*; public class BasicTasksTest { - private static final String MBEAN_ID = "test_bean"; - @After - public void removeLocalMBeans() { - try { - ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); - } catch (Exception e) { - //No op. proceed to next test - } + private static final String MBEAN_ID = "test_bean"; + @After + public void removeLocalMBeans() { + try { + ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); + } catch (Exception e) { + //No op. proceed to next test } + } - @Test - public void testProviderTask() { - int numMessages = 100; - NumericMessageProvider provider = new NumericMessageProvider(numMessages); - StreamsProviderTask task = new StreamsProviderTask(provider, false, null); - BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); - task.addOutputQueue(outQueue); - //Test that adding input queues to providers is not valid - BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); - Exception exp = null; - try { - task.addInputQueue(inQueue); - } catch (UnsupportedOperationException uoe) { - exp = uoe; - } - assertNotNull(exp); + @Test + public void testProviderTask() { + int numMessages = 100; + NumericMessageProvider provider = new NumericMessageProvider(numMessages); + StreamsProviderTask task = new StreamsProviderTask(provider, false, null); + BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); + task.addOutputQueue(outQueue); + //Test that adding input queues to providers is not valid + BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); + Exception exp = null; + try { + task.addInputQueue(inQueue); + } catch (UnsupportedOperationException uoe) { + exp = uoe; + } + assertNotNull(exp); - ExecutorService service = Executors.newFixedThreadPool(1); - service.submit(task); - int attempts = 0; - while(outQueue.size() != numMessages) { - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - if(attempts == 10) { - fail("Provider task failed to output "+numMessages+" in a timely fashion."); - } - } - service.shutdown(); - try { - if(!service.awaitTermination(10, TimeUnit.SECONDS)){ - service.shutdownNow(); - fail("Service did not terminate."); - } - assertTrue("Task should have completed running in allotted time.", service.isTerminated()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + ExecutorService service = Executors.newFixedThreadPool(1); + service.submit(task); + int attempts = 0; + while(outQueue.size() != numMessages) { + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + if(attempts == 10) { + fail("Provider task failed to output "+numMessages+" in a timely fashion."); + } + } + service.shutdown(); + try { + if(!service.awaitTermination(10, TimeUnit.SECONDS)){ + service.shutdownNow(); + fail("Service did not terminate."); + } + assertTrue("Task should have completed running in allotted time.", service.isTerminated()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } + } - @Test - public void testProcessorTask() { - int numMessages = 100; - PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor(""); - StreamsProcessorTask task = new StreamsProcessorTask(processor); - StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1); - task.setStreamsTaskCounter(counter); - BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); - BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); - task.addOutputQueue(outQueue); - task.addInputQueue(inQueue); - assertEquals(numMessages, task.getInputQueues().get(0).size()); - ExecutorService service = Executors.newFixedThreadPool(1); - service.submit(task); - int attempts = 0; - while(inQueue.size() != 0 && outQueue.size() != numMessages) { - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - ++attempts; - if(attempts == 10) { - fail("Processor task failed to output "+numMessages+" in a timely fashion."); - } - } - task.stopTask();; - service.shutdown(); - try { - if(!service.awaitTermination(5, TimeUnit.SECONDS)){ - service.shutdownNow(); - fail("Service did not terminate."); - } - assertTrue("Task should have completed running in allotted time.", service.isTerminated()); - } catch (InterruptedException e) { - fail("Test Interrupted."); - } - assertEquals(numMessages, processor.getMessageCount()); - assertEquals(numMessages, counter.getNumReceived()); - assertEquals(numMessages, counter.getNumEmitted()); - assertEquals(0, counter.getNumUnhandledErrors()); - assertEquals(0.0, counter.getErrorRate(), 0.0); + @Test + public void testProcessorTask() { + int numMessages = 100; + PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor(""); + StreamsProcessorTask task = new StreamsProcessorTask(processor); + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1); + task.setStreamsTaskCounter(counter); + BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); + BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); + task.addOutputQueue(outQueue); + task.addInputQueue(inQueue); + assertEquals(numMessages, task.getInputQueues().get(0).size()); + ExecutorService service = Executors.newFixedThreadPool(1); + service.submit(task); + int attempts = 0; + while(inQueue.size() != 0 && outQueue.size() != numMessages) { + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + ++attempts; + if(attempts == 10) { + fail("Processor task failed to output "+numMessages+" in a timely fashion."); + } + } + task.stopTask();; + service.shutdown(); + try { + if(!service.awaitTermination(5, TimeUnit.SECONDS)){ + service.shutdownNow(); + fail("Service did not terminate."); + } + assertTrue("Task should have completed running in allotted time.", service.isTerminated()); + } catch (InterruptedException e) { + fail("Test Interrupted."); } + assertEquals(numMessages, processor.getMessageCount()); + assertEquals(numMessages, counter.getNumReceived()); + assertEquals(numMessages, counter.getNumEmitted()); + assertEquals(0, counter.getNumUnhandledErrors()); + assertEquals(0.0, counter.getErrorRate(), 0.0); + } - @Test - public void testWriterTask() { - int numMessages = 100; - DatumCounterWriter writer = new DatumCounterWriter(""); - StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer); - StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1); - task.setStreamsTaskCounter(counter); - BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); - BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); + @Test + public void testWriterTask() { + int numMessages = 100; + DatumCounterWriter writer = new DatumCounterWriter(""); + StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer); + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, null, -1); + task.setStreamsTaskCounter(counter); + BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); + BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); - Exception exp = null; - try { - task.addOutputQueue(outQueue); - } catch (UnsupportedOperationException uoe) { - exp = uoe; - } - assertNotNull(exp); - task.addInputQueue(inQueue); - assertEquals(numMessages, task.getInputQueues().get(0).size()); - ExecutorService service = Executors.newFixedThreadPool(1); - service.submit(task); - int attempts = 0; - while(inQueue.size() != 0 ) { - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - ++attempts; - if(attempts == 10) { - fail("Processor task failed to output "+numMessages+" in a timely fashion."); - } - } - task.stopTask(); - service.shutdown(); - try { - if(!service.awaitTermination(15, TimeUnit.SECONDS)){ - service.shutdownNow(); - fail("Service did not terminate."); - } - assertTrue("Task should have completed running in allotted time.", service.isTerminated()); - } catch (InterruptedException e) { - fail("Test Interrupted."); - } - assertEquals(numMessages, writer.getDatumsCounted()); - assertEquals(numMessages, counter.getNumReceived()); - assertEquals(0, counter.getNumEmitted()); - assertEquals(0, counter.getNumUnhandledErrors()); - assertEquals(0.0, counter.getErrorRate(), 0.0); + Exception exp = null; + try { + task.addOutputQueue(outQueue); + } catch (UnsupportedOperationException uoe) { + exp = uoe; } + assertNotNull(exp); + task.addInputQueue(inQueue); + assertEquals(numMessages, task.getInputQueues().get(0).size()); + ExecutorService service = Executors.newFixedThreadPool(1); + service.submit(task); + int attempts = 0; + while(inQueue.size() != 0 ) { + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + ++attempts; + if(attempts == 10) { + fail("Processor task failed to output "+numMessages+" in a timely fashion."); + } + } + task.stopTask(); + service.shutdown(); + try { + if(!service.awaitTermination(15, TimeUnit.SECONDS)){ + service.shutdownNow(); + fail("Service did not terminate."); + } + assertTrue("Task should have completed running in allotted time.", service.isTerminated()); + } catch (InterruptedException e) { + fail("Test Interrupted."); + } + assertEquals(numMessages, writer.getDatumsCounted()); + assertEquals(numMessages, counter.getNumReceived()); + assertEquals(0, counter.getNumEmitted()); + assertEquals(0, counter.getNumUnhandledErrors()); + assertEquals(0.0, counter.getErrorRate(), 0.0); + } - @Test - public void testMergeTask() { - int numMessages = 100; - int incoming = 5; - StreamsMergeTask task = new StreamsMergeTask(); - BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); - task.addOutputQueue(outQueue); - for(int i=0; i < incoming; ++i) { - task.addInputQueue(createInputQueue(numMessages)); - } - ExecutorService service = Executors.newFixedThreadPool(1); - service.submit(task); - int attempts = 0; - while(outQueue.size() != incoming * numMessages ) { - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - ++attempts; - if(attempts == 10) { - assertEquals("Processor task failed to output " + (numMessages * incoming) + " in a timely fashion.", (numMessages * incoming), outQueue.size()); - } - } - task.stopTask(); - service.shutdown(); - try { - if(!service.awaitTermination(5, TimeUnit.SECONDS)){ - service.shutdownNow(); - fail("Service did not terminate."); - } - assertTrue("Task should have completed running in allotted time.", service.isTerminated()); - } catch (InterruptedException e) { - fail("Test Interrupted."); - } + @Test + public void testMergeTask() { + int numMessages = 100; + int incoming = 5; + StreamsMergeTask task = new StreamsMergeTask(); + BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); + task.addOutputQueue(outQueue); + for(int i=0; i < incoming; ++i) { + task.addInputQueue(createInputQueue(numMessages)); + } + ExecutorService service = Executors.newFixedThreadPool(1); + service.submit(task); + int attempts = 0; + while(outQueue.size() != incoming * numMessages ) { + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + ++attempts; + if(attempts == 10) { + assertEquals("Processor task failed to output " + (numMessages * incoming) + " in a timely fashion.", (numMessages * incoming), outQueue.size()); + } } + task.stopTask(); + service.shutdown(); + try { + if(!service.awaitTermination(5, TimeUnit.SECONDS)){ + service.shutdownNow(); + fail("Service did not terminate."); + } + assertTrue("Task should have completed running in allotted time.", service.isTerminated()); + } catch (InterruptedException e) { + fail("Test Interrupted."); + } + } - @Test - public void testBranching() { - int numMessages = 100; - PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor(""); - StreamsProcessorTask task = new StreamsProcessorTask(processor); - BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>(); - BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>(); - BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); - task.addOutputQueue(outQueue1); - task.addOutputQueue(outQueue2); - task.addInputQueue(inQueue); - assertEquals(numMessages, task.getInputQueues().get(0).size()); - ExecutorService service = Executors.newFixedThreadPool(1); - service.submit(task); - int attempts = 0; - while(inQueue.size() != 0 ) { - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - ++attempts; - if(attempts == 10) { - assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size()); - } - } - task.stopTask(); + @Test + public void testBranching() { + int numMessages = 100; + PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor(""); + StreamsProcessorTask task = new StreamsProcessorTask(processor); + BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>(); + BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>(); + BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); + task.addOutputQueue(outQueue1); + task.addOutputQueue(outQueue2); + task.addInputQueue(inQueue); + assertEquals(numMessages, task.getInputQueues().get(0).size()); + ExecutorService service = Executors.newFixedThreadPool(1); + service.submit(task); + int attempts = 0; + while(inQueue.size() != 0 ) { + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + ++attempts; + if(attempts == 10) { + assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size()); + } + } + task.stopTask(); - service.shutdown(); - try { - if(!service.awaitTermination(5, TimeUnit.SECONDS)){ - service.shutdownNow(); - fail("Service did not terminate."); - } - assertTrue("Task should have completed running in allotted time.", service.isTerminated()); - } catch (InterruptedException e) { - fail("Test Interrupted."); - } - assertEquals(numMessages, processor.getMessageCount()); - assertEquals(numMessages, outQueue1.size()); - assertEquals(numMessages, outQueue2.size()); + service.shutdown(); + try { + if(!service.awaitTermination(5, TimeUnit.SECONDS)){ + service.shutdownNow(); + fail("Service did not terminate."); + } + assertTrue("Task should have completed running in allotted time.", service.isTerminated()); + } catch (InterruptedException e) { + fail("Test Interrupted."); } + assertEquals(numMessages, processor.getMessageCount()); + assertEquals(numMessages, outQueue1.size()); + assertEquals(numMessages, outQueue2.size()); + } - @Test - public void testBranchingSerialization() { - int numMessages = 1; - PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor(""); - StreamsProcessorTask task = new StreamsProcessorTask(processor); - BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>(); - BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>(); - BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); - task.addOutputQueue(outQueue1); - task.addOutputQueue(outQueue2); - task.addInputQueue(inQueue); - ExecutorService service = Executors.newFixedThreadPool(1); - service.submit(task); - int attempts = 0; - while(inQueue.size() != 0 ) { - Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - ++attempts; - if(attempts == 10) { - assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size()); - } - } - task.stopTask(); + @Test + public void testBranchingSerialization() { + int numMessages = 1; + PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor(""); + StreamsProcessorTask task = new StreamsProcessorTask(processor); + BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>(); + BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>(); + BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); + task.addOutputQueue(outQueue1); + task.addOutputQueue(outQueue2); + task.addInputQueue(inQueue); + ExecutorService service = Executors.newFixedThreadPool(1); + service.submit(task); + int attempts = 0; + while(inQueue.size() != 0 ) { + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + ++attempts; + if(attempts == 10) { + assertEquals("Processor task failed to output "+(numMessages)+" in a timely fashion.", 0, inQueue.size()); + } + } + task.stopTask(); - service.shutdown(); - try { - if(!service.awaitTermination(5, TimeUnit.SECONDS)){ - service.shutdownNow(); - fail("Service did not terminate."); - } - assertTrue("Task should have completed running in allotted time.", service.isTerminated()); - } catch (InterruptedException e) { - fail("Test Interrupted."); - } - assertEquals(numMessages, processor.getMessageCount()); - assertEquals(numMessages, outQueue1.size()); - assertEquals(numMessages, outQueue2.size()); - StreamsDatum datum1 = outQueue1.poll(); - StreamsDatum datum2 = outQueue2.poll(); - assertNotNull(datum1); - assertEquals(datum1, datum2); - datum1.setDocument("a"); - assertNotEquals(datum1, datum2); + service.shutdown(); + try { + if(!service.awaitTermination(5, TimeUnit.SECONDS)){ + service.shutdownNow(); + fail("Service did not terminate."); + } + assertTrue("Task should have completed running in allotted time.", service.isTerminated()); + } catch (InterruptedException e) { + fail("Test Interrupted."); } + assertEquals(numMessages, processor.getMessageCount()); + assertEquals(numMessages, outQueue1.size()); + assertEquals(numMessages, outQueue2.size()); + StreamsDatum datum1 = outQueue1.poll(); + StreamsDatum datum2 = outQueue2.poll(); + assertNotNull(datum1); + assertEquals(datum1, datum2); + datum1.setDocument("a"); + assertNotEquals(datum1, datum2); + } - private BlockingQueue<StreamsDatum> createInputQueue(int numDatums) { - BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>(); - for(int i=0; i < numDatums; ++i) { - queue.add(new StreamsDatum(i)); - } - return queue; + private BlockingQueue<StreamsDatum> createInputQueue(int numDatums) { + BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>(); + for(int i=0; i < numDatums; ++i) { + queue.add(new StreamsDatum(i)); } + return queue; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java index 222566d..782e232 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java @@ -23,132 +23,142 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.util.ComponentUtils; + import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.Queue; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Tests the StreamsProviderTask. */ public class StreamsProviderTaskTest { - protected StreamsProvider mockProvider; - protected ExecutorService pool; - - @Before - public void setup() { - mockProvider = mock(StreamsProvider.class); - pool = Executors.newFixedThreadPool(1); - } - - @After - public void removeLocalMBeans() { - try { - ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); - } catch (Exception e) { - //No op. proceed to next test - } - } - - @Test - public void runPerpetual() { - StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null); - when(mockProvider.isRunning()).thenReturn(true); - when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>())); - task.setTimeout(500); - task.setSleepTime(10); - task.run(); - //Setting this to at least 2 means that it was correctly set to perpetual mode - verify(mockProvider, atLeast(2)).readCurrent(); - verify(mockProvider, atMost(1)).prepare(null); - } - - @Test - public void flushes() { - BlockingQueue<StreamsDatum> out = new LinkedBlockingQueue<>(); - StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null); - when(mockProvider.isRunning()).thenReturn(true); - when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(getQueue(3))); - task.setTimeout(100); - task.setSleepTime(10); - task.getOutputQueues().add(out); - task.run(); - assertThat(out.size(), is(equalTo(3))); + protected StreamsProvider mockProvider; + protected ExecutorService pool; + + @Before + public void setup() { + mockProvider = mock(StreamsProvider.class); + pool = Executors.newFixedThreadPool(1); + } + + @After + public void removeLocalMBeans() { + try { + ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); + } catch (Exception e) { + //No op. proceed to next test } - - protected Queue<StreamsDatum> getQueue(int numElems) { - Queue<StreamsDatum> results = new LinkedBlockingQueue<>(); - for(int i=0; i<numElems; i++) { - results.add(new StreamsDatum(Math.random())); - } - return results; - } - - @Test - public void runNonPerpetual() { - StreamsProviderTask task = new StreamsProviderTask(mockProvider, false, null); - when(mockProvider.isRunning()).thenReturn(true); - when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>())); - task.setTimeout(500); - task.setSleepTime(10); - task.run(); - //In read current mode, this should only be called 1 time - verify(mockProvider, atLeast(1)).readCurrent(); - verify(mockProvider, atMost(1)).prepare(null); - } - - @Test - public void stoppable() throws InterruptedException { - StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null); - when(mockProvider.isRunning()).thenReturn(true); - when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>())); - task.setTimeout(-1); - task.setSleepTime(10); - Future<?> taskResult = pool.submit(task); - - //After a few milliseconds, tell the task that it is to stop and wait until it says it isn't or a timeout happens - int count = 0; - do { - Thread.sleep(100); - if(count == 0) { - task.stopTask(); - } - } while(++count < 10 && !taskResult.isDone()); - verifyNotRunning(task, taskResult); - + } + + @Test + public void runPerpetual() { + StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null); + when(mockProvider.isRunning()).thenReturn(true); + when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>())); + task.setTimeout(500); + task.setSleepTime(10); + task.run(); + //Setting this to at least 2 means that it was correctly set to perpetual mode + verify(mockProvider, atLeast(2)).readCurrent(); + verify(mockProvider, atMost(1)).prepare(null); + } + + @Test + public void flushes() { + BlockingQueue<StreamsDatum> out = new LinkedBlockingQueue<>(); + StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null); + when(mockProvider.isRunning()).thenReturn(true); + when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(getQueue(3))); + task.setTimeout(100); + task.setSleepTime(10); + task.getOutputQueues().add(out); + task.run(); + assertThat(out.size(), is(equalTo(3))); + } + + protected Queue<StreamsDatum> getQueue(int numElems) { + Queue<StreamsDatum> results = new LinkedBlockingQueue<>(); + for(int i=0; i<numElems; i++) { + results.add(new StreamsDatum(Math.random())); } - - @Test - public void earlyException() throws InterruptedException { - StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null); - when(mockProvider.isRunning()).thenReturn(true); - doThrow(new RuntimeException()).when(mockProvider).prepare(null); - task.setTimeout(-1); - task.setSleepTime(10); - Future<?> taskResult = pool.submit(task); - int count = 0; - while(++count < 10 && !taskResult.isDone()) { - Thread.sleep(100); - } - verifyNotRunning(task, taskResult); + return results; + } + + @Test + public void runNonPerpetual() { + StreamsProviderTask task = new StreamsProviderTask(mockProvider, false, null); + when(mockProvider.isRunning()).thenReturn(true); + when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>())); + task.setTimeout(500); + task.setSleepTime(10); + task.run(); + //In read current mode, this should only be called 1 time + verify(mockProvider, atLeast(1)).readCurrent(); + verify(mockProvider, atMost(1)).prepare(null); + } + + @Test + public void stoppable() throws InterruptedException { + StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null); + when(mockProvider.isRunning()).thenReturn(true); + when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(new LinkedBlockingQueue<StreamsDatum>())); + task.setTimeout(-1); + task.setSleepTime(10); + Future<?> taskResult = pool.submit(task); + + //After a few milliseconds, tell the task that it is to stop and wait until it says it isn't or a timeout happens + int count = 0; + do { + Thread.sleep(100); + if(count == 0) { + task.stopTask(); + } + } while(++count < 10 && !taskResult.isDone()); + verifyNotRunning(task, taskResult); + + } + + @Test + public void earlyException() throws InterruptedException { + StreamsProviderTask task = new StreamsProviderTask(mockProvider, true, null); + when(mockProvider.isRunning()).thenReturn(true); + doThrow(new RuntimeException()).when(mockProvider).prepare(null); + task.setTimeout(-1); + task.setSleepTime(10); + Future<?> taskResult = pool.submit(task); + int count = 0; + while(++count < 10 && !taskResult.isDone()) { + Thread.sleep(100); } - - protected void verifyNotRunning(StreamsProviderTask task, Future<?> taskResult) { - //Make sure the task is reporting that it is complete and that the run method returned - if(taskResult.isDone()) { - assertThat(task.isRunning(), is(false)); - } else { - ComponentUtils.shutdownExecutor(pool, 0, 10); - fail(); - } + verifyNotRunning(task, taskResult); + } + + protected void verifyNotRunning(StreamsProviderTask task, Future<?> taskResult) { + //Make sure the task is reporting that it is complete and that the run method returned + if(taskResult.isDone()) { + assertThat(task.isRunning(), is(false)); + } else { + ComponentUtils.shutdownExecutor(pool, 0, 10); + fail(); } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java index cad7873..31a83ec 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/DoNothingProcessor.java @@ -20,6 +20,7 @@ package org.apache.streams.local.test.processors; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,34 +32,34 @@ import java.util.List; */ public class DoNothingProcessor implements StreamsProcessor { - private final static Logger LOGGER = LoggerFactory.getLogger(DoNothingProcessor.class); + private final static Logger LOGGER = LoggerFactory.getLogger(DoNothingProcessor.class); - public final static String STREAMS_ID = "DoNothingProcessor"; + public final static String STREAMS_ID = "DoNothingProcessor"; - List<StreamsDatum> result; + List<StreamsDatum> result; - public DoNothingProcessor() { - } + public DoNothingProcessor() { + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - this.result = new LinkedList<StreamsDatum>(); - result.add(entry); - return result; - } + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + this.result = new LinkedList<StreamsDatum>(); + result.add(entry); + return result; + } - @Override - public void prepare(Object configurationObject) { + @Override + public void prepare(Object configurationObject) { - } + } - @Override - public void cleanUp() { - LOGGER.debug("Processor clean up!"); - } + @Override + public void cleanUp() { + LOGGER.debug("Processor clean up!"); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java index 970a8dc..43343e5 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/PassthroughDatumCounterProcessor.java @@ -20,10 +20,15 @@ package org.apache.streams.local.test.processors; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -32,76 +37,76 @@ import java.util.concurrent.atomic.AtomicLong; */ public class PassthroughDatumCounterProcessor implements StreamsProcessor { - private final static Logger LOGGER = LoggerFactory.getLogger(PassthroughDatumCounterProcessor.class); + private final static Logger LOGGER = LoggerFactory.getLogger(PassthroughDatumCounterProcessor.class); - public final static String STREAMS_ID = "PassthroughDatumCounterProcessor"; + public final static String STREAMS_ID = "PassthroughDatumCounterProcessor"; - /** - * Set of all ids that have been claimed. Ensures all instances are assigned unique ids - */ - public static Set<Integer> CLAIMED_ID = new HashSet<Integer>(); - /** - * Random instance to generate ids - */ - public static final Random RAND = new Random(); - /** - * Set of instance ids that received data. Usefully for testing parrallelization is actually working. - */ - public final static Set<Integer> SEEN_DATA = new HashSet<Integer>(); - /** - * The total count of data seen by a all instances of a processor. - */ - public static final ConcurrentHashMap<String, AtomicLong> COUNTS = new ConcurrentHashMap<>(); + /** + * Set of all ids that have been claimed. Ensures all instances are assigned unique ids + */ + public static Set<Integer> CLAIMED_ID = new HashSet<Integer>(); + /** + * Random instance to generate ids + */ + public static final Random RAND = new Random(); + /** + * Set of instance ids that received data. Usefully for testing parrallelization is actually working. + */ + public final static Set<Integer> SEEN_DATA = new HashSet<Integer>(); + /** + * The total count of data seen by a all instances of a processor. + */ + public static final ConcurrentHashMap<String, AtomicLong> COUNTS = new ConcurrentHashMap<>(); - private int count = 0; - private int id; - private String procId; + private int count = 0; + private int id; + private String procId; - public PassthroughDatumCounterProcessor(String procId) { - this.procId = procId; - } + public PassthroughDatumCounterProcessor(String procId) { + this.procId = procId; + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - ++this.count; - List<StreamsDatum> result = new LinkedList<StreamsDatum>(); - result.add(entry); - synchronized (SEEN_DATA) { - SEEN_DATA.add(this.id); - } - return result; + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + ++this.count; + List<StreamsDatum> result = new LinkedList<StreamsDatum>(); + result.add(entry); + synchronized (SEEN_DATA) { + SEEN_DATA.add(this.id); } + return result; + } - @Override - public void prepare(Object configurationObject) { - synchronized (CLAIMED_ID) { - this.id = RAND.nextInt(); - while(!CLAIMED_ID.add(this.id)) { - this.id = RAND.nextInt(); - } - } + @Override + public void prepare(Object configurationObject) { + synchronized (CLAIMED_ID) { + this.id = RAND.nextInt(); + while(!CLAIMED_ID.add(this.id)) { + this.id = RAND.nextInt(); + } } + } - @Override - public void cleanUp() { - LOGGER.debug("Clean up {}", this.procId); - synchronized (COUNTS) { - AtomicLong count = COUNTS.get(this.procId); - if(count == null) { - COUNTS.put(this.procId, new AtomicLong(this.count)); - } else { - count.addAndGet(this.count); - } - } - LOGGER.debug("{}\t{}", this.procId, this.count); + @Override + public void cleanUp() { + LOGGER.debug("Clean up {}", this.procId); + synchronized (COUNTS) { + AtomicLong count = COUNTS.get(this.procId); + if(count == null) { + COUNTS.put(this.procId, new AtomicLong(this.count)); + } else { + count.addAndGet(this.count); + } } + LOGGER.debug("{}\t{}", this.procId, this.count); + } - public int getMessageCount() { - return this.count; - } + public int getMessageCount() { + return this.count; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java index 2b172cd..227e0f8 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/processors/SlowProcessor.java @@ -19,40 +19,41 @@ package org.apache.streams.local.test.processors; -import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; +import com.google.common.collect.Lists; + import java.util.List; /** */ public class SlowProcessor implements StreamsProcessor { - public final static String STREAMS_ID = "DoNothingProcessor"; + public final static String STREAMS_ID = "DoNothingProcessor"; - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - return Lists.newArrayList(entry); + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } + return Lists.newArrayList(entry); + } - @Override - public void prepare(Object configurationObject) { + @Override + public void prepare(Object configurationObject) { - } + } - @Override - public void cleanUp() { + @Override + public void cleanUp() { - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java index bdbc9ec..571c0fc 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/EmptyResultSetProvider.java @@ -19,10 +19,11 @@ package org.apache.streams.local.test.providers; -import com.google.common.collect.Queues; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; + +import com.google.common.collect.Queues; import org.joda.time.DateTime; import java.math.BigInteger; @@ -32,43 +33,43 @@ import java.math.BigInteger; */ public class EmptyResultSetProvider implements StreamsProvider { - @Override - public String getId() { - return "EmptyResultSetProvider"; - } + @Override + public String getId() { + return "EmptyResultSetProvider"; + } - @Override - public void startStream() { - //NOP - } + @Override + public void startStream() { + //NOP + } - @Override - public StreamsResultSet readCurrent() { - return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue()); - } + @Override + public StreamsResultSet readCurrent() { + return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue()); + } - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue()); - } + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue()); + } - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue()); - } + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return new StreamsResultSet(Queues.<StreamsDatum>newLinkedBlockingQueue()); + } - @Override - public boolean isRunning() { - return true; - } + @Override + public boolean isRunning() { + return true; + } - @Override - public void prepare(Object configurationObject) { - //NOP - } + @Override + public void prepare(Object configurationObject) { + //NOP + } - @Override - public void cleanUp() { - //NOP - } + @Override + public void cleanUp() { + //NOP + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java index d7c1568..88494a8 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/test/providers/NumericMessageProvider.java @@ -18,93 +18,91 @@ package org.apache.streams.local.test.providers; -import com.google.common.collect.Queues; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProvider; import org.apache.streams.core.StreamsResultSet; + +import com.google.common.collect.Queues; import org.joda.time.DateTime; import java.math.BigInteger; -import java.util.Iterator; import java.util.Queue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; /** * Test StreamsProvider that sends out StreamsDatums numbered from 0 to numMessages. */ public class NumericMessageProvider implements StreamsProvider { - @Override - public String getId() { - return "NumericMessageProvider"; - } - - private static final int DEFAULT_BATCH_SIZE = 100; - - private int numMessages; - private BlockingQueue<StreamsDatum> data; - private volatile boolean complete = false; - - public NumericMessageProvider(int numMessages) { - this.numMessages = numMessages; - } - - @Override - public void startStream() { - this.data = constructQueue(); + @Override + public String getId() { + return "NumericMessageProvider"; + } + + private static final int DEFAULT_BATCH_SIZE = 100; + + private int numMessages; + private BlockingQueue<StreamsDatum> data; + private volatile boolean complete = false; + + public NumericMessageProvider(int numMessages) { + this.numMessages = numMessages; + } + + @Override + public void startStream() { + this.data = constructQueue(); + } + + @Override + public StreamsResultSet readCurrent() { + int batchSize = 0; + Queue<StreamsDatum> batch = Queues.newLinkedBlockingQueue(); + try { + while (!this.data.isEmpty() && batchSize < DEFAULT_BATCH_SIZE) { + batch.add(this.data.take()); + ++batchSize; + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); } - - @Override - public StreamsResultSet readCurrent() { - int batchSize = 0; - Queue<StreamsDatum> batch = Queues.newLinkedBlockingQueue(); - try { - while (!this.data.isEmpty() && batchSize < DEFAULT_BATCH_SIZE) { - batch.add(this.data.take()); - ++batchSize; - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } // System.out.println("******************\n**\tBatchSize="+batch.size()+"\n******************"); - this.complete = batch.isEmpty() && this.data.isEmpty(); - return new StreamsResultSet(batch); - } - - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return new StreamsResultSet(constructQueue()); - } - - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return new StreamsResultSet(constructQueue()); - } - - @Override - public boolean isRunning() { - return !this.complete; - } - - @Override - public void prepare(Object configurationObject) { - this.data = constructQueue(); - } - - @Override - public void cleanUp() { - - } - - private BlockingQueue<StreamsDatum> constructQueue() { - BlockingQueue<StreamsDatum> datums = Queues.newArrayBlockingQueue(numMessages); - for(int i=0;i<numMessages;i++) { - datums.add(new StreamsDatum(i)); - } - return datums; + this.complete = batch.isEmpty() && this.data.isEmpty(); + return new StreamsResultSet(batch); + } + + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return new StreamsResultSet(constructQueue()); + } + + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return new StreamsResultSet(constructQueue()); + } + + @Override + public boolean isRunning() { + return !this.complete; + } + + @Override + public void prepare(Object configurationObject) { + this.data = constructQueue(); + } + + @Override + public void cleanUp() { + + } + + private BlockingQueue<StreamsDatum> constructQueue() { + BlockingQueue<StreamsDatum> datums = Queues.newArrayBlockingQueue(numMessages); + for(int i=0;i<numMessages;i++) { + datums.add(new StreamsDatum(i)); } + return datums; + } }
