Repository: incubator-streams Updated Branches: refs/heads/master 0a32159d8 -> 7b01eb48b
Implement ThroughputQueues and removed busy waiting on processors and writer task Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/fb11add0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/fb11add0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/fb11add0 Branch: refs/heads/master Commit: fb11add0e432e3ba63322dfef863783c5033f0f8 Parents: 50906ed Author: Ryan Ebanks <[email protected]> Authored: Mon Oct 6 17:10:41 2014 -0500 Committer: Ryan Ebanks <[email protected]> Committed: Mon Oct 6 17:10:41 2014 -0500 ---------------------------------------------------------------------- .../local/builders/LocalStreamBuilder.java | 31 +++++---- .../streams/local/builders/StreamComponent.java | 15 +++-- .../streams/local/tasks/BaseStreamsTask.java | 45 +++++++++---- .../local/tasks/StreamsPersistWriterTask.java | 55 +++++++++++----- .../local/tasks/StreamsProcessorTask.java | 68 ++++++++++++++------ .../local/tasks/StreamsProviderTask.java | 3 +- .../apache/streams/local/tasks/StreamsTask.java | 9 +-- 7 files changed, 152 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index bec1ff9..0b02f4e 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -21,17 +21,16 @@ package org.apache.streams.local.builders; import org.apache.log4j.spi.LoggerFactory; import org.apache.streams.core.*; import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor; +import org.apache.streams.local.queues.ThroughputQueue; import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread; import org.apache.streams.local.tasks.StatusCounterMonitorThread; import org.apache.streams.local.tasks.StreamsProviderTask; import org.apache.streams.local.tasks.StreamsTask; -import org.apache.streams.util.SerializationUtil; import org.joda.time.DateTime; import org.slf4j.Logger; import java.math.BigInteger; import java.util.*; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -48,7 +47,6 @@ public class LocalStreamBuilder implements StreamBuilder { public static final String TIMEOUT_KEY = "TIMEOUT"; private Map<String, StreamComponent> providers; private Map<String, StreamComponent> components; - private Queue<StreamsDatum> queue; private Map<String, Object> streamConfig; private ExecutorService executor; private ExecutorService monitor; @@ -57,12 +55,13 @@ public class LocalStreamBuilder implements StreamBuilder { private LocalStreamProcessMonitorThread monitorThread; private Map<String, List<StreamsTask>> tasks; private Thread shutdownHook; + private int maxQueueCapacity; /** * */ public LocalStreamBuilder(){ - this(new ConcurrentLinkedQueue<StreamsDatum>(), null); + this(-1, null); } /** @@ -70,29 +69,29 @@ public class LocalStreamBuilder implements StreamBuilder { * @param streamConfig */ public LocalStreamBuilder(Map<String, Object> streamConfig) { - this(new ConcurrentLinkedQueue<StreamsDatum>(), streamConfig); + this(-1, streamConfig); } /** * - * @param queueType + * @param maxQueueCapacity */ - public LocalStreamBuilder(Queue<StreamsDatum> queueType) { - this(queueType, null); + public LocalStreamBuilder(int maxQueueCapacity) { + this(maxQueueCapacity, null); } /** * - * @param queueType + * @param maxQueueCapacity * @param streamConfig */ - public LocalStreamBuilder(Queue<StreamsDatum> queueType, Map<String, Object> streamConfig) { - this.queue = queueType; + public LocalStreamBuilder(int maxQueueCapacity, Map<String, Object> streamConfig) { this.providers = new HashMap<String, StreamComponent>(); this.components = new HashMap<String, StreamComponent>(); this.streamConfig = streamConfig; this.totalTasks = 0; this.monitorTasks = 0; + this.maxQueueCapacity = maxQueueCapacity; final LocalStreamBuilder self = this; this.shutdownHook = new Thread() { @Override @@ -146,7 +145,7 @@ public class LocalStreamBuilder implements StreamBuilder { @Override public StreamBuilder addStreamsProcessor(String id, StreamsProcessor processor, int numTasks, String... inBoundIds) { validateId(id); - StreamComponent comp = new StreamComponent(id, processor, cloneQueue(), numTasks); + StreamComponent comp = new StreamComponent(id, processor, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id), numTasks); this.components.put(id, comp); connectToOtherComponents(inBoundIds, comp); this.totalTasks += numTasks; @@ -158,7 +157,7 @@ public class LocalStreamBuilder implements StreamBuilder { @Override public StreamBuilder addStreamsPersistWriter(String id, StreamsPersistWriter writer, int numTasks, String... inBoundIds) { validateId(id); - StreamComponent comp = new StreamComponent(id, writer, cloneQueue(), numTasks); + StreamComponent comp = new StreamComponent(id, writer, new ThroughputQueue<StreamsDatum>(this.maxQueueCapacity, id), numTasks); this.components.put(id, comp); connectToOtherComponents(inBoundIds, comp); this.totalTasks += numTasks; @@ -373,13 +372,13 @@ public class LocalStreamBuilder implements StreamBuilder { private void validateId(String id) { if(this.providers.containsKey(id) || this.components.containsKey(id)) { throw new InvalidStreamException("Duplicate id. "+id+" is already assigned to another component"); + } else if(id.contains(":")) { + throw new InvalidStreamException("Invalid character, ':', in component id : "+id); } } - private Queue<StreamsDatum> cloneQueue() { - return (Queue<StreamsDatum>) SerializationUtil.cloneBySerialization(this.queue); - } + protected int getTimeout() { //Set the timeout of it is configured, otherwise signal downstream components to use their default http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java index 8611bbb..5131227 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/StreamComponent.java @@ -28,6 +28,7 @@ import org.joda.time.DateTime; import java.math.BigInteger; import java.util.*; +import java.util.concurrent.BlockingQueue; /** * Stores the implementations of {@link org.apache.streams.core.StreamsOperation}, the StreamsOperations it is connected @@ -40,8 +41,8 @@ public class StreamComponent { private String id; private Set<StreamComponent> inBound; - private Map<StreamComponent, Queue<StreamsDatum>> outBound; - private Queue<StreamsDatum> inQueue; + private Map<StreamComponent, BlockingQueue<StreamsDatum>> outBound; + private BlockingQueue<StreamsDatum> inQueue; private StreamsProvider provider; private StreamsProcessor processor; private StreamsPersistWriter writer; @@ -98,7 +99,7 @@ public class StreamComponent { * @param inQueue * @param numTasks */ - public StreamComponent(String id, StreamsProcessor processor, Queue<StreamsDatum> inQueue, int numTasks) { + public StreamComponent(String id, StreamsProcessor processor, BlockingQueue<StreamsDatum> inQueue, int numTasks) { this.id = id; this.processor = processor; this.inQueue = inQueue; @@ -113,7 +114,7 @@ public class StreamComponent { * @param inQueue * @param numTasks */ - public StreamComponent(String id, StreamsPersistWriter writer, Queue<StreamsDatum> inQueue, int numTasks) { + public StreamComponent(String id, StreamsPersistWriter writer, BlockingQueue<StreamsDatum> inQueue, int numTasks) { this.id = id; this.writer = writer; this.inQueue = inQueue; @@ -123,7 +124,7 @@ public class StreamComponent { private void initializePrivateVariables() { this.inBound = new HashSet<StreamComponent>(); - this.outBound = new HashMap<StreamComponent, Queue<StreamsDatum>>(); + this.outBound = new HashMap<StreamComponent, BlockingQueue<StreamsDatum>>(); } /** @@ -131,7 +132,7 @@ public class StreamComponent { * @param component the component that this supplying their inbound queue * @param queue the queue to to put post processed/provided datums on */ - public void addOutBoundQueue(StreamComponent component, Queue<StreamsDatum> queue) { + public void addOutBoundQueue(StreamComponent component, BlockingQueue<StreamsDatum> queue) { this.outBound.put(component, queue); } @@ -163,7 +164,7 @@ public class StreamComponent { * The inbound queue for this component * @return inbound queue */ - public Queue<StreamsDatum> getInBoundQueue() { + public BlockingQueue<StreamsDatum> getInBoundQueue() { return this.inQueue; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java index 980d199..758a883 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java @@ -20,6 +20,7 @@ package org.apache.streams.local.tasks; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.json.Activity; @@ -30,6 +31,8 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; /** * @@ -38,8 +41,8 @@ public abstract class BaseStreamsTask implements StreamsTask { private static final Logger LOGGER = LoggerFactory.getLogger(BaseStreamsTask.class); - private List<Queue<StreamsDatum>> inQueues = new ArrayList<Queue<StreamsDatum>>(); - private List<Queue<StreamsDatum>> outQueues = new LinkedList<Queue<StreamsDatum>>(); + private List<BlockingQueue<StreamsDatum>> inQueues = new ArrayList<BlockingQueue<StreamsDatum>>(); + private List<BlockingQueue<StreamsDatum>> outQueues = new LinkedList<BlockingQueue<StreamsDatum>>(); private int inIndex = 0; private ObjectMapper mapper; @@ -50,31 +53,31 @@ public abstract class BaseStreamsTask implements StreamsTask { @Override - public void addInputQueue(Queue<StreamsDatum> inputQueue) { + public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) { this.inQueues.add(inputQueue); } @Override - public void addOutputQueue(Queue<StreamsDatum> outputQueue) { + public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue) { this.outQueues.add(outputQueue); } @Override - public List<Queue<StreamsDatum>> getInputQueues() { + public List<BlockingQueue<StreamsDatum>> getInputQueues() { return this.inQueues; } @Override - public List<Queue<StreamsDatum>> getOutputQueues() { + public List<BlockingQueue<StreamsDatum>> getOutputQueues() { return this.outQueues; } /** - * NOTE NECESSARY AT THE MOMENT. MAY BECOME NECESSARY AS WE LOOK AT MAKING JOIN TASKS. CURRENTLY ALL TASK HAVE MAX - * OF 1 INPUT QUEUE. + * SHOULD NOT BE NECCESARY, WILL REMOVE. * Round Robins through input queues to get the next StreamsDatum. If all input queues are empty, it will return null. * @return the next StreamsDatum or null if all input queues are empty. */ + @Deprecated protected StreamsDatum getNextDatum() { int startIndex = this.inIndex; int index = startIndex; @@ -91,23 +94,41 @@ public abstract class BaseStreamsTask implements StreamsTask { * clones of the datum and adds a new clone to each queue. * @param datum */ - protected void addToOutgoingQueue(StreamsDatum datum) { + protected void addToOutgoingQueue(StreamsDatum datum) throws InterruptedException{ if(this.outQueues.size() == 1) { - ComponentUtils.offerUntilSuccess(datum, outQueues.get(0)); + outQueues.get(0).put(datum); } else { StreamsDatum newDatum = null; - for(Queue<StreamsDatum> queue : this.outQueues) { + List<BlockingQueue<StreamsDatum>> failedQueues = Lists.newLinkedList(); + // TODO + // Needs to be optimized better but workable now + // Adds datums to queues that aren't full, then adds to full queues with blocking + for(BlockingQueue<StreamsDatum> queue : this.outQueues) { try { newDatum = cloneStreamsDatum(datum); if(newDatum != null) { - ComponentUtils.offerUntilSuccess(newDatum, queue); + if(!queue.offer(newDatum, 500, TimeUnit.MILLISECONDS)) { + failedQueues.add(queue); + } } } catch (RuntimeException e) { LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum); LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e); } } + for(BlockingQueue<StreamsDatum> queue : failedQueues) { + try { + newDatum = cloneStreamsDatum(datum); + if(newDatum != null) { + queue.put(newDatum); + } + } catch (RuntimeException e) { + LOGGER.debug("Failed to add StreamsDatum to outgoing queue : {}", datum); + LOGGER.error("Exception while offering StreamsDatum to outgoing queue: {}", e); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java index 59c438e..ce9f5a7 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java @@ -27,6 +27,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -40,7 +41,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt private long sleepTime; private AtomicBoolean keepRunning; private Map<String, Object> streamConfig; - private Queue<StreamsDatum> inQueue; + private BlockingQueue<StreamsDatum> inQueue; private AtomicBoolean isRunning; private DatumStatusCounter statusCounter = new DatumStatusCounter(); @@ -77,7 +78,7 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt } @Override - public void addInputQueue(Queue<StreamsDatum> inputQueue) { + public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) { this.inQueue = inputQueue; } @@ -90,29 +91,51 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt public void run() { try { this.writer.prepare(this.streamConfig); - StreamsDatum datum = this.inQueue.poll(); while(this.keepRunning.get()) { + StreamsDatum datum = null; + try { + datum = this.inQueue.take(); + } catch (InterruptedException ie) { + LOGGER.error("Received InterruptedException. Shutting down and re-applying interrupt status."); + Thread.currentThread().interrupt(); + } if(datum != null) { try { this.writer.write(datum); statusCounter.incrementStatus(DatumStatus.SUCCESS); } catch (Exception e) { LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e); - this.keepRunning.set(false); + this.keepRunning.set(false); // why do we shutdown on a failed write ? statusCounter.incrementStatus(DatumStatus.FAIL); DatumUtils.addErrorToMetadata(datum, e, this.writer.getClass()); } + } else { //datums should never be null + LOGGER.warn("Received null StreamsDatum @ writer : {}", this.writer.getClass().getName()); } - else { - try { - Thread.sleep(this.sleepTime); - } catch (InterruptedException e) { - LOGGER.warn("Thread interrupted in Writer task for {}",this.writer.getClass().getSimpleName(), e); - this.keepRunning.set(false); - } - } - datum = this.inQueue.poll(); } +// StreamsDatum datum = this.inQueue.poll(); +// while(this.keepRunning.get()) { +// if(datum != null) { +// try { +// this.writer.write(datum); +// statusCounter.incrementStatus(DatumStatus.SUCCESS); +// } catch (Exception e) { +// LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(), e); +// this.keepRunning.set(false); +// statusCounter.incrementStatus(DatumStatus.FAIL); +// DatumUtils.addErrorToMetadata(datum, e, this.writer.getClass()); +// } +// } +// else { +// try { +// Thread.sleep(this.sleepTime); +// } catch (InterruptedException e) { +// LOGGER.warn("Thread interrupted in Writer task for {}",this.writer.getClass().getSimpleName(), e); +// this.keepRunning.set(false); +// } +// } +// datum = this.inQueue.poll(); +// } } catch(Exception e) { LOGGER.error("Failed to execute Persist Writer {}",this.writer.getClass().getSimpleName(), e); @@ -129,13 +152,13 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumSt @Override - public void addOutputQueue(Queue<StreamsDatum> outputQueue) { + public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue) { throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setOutputQueue()"); } @Override - public List<Queue<StreamsDatum>> getInputQueues() { - List<Queue<StreamsDatum>> queues = new LinkedList<Queue<StreamsDatum>>(); + public List<BlockingQueue<StreamsDatum>> getInputQueues() { + List<BlockingQueue<StreamsDatum>> queues = new LinkedList<>(); queues.add(this.inQueue); return queues; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java index beec0c2..4b65066 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java @@ -28,6 +28,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -42,7 +43,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus private long sleepTime; private AtomicBoolean keepRunning; private Map<String, Object> streamConfig; - private Queue<StreamsDatum> inQueue; + private BlockingQueue<StreamsDatum> inQueue; private AtomicBoolean isRunning; private DatumStatusCounter statusCounter = new DatumStatusCounter(); @@ -83,7 +84,7 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus } @Override - public void addInputQueue(Queue<StreamsDatum> inputQueue) { + public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) { this.inQueue = inputQueue; } @@ -96,35 +97,66 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus public void run() { try { this.processor.prepare(this.streamConfig); - StreamsDatum datum = this.inQueue.poll(); while(this.keepRunning.get()) { + StreamsDatum datum = null; + try { + datum = this.inQueue.take(); + } catch (InterruptedException ie) { + LOGGER.warn("Received InteruptedException, shutting down and re-applying interrupt status."); + Thread.currentThread().interrupt(); + } if(datum != null) { try { List<StreamsDatum> output = this.processor.process(datum); - if(output != null) { for(StreamsDatum outDatum : output) { - super.addToOutgoingQueue(outDatum); + super.addToOutgoingQueue(datum); statusCounter.incrementStatus(DatumStatus.SUCCESS); } } - } catch (Throwable e) { - LOGGER.error("Throwable Streams Processor {}", e); + } catch (InterruptedException ie) { + LOGGER.warn("Received InteruptedException, shutting down and re-applying interrupt status."); + Thread.currentThread().interrupt(); + } catch (Throwable t) { + LOGGER.warn("Caught Throwable in processor, {} : {}", this.processor.getClass().getName(), t.getMessage()); statusCounter.incrementStatus(DatumStatus.FAIL); //Add the error to the metadata, but keep processing - DatumUtils.addErrorToMetadata(datum, e, this.processor.getClass()); - } - } - else { - try { - Thread.sleep(this.sleepTime); - } catch (InterruptedException e) { - this.keepRunning.set(false); + DatumUtils.addErrorToMetadata(datum, t, this.processor.getClass()); } + } else { + LOGGER.warn("Removed NULL datum from queue at processor : {}", this.processor.getClass().getName()); } - datum = this.inQueue.poll(); } +// this.processor.prepare(this.streamConfig); +// StreamsDatum datum = this.inQueue.poll(); +// while(this.keepRunning.get()) { +// if(datum != null) { +// try { +// List<StreamsDatum> output = this.processor.process(datum); +// if(output != null) { +// for(StreamsDatum outDatum : output) { +// super.addToOutgoingQueue(outDatum); +// statusCounter.incrementStatus(DatumStatus.SUCCESS); +// } +// } +// } catch (Throwable e) { +// LOGGER.error("Throwable Streams Processor {}", e); +// statusCounter.incrementStatus(DatumStatus.FAIL); +// //Add the error to the metadata, but keep processing +// DatumUtils.addErrorToMetadata(datum, e, this.processor.getClass()); +// } +// } +// else { +// try { +// Thread.sleep(this.sleepTime); +// } catch (InterruptedException e) { +// this.keepRunning.set(false); +// } +// } +// datum = this.inQueue.poll(); +// } + } finally { this.isRunning.set(false); this.processor.cleanUp(); @@ -132,8 +164,8 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus } @Override - public List<Queue<StreamsDatum>> getInputQueues() { - List<Queue<StreamsDatum>> queues = new LinkedList<Queue<StreamsDatum>>(); + public List<BlockingQueue<StreamsDatum>> getInputQueues() { + List<BlockingQueue<StreamsDatum>> queues = new LinkedList<BlockingQueue<StreamsDatum>>(); queues.add(this.inQueue); return queues; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java index d040c4b..cea7f63 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import java.math.BigInteger; import java.util.Map; import java.util.Queue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -122,7 +123,7 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC } @Override - public void addInputQueue(Queue<StreamsDatum> inputQueue) { + public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) { throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setInputQueue()"); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/fb11add0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java index 1bb3ab1..45c25f3 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java @@ -23,6 +23,7 @@ import org.apache.streams.core.StreamsDatum; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.BlockingQueue; /** * Interface for all task that will be used to execute instances of {@link org.apache.streams.core.StreamsOperation} @@ -49,13 +50,13 @@ public interface StreamsTask extends Runnable{ * Add an input {@link java.util.Queue} for this task. * @param inputQueue */ - public void addInputQueue(Queue<StreamsDatum> inputQueue); + public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue); /** * Add an output {@link java.util.Queue} for this task. * @param outputQueue */ - public void addOutputQueue(Queue<StreamsDatum> outputQueue); + public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue); /** * Set the configuration object that will shared and passed to all instances of StreamsTask. @@ -73,12 +74,12 @@ public interface StreamsTask extends Runnable{ * Returns the input queues that have been set for this task. * @return list of input queues */ - public List<Queue<StreamsDatum>> getInputQueues(); + public List<BlockingQueue<StreamsDatum>> getInputQueues(); /** * Returns the output queues that have been set for this task * @return list of output queues */ - public List<Queue<StreamsDatum>> getOutputQueues(); + public List<BlockingQueue<StreamsDatum>> getOutputQueues(); }
