Added blocking queues to test so that test will compile

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/3fad21a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/3fad21a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/3fad21a1

Branch: refs/heads/master
Commit: 3fad21a109289d4cd5b1e5444e2e0d8748154cbf
Parents: fb11add
Author: Ryan Ebanks <[email protected]>
Authored: Tue Oct 7 09:50:17 2014 -0500
Committer: Ryan Ebanks <[email protected]>
Committed: Tue Oct 7 09:50:17 2014 -0500

----------------------------------------------------------------------
 .../streams/local/builders/StreamComponent.java |  6 +--
 .../streams/local/tasks/StreamsMergeTask.java   |  8 +++-
 .../local/builders/LocalStreamBuilderTest.java  |  4 +-
 .../local/builders/ToyLocalBuilderExample.java  |  5 +--
 .../streams/local/tasks/BasicTasksTest.java     | 44 ++++++++++----------
 .../local/tasks/StreamsProviderTaskTest.java    |  7 +---
 6 files changed, 38 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fad21a1/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index 5131227..8bccdf7 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -189,13 +189,13 @@ public class StreamComponent {
             if(this.numTasks > 1) {
                 task =  new 
StreamsProcessorTask((StreamsProcessor)SerializationUtil.cloneBySerialization(this.processor));
                 task.addInputQueue(this.inQueue);
-                for(Queue<StreamsDatum> q : this.outBound.values()) {
+                for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
                     task.addOutputQueue(q);
                 }
             } else {
                 task = new StreamsProcessorTask(this.processor);
                 task.addInputQueue(this.inQueue);
-                for(Queue<StreamsDatum> q : this.outBound.values()) {
+                for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
                     task.addOutputQueue(q);
                 }
             }
@@ -226,7 +226,7 @@ public class StreamComponent {
             if(timeout != 0) {
                 ((StreamsProviderTask)task).setTimeout(timeout);
             }
-            for(Queue<StreamsDatum> q : this.outBound.values()) {
+            for(BlockingQueue<StreamsDatum> q : this.outBound.values()) {
                 task.addOutputQueue(q);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fad21a1/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
index 4237be8..585e4dd 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsMergeTask.java
@@ -26,7 +26,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  * NOT USED.  When joins/partions are implemented, a similar pattern could be 
followed. Done only as basic proof
  * of concept.
+ * NEEDS TO BE RE-WRITTEN
  */
+@Deprecated
 public class StreamsMergeTask extends BaseStreamsTask {
 
     private AtomicBoolean keepRunning;
@@ -62,7 +64,11 @@ public class StreamsMergeTask extends BaseStreamsTask {
         while(this.keepRunning.get()) {
             StreamsDatum datum = super.getNextDatum();
             if(datum != null) {
-                super.addToOutgoingQueue(datum);
+                try {
+                    super.addToOutgoingQueue(datum);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
             }
             else {
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fad21a1/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
index c02f704..b082168 100644
--- 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java
@@ -223,7 +223,7 @@ public class LocalStreamBuilderTest {
         int timeout = 10000;
         config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout);
         long start = System.currentTimeMillis();
-        StreamBuilder builder = new 
LocalStreamBuilder(Queues.<StreamsDatum>newLinkedBlockingQueue(), config);
+        StreamBuilder builder = new LocalStreamBuilder(-1, config);
         builder.newPerpetualStream("prov1", new EmptyResultSetProvider())
                 .addStreamsProcessor("proc1", new 
PassthroughDatumCounterProcessor(), 1, "prov1")
                 .addStreamsProcessor("proc2", new 
PassthroughDatumCounterProcessor(), 1, "proc1")
@@ -238,7 +238,7 @@ public class LocalStreamBuilderTest {
     public void ensureShutdownWithBlockedQueue() throws InterruptedException {
         ExecutorService service = Executors.newSingleThreadExecutor();
         int before = Thread.activeCount();
-        final StreamBuilder builder = new 
LocalStreamBuilder(Queues.<StreamsDatum>newLinkedBlockingQueue(1));
+        final StreamBuilder builder = new LocalStreamBuilder(1);
         builder.newPerpetualStream("prov1", new NumericMessageProvider(30))
                 .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1")
                 .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, 
"proc1");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fad21a1/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java
index f2bccef..a77dfec 100644
--- 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java
@@ -19,12 +19,11 @@
 package org.apache.streams.local.builders;
 
 import org.apache.streams.core.StreamBuilder;
-import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.local.test.processors.DoNothingProcessor;
 import org.apache.streams.local.test.providers.NumericMessageProvider;
 import org.apache.streams.local.test.writer.DoNothingWriter;
 
-import java.util.concurrent.LinkedBlockingQueue;
+
 
 /**
  * Created by rebanks on 2/20/14.
@@ -36,7 +35,7 @@ public class ToyLocalBuilderExample {
      * @param args
      */
     public static void main(String[] args) {
-        StreamBuilder builder = new LocalStreamBuilder(new 
LinkedBlockingQueue<StreamsDatum>());
+        StreamBuilder builder = new LocalStreamBuilder();
         builder.newReadCurrentStream("prov", new 
NumericMessageProvider(1000000))
                 .addStreamsProcessor("proc", new DoNothingProcessor(), 100, 
"prov")
                 .addStreamsPersistWriter("writer", new DoNothingWriter(), 3, 
"proc");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fad21a1/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
index 4ec4acc..c4ea4ee 100644
--- 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/BasicTasksTest.java
@@ -19,21 +19,19 @@
 package org.apache.streams.local.tasks;
 
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.local.queues.ThroughputQueue;
 import 
org.apache.streams.local.test.processors.PassthroughDatumCounterProcessor;
 import org.apache.streams.local.test.providers.NumericMessageProvider;
 import org.apache.streams.local.test.writer.DatumCounterWriter;
 import org.junit.Test;
 
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import static org.junit.Assert.*;
 
 /**
- * Created by rebanks on 2/18/14.
+ *
  */
 public class BasicTasksTest {
 
@@ -44,9 +42,10 @@ public class BasicTasksTest {
         int numMessages = 100;
         NumericMessageProvider provider = new 
NumericMessageProvider(numMessages);
         StreamsProviderTask task = new StreamsProviderTask(provider, false);
-        Queue<StreamsDatum> outQueue = new 
ConcurrentLinkedQueue<StreamsDatum>();
+        BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
         task.addOutputQueue(outQueue);
-        Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+        //Test that adding input queues to providers is not valid
+        BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
         Exception exp = null;
         try {
             task.addInputQueue(inQueue);
@@ -54,6 +53,7 @@ public class BasicTasksTest {
             exp = uoe;
         }
         assertNotNull(exp);
+
         ExecutorService service = Executors.newFixedThreadPool(1);
         service.submit(task);
         int attempts = 0;
@@ -61,7 +61,7 @@ public class BasicTasksTest {
             try {
                 Thread.sleep(500);
             } catch (InterruptedException e) {
-                //Ignore
+                Thread.currentThread().interrupt();
             }
             ++attempts;
             if(attempts == 10) {
@@ -76,7 +76,7 @@ public class BasicTasksTest {
             }
             assertTrue("Task should have completed running in aloted time.", 
service.isTerminated());
         } catch (InterruptedException e) {
-            fail("Test Interupted.");
+            Thread.currentThread().interrupt();
         };
     }
 
@@ -85,8 +85,8 @@ public class BasicTasksTest {
         int numMessages = 100;
         PassthroughDatumCounterProcessor processor = new 
PassthroughDatumCounterProcessor();
         StreamsProcessorTask task = new StreamsProcessorTask(processor);
-        Queue<StreamsDatum> outQueue = new 
ConcurrentLinkedQueue<StreamsDatum>();
-        Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+        BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
+        BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
         task.addOutputQueue(outQueue);
         task.addInputQueue(inQueue);
         assertEquals(numMessages, task.getInputQueues().get(0).size());
@@ -123,8 +123,8 @@ public class BasicTasksTest {
         int numMessages = 100;
         DatumCounterWriter writer = new DatumCounterWriter();
         StreamsPersistWriterTask task = new StreamsPersistWriterTask(writer);
-        Queue<StreamsDatum> outQueue = new 
ConcurrentLinkedQueue<StreamsDatum>();
-        Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+        BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
+        BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
 
         Exception exp = null;
         try {
@@ -168,7 +168,7 @@ public class BasicTasksTest {
         int numMessages = 100;
         int incoming = 5;
         StreamsMergeTask task = new StreamsMergeTask();
-        Queue<StreamsDatum> outQueue = new 
ConcurrentLinkedQueue<StreamsDatum>();
+        BlockingQueue<StreamsDatum> outQueue = new LinkedBlockingQueue<>();
         task.addOutputQueue(outQueue);
         for(int i=0; i < incoming; ++i) {
             task.addInputQueue(createInputQueue(numMessages));
@@ -205,9 +205,9 @@ public class BasicTasksTest {
         int numMessages = 100;
         PassthroughDatumCounterProcessor processor = new 
PassthroughDatumCounterProcessor();
         StreamsProcessorTask task = new StreamsProcessorTask(processor);
-        Queue<StreamsDatum> outQueue1 = new 
ConcurrentLinkedQueue<StreamsDatum>();
-        Queue<StreamsDatum> outQueue2 = new 
ConcurrentLinkedQueue<StreamsDatum>();
-        Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+        BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>();
+        BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>();
+        BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
         task.addOutputQueue(outQueue1);
         task.addOutputQueue(outQueue2);
         task.addInputQueue(inQueue);
@@ -248,9 +248,9 @@ public class BasicTasksTest {
         int numMessages = 1;
         PassthroughDatumCounterProcessor processor = new 
PassthroughDatumCounterProcessor();
         StreamsProcessorTask task = new StreamsProcessorTask(processor);
-        Queue<StreamsDatum> outQueue1 = new 
ConcurrentLinkedQueue<StreamsDatum>();
-        Queue<StreamsDatum> outQueue2 = new 
ConcurrentLinkedQueue<StreamsDatum>();
-        Queue<StreamsDatum> inQueue = createInputQueue(numMessages);
+        BlockingQueue<StreamsDatum> outQueue1 = new LinkedBlockingQueue<>();
+        BlockingQueue<StreamsDatum> outQueue2 = new LinkedBlockingQueue<>();
+        BlockingQueue<StreamsDatum> inQueue = createInputQueue(numMessages);
         task.addOutputQueue(outQueue1);
         task.addOutputQueue(outQueue2);
         task.addInputQueue(inQueue);
@@ -291,8 +291,8 @@ public class BasicTasksTest {
         assertNotEquals(datum1, datum2);
     }
 
-    private Queue<StreamsDatum> createInputQueue(int numDatums) {
-        Queue<StreamsDatum> queue = new ConcurrentLinkedQueue<StreamsDatum>();
+    private BlockingQueue<StreamsDatum> createInputQueue(int numDatums) {
+        BlockingQueue<StreamsDatum> queue = new LinkedBlockingQueue<>();
         for(int i=0; i < numDatums; ++i) {
             queue.add(new StreamsDatum(i));
         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/3fad21a1/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
index 2cc4fed..856c357 100644
--- 
a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
+++ 
b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/tasks/StreamsProviderTaskTest.java
@@ -27,10 +27,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.*;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -67,7 +64,7 @@ public class StreamsProviderTaskTest {
 
     @Test
     public void flushes() {
-        Queue<StreamsDatum> out = new LinkedBlockingQueue<>();
+        BlockingQueue<StreamsDatum> out = new LinkedBlockingQueue<>();
         StreamsProviderTask task = new StreamsProviderTask(mockProvider, true);
         when(mockProvider.isRunning()).thenReturn(true);
         when(mockProvider.readCurrent()).thenReturn(new 
StreamsResultSet(getQueue(3)));

Reply via email to