Repository: aurora Updated Branches: refs/heads/master da48ad20b -> 887ffd2c4
Use the system-wide AsyncExecutor for pubsub events. Bugs closed: AURORA-1395 Reviewed at https://reviews.apache.org/r/37215/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/887ffd2c Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/887ffd2c Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/887ffd2c Branch: refs/heads/master Commit: 887ffd2c485f1f5bc9639f8b2aec7f8a0626f2c0 Parents: da48ad2 Author: Bill Farner <[email protected]> Authored: Thu Aug 13 12:00:02 2015 -0400 Committer: Bill Farner <[email protected]> Committed: Thu Aug 13 12:00:02 2015 -0400 ---------------------------------------------------------------------- .../apache/aurora/scheduler/app/AppModule.java | 2 +- .../aurora/scheduler/async/AsyncModule.java | 2 + .../scheduler/events/PubsubEventModule.java | 43 +++----------------- .../aurora/scheduler/app/SchedulerIT.java | 4 ++ .../scheduler/events/PubsubEventModuleTest.java | 43 ++++++++------------ .../scheduler/reconciliation/KillRetryTest.java | 6 ++- .../scheduling/TaskSchedulerImplTest.java | 8 +++- .../state/MaintenanceControllerImplTest.java | 7 +++- 8 files changed, 49 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/main/java/org/apache/aurora/scheduler/app/AppModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java index 4cc1127..4eee8e3 100644 --- a/src/main/java/org/apache/aurora/scheduler/app/AppModule.java +++ b/src/main/java/org/apache/aurora/scheduler/app/AppModule.java @@ -111,8 +111,8 @@ public class AppModule extends AbstractModule { .setThriftAPIVersion(THRIFT_API_VERSION) .setStatsUrlPrefix(statsUrlPrefix))); + install(new PubsubEventModule()); // Filter layering: notifier filter -> base impl - install(new PubsubEventModule(true)); PubsubEventModule.bindSchedulingFilterDelegate(binder()).to(SchedulingFilterImpl.class); bind(SchedulingFilterImpl.class).in(Singleton.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java index 8416ea0..a8c6445 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java +++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java @@ -15,6 +15,7 @@ package org.apache.aurora.scheduler.async; import java.lang.annotation.Retention; import java.lang.annotation.Target; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.logging.Logger; @@ -83,6 +84,7 @@ public class AsyncModule extends AbstractModule { }); SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterGauges.class); + bind(Executor.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class); bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class); bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class); } http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java index ccecfdb..84c58e1 100644 --- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java +++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java @@ -15,7 +15,6 @@ package org.apache.aurora.scheduler.events; import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -31,19 +30,15 @@ import com.google.common.eventbus.Subscribe; import com.google.common.eventbus.SubscriberExceptionContext; import com.google.common.eventbus.SubscriberExceptionHandler; import com.google.common.util.concurrent.AbstractIdleService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.AbstractModule; import com.google.inject.Binder; import com.google.inject.Provides; import com.google.inject.binder.LinkedBindingBuilder; import com.google.inject.multibindings.Multibinder; -import com.twitter.common.args.Arg; -import com.twitter.common.args.CmdLine; -import com.twitter.common.args.constraints.Positive; import com.twitter.common.stats.StatsProvider; import org.apache.aurora.scheduler.SchedulerServicesModule; -import org.apache.aurora.scheduler.base.AsyncUtil; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; import org.apache.aurora.scheduler.events.NotifyingSchedulingFilter.NotifyDelegate; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.apache.aurora.scheduler.filter.SchedulingFilter; @@ -55,29 +50,18 @@ import static java.util.Objects.requireNonNull; */ public final class PubsubEventModule extends AbstractModule { - private final boolean async; private final Logger log; @VisibleForTesting - static final String PUBSUB_EXECUTOR_QUEUE_GAUGE = "pubsub_executor_queue_size"; - - @VisibleForTesting static final String EXCEPTIONS_STAT = "event_bus_exceptions"; - @Positive - @CmdLine(name = "max_async_event_bus_threads", - help = "Maximum number of concurrent threads to allow for the async event processing bus.") - private static final Arg<Integer> MAX_ASYNC_EVENT_BUS_THREADS = Arg.create(4); - @VisibleForTesting - PubsubEventModule(boolean async, Logger log) { + PubsubEventModule(Logger log) { this.log = requireNonNull(log); - this.async = requireNonNull(async); } - // TODO(wfarner): Remove the async argument and accept an Executor instead. - public PubsubEventModule(boolean async) { - this(async, Logger.getLogger(PubsubEventModule.class.getName())); + public PubsubEventModule() { + this(Logger.getLogger(PubsubEventModule.class.getName())); } @VisibleForTesting @@ -93,22 +77,7 @@ public final class PubsubEventModule extends AbstractModule { @Provides @Singleton - EventBus provideEventBus(StatsProvider statsProvider) { - Executor executor; - if (async) { - LinkedBlockingQueue<Runnable> executorQueue = new LinkedBlockingQueue<>(); - statsProvider.makeGauge(PUBSUB_EXECUTOR_QUEUE_GAUGE, executorQueue::size); - - executor = AsyncUtil.loggingExecutor( - MAX_ASYNC_EVENT_BUS_THREADS.get(), - MAX_ASYNC_EVENT_BUS_THREADS.get(), - executorQueue, - "AsyncTaskEvents-%d", - log); - } else { - executor = MoreExecutors.sameThreadExecutor(); - } - + EventBus provideEventBus(@AsyncExecutor Executor executor, StatsProvider statsProvider) { final AtomicLong subscriberExceptions = statsProvider.makeCounter(EXCEPTIONS_STAT); EventBus eventBus = new AsyncEventBus( executor, @@ -188,7 +157,7 @@ public final class PubsubEventModule extends AbstractModule { * @param binder Binder to bind against. */ public static void bind(Binder binder) { - binder.install(new PubsubEventModule(true)); + binder.install(new PubsubEventModule()); } private static Multibinder<EventSubscriber> getSubscriberBinder(Binder binder) { http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java index 8d8f8a2..0151dd1 100644 --- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java +++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java @@ -77,6 +77,8 @@ import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.gen.storage.Transaction; import org.apache.aurora.gen.storage.storageConstants; import org.apache.aurora.scheduler.Resources; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.async.FlushableWorkQueue; import org.apache.aurora.scheduler.configuration.ConfigurationManager; import org.apache.aurora.scheduler.log.Log; import org.apache.aurora.scheduler.log.Log.Entry; @@ -384,6 +386,8 @@ public class SchedulerIT extends BaseZooKeeperTest { scheduler.getValue().registered(driver, FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(), MasterInfo.getDefaultInstance()); + // Registration is published on the event bus, which will be gated until a flush. + injector.getInstance(Key.get(FlushableWorkQueue.class, AsyncExecutor.class)).flush(); awaitSchedulerReady(); http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java index 4c189c1..983a831 100644 --- a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java @@ -13,11 +13,14 @@ */ package org.apache.aurora.scheduler.events; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; @@ -30,6 +33,7 @@ import com.twitter.common.stats.StatsProvider; import com.twitter.common.testing.easymock.EasyMockTest; import org.apache.aurora.scheduler.SchedulerServicesModule; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.testing.FakeStatsProvider; import org.easymock.EasyMock; @@ -44,51 +48,39 @@ public class PubsubEventModuleTest extends EasyMockTest { private FakeStatsProvider statsProvider; private Logger logger; + private UncaughtExceptionHandler exceptionHandler; + private SchedulingFilter schedulingFilter; @Before public void setUp() { statsProvider = new FakeStatsProvider(); logger = createMock(Logger.class); + exceptionHandler = createMock(UncaughtExceptionHandler.class); + schedulingFilter = createMock(SchedulingFilter.class); } @Test public void testHandlesDeadEvent() { logger.warning(String.format(PubsubEventModule.DEAD_EVENT_MESSAGE, "hello")); - Injector injector = getInjector(false); control.replay(); - injector.getInstance(EventBus.class).post("hello"); + getInjector().getInstance(EventBus.class).post("hello"); } @Test - public void testPubsubQueueGauge() throws Exception { - Injector injector = getInjector(true); + public void testPubsubExceptionTracking() throws Exception { + logger.log(eq(Level.SEVERE), anyString(), EasyMock.<Throwable>anyObject()); control.replay(); - injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute(); - assertEquals( - 0L, - statsProvider.getLongValue(PubsubEventModule.PUBSUB_EXECUTOR_QUEUE_GAUGE) - ); - } - - @Test - public void testPubsubExceptionTracking() throws Exception { Injector injector = getInjector( - false, new AbstractModule() { @Override protected void configure() { PubsubEventModule.bindSubscriber(binder(), ThrowingSubscriber.class); } }); - - logger.log(eq(Level.SEVERE), anyString(), EasyMock.<Throwable>anyObject()); - - control.replay(); - injector.getInstance(Key.get(ExceptionalCommand.class, StartupStage.class)).execute(); assertEquals(0L, statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT)); injector.getInstance(EventBus.class).post("hello"); @@ -102,20 +94,21 @@ public class PubsubEventModuleTest extends EasyMockTest { } } - public Injector getInjector(boolean isAsync, Module... additionalModules) { + public Injector getInjector(Module... additionalModules) { return Guice.createInjector( new LifecycleModule(), - new PubsubEventModule(isAsync, logger), + new PubsubEventModule(logger), new SchedulerServicesModule(), new AbstractModule() { @Override protected void configure() { - bind(Thread.UncaughtExceptionHandler.class) - .toInstance(createMock(Thread.UncaughtExceptionHandler.class)); + bind(Executor.class).annotatedWith(AsyncExecutor.class) + .toInstance(MoreExecutors.sameThreadExecutor()); + + bind(UncaughtExceptionHandler.class).toInstance(exceptionHandler); bind(StatsProvider.class).toInstance(statsProvider); - PubsubEventModule.bindSchedulingFilterDelegate(binder()) - .toInstance(createMock(SchedulingFilter.class)); + PubsubEventModule.bindSchedulingFilterDelegate(binder()).toInstance(schedulingFilter); for (Module module : additionalModules) { install(module); } http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java index 957cbd0..0962ed9 100644 --- a/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java +++ b/src/test/java/org/apache/aurora/scheduler/reconciliation/KillRetryTest.java @@ -14,10 +14,12 @@ package org.apache.aurora.scheduler.reconciliation; import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.Executor; import javax.inject.Singleton; import com.google.common.eventbus.EventBus; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; @@ -73,7 +75,7 @@ public class KillRetryTest extends EasyMockTest { Injector injector = Guice.createInjector( new LifecycleModule(), - new PubsubEventModule(false), + new PubsubEventModule(), new AbstractModule() { @Override protected void configure() { @@ -86,6 +88,8 @@ public class KillRetryTest extends EasyMockTest { bind(StatsProvider.class).toInstance(statsProvider); bind(UncaughtExceptionHandler.class) .toInstance(createMock(UncaughtExceptionHandler.class)); + bind(Executor.class).annotatedWith(AsyncExecutor.class) + .toInstance(MoreExecutors.sameThreadExecutor()); } } ); http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java index 492334b..102e8d3 100644 --- a/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/scheduling/TaskSchedulerImplTest.java @@ -14,11 +14,13 @@ package org.apache.aurora.scheduler.scheduling; import java.util.Map; +import java.util.concurrent.Executor; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; @@ -28,6 +30,7 @@ import com.twitter.common.testing.easymock.EasyMockTest; import com.twitter.common.util.Clock; import org.apache.aurora.gen.ScheduledTask; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; import org.apache.aurora.scheduler.base.JobKeys; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.TaskGroupKey; @@ -94,10 +97,13 @@ public class TaskSchedulerImplTest extends EasyMockTest { private Injector getInjector(final Storage storageImpl) { return Guice.createInjector( - new PubsubEventModule(false), + new PubsubEventModule(), new AbstractModule() { @Override protected void configure() { + + bind(Executor.class).annotatedWith(AsyncExecutor.class) + .toInstance(MoreExecutors.sameThreadExecutor()); bind(new TypeLiteral<BiCache<String, TaskGroupKey>>() { }).toInstance(reservations); bind(TaskScheduler.class).to(TaskSchedulerImpl.class); bind(Preemptor.class).toInstance(preemptor); http://git-wip-us.apache.org/repos/asf/aurora/blob/887ffd2c/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java index ba2edd8..abeaa49 100644 --- a/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/state/MaintenanceControllerImplTest.java @@ -14,9 +14,11 @@ package org.apache.aurora.scheduler.state; import java.util.Set; +import java.util.concurrent.Executor; import com.google.common.base.Optional; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; @@ -31,6 +33,7 @@ import org.apache.aurora.gen.MaintenanceMode; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.TaskConfig; +import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; import org.apache.aurora.scheduler.base.Query; import org.apache.aurora.scheduler.base.Tasks; import org.apache.aurora.scheduler.events.EventSink; @@ -71,7 +74,7 @@ public class MaintenanceControllerImplTest extends EasyMockTest { stateManager = createMock(StateManager.class); Injector injector = Guice.createInjector( - new PubsubEventModule(false), + new PubsubEventModule(), new AbstractModule() { @Override protected void configure() { @@ -79,6 +82,8 @@ public class MaintenanceControllerImplTest extends EasyMockTest { bind(Storage.class).toInstance(storageUtil.storage); bind(StateManager.class).toInstance(stateManager); bind(StatsProvider.class).toInstance(new FakeStatsProvider()); + bind(Executor.class).annotatedWith(AsyncExecutor.class) + .toInstance(MoreExecutors.sameThreadExecutor()); } }); maintenance = injector.getInstance(MaintenanceController.class);
