Repository: aurora
Updated Branches:
  refs/heads/master 62e7d2358 -> f6c40a279


MesosCallbackHandler uses separate eventbus for registered call

We should have `registered` use its own eventbus so it does not get blocked
by other calls.

Bugs closed: AURORA-1953

Reviewed at https://reviews.apache.org/r/63316/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/f6c40a27
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/f6c40a27
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/f6c40a27

Branch: refs/heads/master
Commit: f6c40a279a460d6943dffaf54d1a35d86deed7a1
Parents: 62e7d23
Author: Jordan Ly <[email protected]>
Authored: Fri Oct 27 14:34:01 2017 -0700
Committer: Bill Farner <[email protected]>
Committed: Fri Oct 27 14:34:01 2017 -0700

----------------------------------------------------------------------
 .../aurora/scheduler/SchedulerModule.java       |   2 +-
 .../scheduler/events/PubsubEventModule.java     | 126 +++++++++++++++----
 .../scheduler/mesos/MesosCallbackHandler.java   |  14 ++-
 .../scheduler/mesos/SchedulerDriverModule.java  |   4 +-
 .../scheduler/events/PubsubEventModuleTest.java |  23 +++-
 .../mesos/MesosCallbackHandlerTest.java         |   7 +-
 6 files changed, 135 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/f6c40a27/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java 
b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
index 3821819..31991ea 100644
--- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java
@@ -97,7 +97,7 @@ public class SchedulerModule extends AbstractModule {
       }
     });
 
-    PubsubEventModule.bindSubscriber(binder(), SchedulerLifecycle.class);
+    PubsubEventModule.bindRegisteredSubscriber(binder(), 
SchedulerLifecycle.class);
     bind(TaskVars.class).in(Singleton.class);
     PubsubEventModule.bindSubscriber(binder(), TaskVars.class);
     addSchedulerActiveServiceBinding(binder()).to(TaskVars.class);

http://git-wip-us.apache.org/repos/asf/aurora/blob/f6c40a27/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java 
b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
index 6758f8c..c260573 100644
--- a/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/events/PubsubEventModule.java
@@ -13,11 +13,14 @@
  */
 package org.apache.aurora.scheduler.events;
 
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.inject.Inject;
+import javax.inject.Qualifier;
 import javax.inject.Singleton;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -25,6 +28,7 @@ import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.DeadEvent;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
+import com.google.common.eventbus.SubscriberExceptionHandler;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.AbstractModule;
 import com.google.inject.Binder;
@@ -34,10 +38,15 @@ import com.google.inject.multibindings.Multibinder;
 import org.apache.aurora.common.stats.StatsProvider;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
+import org.apache.aurora.scheduler.base.AsyncUtil;
 import org.apache.aurora.scheduler.events.PubsubEvent.EventSubscriber;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -45,20 +54,36 @@ import static java.util.Objects.requireNonNull;
  */
 public final class PubsubEventModule extends AbstractModule {
 
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  private @interface DeadEventHandler { }
+
+  @Qualifier
+  @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+  public @interface RegisteredEvents { }
+
   private final Logger log;
+  private final Executor registeredExecutor;
 
   @VisibleForTesting
   static final String EXCEPTIONS_STAT = "event_bus_exceptions";
   @VisibleForTesting
   static final String EVENT_BUS_DEAD_EVENTS = "event_bus_dead_events";
 
-  @VisibleForTesting
-  PubsubEventModule(Logger log) {
+  public PubsubEventModule() {
+    this(LoggerFactory.getLogger(PubsubEventModule.class));
+  }
+
+  private PubsubEventModule(Logger log) {
     this.log = requireNonNull(log);
+    this.registeredExecutor = 
AsyncUtil.singleThreadLoggingScheduledExecutor("RegisteredEventSink",
+        log);
   }
 
-  public PubsubEventModule() {
-    this(LoggerFactory.getLogger(PubsubEventModule.class));
+  @VisibleForTesting
+  PubsubEventModule(Logger log, Executor registeredExecutor) {
+    this.log = requireNonNull(log);
+    this.registeredExecutor = requireNonNull(registeredExecutor);
   }
 
   @VisibleForTesting
@@ -67,33 +92,47 @@ public final class PubsubEventModule extends AbstractModule 
{
   @Override
   protected void configure() {
     // Ensure at least an empty binding is present.
-    getSubscriberBinder(binder());
+    Multibinder.newSetBinder(binder(), EventSubscriber.class);
+    Multibinder.newSetBinder(binder(), EventSubscriber.class, 
RegisteredEvents.class);
+
     // TODO(ksweeney): Would this be better as a scheduler active service?
     
SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterSubscribers.class);
   }
 
   @Provides
   @Singleton
-  EventBus provideEventBus(@AsyncExecutor Executor executor, StatsProvider 
statsProvider) {
+  SubscriberExceptionHandler provideSubscriberExceptionHandler(StatsProvider 
statsProvider) {
     final AtomicLong subscriberExceptions = 
statsProvider.makeCounter(EXCEPTIONS_STAT);
-    EventBus eventBus = new AsyncEventBus(
-        executor,
-        (exception, context) -> {
-          subscriberExceptions.incrementAndGet();
-          log.error(
-              "Failed to dispatch event to " + context.getSubscriberMethod() + 
": " + exception,
-              exception);
-        }
-    );
+    return (exception, context) -> {
+      subscriberExceptions.incrementAndGet();
+      log.error(
+          "Failed to dispatch event to " + context.getSubscriberMethod() + ": 
" + exception,
+          exception);
+    };
+  }
 
+  @Provides
+  @DeadEventHandler
+  @Singleton
+  Object provideDeadEventHandler(StatsProvider statsProvider) {
     final AtomicLong deadEventCounter = 
statsProvider.makeCounter(EVENT_BUS_DEAD_EVENTS);
-    eventBus.register(new Object() {
+    return new Object() {
       @Subscribe
       public void logDeadEvent(DeadEvent event) {
         deadEventCounter.incrementAndGet();
         log.warn(String.format(DEAD_EVENT_MESSAGE, event.getEvent()));
       }
-    });
+    };
+  }
+
+  @Provides
+  @Singleton
+  EventBus provideEventBus(@AsyncExecutor Executor executor,
+                           SubscriberExceptionHandler 
subscriberExceptionHandler,
+                           @DeadEventHandler Object deadEventHandler) {
+
+    EventBus eventBus = new AsyncEventBus(executor, 
subscriberExceptionHandler);
+    eventBus.register(deadEventHandler);
     return eventBus;
   }
 
@@ -103,21 +142,46 @@ public final class PubsubEventModule extends 
AbstractModule {
     return eventBus::post;
   }
 
+  @Provides
+  @RegisteredEvents
+  @Singleton
+  EventBus provideRegisteredEventBus(SubscriberExceptionHandler 
subscriberExceptionHandler,
+                                     @DeadEventHandler Object 
deadEventHandler) {
+
+    EventBus eventBus = new AsyncEventBus(registeredExecutor, 
subscriberExceptionHandler);
+    eventBus.register(deadEventHandler);
+    return eventBus;
+  }
+
+  @Provides
+  @RegisteredEvents
+  @Singleton
+  EventSink provideRegisteredEventSink(@RegisteredEvents EventBus eventBus) {
+    return eventBus::post;
+  }
+
   static class RegisterSubscribers extends AbstractIdleService {
     private final EventBus eventBus;
+    private final EventBus registeredEventBus;
     private final Set<EventSubscriber> subscribers;
+    private final Set<EventSubscriber> registeredSubscribers;
 
     @Inject
-    RegisterSubscribers(EventBus eventBus, Set<EventSubscriber> subscribers) {
+    RegisterSubscribers(EventBus eventBus,
+                        @RegisteredEvents EventBus registeredEventBus,
+                        Set<EventSubscriber> subscribers,
+                        @RegisteredEvents Set<EventSubscriber> 
registeredSubscribers) {
+
       this.eventBus = requireNonNull(eventBus);
+      this.registeredEventBus = requireNonNull(registeredEventBus);
       this.subscribers = requireNonNull(subscribers);
+      this.registeredSubscribers = requireNonNull(registeredSubscribers);
     }
 
     @Override
     protected void startUp() {
-      for (EventSubscriber subscriber : subscribers) {
-        eventBus.register(subscriber);
-      }
+      subscribers.forEach(eventBus::register);
+      registeredSubscribers.forEach(registeredEventBus::register);
     }
 
     @Override
@@ -135,10 +199,6 @@ public final class PubsubEventModule extends 
AbstractModule {
     binder.install(new PubsubEventModule());
   }
 
-  private static Multibinder<EventSubscriber> getSubscriberBinder(Binder 
binder) {
-    return Multibinder.newSetBinder(binder, EventSubscriber.class);
-  }
-
   /**
    * Binds a subscriber to receive task events.
    *
@@ -146,6 +206,20 @@ public final class PubsubEventModule extends 
AbstractModule {
    * @param subscriber Subscriber implementation class to register for events.
    */
   public static void bindSubscriber(Binder binder, Class<? extends 
EventSubscriber> subscriber) {
-    getSubscriberBinder(binder).addBinding().to(subscriber);
+    Multibinder.newSetBinder(binder, 
EventSubscriber.class).addBinding().to(subscriber);
+  }
+
+  /**
+   * Binds a subscriber to receive Mesos registered events.
+   *
+   * @param binder Binder to bind the subscriber with.
+   * @param subscriber Subscriber implementation class to register for events.
+   */
+  public static void bindRegisteredSubscriber(Binder binder,
+                                              Class<? extends EventSubscriber> 
subscriber) {
+
+    Multibinder.newSetBinder(binder, EventSubscriber.class, 
RegisteredEvents.class)
+        .addBinding()
+        .to(subscriber);
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/f6c40a27/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java 
b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
index 68d19ec..e93c4fa 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandler.java
@@ -38,6 +38,7 @@ import org.apache.aurora.scheduler.base.Conversions;
 import org.apache.aurora.scheduler.base.SchedulerException;
 import org.apache.aurora.scheduler.events.EventSink;
 import org.apache.aurora.scheduler.events.PubsubEvent;
+import org.apache.aurora.scheduler.events.PubsubEventModule;
 import org.apache.aurora.scheduler.offers.OfferManager;
 import org.apache.aurora.scheduler.offers.OffersModule;
 import org.apache.aurora.scheduler.state.MaintenanceController;
@@ -90,6 +91,7 @@ public interface MesosCallbackHandler {
     private final Clock clock;
     private final MaintenanceController maintenanceController;
     private final Amount<Long, Time> unavailabilityThreshold;
+    private final EventSink registeredEventSink;
 
     private final AtomicLong offersRescinded;
     private final AtomicLong slavesLost;
@@ -124,7 +126,8 @@ public interface MesosCallbackHandler {
         Driver driver,
         Clock clock,
         MaintenanceController controller,
-        @OffersModule.UnavailabilityThreshold Amount<Long, Time> 
unavailabilityThreshold) {
+        @OffersModule.UnavailabilityThreshold Amount<Long, Time> 
unavailabilityThreshold,
+        @PubsubEventModule.RegisteredEvents EventSink registeredEventSink) {
 
       this(
           storage,
@@ -138,7 +141,8 @@ public interface MesosCallbackHandler {
           driver,
           clock,
           controller,
-          unavailabilityThreshold);
+          unavailabilityThreshold,
+          registeredEventSink);
     }
 
     @VisibleForTesting
@@ -154,7 +158,8 @@ public interface MesosCallbackHandler {
         Driver driver,
         Clock clock,
         MaintenanceController maintenanceController,
-        Amount<Long, Time> unavailabilityThreshold) {
+        Amount<Long, Time> unavailabilityThreshold,
+        EventSink registeredEventSink) {
 
       this.storage = requireNonNull(storage);
       this.lifecycle = requireNonNull(lifecycle);
@@ -167,6 +172,7 @@ public interface MesosCallbackHandler {
       this.clock = requireNonNull(clock);
       this.maintenanceController = requireNonNull(maintenanceController);
       this.unavailabilityThreshold = requireNonNull(unavailabilityThreshold);
+      this.registeredEventSink = requireNonNull(registeredEventSink);
 
       this.offersRescinded = statsProvider.makeCounter("offers_rescinded");
       this.slavesLost = statsProvider.makeCounter("slaves_lost");
@@ -189,7 +195,7 @@ public interface MesosCallbackHandler {
           (Storage.MutateWork.NoResult.Quiet) storeProvider ->
               
storeProvider.getSchedulerStore().saveFrameworkId(frameworkId.getValue()));
       frameworkRegistered.set(true);
-      eventSink.post(new PubsubEvent.DriverRegistered());
+      registeredEventSink.post(new PubsubEvent.DriverRegistered());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/f6c40a27/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java 
b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
index b54e1f3..64c7ab5 100644
--- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverModule.java
@@ -76,12 +76,12 @@ public class SchedulerDriverModule extends AbstractModule {
       case V0_DRIVER:
         bind(Driver.class).to(VersionedSchedulerDriverService.class);
         bind(VersionedSchedulerDriverService.class).in(Singleton.class);
-        PubsubEventModule.bindSubscriber(binder(), 
VersionedSchedulerDriverService.class);
+        PubsubEventModule.bindRegisteredSubscriber(binder(), 
VersionedSchedulerDriverService.class);
         break;
       case V1_DRIVER:
         bind(Driver.class).to(VersionedSchedulerDriverService.class);
         bind(VersionedSchedulerDriverService.class).in(Singleton.class);
-        PubsubEventModule.bindSubscriber(binder(), 
VersionedSchedulerDriverService.class);
+        PubsubEventModule.bindRegisteredSubscriber(binder(), 
VersionedSchedulerDriverService.class);
         break;
       default:
         checkState(false, "Unknown driver kind.");

http://git-wip-us.apache.org/repos/asf/aurora/blob/f6c40a27/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java 
b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
index f656b27..5f4cb14 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/events/PubsubEventModuleTest.java
@@ -39,6 +39,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 
 import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 
 public class PubsubEventModuleTest extends EasyMockTest {
@@ -57,16 +58,22 @@ public class PubsubEventModuleTest extends EasyMockTest {
   @Test
   public void testHandlesDeadEvent() {
     logger.warn(String.format(PubsubEventModule.DEAD_EVENT_MESSAGE, "hello"));
+    logger.warn(String.format(PubsubEventModule.DEAD_EVENT_MESSAGE, "hello2"));
 
     control.replay();
 
-    getInjector().getInstance(EventBus.class).post("hello");
-    assertEquals(1L, 
statsProvider.getLongValue(PubsubEventModule.EVENT_BUS_DEAD_EVENTS));
+    Injector injector = getInjector();
+    injector.getInstance(EventBus.class).post("hello");
+    assertEquals(1, 
statsProvider.getLongValue(PubsubEventModule.EVENT_BUS_DEAD_EVENTS));
+    injector.getInstance(Key.get(EventBus.class, 
PubsubEventModule.RegisteredEvents.class))
+        .post("hello2");
+    assertEquals(2, 
statsProvider.getLongValue(PubsubEventModule.EVENT_BUS_DEAD_EVENTS));
   }
 
   @Test
   public void testPubsubExceptionTracking() throws Exception {
     logger.error(anyString(), EasyMock.<Throwable>anyObject());
+    expectLastCall().times(2);
 
     control.replay();
 
@@ -75,14 +82,18 @@ public class PubsubEventModuleTest extends EasyMockTest {
           @Override
           protected void configure() {
             PubsubEventModule.bindSubscriber(binder(), 
ThrowingSubscriber.class);
+            PubsubEventModule.bindRegisteredSubscriber(binder(), 
ThrowingSubscriber.class);
           }
         });
     injector.getInstance(Key.get(GuavaUtils.ServiceManagerIface.class, 
AppStartup.class))
         .startAsync().awaitHealthy();
-    assertEquals(0L, 
statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT));
+    assertEquals(0, 
statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT));
     injector.getInstance(EventBus.class).post("hello");
-    assertEquals(1L, 
statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT));
-    assertEquals(0L, 
statsProvider.getLongValue(PubsubEventModule.EVENT_BUS_DEAD_EVENTS));
+    assertEquals(1, 
statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT));
+    injector.getInstance(Key.get(EventBus.class, 
PubsubEventModule.RegisteredEvents.class))
+        .post("hello2");
+    assertEquals(2, 
statsProvider.getLongValue(PubsubEventModule.EXCEPTIONS_STAT));
+    assertEquals(0, 
statsProvider.getLongValue(PubsubEventModule.EVENT_BUS_DEAD_EVENTS));
   }
 
   static class ThrowingSubscriber implements PubsubEvent.EventSubscriber {
@@ -95,7 +106,7 @@ public class PubsubEventModuleTest extends EasyMockTest {
   public Injector getInjector(Module... additionalModules) {
     return Guice.createInjector(
         new LifecycleModule(),
-        new PubsubEventModule(logger),
+        new PubsubEventModule(logger, MoreExecutors.directExecutor()),
         new SchedulerServicesModule(),
         new AbstractModule() {
           @Override

http://git-wip-us.apache.org/repos/asf/aurora/blob/f6c40a27/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java 
b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
index 4d1a676..51d0371 100644
--- 
a/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
+++ 
b/src/test/java/org/apache/aurora/scheduler/mesos/MesosCallbackHandlerTest.java
@@ -165,6 +165,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
   private Logger injectedLog;
   private FakeClock clock;
   private MaintenanceController controller;
+  private EventSink registeredEventSink;
 
   private MesosCallbackHandler handler;
 
@@ -179,6 +180,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
     driver = createMock(Driver.class);
     clock = new FakeClock();
     controller = createMock(MaintenanceController.class);
+    registeredEventSink = createMock(EventSink.class);
     createHandler(false);
   }
 
@@ -205,7 +207,8 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
         driver,
         clock,
         controller,
-        DRAIN_THRESHOLD);
+        DRAIN_THRESHOLD,
+        registeredEventSink);
   }
 
   @Test
@@ -216,7 +219,7 @@ public class MesosCallbackHandlerTest extends EasyMockTest {
     storageUtil.schedulerStore.saveFrameworkId(FRAMEWORK_ID);
     expectLastCall();
 
-    eventSink.post(new PubsubEvent.DriverRegistered());
+    registeredEventSink.post(new PubsubEvent.DriverRegistered());
 
     control.replay();
 

Reply via email to