Added blocking queues to test so that test will compile
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3fad21a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3fad21a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3fad21a1 Branch: refs/heads/master Commit: 3fad21a109289d4cd5b1e5444e2e0d8748154cbf Parents: fb11add Author: Ryan Ebanks <[email protected]> Authored: Tue Oct 7 09:50:17 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Tue Oct 7 09:50:17 2014 -0500 ---------------------------------------------------------------------- .../streams/local/builders/StreamComponent.java | 6 +-- .../streams/local/tasks/StreamsMergeTask.java | 8 +++- .../local/builders/LocalStreamBuilderTest.java | 4 +- .../local/builders/ToyLocalBuilderExample.java | 5 +-- .../streams/local/tasks/BasicTasksTest.java | 44 ++++++++++---------- .../local/tasks/StreamsProviderTaskTest.java | 7 +--- 6 files changed, 38 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fad21a1/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java index 5131227..8bccdf7 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java @@ -189,13 +189,13 @@ public class StreamComponent { if(this.numTasks > 1) { task = new StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor)); task.addInputQueue(this.inQueue); - for(Queue<StreamsDatum> q : this.outBound.values()) { + for(BlockingQueue<StreamsDatum> q : this.outBound.values()) { task.addOutputQueue(q); } } else { task = new StreamsProcessorTask(this.processor); task.addInputQueue(this.inQueue); - for(Queue<StreamsDatum> q : this.outBound.values()) { + for(BlockingQueue<StreamsDatum> q : this.outBound.values()) { task.addOutputQueue(q); } } @@ -226,7 +226,7 @@ public class StreamComponent { if(timeout != 0) { ((StreamsProviderTask)task).setTimeout(timeout); } - for(Queue<StreamsDatum> q : this.outBound.values()) { + for(BlockingQueue<StreamsDatum> q : this.outBound.values()) { task.addOutputQueue(q); } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fad21a1/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java index 4237be8..585e4dd 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java @@ -26,7 +26,9 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * NOT USED. When joins/partions are implemented, a similar pattern could be followed. Done only as basic proof * of concept. + * NEEDS TO BE RE-WRITTEN */ +@Deprecated public class StreamsMergeTask extends BaseStreamsTask { private AtomicBoolean keepRunning; @@ -62,7 +64,11 @@ public class StreamsMergeTask extends BaseStreamsTask { while(this.keepRunning.get()) { StreamsDatum datum = super.getNextDatum(); if(datum != null) { - super.addToOutgoingQueue(datum); + try { + super.addToOutgoingQueue(datum); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } } else { try { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fad21a1/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java index c02f704..b082168 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java @@ -223,7 +223,7 @@ public class LocalStreamBuilderTest { int timeout = 10000; config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout); long start = System.currentTimeMillis(); - StreamBuilder builder = new LocalStreamBuilder(Queues.<StreamsDatum>newLinkedBlockingQueue(), config); + StreamBuilder builder = new LocalStreamBuilder(-1, config); builder.newPerpetualStream("prov1", new EmptyResultSetProvider()) .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor(), 1, "prov1") .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor(), 1, "proc1") @@ -238,7 +238,7 @@ public class LocalStreamBuilderTest { public void ensureShutdownWithBlockedQueue() throws InterruptedException { ExecutorService service = Executors.newSingleThreadExecutor(); int before = Thread.activeCount(); - final StreamBuilder builder = new LocalStreamBuilder(Queues.<StreamsDatum>newLinkedBlockingQueue(1)); + final StreamBuilder builder = new LocalStreamBuilder(1); builder.newPerpetualStream("prov1", new NumericMessageProvider(30)) .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1") .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1"); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fad21a1/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java index f2bccef..a77dfec 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java @@ -19,12 +19,11 @@ package org.apache.streams.local.builders; import org.apache.streams.core.StreamBuilder; -import org.apache.streams.core.StreamsDatum; import org.apache.streams.local.test.processors.DoNothingProcessor; import org.apache.streams.local.test.providers.NumericMessageProvider; import org.apache.streams.local.test.writer.DoNothingWriter; -import java.util.concurrent.LinkedBlockingQueue; + /** * Created by rebanks on 2/20/14. @@ -36,7 +35,7 @@ public class ToyLocalBuilderExample { * @param args */ public static void main(String[] args) { - StreamBuilder builder = new LocalStreamBuilder(new LinkedBlockingQueue<StreamsDatum>()); + StreamBuilder builder = new LocalStreamBuilder(); builder.newReadCurrentStream("prov", new NumericMessageProvider(1000000)) .addStreamsProcessor("proc", new DoNothingProcessor(), 100, "prov") .addStreamsPersistWriter("writer", new DoNothingWriter(), 3, "proc"); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fad21a1/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java index 4ec4acc..c4ea4ee 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java @@ -19,21 +19,19 @@ package org.apache.streams.local.tasks; import org.apache.streams.core.StreamsDatum; +import org.apache.streams.local.queues.ThroughputQueue; import org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor; import org.apache.streams.local.test.providers.NumericMessageProvider; import org.apache.streams.local.test.writer.DatumCounterWriter; import org.junit.Test; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import static org.junit.Assert.*; /** - * Created by rebanks on 2/18/14. + * */ public class BasicTasksTest { @@ -44,9 +42,10 @@ public class BasicTasksTest { int numMessages = 100; NumericMessageProvider provider = new NumericMessageProvider(numMessages); StreamsProviderTask task = new StreamsProviderTask(provider, false); - Queue<StreamsDatum> outQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); task.addOutputQueue(outQueue); - Queue<StreamsDatum> inQueue = createInputQueue(numMessages); + //Test that adding input queues to providers is not valid + BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); Exception exp = null; try { task.addInputQueue(inQueue); @@ -54,6 +53,7 @@ public class BasicTasksTest { exp = uoe; } assertNotNull(exp); + ExecutorService service = Executors.newFixedThreadPool(1); service.submit(task); int attempts = 0; @@ -61,7 +61,7 @@ public class BasicTasksTest { try { Thread.sleep(500); } catch (InterruptedException e) { - //Ignore + Thread.currentThread().interrupt(); } ++attempts; if(attempts == 10) { @@ -76,7 +76,7 @@ public class BasicTasksTest { } assertTrue("Task should have completed running in aloted time.", service.isTerminated()); } catch (InterruptedException e) { - fail("Test Interupted."); + Thread.currentThread().interrupt(); }; } @@ -85,8 +85,8 @@ public class BasicTasksTest { int numMessages = 100; PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor(); StreamsProcessorTask task = new StreamsProcessorTask(processor); - Queue<StreamsDatum> outQueue = new ConcurrentLinkedQueue<StreamsDatum>(); - Queue<StreamsDatum> inQueue = createInputQueue(numMessages); + BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); + BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); task.addOutputQueue(outQueue); task.addInputQueue(inQueue); assertEquals(numMessages, task.getInputQueues().get(0).size()); @@ -123,8 +123,8 @@ public class BasicTasksTest { int numMessages = 100; DatumCounterWriter writer = new DatumCounterWriter(); StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer); - Queue<StreamsDatum> outQueue = new ConcurrentLinkedQueue<StreamsDatum>(); - Queue<StreamsDatum> inQueue = createInputQueue(numMessages); + BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); + BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); Exception exp = null; try { @@ -168,7 +168,7 @@ public class BasicTasksTest { int numMessages = 100; int incoming = 5; StreamsMergeTask task = new StreamsMergeTask(); - Queue<StreamsDatum> outQueue = new ConcurrentLinkedQueue<StreamsDatum>(); + BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>(); task.addOutputQueue(outQueue); for(int i=0; i < incoming; ++i) { task.addInputQueue(createInputQueue(numMessages)); @@ -205,9 +205,9 @@ public class BasicTasksTest { int numMessages = 100; PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor(); StreamsProcessorTask task = new StreamsProcessorTask(processor); - Queue<StreamsDatum> outQueue1 = new ConcurrentLinkedQueue<StreamsDatum>(); - Queue<StreamsDatum> outQueue2 = new ConcurrentLinkedQueue<StreamsDatum>(); - Queue<StreamsDatum> inQueue = createInputQueue(numMessages); + BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>(); + BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>(); + BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); task.addOutputQueue(outQueue1); task.addOutputQueue(outQueue2); task.addInputQueue(inQueue); @@ -248,9 +248,9 @@ public class BasicTasksTest { int numMessages = 1; PassthroughDatumCounterProcessor processor = new PassthroughDatumCounterProcessor(); StreamsProcessorTask task = new StreamsProcessorTask(processor); - Queue<StreamsDatum> outQueue1 = new ConcurrentLinkedQueue<StreamsDatum>(); - Queue<StreamsDatum> outQueue2 = new ConcurrentLinkedQueue<StreamsDatum>(); - Queue<StreamsDatum> inQueue = createInputQueue(numMessages); + BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>(); + BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>(); + BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages); task.addOutputQueue(outQueue1); task.addOutputQueue(outQueue2); task.addInputQueue(inQueue); @@ -291,8 +291,8 @@ public class BasicTasksTest { assertNotEquals(datum1, datum2); } - private Queue<StreamsDatum> createInputQueue(int numDatums) { - Queue<StreamsDatum> queue = new ConcurrentLinkedQueue<StreamsDatum>(); + private BlockingQueue<StreamsDatum> createInputQueue(int numDatums) { + BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>(); for(int i=0; i < numDatums; ++i) { queue.add(new StreamsDatum(i)); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fad21a1/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java index 2cc4fed..856c357 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java @@ -27,10 +27,7 @@ import org.junit.Before; import org.junit.Test; import java.util.Queue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.*; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -67,7 +64,7 @@ public class StreamsProviderTaskTest { @Test public void flushes() { - Queue<StreamsDatum> out = new LinkedBlockingQueue<>(); + BlockingQueue<StreamsDatum> out = new LinkedBlockingQueue<>(); StreamsProviderTask task = new StreamsProviderTask(mockProvider, true); when(mockProvider.isRunning()).thenReturn(true); when(mockProvider.readCurrent()).thenReturn(new StreamsResultSet(getQueue(3)));
