Repository: incubator-streams
Updated Branches:
  refs/heads/master 0a32159d8 -> 7b01eb48b


Implement ThroughputQueues and removed busy waiting on processors and writer 
task


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fb11add0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fb11add0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fb11add0

Branch: refs/heads/master
Commit: fb11add0e432e3ba63322dfef863783c5033f0f8
Parents: 50906ed
Author: Ryan Ebanks <[email protected]>
Authored: Mon Oct 6 17:10:41 2014 -0500
Committer: Ryan Ebanks <[email protected]>
Committed: Mon Oct 6 17:10:41 2014 -0500

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      | 31 +++++----
 .../streams/local/builders/StreamComponent.java | 15 +++--
 .../streams/local/tasks/BaseStreamsTask.java    | 45 +++++++++----
 .../local/tasks/StreamsPersistWriterTask.java   | 55 +++++++++++-----
 .../local/tasks/StreamsProcessorTask.java       | 68 ++++++++++++++------
 .../local/tasks/StreamsProviderTask.java        |  3 +-
 .../apache/streams/local/tasks/StreamsTask.java |  9 +--
 7 files changed, 152 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index bec1ff9..0b02f4e 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -21,17 +21,16 @@ package org.apache.streams.local.builders;
 import org.apache.log4j.spi.LoggerFactory;
 import org.apache.streams.core.*;
 import 
org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
+import org.apache.streams.local.queues.ThroughputQueue;
 import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
 import org.apache.streams.local.tasks.StatusCounterMonitorThread;
 import org.apache.streams.local.tasks.StreamsProviderTask;
 import org.apache.streams.local.tasks.StreamsTask;
-import org.apache.streams.util.SerializationUtil;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 
 import java.math.BigInteger;
 import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -48,7 +47,6 @@ public class LocalStreamBuilder implements StreamBuilder {
     public static final String TIMEOUT_KEY = "TIMEOUT";
     private Map<String, StreamComponent> providers;
     private Map<String, StreamComponent> components;
-    private Queue<StreamsDatum> queue;
     private Map<String, Object> streamConfig;
     private ExecutorService executor;
     private ExecutorService monitor;
@@ -57,12 +55,13 @@ public class LocalStreamBuilder implements StreamBuilder {
     private LocalStreamProcessMonitorThread monitorThread;
     private Map<String, List<StreamsTask>> tasks;
     private Thread shutdownHook;
+    private int maxQueueCapacity;
 
     /**
      *
      */
     public LocalStreamBuilder(){
-        this(new ConcurrentLinkedQueue<StreamsDatum>(), null);
+        this(-1, null);
     }
 
     /**
@@ -70,29 +69,29 @@ public class LocalStreamBuilder implements StreamBuilder {
      * @param streamConfig
      */
     public LocalStreamBuilder(Map<String, Object> streamConfig) {
-        this(new ConcurrentLinkedQueue<StreamsDatum>(), streamConfig);
+        this(-1, streamConfig);
     }
 
     /**
      *
-     * @param queueType
+     * @param maxQueueCapacity
      */
-    public LocalStreamBuilder(Queue<StreamsDatum> queueType) {
-        this(queueType, null);
+    public LocalStreamBuilder(int maxQueueCapacity) {
+        this(maxQueueCapacity, null);
     }
 
     /**
      *
-     * @param queueType
+     * @param maxQueueCapacity
      * @param streamConfig
      */
-    public LocalStreamBuilder(Queue<StreamsDatum> queueType, Map<String, 
Object> streamConfig) {
-        this.queue = queueType;
+    public LocalStreamBuilder(int maxQueueCapacity, Map<String, Object> 
streamConfig) {
         this.providers = new HashMap<String, StreamComponent>();
         this.components = new HashMap<String, StreamComponent>();
         this.streamConfig = streamConfig;
         this.totalTasks = 0;
         this.monitorTasks = 0;
+        this.maxQueueCapacity = maxQueueCapacity;
         final LocalStreamBuilder self = this;
         this.shutdownHook = new Thread() {
             @Override
@@ -146,7 +145,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     @Override
     public StreamBuilder addStreamsProcessor(String id, StreamsProcessor 
processor, int numTasks, String... inBoundIds) {
         validateId(id);
-        StreamComponent comp = new StreamComponent(id, processor, 
cloneQueue(), numTasks);
+        StreamComponent comp = new StreamComponent(id, processor, new 
ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id), numTasks);
         this.components.put(id, comp);
         connectToOtherComponents(inBoundIds, comp);
         this.totalTasks += numTasks;
@@ -158,7 +157,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     @Override
     public StreamBuilder addStreamsPersistWriter(String id, 
StreamsPersistWriter writer, int numTasks, String... inBoundIds) {
         validateId(id);
-        StreamComponent comp = new StreamComponent(id, writer, cloneQueue(), 
numTasks);
+        StreamComponent comp = new StreamComponent(id, writer, new 
ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id), numTasks);
         this.components.put(id, comp);
         connectToOtherComponents(inBoundIds, comp);
         this.totalTasks += numTasks;
@@ -373,13 +372,13 @@ public class LocalStreamBuilder implements StreamBuilder {
     private void validateId(String id) {
         if(this.providers.containsKey(id) || this.components.containsKey(id)) {
             throw new InvalidStreamException("Duplicate id. "+id+" is already 
assigned to another component");
+        } else if(id.contains(":")) {
+            throw new InvalidStreamException("Invalid character, ':', in 
component id : "+id);
         }
     }
 
 
-    private Queue<StreamsDatum> cloneQueue() {
-        return (Queue<StreamsDatum>) 
SerializationUtil.cloneBySerialization(this.queue);
-    }
+
 
     protected int getTimeout() {
     //Set the timeout of it is configured, otherwise signal downstream 
components to use their default

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
index 8611bbb..5131227 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java
@@ -28,6 +28,7 @@ import org.joda.time.DateTime;
 
 import java.math.BigInteger;
 import java.util.*;
+import java.util.concurrent.BlockingQueue;
 
 /**
  * Stores the implementations of {@link 
org.apache.streams.core.StreamsOperation}, the StreamsOperations it is connected
@@ -40,8 +41,8 @@ public class StreamComponent {
 
     private String id;
     private Set<StreamComponent> inBound;
-    private Map<StreamComponent, Queue<StreamsDatum>> outBound;
-    private Queue<StreamsDatum> inQueue;
+    private Map<StreamComponent, BlockingQueue<StreamsDatum>> outBound;
+    private BlockingQueue<StreamsDatum> inQueue;
     private StreamsProvider provider;
     private StreamsProcessor processor;
     private StreamsPersistWriter writer;
@@ -98,7 +99,7 @@ public class StreamComponent {
      * @param inQueue
      * @param numTasks
      */
-    public StreamComponent(String id, StreamsProcessor processor, 
Queue<StreamsDatum> inQueue, int numTasks) {
+    public StreamComponent(String id, StreamsProcessor processor, 
BlockingQueue<StreamsDatum> inQueue, int numTasks) {
         this.id = id;
         this.processor = processor;
         this.inQueue = inQueue;
@@ -113,7 +114,7 @@ public class StreamComponent {
      * @param inQueue
      * @param numTasks
      */
-    public StreamComponent(String id, StreamsPersistWriter writer, 
Queue<StreamsDatum> inQueue, int numTasks) {
+    public StreamComponent(String id, StreamsPersistWriter writer, 
BlockingQueue<StreamsDatum> inQueue, int numTasks) {
         this.id = id;
         this.writer = writer;
         this.inQueue = inQueue;
@@ -123,7 +124,7 @@ public class StreamComponent {
 
     private void initializePrivateVariables() {
         this.inBound = new HashSet<StreamComponent>();
-        this.outBound = new HashMap<StreamComponent, Queue<StreamsDatum>>();
+        this.outBound = new HashMap<StreamComponent, 
BlockingQueue<StreamsDatum>>();
     }
 
     /**
@@ -131,7 +132,7 @@ public class StreamComponent {
      * @param component the component that this supplying their inbound queue
      * @param queue the queue to to put post processed/provided datums on
      */
-    public void addOutBoundQueue(StreamComponent component, 
Queue<StreamsDatum> queue) {
+    public void addOutBoundQueue(StreamComponent component, 
BlockingQueue<StreamsDatum> queue) {
         this.outBound.put(component, queue);
     }
 
@@ -163,7 +164,7 @@ public class StreamComponent {
      * The inbound queue for this component
      * @return inbound queue
      */
-    public Queue<StreamsDatum> getInBoundQueue() {
+    public BlockingQueue<StreamsDatum> getInBoundQueue() {
         return this.inQueue;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 980d199..758a883 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -20,6 +20,7 @@ package org.apache.streams.local.tasks;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
@@ -30,6 +31,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  *
@@ -38,8 +41,8 @@ public abstract class BaseStreamsTask implements StreamsTask {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseStreamsTask.class);
 
-    private List<Queue<StreamsDatum>> inQueues = new 
ArrayList<Queue<StreamsDatum>>();
-    private List<Queue<StreamsDatum>> outQueues = new 
LinkedList<Queue<StreamsDatum>>();
+    private List<BlockingQueue<StreamsDatum>> inQueues = new 
ArrayList<BlockingQueue<StreamsDatum>>();
+    private List<BlockingQueue<StreamsDatum>> outQueues = new 
LinkedList<BlockingQueue<StreamsDatum>>();
     private int inIndex = 0;
     private ObjectMapper mapper;
 
@@ -50,31 +53,31 @@ public abstract class BaseStreamsTask implements 
StreamsTask {
 
 
     @Override
-    public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+    public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
         this.inQueues.add(inputQueue);
     }
 
     @Override
-    public void addOutputQueue(Queue<StreamsDatum> outputQueue) {
+    public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue) {
         this.outQueues.add(outputQueue);
     }
 
     @Override
-    public List<Queue<StreamsDatum>> getInputQueues() {
+    public List<BlockingQueue<StreamsDatum>> getInputQueues() {
         return this.inQueues;
     }
 
     @Override
-    public List<Queue<StreamsDatum>> getOutputQueues() {
+    public List<BlockingQueue<StreamsDatum>> getOutputQueues() {
         return this.outQueues;
     }
 
     /**
-     * NOTE NECESSARY AT THE MOMENT.  MAY BECOME NECESSARY AS WE LOOK AT 
MAKING JOIN TASKS. CURRENTLY ALL TASK HAVE MAX
-     * OF 1 INPUT QUEUE.
+     * SHOULD NOT BE NECCESARY, WILL REMOVE.
      * Round Robins through input queues to get the next StreamsDatum. If all 
input queues are empty, it will return null.
      * @return the next StreamsDatum or null if all input queues are empty.
      */
+    @Deprecated
     protected StreamsDatum getNextDatum() {
         int startIndex = this.inIndex;
         int index = startIndex;
@@ -91,23 +94,41 @@ public abstract class BaseStreamsTask implements 
StreamsTask {
      * clones of the datum and adds a new clone to each queue.
      * @param datum
      */
-    protected void addToOutgoingQueue(StreamsDatum datum) {
+    protected void addToOutgoingQueue(StreamsDatum datum) throws 
InterruptedException{
         if(this.outQueues.size() == 1) {
-            ComponentUtils.offerUntilSuccess(datum, outQueues.get(0));
+            outQueues.get(0).put(datum);
         }
         else {
             StreamsDatum newDatum = null;
-            for(Queue<StreamsDatum> queue : this.outQueues) {
+            List<BlockingQueue<StreamsDatum>> failedQueues = 
Lists.newLinkedList();
+            // TODO
+            // Needs to be optimized better but workable now
+            // Adds datums to queues that aren't full, then adds to full 
queues with blocking
+            for(BlockingQueue<StreamsDatum> queue : this.outQueues) {
                 try {
                     newDatum = cloneStreamsDatum(datum);
                     if(newDatum != null) {
-                        ComponentUtils.offerUntilSuccess(newDatum, queue);
+                        if(!queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) 
{
+                            failedQueues.add(queue);
+                        }
                     }
                 } catch (RuntimeException e) {
                     LOGGER.debug("Failed to add StreamsDatum to outgoing queue 
: {}", datum);
                     LOGGER.error("Exception while offering StreamsDatum to 
outgoing queue: {}", e);
                 }
             }
+            for(BlockingQueue<StreamsDatum> queue : failedQueues) {
+                try {
+                    newDatum = cloneStreamsDatum(datum);
+                    if(newDatum != null) {
+                        queue.put(newDatum);
+                    }
+                } catch (RuntimeException e) {
+                    LOGGER.debug("Failed to add StreamsDatum to outgoing queue 
: {}", datum);
+                    LOGGER.error("Exception while offering StreamsDatum to 
outgoing queue: {}", e);
+                }
+            }
+
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index 59c438e..ce9f5a7 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -40,7 +41,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask 
implements DatumSt
     private long sleepTime;
     private AtomicBoolean keepRunning;
     private Map<String, Object> streamConfig;
-    private Queue<StreamsDatum> inQueue;
+    private BlockingQueue<StreamsDatum> inQueue;
     private AtomicBoolean isRunning;
 
     private DatumStatusCounter statusCounter = new DatumStatusCounter();
@@ -77,7 +78,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask 
implements DatumSt
     }
 
     @Override
-    public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+    public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
         this.inQueue = inputQueue;
     }
 
@@ -90,29 +91,51 @@ public class StreamsPersistWriterTask extends 
BaseStreamsTask implements DatumSt
     public void run() {
         try {
             this.writer.prepare(this.streamConfig);
-            StreamsDatum datum = this.inQueue.poll();
             while(this.keepRunning.get()) {
+                StreamsDatum datum = null;
+                try {
+                    datum = this.inQueue.take();
+                } catch (InterruptedException ie) {
+                    LOGGER.error("Received InterruptedException. Shutting down 
and re-applying interrupt status.");
+                    Thread.currentThread().interrupt();
+                }
                 if(datum != null) {
                     try {
                         this.writer.write(datum);
                         statusCounter.incrementStatus(DatumStatus.SUCCESS);
                     } catch (Exception e) {
                         LOGGER.error("Error writing to persist writer {}", 
this.writer.getClass().getSimpleName(), e);
-                        this.keepRunning.set(false);
+                        this.keepRunning.set(false); // why do we shutdown on 
a failed write ?
                         statusCounter.incrementStatus(DatumStatus.FAIL);
                         DatumUtils.addErrorToMetadata(datum, e, 
this.writer.getClass());
                     }
+                } else { //datums should never be null
+                    LOGGER.warn("Received null StreamsDatum @ writer : {}", 
this.writer.getClass().getName());
                 }
-                else {
-                    try {
-                        Thread.sleep(this.sleepTime);
-                    } catch (InterruptedException e) {
-                        LOGGER.warn("Thread interrupted in Writer task for 
{}",this.writer.getClass().getSimpleName(), e);
-                        this.keepRunning.set(false);
-                    }
-                }
-                datum = this.inQueue.poll();
             }
+//            StreamsDatum datum = this.inQueue.poll();
+//            while(this.keepRunning.get()) {
+//                if(datum != null) {
+//                    try {
+//                        this.writer.write(datum);
+//                        statusCounter.incrementStatus(DatumStatus.SUCCESS);
+//                    } catch (Exception e) {
+//                        LOGGER.error("Error writing to persist writer {}", 
this.writer.getClass().getSimpleName(), e);
+//                        this.keepRunning.set(false);
+//                        statusCounter.incrementStatus(DatumStatus.FAIL);
+//                        DatumUtils.addErrorToMetadata(datum, e, 
this.writer.getClass());
+//                    }
+//                }
+//                else {
+//                    try {
+//                        Thread.sleep(this.sleepTime);
+//                    } catch (InterruptedException e) {
+//                        LOGGER.warn("Thread interrupted in Writer task for 
{}",this.writer.getClass().getSimpleName(), e);
+//                        this.keepRunning.set(false);
+//                    }
+//                }
+//                datum = this.inQueue.poll();
+//            }
 
         } catch(Exception e) {
             LOGGER.error("Failed to execute Persist Writer 
{}",this.writer.getClass().getSimpleName(), e);
@@ -129,13 +152,13 @@ public class StreamsPersistWriterTask extends 
BaseStreamsTask implements DatumSt
 
 
     @Override
-    public void addOutputQueue(Queue<StreamsDatum> outputQueue) {
+    public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue) {
         throw new UnsupportedOperationException(this.getClass().getName()+" 
does not support method - setOutputQueue()");
     }
 
     @Override
-    public List<Queue<StreamsDatum>> getInputQueues() {
-        List<Queue<StreamsDatum>> queues = new 
LinkedList<Queue<StreamsDatum>>();
+    public List<BlockingQueue<StreamsDatum>> getInputQueues() {
+        List<BlockingQueue<StreamsDatum>> queues = new LinkedList<>();
         queues.add(this.inQueue);
         return queues;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index beec0c2..4b65066 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -28,6 +28,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -42,7 +43,7 @@ public class StreamsProcessorTask extends BaseStreamsTask 
implements DatumStatus
     private long sleepTime;
     private AtomicBoolean keepRunning;
     private Map<String, Object> streamConfig;
-    private Queue<StreamsDatum> inQueue;
+    private BlockingQueue<StreamsDatum> inQueue;
     private AtomicBoolean isRunning;
 
     private DatumStatusCounter statusCounter = new DatumStatusCounter();
@@ -83,7 +84,7 @@ public class StreamsProcessorTask extends BaseStreamsTask 
implements DatumStatus
     }
 
     @Override
-    public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+    public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
         this.inQueue = inputQueue;
     }
 
@@ -96,35 +97,66 @@ public class StreamsProcessorTask extends BaseStreamsTask 
implements DatumStatus
     public void run() {
         try {
             this.processor.prepare(this.streamConfig);
-            StreamsDatum datum = this.inQueue.poll();
             while(this.keepRunning.get()) {
+                StreamsDatum datum = null;
+                try {
+                    datum = this.inQueue.take();
+                } catch (InterruptedException ie) {
+                    LOGGER.warn("Received InteruptedException, shutting down 
and re-applying interrupt status.");
+                    Thread.currentThread().interrupt();
+                }
                 if(datum != null) {
                     try {
                         List<StreamsDatum> output = 
this.processor.process(datum);
-
                         if(output != null) {
                             for(StreamsDatum outDatum : output) {
-                                super.addToOutgoingQueue(outDatum);
+                                super.addToOutgoingQueue(datum);
                                 
statusCounter.incrementStatus(DatumStatus.SUCCESS);
                             }
                         }
-                    } catch (Throwable e) {
-                        LOGGER.error("Throwable Streams Processor {}", e);
+                    } catch (InterruptedException ie) {
+                        LOGGER.warn("Received InteruptedException, shutting 
down and re-applying interrupt status.");
+                        Thread.currentThread().interrupt();
+                    } catch (Throwable t) {
+                        LOGGER.warn("Caught Throwable in processor, {} : {}", 
this.processor.getClass().getName(), t.getMessage());
                         statusCounter.incrementStatus(DatumStatus.FAIL);
                         //Add the error to the metadata, but keep processing
-                        DatumUtils.addErrorToMetadata(datum, e, 
this.processor.getClass());
-                    }
-                }
-                else {
-                    try {
-                        Thread.sleep(this.sleepTime);
-                    } catch (InterruptedException e) {
-                        this.keepRunning.set(false);
+                        DatumUtils.addErrorToMetadata(datum, t, 
this.processor.getClass());
                     }
+                } else {
+                    LOGGER.warn("Removed NULL datum from queue at processor : 
{}", this.processor.getClass().getName());
                 }
-                datum = this.inQueue.poll();
             }
 
+//            this.processor.prepare(this.streamConfig);
+//            StreamsDatum datum = this.inQueue.poll();
+//            while(this.keepRunning.get()) {
+//                if(datum != null) {
+//                    try {
+//                        List<StreamsDatum> output = 
this.processor.process(datum);
+//                        if(output != null) {
+//                            for(StreamsDatum outDatum : output) {
+//                                super.addToOutgoingQueue(outDatum);
+//                                
statusCounter.incrementStatus(DatumStatus.SUCCESS);
+//                            }
+//                        }
+//                    } catch (Throwable e) {
+//                        LOGGER.error("Throwable Streams Processor {}", e);
+//                        statusCounter.incrementStatus(DatumStatus.FAIL);
+//                        //Add the error to the metadata, but keep processing
+//                        DatumUtils.addErrorToMetadata(datum, e, 
this.processor.getClass());
+//                    }
+//                }
+//                else {
+//                    try {
+//                        Thread.sleep(this.sleepTime);
+//                    } catch (InterruptedException e) {
+//                        this.keepRunning.set(false);
+//                    }
+//                }
+//                datum = this.inQueue.poll();
+//            }
+
         } finally {
             this.isRunning.set(false);
             this.processor.cleanUp();
@@ -132,8 +164,8 @@ public class StreamsProcessorTask extends BaseStreamsTask 
implements DatumStatus
     }
 
     @Override
-    public List<Queue<StreamsDatum>> getInputQueues() {
-        List<Queue<StreamsDatum>> queues = new 
LinkedList<Queue<StreamsDatum>>();
+    public List<BlockingQueue<StreamsDatum>> getInputQueues() {
+        List<BlockingQueue<StreamsDatum>> queues = new 
LinkedList<BlockingQueue<StreamsDatum>>();
         queues.add(this.inQueue);
         return queues;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index d040c4b..cea7f63 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 import java.math.BigInteger;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -122,7 +123,7 @@ public class StreamsProviderTask extends BaseStreamsTask 
implements DatumStatusC
     }
 
     @Override
-    public void addInputQueue(Queue<StreamsDatum> inputQueue) {
+    public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) {
         throw new UnsupportedOperationException(this.getClass().getName()+" 
does not support method - setInputQueue()");
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
index 1bb3ab1..45c25f3 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java
@@ -23,6 +23,7 @@ import org.apache.streams.core.StreamsDatum;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 
 /**
  * Interface for all task that will be used to execute instances of {@link 
org.apache.streams.core.StreamsOperation}
@@ -49,13 +50,13 @@ public interface StreamsTask extends Runnable{
      * Add an input {@link java.util.Queue} for this task.
      * @param inputQueue
      */
-    public void addInputQueue(Queue<StreamsDatum> inputQueue);
+    public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue);
 
     /**
      * Add an output {@link java.util.Queue} for this task.
      * @param outputQueue
      */
-    public void addOutputQueue(Queue<StreamsDatum> outputQueue);
+    public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue);
 
     /**
      * Set the configuration object that will shared and passed to all 
instances of StreamsTask.
@@ -73,12 +74,12 @@ public interface StreamsTask extends Runnable{
      * Returns the input queues that have been set for this task.
      * @return list of input queues
      */
-    public List<Queue<StreamsDatum>> getInputQueues();
+    public List<BlockingQueue<StreamsDatum>> getInputQueues();
 
     /**
      * Returns the output queues that have been set for this task
      * @return list of output queues
      */
-    public List<Queue<StreamsDatum>> getOutputQueues();
+    public List<BlockingQueue<StreamsDatum>> getOutputQueues();
 
 }

Reply via email to