http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java index 3405882..26272ea 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java @@ -18,17 +18,22 @@ package org.apache.streams.local.tasks; -import com.google.common.util.concurrent.Uninterruptibles; import org.apache.streams.config.StreamsConfiguration; -import org.apache.streams.core.*; +import org.apache.streams.core.DatumStatus; +import org.apache.streams.core.DatumStatusCountable; +import org.apache.streams.core.DatumStatusCounter; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProvider; +import org.apache.streams.core.StreamsResultSet; import org.apache.streams.core.util.DatumUtils; import org.apache.streams.local.counters.StreamsTaskCounter; + +import com.google.common.util.concurrent.Uninterruptibles; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigInteger; -import java.util.Map; import java.util.Queue; import java.util.UUID; import java.util.concurrent.BlockingQueue; @@ -40,219 +45,219 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusCountable { - private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderTask.class); + private final static Logger LOGGER = LoggerFactory.getLogger(StreamsProviderTask.class); - public DatumStatusCounter getDatumStatusCounter() { - return this.statusCounter; - } + public DatumStatusCounter getDatumStatusCounter() { + return this.statusCounter; + } - private static enum Type { - PERPETUAL, - READ_CURRENT, - READ_NEW, - READ_RANGE - } + private static enum Type { + PERPETUAL, + READ_CURRENT, + READ_NEW, + READ_RANGE + } - private static final int START = 0; - private static final int END = 1; + private static final int START = 0; + private static final int END = 1; - private StreamsProvider provider; - private final AtomicBoolean keepRunning = new AtomicBoolean(true); - private final AtomicBoolean flushing = new AtomicBoolean(false); - private final AtomicBoolean started = new AtomicBoolean(false); - private Type type; - private BigInteger sequence; - private DateTime[] dateRange; - private StreamsConfiguration config; + private StreamsProvider provider; + private final AtomicBoolean keepRunning = new AtomicBoolean(true); + private final AtomicBoolean flushing = new AtomicBoolean(false); + private final AtomicBoolean started = new AtomicBoolean(false); + private Type type; + private BigInteger sequence; + private DateTime[] dateRange; + private StreamsConfiguration config; - private int timeout; - private long sleepTime; - private int zeros = 0; - private DatumStatusCounter statusCounter = new DatumStatusCounter(); - private StreamsTaskCounter counter; + private int timeout; + private long sleepTime; + private int zeros = 0; + private DatumStatusCounter statusCounter = new DatumStatusCounter(); + private StreamsTaskCounter counter; - /** - * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()} - * @param provider - */ - public StreamsProviderTask(StreamsProvider provider, boolean perpetual, StreamsConfiguration streamConfig) { - super(streamConfig); - streamConfig = super.streamConfig; - this.provider = provider; - if( perpetual ) - this.type = Type.PERPETUAL; - else - this.type = Type.READ_CURRENT; - this.timeout = super.streamConfig.getProviderTimeoutMs().intValue(); - this.sleepTime = streamConfig.getBatchFrequencyMs(); - } + /** + * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readCurrent()} + * @param provider + */ + public StreamsProviderTask(StreamsProvider provider, boolean perpetual, StreamsConfiguration streamConfig) { + super(streamConfig); + streamConfig = super.streamConfig; + this.provider = provider; + if( perpetual ) + this.type = Type.PERPETUAL; + else + this.type = Type.READ_CURRENT; + this.timeout = super.streamConfig.getProviderTimeoutMs().intValue(); + this.sleepTime = streamConfig.getBatchFrequencyMs(); + } - /** - * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readNew(BigInteger)} - * @param provider - * @param sequence - */ - public StreamsProviderTask(StreamsProvider provider, BigInteger sequence, StreamsConfiguration streamConfig) { - super(streamConfig); - this.provider = provider; - this.type = Type.READ_NEW; - this.sequence = sequence; - this.timeout = streamConfig.getProviderTimeoutMs().intValue(); - this.sleepTime = streamConfig.getBatchFrequencyMs(); - } + /** + * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readNew(BigInteger)} + * @param provider + * @param sequence + */ + public StreamsProviderTask(StreamsProvider provider, BigInteger sequence, StreamsConfiguration streamConfig) { + super(streamConfig); + this.provider = provider; + this.type = Type.READ_NEW; + this.sequence = sequence; + this.timeout = streamConfig.getProviderTimeoutMs().intValue(); + this.sleepTime = streamConfig.getBatchFrequencyMs(); + } - /** - * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readRange(DateTime,DateTime)} - * @param provider - * @param start - * @param end - */ - public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end, StreamsConfiguration streamConfig) { - super(streamConfig); - this.provider = provider; - this.type = Type.READ_RANGE; - this.dateRange = new DateTime[2]; - this.dateRange[START] = start; - this.dateRange[END] = end; - this.timeout = streamConfig.getProviderTimeoutMs().intValue(); - this.sleepTime = streamConfig.getBatchFrequencyMs(); - } + /** + * Constructor for a StreamsProvider to execute {@link org.apache.streams.core.StreamsProvider:readRange(DateTime,DateTime)} + * @param provider + * @param start + * @param end + */ + public StreamsProviderTask(StreamsProvider provider, DateTime start, DateTime end, StreamsConfiguration streamConfig) { + super(streamConfig); + this.provider = provider; + this.type = Type.READ_RANGE; + this.dateRange = new DateTime[2]; + this.dateRange[START] = start; + this.dateRange[END] = end; + this.timeout = streamConfig.getProviderTimeoutMs().intValue(); + this.sleepTime = streamConfig.getBatchFrequencyMs(); + } - public void setTimeout(int timeout) { - this.timeout = timeout; - } + public void setTimeout(int timeout) { + this.timeout = timeout; + } - public void setSleepTime(long sleepTime) { - this.sleepTime = sleepTime; - } + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } - @Override - public boolean isWaiting() { - return false; //providers don't have inbound queues - } + @Override + public boolean isWaiting() { + return false; //providers don't have inbound queues + } - @Override - public void stopTask() { - LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName()); - this.keepRunning.set(false); - } + @Override + public void stopTask() { + LOGGER.debug("Stopping Provider Task for {}", this.provider.getClass().getSimpleName()); + this.keepRunning.set(false); + } - @Override - public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) { - throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setInputQueue()"); - } + @Override + public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue) { + throw new UnsupportedOperationException(this.getClass().getName()+" does not support method - setInputQueue()"); + } - @Override - public void setStreamConfig(StreamsConfiguration config) { - this.config = config; - } + @Override + public void setStreamConfig(StreamsConfiguration config) { + this.config = config; + } - @Override - public void run() { - try { - this.provider.prepare(this.config); //TODO allow for configuration objects - StreamsResultSet resultSet = null; - //Negative values mean we want to run forever - long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime); - if(this.counter == null) { //should never be null - this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt()); + @Override + public void run() { + try { + this.provider.prepare(this.config); //TODO allow for configuration objects + StreamsResultSet resultSet = null; + //Negative values mean we want to run forever + long maxZeros = timeout < 0 ? Long.MAX_VALUE : (timeout / sleepTime); + if(this.counter == null) { //should never be null + this.counter = new StreamsTaskCounter(this.provider.getClass().getName()+ UUID.randomUUID().toString(), getStreamIdentifier(), getStartedAt()); + } + switch(this.type) { + case PERPETUAL: { + provider.startStream(); + this.started.set(true); + while(this.isRunning()) { + try { + long startTime = System.currentTimeMillis(); + resultSet = provider.readCurrent(); + this.counter.addTime(System.currentTimeMillis() - startTime); + if( resultSet.size() == 0 ) + zeros++; + else { + zeros = 0; + } + flushResults(resultSet); + // the way this works needs to change... + if(zeros > maxZeros) + this.keepRunning.set(false); + if(zeros > 0) + Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS); + } catch (Exception e) { + this.counter.incrementErrorCount(); + LOGGER.warn("Thread exception"); + this.keepRunning.set(false); } - switch(this.type) { - case PERPETUAL: { - provider.startStream(); - this.started.set(true); - while(this.isRunning()) { - try { - long startTime = System.currentTimeMillis(); - resultSet = provider.readCurrent(); - this.counter.addTime(System.currentTimeMillis() - startTime); - if( resultSet.size() == 0 ) - zeros++; - else { - zeros = 0; - } - flushResults(resultSet); - // the way this works needs to change... - if(zeros > maxZeros) - this.keepRunning.set(false); - if(zeros > 0) - Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS); - } catch (Exception e) { - this.counter.incrementErrorCount(); - LOGGER.warn("Thread exception"); - this.keepRunning.set(false); - } - } - Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS); - } - break; - case READ_CURRENT: - resultSet = this.provider.readCurrent(); - this.started.set(true); - break; - case READ_NEW: - resultSet = this.provider.readNew(this.sequence); - this.started.set(true); - break; - case READ_RANGE: - resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]); - this.started.set(true); - break; - default: throw new RuntimeException("Type has not been added to StreamsProviderTask."); - } - if( resultSet != null ) - flushResults(resultSet); - - } catch(Throwable e) { - LOGGER.error("Caught Throwable in Provider {}", this.provider.getClass().getSimpleName(), e); - } finally { - Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS); - LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName()); - this.provider.cleanUp(); - //Setting started to 'true' here will allow the isRunning() method to return false in the event of an exception - //before started would normally be set to true n the run method. - this.started.set(true); - this.keepRunning.set(false); + } + Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS); } - } + break; + case READ_CURRENT: + resultSet = this.provider.readCurrent(); + this.started.set(true); + break; + case READ_NEW: + resultSet = this.provider.readNew(this.sequence); + this.started.set(true); + break; + case READ_RANGE: + resultSet = this.provider.readRange(this.dateRange[START], this.dateRange[END]); + this.started.set(true); + break; + default: throw new RuntimeException("Type has not been added to StreamsProviderTask."); + } + if( resultSet != null ) + flushResults(resultSet); - @Override - public boolean isRunning() { - //We want to make sure that we never return false if it is flushing, regardless of the state of the provider - //or whether we have been told to shut down. If someone really wants us to shut down, they will interrupt the - //thread and force us to shutdown. We also want to make sure we have had the opportunity to run before the - //runtime kills us. - return !this.started.get() || this.flushing.get() || (this.provider.isRunning() && this.keepRunning.get()); + } catch(Throwable e) { + LOGGER.error("Caught Throwable in Provider {}", this.provider.getClass().getSimpleName(), e); + } finally { + Uninterruptibles.sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS); + LOGGER.debug("Complete Provider Task execution for {}", this.provider.getClass().getSimpleName()); + this.provider.cleanUp(); + //Setting started to 'true' here will allow the isRunning() method to return false in the event of an exception + //before started would normally be set to true n the run method. + this.started.set(true); + this.keepRunning.set(false); } + } - public void flushResults(StreamsResultSet resultSet) { - Queue<StreamsDatum> queue = resultSet.getQueue(); - this.flushing.set(true); - while(!queue.isEmpty()) { - StreamsDatum datum = queue.poll(); - if(!this.keepRunning.get()) { - break; - } - if(datum != null) { - try { - super.addToOutgoingQueue(datum); - this.counter.incrementEmittedCount(); - statusCounter.incrementStatus(DatumStatus.SUCCESS); - } catch( Exception e ) { - this.counter.incrementErrorCount(); - statusCounter.incrementStatus(DatumStatus.FAIL); - DatumUtils.addErrorToMetadata(datum, e, this.provider.getClass()); - } - } + @Override + public boolean isRunning() { + //We want to make sure that we never return false if it is flushing, regardless of the state of the provider + //or whether we have been told to shut down. If someone really wants us to shut down, they will interrupt the + //thread and force us to shutdown. We also want to make sure we have had the opportunity to run before the + //runtime kills us. + return !this.started.get() || this.flushing.get() || (this.provider.isRunning() && this.keepRunning.get()); + } + + public void flushResults(StreamsResultSet resultSet) { + Queue<StreamsDatum> queue = resultSet.getQueue(); + this.flushing.set(true); + while(!queue.isEmpty()) { + StreamsDatum datum = queue.poll(); + if(!this.keepRunning.get()) { + break; + } + if(datum != null) { + try { + super.addToOutgoingQueue(datum); + this.counter.incrementEmittedCount(); + statusCounter.incrementStatus(DatumStatus.SUCCESS); + } catch( Exception e ) { + this.counter.incrementErrorCount(); + statusCounter.incrementStatus(DatumStatus.FAIL); + DatumUtils.addErrorToMetadata(datum, e, this.provider.getClass()); } - this.flushing.set(false); + } } + this.flushing.set(false); + } - @Override - public void setStreamsTaskCounter(StreamsTaskCounter counter) { - this.counter = counter; - } + @Override + public void setStreamsTaskCounter(StreamsTaskCounter counter) { + this.counter = counter; + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java index 5c14c1f..1b91e5c 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsTask.java @@ -23,8 +23,6 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.local.counters.StreamsTaskCounter; import java.util.List; -import java.util.Map; -import java.util.Queue; import java.util.concurrent.BlockingQueue; /** @@ -33,53 +31,53 @@ import java.util.concurrent.BlockingQueue; */ public interface StreamsTask extends Runnable{ - /** - * Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting. - */ - public void stopTask(); + /** + * Informs the task to stop. Tasks may or may not try to empty its inbound queue before halting. + */ + public void stopTask(); - /** - * Returns true if the task is waiting on more data to process - * @return true, if waiting on more data to process - */ - public boolean isWaiting(); - /** - * Add an input {@link java.util.Queue} for this task. - * @param inputQueue - */ - public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue); + /** + * Returns true if the task is waiting on more data to process + * @return true, if waiting on more data to process + */ + public boolean isWaiting(); + /** + * Add an input {@link java.util.Queue} for this task. + * @param inputQueue + */ + public void addInputQueue(BlockingQueue<StreamsDatum> inputQueue); - /** - * Add an output {@link java.util.Queue} for this task. - * @param outputQueue - */ - public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue); + /** + * Add an output {@link java.util.Queue} for this task. + * @param outputQueue + */ + public void addOutputQueue(BlockingQueue<StreamsDatum> outputQueue); - /** - * Set the configuration object that will shared and passed to all instances of StreamsTask. - * @param config optional configuration information - */ - public void setStreamConfig(StreamsConfiguration config); + /** + * Set the configuration object that will shared and passed to all instances of StreamsTask. + * @param config optional configuration information + */ + public void setStreamConfig(StreamsConfiguration config); - /** - * Returns true when the task has not completed. Returns false otherwise - * @return true when the task has not completed. Returns false otherwise - */ - public boolean isRunning(); + /** + * Returns true when the task has not completed. Returns false otherwise + * @return true when the task has not completed. Returns false otherwise + */ + public boolean isRunning(); - /** - * Returns the input queues that have been set for this task. - * @return list of input queues - */ - public List<BlockingQueue<StreamsDatum>> getInputQueues(); + /** + * Returns the input queues that have been set for this task. + * @return list of input queues + */ + public List<BlockingQueue<StreamsDatum>> getInputQueues(); - /** - * Returns the output queues that have been set for this task - * @return list of output queues - */ - public List<BlockingQueue<StreamsDatum>> getOutputQueues(); + /** + * Returns the output queues that have been set for this task + * @return list of output queues + */ + public List<BlockingQueue<StreamsDatum>> getOutputQueues(); - public void setStreamsTaskCounter(StreamsTaskCounter counter); + public void setStreamsTaskCounter(StreamsTaskCounter counter); } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java index 2bbfdcc..741c0b5 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/LocalStreamBuilderTest.java @@ -18,11 +18,6 @@ package org.apache.streams.local.builders; -import com.carrotsearch.randomizedtesting.RandomizedTest; -import com.carrotsearch.randomizedtesting.annotations.Repeat; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.Uninterruptibles; import org.apache.streams.core.StreamBuilder; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; @@ -36,6 +31,12 @@ import org.apache.streams.local.test.providers.NumericMessageProvider; import org.apache.streams.local.test.writer.DatumCounterWriter; import org.apache.streams.local.test.writer.SystemOutWriter; import org.apache.streams.util.ComponentUtils; + +import com.carrotsearch.randomizedtesting.RandomizedTest; +import com.carrotsearch.randomizedtesting.annotations.Repeat; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Uninterruptibles; import org.joda.time.DateTime; import org.junit.After; import org.junit.Before; @@ -44,7 +45,6 @@ import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import javax.management.*; import java.lang.management.ManagementFactory; import java.util.Collections; import java.util.List; @@ -55,10 +55,21 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.management.InstanceNotFoundException; +import javax.management.MBeanRegistrationException; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Basic Tests for the LocalStreamBuilder. @@ -70,334 +81,334 @@ import static org.mockito.Mockito.*; * */ public class LocalStreamBuilderTest extends RandomizedTest { - private static final String MBEAN_ID = "test_id"; - private static final String STREAM_ID = "test_stream"; - private static long STREAM_START_TIME = (new DateTime()).getMillis(); + private static final String MBEAN_ID = "test_id"; + private static final String STREAM_ID = "test_stream"; + private static long STREAM_START_TIME = (new DateTime()).getMillis(); - @After - public void removeLocalMBeans() { - try { - ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); - } catch (Exception e) { - //No op. proceed to next test - } + @After + public void removeLocalMBeans() { + try { + ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); + } catch (Exception e) { + //No op. proceed to next test } + } - public void removeRegisteredMBeans(String... ids) { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - for(String id : ids) { - try { - mbs.unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME))); - } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) { - //No-op - } - try { - mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME)))); - } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) { - //No-op - } - } + public void removeRegisteredMBeans(String... ids) { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + for(String id : ids) { + try { + mbs.unregisterMBean(new ObjectName(String.format(ThroughputQueue.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME))); + } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) { + //No-op + } + try { + mbs.unregisterMBean(new ObjectName((String.format(StreamsTaskCounter.NAME_TEMPLATE, id, STREAM_ID, STREAM_START_TIME)))); + } catch (MalformedObjectNameException|InstanceNotFoundException|MBeanRegistrationException e) { + //No-op + } } + } - @Test - public void testStreamIdValidations() { - StreamBuilder builder = new LocalStreamBuilder(); - builder.newReadCurrentStream("id", new NumericMessageProvider(1)); - Exception exp = null; - try { - builder.newReadCurrentStream("id", new NumericMessageProvider(1)); - } catch (RuntimeException e) { - exp = e; - } - assertNotNull(exp); - exp = null; - builder.addStreamsProcessor("1", new PassthroughDatumCounterProcessor("1"), 1, "id"); - try { - builder.addStreamsProcessor("2", new PassthroughDatumCounterProcessor("2"), 1, "id", "id2"); - } catch (RuntimeException e) { - exp = e; - } - assertNotNull(exp); - removeRegisteredMBeans("1", "2", "id"); + @Test + public void testStreamIdValidations() { + StreamBuilder builder = new LocalStreamBuilder(); + builder.newReadCurrentStream("id", new NumericMessageProvider(1)); + Exception exp = null; + try { + builder.newReadCurrentStream("id", new NumericMessageProvider(1)); + } catch (RuntimeException e) { + exp = e; } - - @Test - public void testBasicLinearStream1() { - linearStreamNonParallel(1, 1); + assertNotNull(exp); + exp = null; + builder.addStreamsProcessor("1", new PassthroughDatumCounterProcessor("1"), 1, "id"); + try { + builder.addStreamsProcessor("2", new PassthroughDatumCounterProcessor("2"), 1, "id", "id2"); + } catch (RuntimeException e) { + exp = e; } + assertNotNull(exp); + removeRegisteredMBeans("1", "2", "id"); + } - @Test - public void testBasicLinearStream2() { - linearStreamNonParallel(1004, 1); - } + @Test + public void testBasicLinearStream1() { + linearStreamNonParallel(1, 1); + } - @Test - public void testBasicLinearStream3() { - linearStreamNonParallel(1, 10); - } + @Test + public void testBasicLinearStream2() { + linearStreamNonParallel(1004, 1); + } - @Test - @Repeat(iterations = 3) - public void testBasicLinearStreamRandom() { - int numDatums = randomIntBetween(1, 100000); - int numProcessors = randomIntBetween(1, 10); - linearStreamNonParallel(numDatums, numProcessors); - } + @Test + public void testBasicLinearStream3() { + linearStreamNonParallel(1, 10); + } + + @Test + @Repeat(iterations = 3) + public void testBasicLinearStreamRandom() { + int numDatums = randomIntBetween(1, 100000); + int numProcessors = randomIntBetween(1, 10); + linearStreamNonParallel(numDatums, numProcessors); + } - /** - * Tests that all datums pass through each processor and that all datums reach the writer - * @param numDatums - * @param numProcessors - */ - private void linearStreamNonParallel(int numDatums, int numProcessors) { - String processorId = "proc"; - try { - StreamBuilder builder = new LocalStreamBuilder(10); - builder.newPerpetualStream("numeric_provider", new NumericMessageProvider(numDatums)); - String connectTo = null; - for(int i=0; i < numProcessors; ++i) { - if(i == 0) { - connectTo = "numeric_provider"; - } else { - connectTo = processorId+(i-1); - } - builder.addStreamsProcessor(processorId+i, new PassthroughDatumCounterProcessor(processorId+i), 1, connectTo); - } - Set output = Collections.newSetFromMap(new ConcurrentHashMap()); - builder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, processorId+(numProcessors-1)); - builder.start(); - for(int i=0; i < numProcessors; ++i) { - assertEquals("Processor "+i+" did not receive all of the datums", numDatums, PassthroughDatumCounterProcessor.COUNTS.get(processorId+i).get()); - } - for(int i=0; i < numDatums; ++i) { - assertTrue("Expected writer to have received : "+i, DatumCounterWriter.RECEIVED.get("writer").contains(i)); - } - } finally { - for(int i=0; i < numProcessors; ++i) { - removeRegisteredMBeans(processorId+i, processorId+i+"-"+PassthroughDatumCounterProcessor.class.getCanonicalName()); - } - removeRegisteredMBeans("writer", "numeric_provider"); + /** + * Tests that all datums pass through each processor and that all datums reach the writer + * @param numDatums + * @param numProcessors + */ + private void linearStreamNonParallel(int numDatums, int numProcessors) { + String processorId = "proc"; + try { + StreamBuilder builder = new LocalStreamBuilder(10); + builder.newPerpetualStream("numeric_provider", new NumericMessageProvider(numDatums)); + String connectTo = null; + for(int i=0; i < numProcessors; ++i) { + if(i == 0) { + connectTo = "numeric_provider"; + } else { + connectTo = processorId+(i-1); } + builder.addStreamsProcessor(processorId+i, new PassthroughDatumCounterProcessor(processorId+i), 1, connectTo); + } + Set output = Collections.newSetFromMap(new ConcurrentHashMap()); + builder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, processorId+(numProcessors-1)); + builder.start(); + for(int i=0; i < numProcessors; ++i) { + assertEquals("Processor "+i+" did not receive all of the datums", numDatums, PassthroughDatumCounterProcessor.COUNTS.get(processorId+i).get()); + } + for(int i=0; i < numDatums; ++i) { + assertTrue("Expected writer to have received : "+i, DatumCounterWriter.RECEIVED.get("writer").contains(i)); + } + } finally { + for(int i=0; i < numProcessors; ++i) { + removeRegisteredMBeans(processorId+i, processorId+i+"-"+PassthroughDatumCounterProcessor.class.getCanonicalName()); + } + removeRegisteredMBeans("writer", "numeric_provider"); } + } - @Test - public void testParallelLinearStream1() { - String processorId = "proc"; - int numProcessors = randomIntBetween(1, 10); - int numDatums = randomIntBetween(1, 300000); - try { - StreamBuilder builder = new LocalStreamBuilder(50); - builder.newPerpetualStream("numeric_provider", new NumericMessageProvider(numDatums)); - String connectTo = null; - for(int i=0; i < numProcessors; ++i) { - if(i == 0) { - connectTo = "numeric_provider"; - } else { - connectTo = processorId+(i-1); - } - int parallelHint = randomIntBetween(1,5); - builder.addStreamsProcessor(processorId+i, new PassthroughDatumCounterProcessor(processorId+i), parallelHint, connectTo); - } - builder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, processorId+(numProcessors-1)); - builder.start(); - Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); - builder.stop(); - Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); - assertEquals(numDatums, DatumCounterWriter.RECEIVED.get("writer").size()); - for(int i=0; i < numDatums; ++i) { - assertTrue("Expected Writer to receive datum : " + i, DatumCounterWriter.RECEIVED.get("writer").contains(i)); - } - for(int i=0; i < numProcessors; ++i) { - assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get(processorId+i).get()); - } - - } finally { - for(int i=0; i < numProcessors; ++i) { - removeRegisteredMBeans(processorId+i); - } - removeRegisteredMBeans("writer", "numeric_provider"); + @Test + public void testParallelLinearStream1() { + String processorId = "proc"; + int numProcessors = randomIntBetween(1, 10); + int numDatums = randomIntBetween(1, 300000); + try { + StreamBuilder builder = new LocalStreamBuilder(50); + builder.newPerpetualStream("numeric_provider", new NumericMessageProvider(numDatums)); + String connectTo = null; + for(int i=0; i < numProcessors; ++i) { + if(i == 0) { + connectTo = "numeric_provider"; + } else { + connectTo = processorId+(i-1); } + int parallelHint = randomIntBetween(1,5); + builder.addStreamsProcessor(processorId+i, new PassthroughDatumCounterProcessor(processorId+i), parallelHint, connectTo); + } + builder.addStreamsPersistWriter("writer", new DatumCounterWriter("writer"), 1, processorId+(numProcessors-1)); + builder.start(); + Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); + builder.stop(); + Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); + assertEquals(numDatums, DatumCounterWriter.RECEIVED.get("writer").size()); + for(int i=0; i < numDatums; ++i) { + assertTrue("Expected Writer to receive datum : " + i, DatumCounterWriter.RECEIVED.get("writer").contains(i)); + } + for(int i=0; i < numProcessors; ++i) { + assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get(processorId+i).get()); + } + + } finally { + for(int i=0; i < numProcessors; ++i) { + removeRegisteredMBeans(processorId+i); + } + removeRegisteredMBeans("writer", "numeric_provider"); } + } - @Test - public void testBasicMergeStream() { - try { - int numDatums1 = randomIntBetween(1, 300000); - int numDatums2 = randomIntBetween(1, 300000); - StreamsProcessor processor1 = new PassthroughDatumCounterProcessor("proc1"); - StreamsProcessor processor2 = new PassthroughDatumCounterProcessor("proc2"); - StreamBuilder builder = new LocalStreamBuilder(); - builder.newPerpetualStream("sp1", new NumericMessageProvider(numDatums1)) - .newPerpetualStream("sp2", new NumericMessageProvider(numDatums2)) - .addStreamsProcessor("proc1", processor1, 1, "sp1") - .addStreamsProcessor("proc2", processor2, 1, "sp2") - .addStreamsPersistWriter("writer1", new DatumCounterWriter("writer"), 1, "proc1", "proc2"); - builder.start(); - assertEquals(numDatums1, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get()); - assertEquals(numDatums2, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get()); - assertEquals(numDatums1+numDatums2, DatumCounterWriter.COUNTS.get("writer").get()); - } finally { - String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); - String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); - removeRegisteredMBeans("proc1", "proc2", "writer1", "sp1", "sp2"); - } + @Test + public void testBasicMergeStream() { + try { + int numDatums1 = randomIntBetween(1, 300000); + int numDatums2 = randomIntBetween(1, 300000); + StreamsProcessor processor1 = new PassthroughDatumCounterProcessor("proc1"); + StreamsProcessor processor2 = new PassthroughDatumCounterProcessor("proc2"); + StreamBuilder builder = new LocalStreamBuilder(); + builder.newPerpetualStream("sp1", new NumericMessageProvider(numDatums1)) + .newPerpetualStream("sp2", new NumericMessageProvider(numDatums2)) + .addStreamsProcessor("proc1", processor1, 1, "sp1") + .addStreamsProcessor("proc2", processor2, 1, "sp2") + .addStreamsPersistWriter("writer1", new DatumCounterWriter("writer"), 1, "proc1", "proc2"); + builder.start(); + assertEquals(numDatums1, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get()); + assertEquals(numDatums2, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get()); + assertEquals(numDatums1+numDatums2, DatumCounterWriter.COUNTS.get("writer").get()); + } finally { + String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); + String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); + removeRegisteredMBeans("proc1", "proc2", "writer1", "sp1", "sp2"); } + } - @Test - public void testBasicBranch() { - try { - int numDatums = randomIntBetween(1, 300000); - StreamBuilder builder = new LocalStreamBuilder(50); - builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums)) - .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, "prov1") - .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, "prov1") - .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1", "proc2"); - builder.start(); - assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get()); - assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get()); - assertEquals(numDatums*2, DatumCounterWriter.COUNTS.get("writer").get()); - } finally { - String provClass = "-"+NumericMessageProvider.class.getCanonicalName(); - String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); - String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); - removeRegisteredMBeans("prov1", "proc1", "proc2", "w1"); - } + @Test + public void testBasicBranch() { + try { + int numDatums = randomIntBetween(1, 300000); + StreamBuilder builder = new LocalStreamBuilder(50); + builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums)) + .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, "prov1") + .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, "prov1") + .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1", "proc2"); + builder.start(); + assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc1").get()); + assertEquals(numDatums, PassthroughDatumCounterProcessor.COUNTS.get("proc2").get()); + assertEquals(numDatums*2, DatumCounterWriter.COUNTS.get("writer").get()); + } finally { + String provClass = "-"+NumericMessageProvider.class.getCanonicalName(); + String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); + String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); + removeRegisteredMBeans("prov1", "proc1", "proc2", "w1"); } + } - @Test - public void testSlowProcessorBranch() { - try { - int numDatums = 30; - int timeout = 2000; - Map<String, Object> config = Maps.newHashMap(); - config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout); - StreamBuilder builder = new LocalStreamBuilder(config); - builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums)) - .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1") - .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1"); - builder.start(); - assertEquals(numDatums, DatumCounterWriter.COUNTS.get("writer").get()); - } finally { - String provClass = "-"+NumericMessageProvider.class.getCanonicalName(); - String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); - String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); - removeRegisteredMBeans("prov1", "proc1", "w1"); - } + @Test + public void testSlowProcessorBranch() { + try { + int numDatums = 30; + int timeout = 2000; + Map<String, Object> config = Maps.newHashMap(); + config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout); + StreamBuilder builder = new LocalStreamBuilder(config); + builder.newPerpetualStream("prov1", new NumericMessageProvider(numDatums)) + .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1") + .addStreamsPersistWriter("w1", new DatumCounterWriter("writer"), 1, "proc1"); + builder.start(); + assertEquals(numDatums, DatumCounterWriter.COUNTS.get("writer").get()); + } finally { + String provClass = "-"+NumericMessageProvider.class.getCanonicalName(); + String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); + String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); + removeRegisteredMBeans("prov1", "proc1", "w1"); } + } - @Test - public void testConfiguredProviderTimeout() { - try { - Map<String, Object> config = Maps.newHashMap(); - int timeout = 10000; - config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout); - long start = System.currentTimeMillis(); - StreamBuilder builder = new LocalStreamBuilder(-1, config); - builder.newPerpetualStream("prov1", new EmptyResultSetProvider()) - .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, "prov1") - .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, "proc1") - .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1"); - builder.start(); - long end = System.currentTimeMillis(); - //We care mostly that it doesn't terminate too early. With thread shutdowns, etc, the actual time is indeterminate. Just make sure there is an upper bound - assertThat((int) (end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout)))); - } finally { - String provClass = "-"+NumericMessageProvider.class.getCanonicalName(); - String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); - String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); - removeRegisteredMBeans("prov1", "proc1", "proc2", "w1"); - } + @Test + public void testConfiguredProviderTimeout() { + try { + Map<String, Object> config = Maps.newHashMap(); + int timeout = 10000; + config.put(LocalStreamBuilder.TIMEOUT_KEY, timeout); + long start = System.currentTimeMillis(); + StreamBuilder builder = new LocalStreamBuilder(-1, config); + builder.newPerpetualStream("prov1", new EmptyResultSetProvider()) + .addStreamsProcessor("proc1", new PassthroughDatumCounterProcessor("proc1"), 1, "prov1") + .addStreamsProcessor("proc2", new PassthroughDatumCounterProcessor("proc2"), 1, "proc1") + .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1"); + builder.start(); + long end = System.currentTimeMillis(); + //We care mostly that it doesn't terminate too early. With thread shutdowns, etc, the actual time is indeterminate. Just make sure there is an upper bound + assertThat((int) (end - start), is(allOf(greaterThanOrEqualTo(timeout), lessThanOrEqualTo(4 * timeout)))); + } finally { + String provClass = "-"+NumericMessageProvider.class.getCanonicalName(); + String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); + String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); + removeRegisteredMBeans("prov1", "proc1", "proc2", "w1"); } + } - @Ignore - @Test - public void ensureShutdownWithBlockedQueue() throws InterruptedException { - try { - ExecutorService service = Executors.newSingleThreadExecutor(); - int before = Thread.activeCount(); - final StreamBuilder builder = new LocalStreamBuilder(); - builder.newPerpetualStream("prov1", new NumericMessageProvider(30)) - .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1") - .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1"); - service.submit(new Runnable() { - @Override - public void run() { - builder.start(); - } - }); - //Let streams spin up threads and start to process - Thread.sleep(500); - builder.stop(); - service.shutdownNow(); - service.awaitTermination(30000, TimeUnit.MILLISECONDS); - assertThat(Thread.activeCount(), is(equalTo(before))); - } finally { - String provClass = "-"+NumericMessageProvider.class.getCanonicalName(); - String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); - String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); - removeRegisteredMBeans("prov1", "proc1", "w1"); + @Ignore + @Test + public void ensureShutdownWithBlockedQueue() throws InterruptedException { + try { + ExecutorService service = Executors.newSingleThreadExecutor(); + int before = Thread.activeCount(); + final StreamBuilder builder = new LocalStreamBuilder(); + builder.newPerpetualStream("prov1", new NumericMessageProvider(30)) + .addStreamsProcessor("proc1", new SlowProcessor(), 1, "prov1") + .addStreamsPersistWriter("w1", new SystemOutWriter(), 1, "proc1"); + service.submit(new Runnable() { + @Override + public void run() { + builder.start(); } + }); + //Let streams spin up threads and start to process + Thread.sleep(500); + builder.stop(); + service.shutdownNow(); + service.awaitTermination(30000, TimeUnit.MILLISECONDS); + assertThat(Thread.activeCount(), is(equalTo(before))); + } finally { + String provClass = "-"+NumericMessageProvider.class.getCanonicalName(); + String procClass = "-"+PassthroughDatumCounterProcessor.class.getCanonicalName(); + String writerClass = "-"+DatumCounterWriter.class.getCanonicalName(); + removeRegisteredMBeans("prov1", "proc1", "w1"); } + } - @Before - private void clearCounters() { - PassthroughDatumCounterProcessor.COUNTS.clear(); - PassthroughDatumCounterProcessor.CLAIMED_ID.clear(); - PassthroughDatumCounterProcessor.SEEN_DATA.clear(); - DatumCounterWriter.COUNTS.clear(); - DatumCounterWriter.CLAIMED_ID.clear(); - DatumCounterWriter.SEEN_DATA.clear(); - DatumCounterWriter.RECEIVED.clear(); - } + @Before + private void clearCounters() { + PassthroughDatumCounterProcessor.COUNTS.clear(); + PassthroughDatumCounterProcessor.CLAIMED_ID.clear(); + PassthroughDatumCounterProcessor.SEEN_DATA.clear(); + DatumCounterWriter.COUNTS.clear(); + DatumCounterWriter.CLAIMED_ID.clear(); + DatumCounterWriter.SEEN_DATA.clear(); + DatumCounterWriter.RECEIVED.clear(); + } - /** - * Creates {@link org.apache.streams.core.StreamsProcessor} that passes any StreamsDatum it gets as an - * input and counts the number of items it processes. - * @param counter - * @return - */ - private StreamsProcessor createPassThroughProcessor(final AtomicInteger counter) { - StreamsProcessor processor = mock(StreamsProcessor.class); - when(processor.process(any(StreamsDatum.class))).thenAnswer(new Answer<List<StreamsDatum>>() { - @Override - public List<StreamsDatum> answer(InvocationOnMock invocationOnMock) throws Throwable { - List<StreamsDatum> datum = Lists.newLinkedList(); - if(counter != null) { - counter.incrementAndGet(); - } - datum.add((StreamsDatum) invocationOnMock.getArguments()[0] ); - return datum; - } - }); - return processor; - } + /** + * Creates {@link org.apache.streams.core.StreamsProcessor} that passes any StreamsDatum it gets as an + * input and counts the number of items it processes. + * @param counter + * @return + */ + private StreamsProcessor createPassThroughProcessor(final AtomicInteger counter) { + StreamsProcessor processor = mock(StreamsProcessor.class); + when(processor.process(any(StreamsDatum.class))).thenAnswer(new Answer<List<StreamsDatum>>() { + @Override + public List<StreamsDatum> answer(InvocationOnMock invocationOnMock) throws Throwable { + List<StreamsDatum> datum = Lists.newLinkedList(); + if(counter != null) { + counter.incrementAndGet(); + } + datum.add((StreamsDatum) invocationOnMock.getArguments()[0] ); + return datum; + } + }); + return processor; + } - private StreamsPersistWriter createSetCollectingWriter(final Set collector) { - return createSetCollectingWriter(collector, null); - } + private StreamsPersistWriter createSetCollectingWriter(final Set collector) { + return createSetCollectingWriter(collector, null); + } - /** - * Creates a StreamsPersistWriter that adds every datums document to a set - * @param collector - * @return - */ - private StreamsPersistWriter createSetCollectingWriter(final Set collector, final AtomicInteger counter) { - StreamsPersistWriter writer = mock(StreamsPersistWriter.class); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - if(counter != null) { - counter.incrementAndGet(); - } - collector.add(((StreamsDatum)invocationOnMock.getArguments()[0]).getDocument()); - return null; - } - }).when(writer).write(any(StreamsDatum.class)); - return writer; - } + /** + * Creates a StreamsPersistWriter that adds every datums document to a set + * @param collector + * @return + */ + private StreamsPersistWriter createSetCollectingWriter(final Set collector, final AtomicInteger counter) { + StreamsPersistWriter writer = mock(StreamsPersistWriter.class); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + if(counter != null) { + counter.incrementAndGet(); + } + collector.add(((StreamsDatum)invocationOnMock.getArguments()[0]).getDocument()); + return null; + } + }).when(writer).write(any(StreamsDatum.class)); + return writer; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java index a77dfec..2c61093 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/builders/ToyLocalBuilderExample.java @@ -30,16 +30,16 @@ import org.apache.streams.local.test.writer.DoNothingWriter; */ public class ToyLocalBuilderExample { - /** - * A simple example of how to run a stream in local mode. - * @param args - */ - public static void main(String[] args) { - StreamBuilder builder = new LocalStreamBuilder(); - builder.newReadCurrentStream("prov", new NumericMessageProvider(1000000)) - .addStreamsProcessor("proc", new DoNothingProcessor(), 100, "prov") - .addStreamsPersistWriter("writer", new DoNothingWriter(), 3, "proc"); - builder.start(); - } + /** + * A simple example of how to run a stream in local mode. + * @param args + */ + public static void main(String[] args) { + StreamBuilder builder = new LocalStreamBuilder(); + builder.newReadCurrentStream("prov", new NumericMessageProvider(1000000)) + .addStreamsProcessor("proc", new DoNothingProcessor(), 100, "prov") + .addStreamsPersistWriter("writer", new DoNothingWriter(), 3, "proc"); + builder.start(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java index 9775c6f..9d92bec 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/DatumStatusCounterTest.java @@ -23,112 +23,112 @@ import org.joda.time.DateTime; import org.junit.After; import org.junit.Test; +import java.lang.management.ManagementFactory; import javax.management.InstanceNotFoundException; import javax.management.ObjectName; -import java.lang.management.ManagementFactory; /** * */ public class DatumStatusCounterTest extends RandomizedTest { - private static final String MBEAN_ID = "test_id"; - private static final String STREAM_ID = "test_stream"; - private static long STREAM_START_TIME = (new DateTime()).getMillis(); + private static final String MBEAN_ID = "test_id"; + private static final String STREAM_ID = "test_stream"; + private static long STREAM_START_TIME = (new DateTime()).getMillis(); - /** - * Remove registered mbeans from previous tests - * @throws Exception - */ - @After - public void unregisterMXBean() throws Exception { - try { - ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME))); - } catch (InstanceNotFoundException ife) { - //No-op - } + /** + * Remove registered mbeans from previous tests + * @throws Exception + */ + @After + public void unregisterMXBean() throws Exception { + try { + ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(DatumStatusCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME))); + } catch (InstanceNotFoundException ife) { + //No-op } + } - /** - * Test Constructor can register the counter as an mxbean with throwing an exception. - */ - @Test - public void testConstructor() { - try { - new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - } catch (Throwable t) { - fail("Constructor Threw Exception : "+t.getMessage()); - } + /** + * Test Constructor can register the counter as an mxbean with throwing an exception. + */ + @Test + public void testConstructor() { + try { + new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + } catch (Throwable t) { + fail("Constructor Threw Exception : "+t.getMessage()); } + } - /** - * Test that you can increment passes and it returns the correct count - * @throws Exception - */ - @Test - @Repeat(iterations = 3) - public void testPassed() throws Exception { - DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - int numIncrements = randomIntBetween(1, 100000); - for(int i=0; i < numIncrements; ++i) { - counter.incrementPassedCount(); - } - assertEquals(numIncrements, counter.getNumPassed()); + /** + * Test that you can increment passes and it returns the correct count + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testPassed() throws Exception { + DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + int numIncrements = randomIntBetween(1, 100000); + for(int i=0; i < numIncrements; ++i) { + counter.incrementPassedCount(); + } + assertEquals(numIncrements, counter.getNumPassed()); - unregisterMXBean(); + unregisterMXBean(); - counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - numIncrements = randomIntBetween(1, 100000); - long total = 0; - for(int i=0; i < numIncrements; ++i) { - long delta = randomIntBetween(1, 100); - total += delta; - counter.incrementPassedCount(delta); - } - assertEquals(total, counter.getNumPassed()); + counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + numIncrements = randomIntBetween(1, 100000); + long total = 0; + for(int i=0; i < numIncrements; ++i) { + long delta = randomIntBetween(1, 100); + total += delta; + counter.incrementPassedCount(delta); } + assertEquals(total, counter.getNumPassed()); + } - /** - * Test that you can increment failed and it returns the correct count - * @throws Exception - */ - @Test - @Repeat(iterations = 3) - public void testFailed() throws Exception { - DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - int numIncrements = randomIntBetween(1, 100000); - for(int i=0; i < numIncrements; ++i) { - counter.incrementFailedCount(); - } - assertEquals(numIncrements, counter.getNumFailed()); + /** + * Test that you can increment failed and it returns the correct count + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testFailed() throws Exception { + DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + int numIncrements = randomIntBetween(1, 100000); + for(int i=0; i < numIncrements; ++i) { + counter.incrementFailedCount(); + } + assertEquals(numIncrements, counter.getNumFailed()); - unregisterMXBean(); + unregisterMXBean(); - counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - numIncrements = randomIntBetween(1, 100000); - long total = 0; - for(int i=0; i < numIncrements; ++i) { - long delta = randomIntBetween(1, 100); - total += delta; - counter.incrementFailedCount(delta); - } - assertEquals(total, counter.getNumFailed()); + counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + numIncrements = randomIntBetween(1, 100000); + long total = 0; + for(int i=0; i < numIncrements; ++i) { + long delta = randomIntBetween(1, 100); + total += delta; + counter.incrementFailedCount(delta); } + assertEquals(total, counter.getNumFailed()); + } - /** - * Test failure rate returns expected values - */ - @Test - @Repeat(iterations = 3) - public void testFailureRate() { - DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - assertEquals(0.0, counter.getFailRate(), 0); - int failures = randomIntBetween(0, 100000); - int passes = randomIntBetween(0, 100000); - counter.incrementPassedCount(passes); - counter.incrementFailedCount(failures); - assertEquals((double)failures / (double)(passes + failures), counter.getFailRate(), 0); - } + /** + * Test failure rate returns expected values + */ + @Test + @Repeat(iterations = 3) + public void testFailureRate() { + DatumStatusCounter counter = new DatumStatusCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + assertEquals(0.0, counter.getFailRate(), 0); + int failures = randomIntBetween(0, 100000); + int passes = randomIntBetween(0, 100000); + counter.incrementPassedCount(passes); + counter.incrementFailedCount(failures); + assertEquals((double)failures / (double)(passes + failures), counter.getFailRate(), 0); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java index 95fd610..544b065 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/counters/StreamsTaskCounterTest.java @@ -23,139 +23,139 @@ import org.joda.time.DateTime; import org.junit.After; import org.junit.Test; +import java.lang.management.ManagementFactory; import javax.management.InstanceNotFoundException; import javax.management.ObjectName; -import java.lang.management.ManagementFactory; /** * Unit tests for {@link org.apache.streams.local.counters.StreamsTaskCounter} */ public class StreamsTaskCounterTest extends RandomizedTest { - private static final String MBEAN_ID = "test_id"; - private static final String STREAM_ID = "test_stream"; - private static long STREAM_START_TIME = (new DateTime()).getMillis(); - - /** - * Remove registered mbeans from previous tests - * @throws Exception - */ - @After - public void unregisterMXBean() throws Exception { - try { - ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME))); - } catch (InstanceNotFoundException ife) { - //No-op - } + private static final String MBEAN_ID = "test_id"; + private static final String STREAM_ID = "test_stream"; + private static long STREAM_START_TIME = (new DateTime()).getMillis(); + + /** + * Remove registered mbeans from previous tests + * @throws Exception + */ + @After + public void unregisterMXBean() throws Exception { + try { + ManagementFactory.getPlatformMBeanServer().unregisterMBean(new ObjectName(String.format(StreamsTaskCounter.NAME_TEMPLATE, MBEAN_ID, STREAM_ID, STREAM_START_TIME))); + } catch (InstanceNotFoundException ife) { + //No-op } - - /** - * Test constructor does not throw errors - */ - @Test - public void testConstructor() { - try { - new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - } catch (Throwable t) { - fail("Constructor threw error : "+t.getMessage()); - } + } + + /** + * Test constructor does not throw errors + */ + @Test + public void testConstructor() { + try { + new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + } catch (Throwable t) { + fail("Constructor threw error : "+t.getMessage()); } - - /** - * Test emitted increments correctly and returns expected value - * @throws Exception - */ - @Test - @Repeat(iterations = 3) - public void testEmitted() throws Exception { - StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - int numIncrements = randomIntBetween(1, 100000); - for(int i=0; i < numIncrements; ++i) { - counter.incrementEmittedCount(); - } - assertEquals(numIncrements, counter.getNumEmitted()); - - unregisterMXBean(); - - counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - numIncrements = randomIntBetween(1, 100000); - long total = 0; - for(int i=0; i < numIncrements; ++i) { - long delta = randomIntBetween(1, 100); - total += delta; - counter.incrementEmittedCount(delta); - } - assertEquals(total, counter.getNumEmitted()); + } + + /** + * Test emitted increments correctly and returns expected value + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testEmitted() throws Exception { + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + int numIncrements = randomIntBetween(1, 100000); + for(int i=0; i < numIncrements; ++i) { + counter.incrementEmittedCount(); } + assertEquals(numIncrements, counter.getNumEmitted()); + + unregisterMXBean(); - /** - * Test received increments correctly and returns expected value - * @throws Exception - */ - @Test - @Repeat(iterations = 3) - public void testReceived() throws Exception { - StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - int numIncrements = randomIntBetween(1, 100000); - for(int i=0; i < numIncrements; ++i) { - counter.incrementReceivedCount(); - } - assertEquals(numIncrements, counter.getNumReceived()); - - unregisterMXBean(); - - counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - numIncrements = randomIntBetween(1, 100000); - long total = 0; - for(int i=0; i < numIncrements; ++i) { - long delta = randomIntBetween(1, 100); - total += delta; - counter.incrementReceivedCount(delta); - } - assertEquals(total, counter.getNumReceived()); + counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + numIncrements = randomIntBetween(1, 100000); + long total = 0; + for(int i=0; i < numIncrements; ++i) { + long delta = randomIntBetween(1, 100); + total += delta; + counter.incrementEmittedCount(delta); } + assertEquals(total, counter.getNumEmitted()); + } + + /** + * Test received increments correctly and returns expected value + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testReceived() throws Exception { + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + int numIncrements = randomIntBetween(1, 100000); + for(int i=0; i < numIncrements; ++i) { + counter.incrementReceivedCount(); + } + assertEquals(numIncrements, counter.getNumReceived()); + + unregisterMXBean(); - /** - * Test errors increments correctly and returns expected value - * @throws Exception - */ - @Test - @Repeat(iterations = 3) - public void testError() throws Exception { - StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - int numIncrements = randomIntBetween(1, 100000); - for(int i=0; i < numIncrements; ++i) { - counter.incrementErrorCount(); - } - assertEquals(numIncrements, counter.getNumUnhandledErrors()); - - unregisterMXBean(); - - counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - numIncrements = randomIntBetween(1, 100000); - long total = 0; - for(int i=0; i < numIncrements; ++i) { - long delta = randomIntBetween(1, 100); - total += delta; - counter.incrementErrorCount(delta); - } - assertEquals(total, counter.getNumUnhandledErrors()); + counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + numIncrements = randomIntBetween(1, 100000); + long total = 0; + for(int i=0; i < numIncrements; ++i) { + long delta = randomIntBetween(1, 100); + total += delta; + counter.incrementReceivedCount(delta); } + assertEquals(total, counter.getNumReceived()); + } + + /** + * Test errors increments correctly and returns expected value + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testError() throws Exception { + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + int numIncrements = randomIntBetween(1, 100000); + for(int i=0; i < numIncrements; ++i) { + counter.incrementErrorCount(); + } + assertEquals(numIncrements, counter.getNumUnhandledErrors()); + + unregisterMXBean(); - /** - * Test error rate returns expected value - * @throws Exception - */ - @Test - @Repeat(iterations = 3) - public void testErrorRate() throws Exception { - StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); - assertEquals(0.0, counter.getErrorRate(), 0); - int failures = randomIntBetween(0, 100000); - int received = randomIntBetween(0, 100000); - counter.incrementReceivedCount(received); - counter.incrementErrorCount(failures); - assertEquals((double)failures / (double)(received), counter.getErrorRate(), 0); + counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + numIncrements = randomIntBetween(1, 100000); + long total = 0; + for(int i=0; i < numIncrements; ++i) { + long delta = randomIntBetween(1, 100); + total += delta; + counter.incrementErrorCount(delta); } + assertEquals(total, counter.getNumUnhandledErrors()); + } + + /** + * Test error rate returns expected value + * @throws Exception + */ + @Test + @Repeat(iterations = 3) + public void testErrorRate() throws Exception { + StreamsTaskCounter counter = new StreamsTaskCounter(MBEAN_ID, STREAM_ID, STREAM_START_TIME); + assertEquals(0.0, counter.getErrorRate(), 0); + int failures = randomIntBetween(0, 100000); + int received = randomIntBetween(0, 100000); + counter.incrementReceivedCount(received); + counter.incrementErrorCount(failures); + assertEquals((double)failures / (double)(received), counter.getErrorRate(), 0); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java index 7e33ab9..e3b608d 100644 --- a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java @@ -20,6 +20,7 @@ package org.apache.streams.local.executors; import org.apache.streams.local.builders.LocalStreamBuilder; import org.apache.streams.util.ComponentUtils; + import org.junit.After; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; @@ -27,7 +28,6 @@ import org.mockito.stubbing.Answer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,7 +35,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * @@ -43,90 +42,90 @@ import static org.mockito.Mockito.when; public class ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest { - @After - public void removeLocalMBeans() { - try { - ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); - } catch (Exception e) { - //No op. proceed to next test - } + @After + public void removeLocalMBeans() { + try { + ComponentUtils.removeAllMBeansOfDomain("org.apache.streams.local"); + } catch (Exception e) { + //No op. proceed to next test } - - @Test - public void testShutDownOnException() { - LocalStreamBuilder sb = mock(LocalStreamBuilder.class); - final AtomicBoolean isShutdown = new AtomicBoolean(false); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - isShutdown.set(true); - return null; - } - }).when(sb).stop(); - - final CountDownLatch latch = new CountDownLatch(1); - - Runnable runnable = new Runnable() { - @Override - public void run() { - latch.countDown(); - throw new RuntimeException("Testing Throwable Handling!"); - } - }; - - ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb); - executor.execute(runnable); - try { - latch.await(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - executor.shutdownNow(); - try { - executor.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - assertTrue("Expected StreamBuilder shutdown to be called", isShutdown.get()); + } + + @Test + public void testShutDownOnException() { + LocalStreamBuilder sb = mock(LocalStreamBuilder.class); + final AtomicBoolean isShutdown = new AtomicBoolean(false); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + isShutdown.set(true); + return null; + } + }).when(sb).stop(); + + final CountDownLatch latch = new CountDownLatch(1); + + Runnable runnable = new Runnable() { + @Override + public void run() { + latch.countDown(); + throw new RuntimeException("Testing Throwable Handling!"); + } + }; + + ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb); + executor.execute(runnable); + try { + latch.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); } - - - @Test - public void testNormalExecution() { - LocalStreamBuilder sb = mock(LocalStreamBuilder.class); - final AtomicBoolean isShutdown = new AtomicBoolean(false); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - isShutdown.set(true); - return null; - } - }).when(sb).stop(); - - final CountDownLatch latch = new CountDownLatch(1); - - Runnable runnable = new Runnable() { - @Override - public void run() { - latch.countDown(); - } - }; - - ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb); - executor.execute(runnable); - try { - latch.await(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - executor.shutdownNow(); - try { - executor.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - assertFalse("Expected StreamBuilder shutdown to be called", isShutdown.get()); + executor.shutdownNow(); + try { + executor.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + assertTrue("Expected StreamBuilder shutdown to be called", isShutdown.get()); + } + + + @Test + public void testNormalExecution() { + LocalStreamBuilder sb = mock(LocalStreamBuilder.class); + final AtomicBoolean isShutdown = new AtomicBoolean(false); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + isShutdown.set(true); + return null; + } + }).when(sb).stop(); + + final CountDownLatch latch = new CountDownLatch(1); + + Runnable runnable = new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }; + + ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb); + executor.execute(runnable); + try { + latch.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + executor.shutdownNow(); + try { + executor.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); } + assertFalse("Expected StreamBuilder shutdown to be called", isShutdown.get()); + } }
