Repository: aurora Updated Branches: refs/heads/master f3da5aea2 -> 5fc7baf5a
Adding logging threadpool executor. Reviewed at https://reviews.apache.org/r/33456/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/5fc7baf5 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/5fc7baf5 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/5fc7baf5 Branch: refs/heads/master Commit: 5fc7baf5aa4cf6a94247ed9a287be431a3f229c3 Parents: f3da5ae Author: Maxim Khutornenko <[email protected]> Authored: Fri May 1 15:37:31 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Fri May 1 15:37:31 2015 -0700 ---------------------------------------------------------------------- .../apache/aurora/scheduler/base/AsyncUtil.java | 84 ++++++++++++++------ .../scheduler/events/PubsubEventModule.java | 14 +--- .../aurora/scheduler/base/AsyncUtilTest.java | 64 +++++++++------ 3 files changed, 105 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/5fc7baf5/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java b/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java index f657e05..d6d1350 100644 --- a/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java +++ b/src/main/java/org/apache/aurora/scheduler/base/AsyncUtil.java @@ -13,9 +13,12 @@ */ package org.apache.aurora.scheduler.base; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -50,30 +53,12 @@ public final class AsyncUtil { return new ScheduledThreadPoolExecutor( poolSize, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build()) { - - @Override - protected void afterExecute(Runnable runnable, Throwable throwable) { - // See java.util.concurrent.ThreadPoolExecutor#afterExecute(Runnable, Throwable) - // for more details and an implementation example. - super.afterExecute(runnable, throwable); - if (throwable == null) { - if (runnable instanceof Future) { - try { - Future<?> future = (Future<?>) runnable; - if (future.isDone()) { - future.get(); - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } catch (ExecutionException ee) { - logger.log(Level.SEVERE, ee.toString(), ee); - } + @Override + protected void afterExecute(Runnable runnable, Throwable throwable) { + super.afterExecute(runnable, throwable); + evaluateResult(runnable, throwable, logger); } - } else { - logger.log(Level.SEVERE, throwable.toString(), throwable); - } - } - }; + }; } /** @@ -89,4 +74,57 @@ public final class AsyncUtil { return loggingScheduledExecutor(1, nameFormat, logger); } + + /** + * Creates a {@link ThreadPoolExecutor} that logs unhandled errors. + * + * @param corePoolSize see {@link ThreadPoolExecutor}. + * @param maxPoolSize see {@link ThreadPoolExecutor}. + * @param workQueue see {@link ThreadPoolExecutor}. + * @param nameFormat Thread naming format. + * @param logger Logger instance. + * @return instance of {@link ThreadPoolExecutor} enabled to log unhandled exceptions. + */ + public static ThreadPoolExecutor loggingExecutor( + int corePoolSize, + int maxPoolSize, + BlockingQueue<Runnable> workQueue, + String nameFormat, + final Logger logger) { + + return new ThreadPoolExecutor( + corePoolSize, + maxPoolSize, + 0L, + TimeUnit.MILLISECONDS, + workQueue, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build()) { + @Override + protected void afterExecute(Runnable runnable, Throwable throwable) { + super.afterExecute(runnable, throwable); + evaluateResult(runnable, throwable, logger); + } + }; + } + + private static void evaluateResult(Runnable runnable, Throwable throwable, Logger logger) { + // See java.util.concurrent.ThreadPoolExecutor#afterExecute(Runnable, Throwable) + // for more details and an implementation example. + if (throwable == null) { + if (runnable instanceof Future) { + try { + Future<?> future = (Future<?>) runnable; + if (future.isDone()) { + future.get(); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } catch (ExecutionException ee) { + logger.log(Level.SEVERE, ee.toString(), ee); + } + } + } else { + logger.log(Level.SEVERE, throwable.toString(), throwable); + } + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/5fc7baf5/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java index 3a4d40a..82d479e 100644 --- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java +++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java @@ -18,8 +18,6 @@ import java.lang.annotation.Target; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import javax.inject.Inject; @@ -34,7 +32,6 @@ import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.AbstractModule; import com.google.inject.Binder; import com.google.inject.TypeLiteral; @@ -48,6 +45,7 @@ import com.twitter.common.base.Command; import com.twitter.common.stats.StatsProvider; import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.events.NotifyingSchedulingFilter.NotifyDelegate; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.filter.SchedulingFilter; @@ -100,16 +98,12 @@ public final class PubsubEventModule extends AbstractModule { .annotatedWith(PubsubExecutorQueue.class) .toInstance(executorQueue); - executor = new ThreadPoolExecutor( + executor = AsyncUtil.loggingExecutor( MAX_ASYNC_EVENT_BUS_THREADS.get(), MAX_ASYNC_EVENT_BUS_THREADS.get(), - 0L, - TimeUnit.MILLISECONDS, executorQueue, - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("AsyncTaskEvents-%d") - .build()); + "AsyncTaskEvents-%d", + log); LifecycleModule.bindStartupAction(binder(), RegisterGauges.class); } else { http://git-wip-us.apache.org/repos/asf/aurora/blob/5fc7baf5/src/test/java/org/apache/aurora/scheduler/base/AsyncUtilTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/base/AsyncUtilTest.java b/src/test/java/org/apache/aurora/scheduler/base/AsyncUtilTest.java index e990f52..2397186 100644 --- a/src/test/java/org/apache/aurora/scheduler/base/AsyncUtilTest.java +++ b/src/test/java/org/apache/aurora/scheduler/base/AsyncUtilTest.java @@ -15,7 +15,9 @@ package org.apache.aurora.scheduler.base; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,46 +34,67 @@ import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expectLastCall; public class AsyncUtilTest extends EasyMockTest { + private static final String NAME_FORMAT = "Test-%d"; private Logger logger; - private ScheduledThreadPoolExecutor executor; private CountDownLatch latch; @Before public void setUp() { logger = createMock(Logger.class); latch = new CountDownLatch(1); - executor = AsyncUtil.singleThreadLoggingScheduledExecutor("Test-%d", logger); } @Test public void testScheduleLogging() throws Exception { - logger.log( - eq(Level.SEVERE), - contains("Expected exception."), - EasyMock.<ExecutionException>anyObject()); + expectLogging(); - expectLastCall().andAnswer(new IAnswer<Object>() { + control.replay(); + + scheduledExecutor().schedule(new Runnable() { @Override - public Object answer() throws Throwable { - latch.countDown(); - return null; + public void run() { + throw new IllegalArgumentException("Expected exception."); } - }).once(); + }, 0, TimeUnit.MILLISECONDS); + + latch.await(); + } + + @Test + public void testSubmitLogging() throws Exception { + expectLogging(); control.replay(); - executor.schedule(new Runnable() { + scheduledExecutor().submit(new Runnable() { @Override public void run() { throw new IllegalArgumentException("Expected exception."); } - }, 0, TimeUnit.MILLISECONDS); + }); latch.await(); } @Test - public void testSubmitLogging() throws Exception { + public void testExecuteLogging() throws Exception { + expectLogging(); + + control.replay(); + + ThreadPoolExecutor executor = + AsyncUtil.loggingExecutor(1, 1, new LinkedBlockingQueue<Runnable>(), NAME_FORMAT, logger); + executor.execute(new Runnable() { + @Override + public void run() { + throw new IllegalArgumentException("Expected exception."); + } + }); + + latch.await(); + } + + private void expectLogging() { logger.log( eq(Level.SEVERE), contains("Expected exception."), @@ -84,16 +107,9 @@ public class AsyncUtilTest extends EasyMockTest { return null; } }).once(); + } - control.replay(); - - executor.submit(new Runnable() { - @Override - public void run() { - throw new IllegalArgumentException("Expected exception."); - } - }); - - latch.await(); + private ScheduledThreadPoolExecutor scheduledExecutor() { + return AsyncUtil.singleThreadLoggingScheduledExecutor(NAME_FORMAT, logger); } }
