Repository: aurora Updated Branches: refs/heads/master 62e7d2358 -> f6c40a279
MesosCallbackHandler uses separate eventbus for registered call We should have `registered` use its own eventbus so it does not get blocked by other calls. Bugs closed: AURORA-1953 Reviewed at https://reviews.apache.org/r/63316/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f6c40a27 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f6c40a27 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f6c40a27 Branch: refs/heads/master Commit: f6c40a279a460d6943dffaf54d1a35d86deed7a1 Parents: 62e7d23 Author: Jordan Ly <[email protected]> Authored: Fri Oct 27 14:34:01 2017 -0700 Committer: Bill Farner <[email protected]> Committed: Fri Oct 27 14:34:01 2017 -0700 ---------------------------------------------------------------------- .../aurora/scheduler/SchedulerModule.java | 2 +- .../scheduler/events/PubsubEventModule.java | 126 +++++++++++++++---- .../scheduler/mesos/MesosCallbackHandler.java | 14 ++- .../scheduler/mesos/SchedulerDriverModule.java | 4 +- .../scheduler/events/PubsubEventModuleTest.java | 23 +++- .../mesos/MesosCallbackHandlerTest.java | 7 +- 6 files changed, 135 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/f6c40a27/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java index 3821819..31991ea 100644 --- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java +++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java @@ -97,7 +97,7 @@ public class SchedulerModule extends AbstractModule { } }); - PubsubEventModule.bindSubscriber(binder(), SchedulerLifecycle.class); + PubsubEventModule.bindRegisteredSubscriber(binder(), SchedulerLifecycle.class); bind(TaskVars.class).in(Singleton.class); PubsubEventModule.bindSubscriber(binder(), TaskVars.class); addSchedulerActiveServiceBinding(binder()).to(TaskVars.class); http://git-wip-us.apache.org/repos/asf/aurora/blob/f6c40a27/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java index 6758f8c..c260573 100644 --- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java +++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java @@ -13,11 +13,14 @@ */ package org.apache.aurora.scheduler.events; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; +import javax.inject.Qualifier; import javax.inject.Singleton; import com.google.common.annotations.VisibleForTesting; @@ -25,6 +28,7 @@ import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.DeadEvent; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; +import com.google.common.eventbus.SubscriberExceptionHandler; import com.google.common.util.concurrent.AbstractIdleService; import com.google.inject.AbstractModule; import com.google.inject.Binder; @@ -34,10 +38,15 @@ import com.google.inject.multibindings.Multibinder; import org.apache.aurora.common.stats.StatsProvider; import org.apache.aurora.scheduler.SchedulerServicesModule; import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; +import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; /** @@ -45,20 +54,36 @@ import static java.util.Objects.requireNonNull; */ public final class PubsubEventModule extends AbstractModule { + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + private @interface DeadEventHandler { } + + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface RegisteredEvents { } + private final Logger log; + private final Executor registeredExecutor; @VisibleForTesting static final String EXCEPTIONS_STAT = "event_bus_exceptions"; @VisibleForTesting static final String EVENT_BUS_DEAD_EVENTS = "event_bus_dead_events"; - @VisibleForTesting - PubsubEventModule(Logger log) { + public PubsubEventModule() { + this(LoggerFactory.getLogger(PubsubEventModule.class)); + } + + private PubsubEventModule(Logger log) { this.log = requireNonNull(log); + this.registeredExecutor = AsyncUtil.singleThreadLoggingScheduledExecutor("RegisteredEventSink", + log); } - public PubsubEventModule() { - this(LoggerFactory.getLogger(PubsubEventModule.class)); + @VisibleForTesting + PubsubEventModule(Logger log, Executor registeredExecutor) { + this.log = requireNonNull(log); + this.registeredExecutor = requireNonNull(registeredExecutor); } @VisibleForTesting @@ -67,33 +92,47 @@ public final class PubsubEventModule extends AbstractModule { @Override protected void configure() { // Ensure at least an empty binding is present. - getSubscriberBinder(binder()); + Multibinder.newSetBinder(binder(), EventSubscriber.class); + Multibinder.newSetBinder(binder(), EventSubscriber.class, RegisteredEvents.class); + // TODO(ksweeney): Would this be better as a scheduler active service? SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterSubscribers.class); } @Provides @Singleton - EventBus provideEventBus(@AsyncExecutor Executor executor, StatsProvider statsProvider) { + SubscriberExceptionHandler provideSubscriberExceptionHandler(StatsProvider statsProvider) { final AtomicLong subscriberExceptions = statsProvider.makeCounter(EXCEPTIONS_STAT); - EventBus eventBus = new AsyncEventBus( - executor, - (exception, context) -> { - subscriberExceptions.incrementAndGet(); - log.error( - "Failed to dispatch event to " + context.getSubscriberMethod() + ": " + exception, - exception); - } - ); + return (exception, context) -> { + subscriberExceptions.incrementAndGet(); + log.error( + "Failed to dispatch event to " + context.getSubscriberMethod() + ": " + exception, + exception); + }; + } + @Provides + @DeadEventHandler + @Singleton + Object provideDeadEventHandler(StatsProvider statsProvider) { final AtomicLong deadEventCounter = statsProvider.makeCounter(EVENT_BUS_DEAD_EVENTS); - eventBus.register(new Object() { + return new Object() { @Subscribe public void logDeadEvent(DeadEvent event) { deadEventCounter.incrementAndGet(); log.warn(String.format(DEAD_EVENT_MESSAGE, event.getEvent())); } - }); + }; + } + + @Provides + @Singleton + EventBus provideEventBus(@AsyncExecutor Executor executor, + SubscriberExceptionHandler subscriberExceptionHandler, + @DeadEventHandler Object deadEventHandler) { + + EventBus eventBus = new AsyncEventBus(executor, subscriberExceptionHandler); + eventBus.register(deadEventHandler); return eventBus; } @@ -103,21 +142,46 @@ public final class PubsubEventModule extends AbstractModule { return eventBus::post; } + @Provides + @RegisteredEvents + @Singleton + EventBus provideRegisteredEventBus(SubscriberExceptionHandler subscriberExceptionHandler, + @DeadEventHandler Object deadEventHandler) { + + EventBus eventBus = new AsyncEventBus(registeredExecutor, subscriberExceptionHandler); + eventBus.register(deadEventHandler); + return eventBus; + } + + @Provides + @RegisteredEvents + @Singleton + EventSink provideRegisteredEventSink(@RegisteredEvents EventBus eventBus) { + return eventBus::post; + } + static class RegisterSubscribers extends AbstractIdleService { private final EventBus eventBus; + private final EventBus registeredEventBus; private final Set<EventSubscriber> subscribers; + private final Set<EventSubscriber> registeredSubscribers; @Inject - RegisterSubscribers(EventBus eventBus, Set<EventSubscriber> subscribers) { + RegisterSubscribers(EventBus eventBus, + @RegisteredEvents EventBus registeredEventBus, + Set<EventSubscriber> subscribers, + @RegisteredEvents Set<EventSubscriber> registeredSubscribers) { + this.eventBus = requireNonNull(eventBus); + this.registeredEventBus = requireNonNull(registeredEventBus); this.subscribers = requireNonNull(subscribers); + this.registeredSubscribers = requireNonNull(registeredSubscribers); } @Override protected void startUp() { - for (EventSubscriber subscriber : subscribers) { - eventBus.register(subscriber); - } + subscribers.forEach(eventBus::register); + registeredSubscribers.forEach(registeredEventBus::register); } @Override @@ -135,10 +199,6 @@ public final class PubsubEventModule extends AbstractModule { binder.install(new PubsubEventModule()); } - private static Multibinder<EventSubscriber> getSubscriberBinder(Binder binder) { - return Multibinder.newSetBinder(binder, EventSubscriber.class); - } - /** * Binds a subscriber to receive task events. * @@ -146,6 +206,20 @@ public final class PubsubEventModule extends AbstractModule { * @param subscriber Subscriber implementation class to register for events. */ public static void bindSubscriber(Binder binder, Class<? extends EventSubscriber> subscriber) { - getSubscriberBinder(binder).addBinding().to(subscriber); + Multibinder.newSetBinder(binder, EventSubscriber.class).addBinding().to(subscriber); + } + + /** + * Binds a subscriber to receive Mesos registered events. + * + * @param binder Binder to bind the subscriber with. + * @param subscriber Subscriber implementation class to register for events. + */ + public static void bindRegisteredSubscriber(Binder binder, + Class<? extends EventSubscriber> subscriber) { + + Multibinder.newSetBinder(binder, EventSubscriber.class, RegisteredEvents.class) + .addBinding() + .to(subscriber); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/f6c40a27/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java index 68d19ec..e93c4fa 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java @@ -38,6 +38,7 @@ import org.apache.aurora.scheduler.base.Conversions; import org.apache.aurora.scheduler.base.SchedulerException; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; +import org.apache.aurora.scheduler.events.PubsubEventModule; import org.apache.aurora.scheduler.offers.OfferManager; import org.apache.aurora.scheduler.offers.OffersModule; import org.apache.aurora.scheduler.state.MaintenanceController; @@ -90,6 +91,7 @@ public interface MesosCallbackHandler { private final Clock clock; private final MaintenanceController maintenanceController; private final Amount<Long, Time> unavailabilityThreshold; + private final EventSink registeredEventSink; private final AtomicLong offersRescinded; private final AtomicLong slavesLost; @@ -124,7 +126,8 @@ public interface MesosCallbackHandler { Driver driver, Clock clock, MaintenanceController controller, - @OffersModule.UnavailabilityThreshold Amount<Long, Time> unavailabilityThreshold) { + @OffersModule.UnavailabilityThreshold Amount<Long, Time> unavailabilityThreshold, + @PubsubEventModule.RegisteredEvents EventSink registeredEventSink) { this( storage, @@ -138,7 +141,8 @@ public interface MesosCallbackHandler { driver, clock, controller, - unavailabilityThreshold); + unavailabilityThreshold, + registeredEventSink); } @VisibleForTesting @@ -154,7 +158,8 @@ public interface MesosCallbackHandler { Driver driver, Clock clock, MaintenanceController maintenanceController, - Amount<Long, Time> unavailabilityThreshold) { + Amount<Long, Time> unavailabilityThreshold, + EventSink registeredEventSink) { this.storage = requireNonNull(storage); this.lifecycle = requireNonNull(lifecycle); @@ -167,6 +172,7 @@ public interface MesosCallbackHandler { this.clock = requireNonNull(clock); this.maintenanceController = requireNonNull(maintenanceController); this.unavailabilityThreshold = requireNonNull(unavailabilityThreshold); + this.registeredEventSink = requireNonNull(registeredEventSink); this.offersRescinded = statsProvider.makeCounter("offers_rescinded"); this.slavesLost = statsProvider.makeCounter("slaves_lost"); @@ -189,7 +195,7 @@ public interface MesosCallbackHandler { (Storage.MutateWork.NoResult.Quiet) storeProvider -> storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue())); frameworkRegistered.set(true); - eventSink.post(new PubsubEvent.DriverRegistered()); + registeredEventSink.post(new PubsubEvent.DriverRegistered()); } @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/f6c40a27/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java index b54e1f3..64c7ab5 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java @@ -76,12 +76,12 @@ public class SchedulerDriverModule extends AbstractModule { case V0_DRIVER: bind(Driver.class).to(VersionedSchedulerDriverService.class); bind(VersionedSchedulerDriverService.class).in(Singleton.class); - PubsubEventModule.bindSubscriber(binder(), VersionedSchedulerDriverService.class); + PubsubEventModule.bindRegisteredSubscriber(binder(), VersionedSchedulerDriverService.class); break; case V1_DRIVER: bind(Driver.class).to(VersionedSchedulerDriverService.class); bind(VersionedSchedulerDriverService.class).in(Singleton.class); - PubsubEventModule.bindSubscriber(binder(), VersionedSchedulerDriverService.class); + PubsubEventModule.bindRegisteredSubscriber(binder(), VersionedSchedulerDriverService.class); break; default: checkState(false, "Unknown driver kind."); http://git-wip-us.apache.org/repos/asf/aurora/blob/f6c40a27/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java index f656b27..5f4cb14 100644 --- a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java +++ b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java @@ -39,6 +39,7 @@ import org.junit.Test; import org.slf4j.Logger; import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; public class PubsubEventModuleTest extends EasyMockTest { @@ -57,16 +58,22 @@ public class PubsubEventModuleTest extends EasyMockTest { @Test public void testHandlesDeadEvent() { logger.warn(String.format(PubsubEventModule.DEAD_EVENT_MESSAGE, "hello")); + logger.warn(String.format(PubsubEventModule.DEAD_EVENT_MESSAGE, "hello2")); control.replay(); - getInjector().getInstance(EventBus.class).post("hello"); - assertEquals(1L, statsProvider.getLongValue(PubsubEventModule.EVENT_BUS_DEAD_EVENTS)); + Injector injector = getInjector(); + injector.getInstance(EventBus.class).post("hello"); + assertEquals(1, statsProvider.getLongValue(PubsubEventModule.EVENT_BUS_DEAD_EVENTS)); + injector.getInstance(Key.get(EventBus.class, PubsubEventModule.RegisteredEvents.class)) + .post("hello2"); + assertEquals(2, statsProvider.getLongValue(PubsubEventModule.EVENT_BUS_DEAD_EVENTS)); } @Test public void testPubsubExceptionTracking() throws Exception { logger.error(anyString(), EasyMock.<Throwable>anyObject()); + expectLastCall().times(2); control.replay(); @@ -75,14 +82,18 @@ public class PubsubEventModuleTest extends EasyMockTest { @Override protected void configure() { PubsubEventModule.bindSubscriber(binder(), ThrowingSubscriber.class); + PubsubEventModule.bindRegisteredSubscriber(binder(), ThrowingSubscriber.class); } }); injector.getInstance(Key.get(GuavaUtils.ServiceManagerIface.class, AppStartup.class)) .startAsync().awaitHealthy(); - assertEquals(0L, statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT)); + assertEquals(0, statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT)); injector.getInstance(EventBus.class).post("hello"); - assertEquals(1L, statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT)); - assertEquals(0L, statsProvider.getLongValue(PubsubEventModule.EVENT_BUS_DEAD_EVENTS)); + assertEquals(1, statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT)); + injector.getInstance(Key.get(EventBus.class, PubsubEventModule.RegisteredEvents.class)) + .post("hello2"); + assertEquals(2, statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT)); + assertEquals(0, statsProvider.getLongValue(PubsubEventModule.EVENT_BUS_DEAD_EVENTS)); } static class ThrowingSubscriber implements PubsubEvent.EventSubscriber { @@ -95,7 +106,7 @@ public class PubsubEventModuleTest extends EasyMockTest { public Injector getInjector(Module... additionalModules) { return Guice.createInjector( new LifecycleModule(), - new PubsubEventModule(logger), + new PubsubEventModule(logger, MoreExecutors.directExecutor()), new SchedulerServicesModule(), new AbstractModule() { @Override http://git-wip-us.apache.org/repos/asf/aurora/blob/f6c40a27/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java index 4d1a676..51d0371 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java @@ -165,6 +165,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest { private Logger injectedLog; private FakeClock clock; private MaintenanceController controller; + private EventSink registeredEventSink; private MesosCallbackHandler handler; @@ -179,6 +180,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest { driver = createMock(Driver.class); clock = new FakeClock(); controller = createMock(MaintenanceController.class); + registeredEventSink = createMock(EventSink.class); createHandler(false); } @@ -205,7 +207,8 @@ public class MesosCallbackHandlerTest extends EasyMockTest { driver, clock, controller, - DRAIN_THRESHOLD); + DRAIN_THRESHOLD, + registeredEventSink); } @Test @@ -216,7 +219,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest { storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID); expectLastCall(); - eventSink.post(new PubsubEvent.DriverRegistered()); + registeredEventSink.post(new PubsubEvent.DriverRegistered()); control.replay();
