STREAMS-179 | Added new thread pool executor to shutdown streams
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e091c6cc Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e091c6cc Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e091c6cc Branch: refs/heads/STREAMS-170 Commit: e091c6ccc4f6ca849a638402ce8de5a7d73a8df0 Parents: 507e679 Author: Ryan Ebanks <rebanks@Informations-MacBook-Pro-3.local> Authored: Fri Sep 19 15:22:32 2014 -0500 Committer: Ryan Ebanks <rebanks@Informations-MacBook-Pro-3.local> Committed: Fri Sep 19 15:22:32 2014 -0500 ---------------------------------------------------------------------- .../local/builders/LocalStreamBuilder.java | 3 +- ...amOnUnhandleThrowableThreadPoolExecutor.java | 45 ++++++++ ...nhandledThrowableThreadPoolExecutorTest.java | 103 +++++++++++++++++++ 3 files changed, 150 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java index 25f9fe7..bec1ff9 100644 --- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java @@ -20,6 +20,7 @@ package org.apache.streams.local.builders; import org.apache.log4j.spi.LoggerFactory; import org.apache.streams.core.*; +import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor; import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread; import org.apache.streams.local.tasks.StatusCounterMonitorThread; import org.apache.streams.local.tasks.StreamsProviderTask; @@ -173,7 +174,7 @@ public class LocalStreamBuilder implements StreamBuilder { public void start() { attachShutdownHandler(); boolean isRunning = true; - this.executor = Executors.newFixedThreadPool(this.totalTasks); + this.executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(this.totalTasks, this); this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1); Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>(); tasks = new HashMap<String, List<StreamsTask>>(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java new file mode 100644 index 0000000..f8d6343 --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.java @@ -0,0 +1,45 @@ +package org.apache.streams.local.executors; + +import org.apache.streams.local.builders.LocalStreamBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.*; + +/** + * @see {@link java.util.concurrent.ThreadPoolExecutor} + */ +public class ShutdownStreamOnUnhandleThrowableThreadPoolExecutor extends ThreadPoolExecutor { + + private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownStreamOnUnhandleThrowableThreadPoolExecutor.class); + + private LocalStreamBuilder streamBuilder; + private volatile boolean isStoped; + + /** + * Creates a fixed size thread pool where corePoolSize & maximumPoolSize equal numThreads with an unbounded queue. + * @param numThreads number of threads in pool + * @param streamBuilder streambuilder to call {@link org.apache.streams.core.StreamBuilder#stop()} on upon receiving an unhandled throwable + */ + public ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(int numThreads, LocalStreamBuilder streamBuilder) { + super(numThreads, numThreads, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); + this.streamBuilder = streamBuilder; + this.isStoped = false; + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + if(t != null) { + LOGGER.error("Runnable, {}, exited with an unhandled throwable! : {}", r.getClass(), t); + LOGGER.error("Attempting to shut down stream."); + synchronized (this) { + if (!this.isStoped) { + this.isStoped = true; + this.streamBuilder.stop(); + } + } + } else { + LOGGER.trace("Runnable, {}, finished executing.", r.getClass()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e091c6cc/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java ---------------------------------------------------------------------- diff --git a/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java new file mode 100644 index 0000000..17e8dd9 --- /dev/null +++ b/streams-runtimes/streams-runtime-local/src/test/java/org/apache/streams/local/executors/ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest.java @@ -0,0 +1,103 @@ +package org.apache.streams.local.executors; + +import org.apache.streams.local.builders.LocalStreamBuilder; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * + */ +public class ShutdownStreamOnUnhandledThrowableThreadPoolExecutorTest { + + + @Test + public void testShutDownOnException() { + LocalStreamBuilder sb = mock(LocalStreamBuilder.class); + final AtomicBoolean isShutdown = new AtomicBoolean(false); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + isShutdown.set(true); + return null; + } + }).when(sb).stop(); + + final CountDownLatch latch = new CountDownLatch(1); + + Runnable runnable = new Runnable() { + @Override + public void run() { + latch.countDown(); + throw new RuntimeException("Testing Throwable Handling!"); + } + }; + + ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb); + executor.execute(runnable); + try { + latch.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + executor.shutdownNow(); + try { + executor.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + assertTrue("Expected StreamBuilder shutdown to be called", isShutdown.get()); + } + + + @Test + public void testNormalExecution() { + LocalStreamBuilder sb = mock(LocalStreamBuilder.class); + final AtomicBoolean isShutdown = new AtomicBoolean(false); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + isShutdown.set(true); + return null; + } + }).when(sb).stop(); + + final CountDownLatch latch = new CountDownLatch(1); + + Runnable runnable = new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }; + + ExecutorService executor = new ShutdownStreamOnUnhandleThrowableThreadPoolExecutor(1, sb); + executor.execute(runnable); + try { + latch.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + executor.shutdownNow(); + try { + executor.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + assertFalse("Expected StreamBuilder shutdown to be called", isShutdown.get()); + } + + +}