STREAMS-191 | Changed addToOutgoingQueue to reduce code complexity and to 
clarify code


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7882f148
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7882f148
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7882f148

Branch: refs/heads/master
Commit: 7882f1480399289b1580235120fa7ee6bc7cd0ff
Parents: ae01621
Author: Ryan Ebanks <[email protected]>
Authored: Wed Oct 15 11:03:23 2014 -0500
Committer: Ryan Ebanks <[email protected]>
Committed: Wed Oct 15 11:03:23 2014 -0500

----------------------------------------------------------------------
 .../streams/local/tasks/BaseStreamsTask.java    | 33 +++++---------------
 1 file changed, 7 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7882f148/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 902a2d7..6755d77 100644
--- 
a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ 
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -99,36 +99,17 @@ public abstract class BaseStreamsTask implements 
StreamsTask {
             outQueues.get(0).put(datum);
         }
         else {
-            StreamsDatum newDatum = null;
-            List<BlockingQueue<StreamsDatum>> failedQueues = 
Lists.newLinkedList();
-            // TODO
-            // Needs to be optimized better but workable now
-            // Adds datums to queues that aren't full, then adds to full 
queues with blocking
-            for(BlockingQueue<StreamsDatum> queue : this.outQueues) {
-                try {
-                    newDatum = cloneStreamsDatum(datum);
-                    if(newDatum != null) {
-                        if(!queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) 
{
-                            failedQueues.add(queue);
+            List<BlockingQueue<StreamsDatum>> toOutput = 
Lists.newLinkedList(this.outQueues);
+            while(!toOutput.isEmpty()) {
+                for (BlockingQueue<StreamsDatum> queue : toOutput) {
+                    StreamsDatum newDatum = cloneStreamsDatum(datum);
+                    if (newDatum != null) {
+                        if (queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) 
{
+                            toOutput.remove(queue);
                         }
                     }
-                } catch (RuntimeException e) {
-                    LOGGER.debug("Failed to add StreamsDatum to outgoing queue 
: {}", datum);
-                    LOGGER.error("Exception while offering StreamsDatum to 
outgoing queue: {}", e);
                 }
             }
-            for(BlockingQueue<StreamsDatum> queue : failedQueues) {
-                try {
-                    newDatum = cloneStreamsDatum(datum);
-                    if(newDatum != null) {
-                        queue.put(newDatum);
-                    }
-                } catch (RuntimeException e) {
-                    LOGGER.debug("Failed to add StreamsDatum to outgoing queue 
: {}", datum);
-                    LOGGER.error("Exception while offering StreamsDatum to 
outgoing queue: {}", e);
-                }
-            }
-
         }
     }
 

Reply via email to