STREAMS-191 | Changed addToOutgoingQueue to reduce code complexity and to clarify code
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7882f148 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7882f148 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7882f148 Branch: refs/heads/master Commit: 7882f1480399289b1580235120fa7ee6bc7cd0ff Parents: ae01621 Author: Ryan Ebanks <[email protected]> Authored: Wed Oct 15 11:03:23 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Wed Oct 15 11:03:23 2014 -0500 ---------------------------------------------------------------------- .../streams/local/tasks/BaseStreamsTask.java | 33 +++++--------------- 1 file changed, 7 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7882f148/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java index 902a2d7..6755d77 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java @@ -99,36 +99,17 @@ public abstract class BaseStreamsTask implements StreamsTask { outQueues.get(0).put(datum); } else { - StreamsDatum newDatum = null; - List<BlockingQueue<StreamsDatum>> failedQueues = Lists.newLinkedList(); - // TODO - // Needs to be optimized better but workable now - // Adds datums to queues that aren't full, then adds to full queues with blocking - for(BlockingQueue<StreamsDatum> queue : this.outQueues) { - try { - newDatum = cloneStreamsDatum(datum); - if(newDatum != null) { - if(!queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) { - failedQueues.add(queue); + List<BlockingQueue<StreamsDatum>> toOutput = Lists.newLinkedList(this.outQueues); + while(!toOutput.isEmpty()) { + for (BlockingQueue<StreamsDatum> queue : toOutput) { + StreamsDatum newDatum = cloneStreamsDatum(datum); + if (newDatum != null) { + if (queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) { + toOutput.remove(queue); } } - } catch (RuntimeException e) { - LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum); - LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e); } } - for(BlockingQueue<StreamsDatum> queue : failedQueues) { - try { - newDatum = cloneStreamsDatum(datum); - if(newDatum != null) { - queue.put(newDatum); - } - } catch (RuntimeException e) { - LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum); - LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e); - } - } - } }
