Repository: aurora Updated Branches: refs/heads/master 2dc1d59f1 -> b3f8da3ed
Updated scheduler to process status updates asynchronously in batches. Bugs closed: AURORA-1228 Reviewed at https://reviews.apache.org/r/33689/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/b3f8da3e Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/b3f8da3e Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/b3f8da3e Branch: refs/heads/master Commit: b3f8da3edaf8eb822afe0ff7d5d6a129959e5069 Parents: 2dc1d59 Author: Ben Mahler <[email protected]> Authored: Wed May 13 17:57:18 2015 -0700 Committer: Maxim Khutornenko <[email protected]> Committed: Wed May 13 17:57:18 2015 -0700 ---------------------------------------------------------------------- .../aurora/benchmark/StatusUpdateBenchmark.java | 42 ++++- .../aurora/benchmark/fakes/FakeDriver.java | 10 ++ .../aurora/scheduler/SchedulerModule.java | 22 ++- .../aurora/scheduler/UserTaskLauncher.java | 165 +++++++++++++++---- .../scheduler/async/GcExecutorLauncher.java | 1 + .../apache/aurora/scheduler/mesos/Driver.java | 13 ++ .../scheduler/mesos/MesosSchedulerImpl.java | 9 +- .../scheduler/mesos/SchedulerDriverService.java | 11 ++ .../aurora/scheduler/UserTaskLauncherTest.java | 151 ++++++++++++++--- .../scheduler/async/GcExecutorLauncherTest.java | 18 +- .../scheduler/mesos/MesosSchedulerImplTest.java | 2 - 11 files changed, 373 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java index 7bb64dd..4c63cc7 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java +++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java @@ -15,8 +15,12 @@ package org.apache.aurora.benchmark; import java.util.List; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; import javax.inject.Singleton; @@ -29,6 +33,7 @@ import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Provides; +import com.google.inject.TypeLiteral; import com.twitter.common.application.ShutdownStage; import com.twitter.common.base.Command; @@ -38,6 +43,7 @@ import com.twitter.common.stats.StatsProvider; import com.twitter.common.util.Clock; import com.twitter.common.util.testing.FakeClock; +import org.apache.aurora.benchmark.fakes.FakeDriver; import org.apache.aurora.benchmark.fakes.FakeOfferManager; import org.apache.aurora.benchmark.fakes.FakeRescheduleCalculator; import org.apache.aurora.benchmark.fakes.FakeSchedulerDriver; @@ -49,14 +55,16 @@ import org.apache.aurora.scheduler.UserTaskLauncher; import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.async.RescheduleCalculator; import org.apache.aurora.scheduler.async.preemptor.ClusterStateImpl; +import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.events.EventSink; import org.apache.aurora.scheduler.events.PubsubEvent; import org.apache.aurora.scheduler.filter.SchedulingFilter; import org.apache.aurora.scheduler.filter.SchedulingFilterImpl; +import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.mesos.DriverFactory; import org.apache.aurora.scheduler.mesos.DriverSettings; import org.apache.aurora.scheduler.mesos.ExecutorSettings; -import org.apache.aurora.scheduler.mesos.SchedulerDriverModule; +import org.apache.aurora.scheduler.mesos.MesosSchedulerImpl; import org.apache.aurora.scheduler.state.StateModule; import org.apache.aurora.scheduler.storage.Storage; import org.apache.aurora.scheduler.storage.db.DbUtil; @@ -75,6 +83,7 @@ import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; @@ -154,8 +163,8 @@ public class StatusUpdateBenchmark { @Param({"5", "25", "100"}) private long latencyMilliseconds; - private SchedulerDriver driver; private Scheduler scheduler; + private UserTaskLauncher userTaskLauncher; private SlowStorageWrapper storage; private EventBus eventBus; private Set<IScheduledTask> tasks; @@ -171,10 +180,17 @@ public class StatusUpdateBenchmark { Injector injector = Guice.createInjector( new StateModule(), - new SchedulerDriverModule(), new AbstractModule() { @Override protected void configure() { + bind(Driver.class).toInstance(new FakeDriver()); + bind(Scheduler.class).to(MesosSchedulerImpl.class); + bind(MesosSchedulerImpl.class).in(Singleton.class); + bind(Executor.class) + .annotatedWith(MesosSchedulerImpl.SchedulerExecutor.class) + .toInstance(AsyncUtil.singleThreadLoggingScheduledExecutor( + "SchedulerImpl-%d", + Logger.getLogger(StatusUpdateBenchmark.class.getName()))); bind(DriverFactory.class).toInstance(new DriverFactory() { @Override public SchedulerDriver create( @@ -226,13 +242,19 @@ public class StatusUpdateBenchmark { eventBus.post(event); } }); + bind(new TypeLiteral<BlockingQueue<Protos.TaskStatus>>() { }) + .annotatedWith(UserTaskLauncher.StatusUpdateQueue.class) + .toInstance(new LinkedBlockingQueue<Protos.TaskStatus>()); + bind(new TypeLiteral<Integer>() { }) + .annotatedWith(UserTaskLauncher.MaxBatchSize.class) + .toInstance(1000); + bind(UserTaskLauncher.class).in(Singleton.class); } @Provides @Singleton - List<TaskLauncher> provideTaskLaunchers( - UserTaskLauncher userTaskLauncher) { - return ImmutableList.<TaskLauncher>of(userTaskLauncher); + List<TaskLauncher> provideTaskLaunchers(UserTaskLauncher launcher) { + return ImmutableList.<TaskLauncher>of(launcher); } } ); @@ -240,6 +262,14 @@ public class StatusUpdateBenchmark { eventBus.register(injector.getInstance(ClusterStateImpl.class)); scheduler = injector.getInstance(Scheduler.class); eventBus.register(this); + + userTaskLauncher = injector.getInstance(UserTaskLauncher.class); + userTaskLauncher.startAsync(); + } + + @TearDown(Level.Trial) + public void tearDown() { + userTaskLauncher.stopAsync(); } /** http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java ---------------------------------------------------------------------- diff --git a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java index 45de15a..316ab1c 100644 --- a/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java +++ b/src/jmh/java/org/apache/aurora/benchmark/fakes/FakeDriver.java @@ -40,6 +40,16 @@ public class FakeDriver extends AbstractIdleService implements Driver { } @Override + public void acknowledgeStatusUpdate(Protos.TaskStatus status) { + // no-op + } + + @Override + public void abort() { + // no-op + } + + @Override protected void startUp() throws Exception { // no-op } http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java index d3ac176..6edec22 100644 --- a/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java +++ b/src/main/java/org/apache/aurora/scheduler/SchedulerModule.java @@ -14,6 +14,8 @@ package org.apache.aurora.scheduler; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.logging.Logger; @@ -23,8 +25,11 @@ import com.google.common.collect.ImmutableList; import com.google.inject.AbstractModule; import com.google.inject.PrivateModule; import com.google.inject.Provides; +import com.google.inject.TypeLiteral; + import com.twitter.common.args.Arg; import com.twitter.common.args.CmdLine; +import com.twitter.common.args.constraints.Positive; import com.twitter.common.quantity.Amount; import com.twitter.common.quantity.Time; @@ -33,6 +38,7 @@ import org.apache.aurora.scheduler.TaskIdGenerator.TaskIdGeneratorImpl; import org.apache.aurora.scheduler.async.GcExecutorLauncher; import org.apache.aurora.scheduler.base.AsyncUtil; import org.apache.aurora.scheduler.events.PubsubEventModule; +import org.apache.mesos.Protos; /** * Binding module for top-level scheduling logic. @@ -51,10 +57,14 @@ public class SchedulerModule extends AbstractModule { private static final Arg<Amount<Long, Time>> MAX_LEADING_DURATION = Arg.create(Amount.of(1L, Time.DAYS)); + @Positive + @CmdLine(name = "max_status_update_batch_size", + help = "The maximum number of status updates that can be processed in a batch.") + private static final Arg<Integer> MAX_STATUS_UPDATE_BATCH_SIZE = Arg.create(1000); + @Override protected void configure() { bind(TaskIdGenerator.class).to(TaskIdGeneratorImpl.class); - bind(UserTaskLauncher.class).in(Singleton.class); install(new PrivateModule() { @Override @@ -75,6 +85,16 @@ public class SchedulerModule extends AbstractModule { bind(TaskVars.class).in(Singleton.class); PubsubEventModule.bindSubscriber(binder(), TaskVars.class); SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(TaskVars.class); + + bind(new TypeLiteral<BlockingQueue<Protos.TaskStatus>>() { }) + .annotatedWith(UserTaskLauncher.StatusUpdateQueue.class) + .toInstance(new LinkedBlockingQueue<Protos.TaskStatus>()); + bind(new TypeLiteral<Integer>() { }) + .annotatedWith(UserTaskLauncher.MaxBatchSize.class) + .toInstance(MAX_STATUS_UPDATE_BATCH_SIZE.get()); + + bind(UserTaskLauncher.class).in(Singleton.class); + SchedulerServicesModule.addSchedulerActiveServiceBinding(binder()).to(UserTaskLauncher.class); } @Provides http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java index 0ce9c9d..f1e5dd2 100644 --- a/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java +++ b/src/main/java/org/apache/aurora/scheduler/UserTaskLauncher.java @@ -13,31 +13,45 @@ */ package org.apache.aurora.scheduler; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; import javax.inject.Inject; +import javax.inject.Qualifier; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import com.google.common.util.concurrent.MoreExecutors; + +import com.twitter.common.stats.Stats; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.async.OfferManager; import org.apache.aurora.scheduler.base.Conversions; -import org.apache.aurora.scheduler.base.SchedulerException; +import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.state.StateManager; import org.apache.aurora.scheduler.storage.Storage; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.TaskStatus; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; /** * A task launcher that matches resource offers against user tasks. */ @VisibleForTesting -public class UserTaskLauncher implements TaskLauncher { +public class UserTaskLauncher extends AbstractExecutionThreadService implements TaskLauncher { private static final Logger LOG = Logger.getLogger(UserTaskLauncher.class.getName()); @@ -50,12 +64,55 @@ public class UserTaskLauncher implements TaskLauncher { private final Storage storage; private final OfferManager offerManager; private final StateManager stateManager; + private final Driver driver; + private final BlockingQueue<TaskStatus> pendingUpdates; + private final int maxBatchSize; + + private final AtomicReference<Thread> threadReference = new AtomicReference<>(); + + /** + * Binding annotation for the status update queue. + */ + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface StatusUpdateQueue { } + + /** + * Binding annotation maximum size of a status update batch. + */ + @VisibleForTesting + @Qualifier + @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) + public @interface MaxBatchSize { } @Inject - UserTaskLauncher(Storage storage, OfferManager offerManager, StateManager stateManager) { + UserTaskLauncher( + Storage storage, + OfferManager offerManager, + StateManager stateManager, + final Driver driver, + @StatusUpdateQueue BlockingQueue<TaskStatus> pendingUpdates, + @MaxBatchSize Integer maxBatchSize) { + this.storage = requireNonNull(storage); this.offerManager = requireNonNull(offerManager); this.stateManager = requireNonNull(stateManager); + this.driver = requireNonNull(driver); + this.pendingUpdates = requireNonNull(pendingUpdates); + this.maxBatchSize = requireNonNull(maxBatchSize); + + Stats.exportSize("status_updates_queue_size", this.pendingUpdates); + + addListener( + new Listener() { + @Override + public void failed(State from, Throwable failure) { + LOG.log(Level.SEVERE, "UserTaskLauncher failed: ", failure); + driver.abort(); + } + }, + MoreExecutors.sameThreadExecutor()); } @Override @@ -67,38 +124,8 @@ public class UserTaskLauncher implements TaskLauncher { } @Override - public synchronized boolean statusUpdate(final TaskStatus status) { - @Nullable String message = null; - if (status.hasMessage()) { - message = status.getMessage(); - } - - try { - final ScheduleStatus translatedState = Conversions.convertProtoState(status.getState()); - // TODO(William Farner): Remove this hack once Mesos API change is done. - // Tracked by: https://issues.apache.org/jira/browse/MESOS-343 - if (translatedState == ScheduleStatus.FAILED - && message != null - && message.contains(MEMORY_LIMIT_EXCEEDED)) { - message = MEMORY_LIMIT_DISPLAY; - } - - final String auditMessage = message; - storage.write(new Storage.MutateWork.NoResult.Quiet() { - @Override - protected void execute(Storage.MutableStoreProvider storeProvider) { - stateManager.changeState( - storeProvider, - status.getTaskId().getValue(), - Optional.<ScheduleStatus>absent(), - translatedState, - Optional.fromNullable(auditMessage)); - } - }); - } catch (SchedulerException e) { - LOG.log(Level.WARNING, "Failed to update status for: " + status, e); - throw e; - } + public boolean statusUpdate(TaskStatus status) { + pendingUpdates.add(status); return true; } @@ -106,4 +133,70 @@ public class UserTaskLauncher implements TaskLauncher { public void cancelOffer(OfferID offer) { offerManager.cancelOffer(offer); } + + @Override + protected void triggerShutdown() { + Thread thread = threadReference.get(); + + if (thread != null) { + thread.interrupt(); + } + } + + @Override + protected void run() { + threadReference.set(Thread.currentThread()); + + while (isRunning()) { + final Queue<TaskStatus> updates = new ArrayDeque<>(); + + try { + updates.add(pendingUpdates.take()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + + // Process all other available updates, up to the limit on batch size. + // TODO(bmahler): Expose histogram metrics of the batch sizes. + pendingUpdates.drainTo(updates, maxBatchSize - updates.size()); + + try { + storage.write(new Storage.MutateWork.NoResult.Quiet() { + @Override + protected void execute(Storage.MutableStoreProvider storeProvider) { + for (TaskStatus status : updates) { + ScheduleStatus translatedState = Conversions.convertProtoState(status.getState()); + + Optional<String> message = Optional.absent(); + if (status.hasMessage()) { + message = Optional.of(status.getMessage()); + } + + // TODO(William Farner): Remove this hack once Mesos API change is done. + // Tracked by: https://issues.apache.org/jira/browse/MESOS-343 + if (translatedState == ScheduleStatus.FAILED + && message.isPresent() + && message.get().contains(MEMORY_LIMIT_EXCEEDED)) { + message = Optional.of(MEMORY_LIMIT_DISPLAY); + } + + stateManager.changeState( + storeProvider, + status.getTaskId().getValue(), + Optional.<ScheduleStatus>absent(), + translatedState, + message); + } + } + }); + + for (TaskStatus status : updates) { + driver.acknowledgeStatusUpdate(status); + } + } catch (RuntimeException e) { + LOG.log(Level.SEVERE, "Failed to process status update batch " + updates, e); + } + } + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java index 4d589a3..f2ef70d 100644 --- a/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java +++ b/src/main/java/org/apache/aurora/scheduler/async/GcExecutorLauncher.java @@ -230,6 +230,7 @@ public class GcExecutorLauncher implements TaskLauncher { if (status.getState() == Protos.TaskState.TASK_LOST) { lostTasks.incrementAndGet(); } + driver.acknowledgeStatusUpdate(status); return true; } else { return false; http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java index c7e45a8..975ea02 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/Driver.java @@ -17,6 +17,7 @@ import com.google.common.util.concurrent.Service; import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.TaskInfo; +import org.apache.mesos.Protos.TaskStatus; /** * Wraps the mesos Scheduler driver to ensure its used in a valid lifecycle; namely: @@ -51,7 +52,19 @@ public interface Driver extends Service { void killTask(String taskId); /** + * Acknowledges the given {@code status} update. + * + * @param status The status to acknowledge. + */ + void acknowledgeStatusUpdate(TaskStatus status); + + /** * Blocks until the driver is no longer active. */ void blockUntilStopped(); + + /** + * Aborts the driver. + */ + void abort(); } http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java index 9b8ab7c..f233d5a 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImpl.java @@ -23,6 +23,7 @@ import java.util.logging.Logger; import javax.inject.Inject; import javax.inject.Qualifier; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -63,7 +64,8 @@ import static org.apache.mesos.Protos.Offer; /** * Location for communication with mesos. */ -class MesosSchedulerImpl implements Scheduler { +@VisibleForTesting +public class MesosSchedulerImpl implements Scheduler { private final List<TaskLauncher> taskLaunchers; private final Storage storage; @@ -77,9 +79,10 @@ class MesosSchedulerImpl implements Scheduler { /** * Binding annotation for the executor the incoming Mesos message handler uses. */ + @VisibleForTesting @Qualifier @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME) - @interface SchedulerExecutor { } + public @interface SchedulerExecutor { } /** * Creates a new scheduler. @@ -228,7 +231,7 @@ class MesosSchedulerImpl implements Scheduler { try { for (TaskLauncher launcher : taskLaunchers) { if (launcher.statusUpdate(status)) { - driver.acknowledgeStatusUpdate(status); + // The launcher is responsible for acknowledging the update. return; } } http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java index da2d5df..35cada6 100644 --- a/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java +++ b/src/main/java/org/apache/aurora/scheduler/mesos/SchedulerDriverService.java @@ -114,6 +114,11 @@ class SchedulerDriverService extends AbstractIdleService implements Driver { } @Override + public void abort() { + Futures.getUnchecked(driverFuture).abort(); + } + + @Override public void launchTask(Protos.OfferID offerId, Protos.TaskInfo task) { checkState(isRunning(), "Driver is not running."); Futures.getUnchecked(driverFuture) @@ -138,4 +143,10 @@ class SchedulerDriverService extends AbstractIdleService implements Driver { killFailures.incrementAndGet(); } } + + @Override + public void acknowledgeStatusUpdate(Protos.TaskStatus status) { + checkState(isRunning(), "Driver is not running."); + Futures.getUnchecked(driverFuture).acknowledgeStatusUpdate(status); + } } http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java index 8da488d..f4631c1 100644 --- a/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java +++ b/src/test/java/org/apache/aurora/scheduler/UserTaskLauncherTest.java @@ -13,6 +13,11 @@ */ package org.apache.aurora.scheduler; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + import com.google.common.base.Optional; import com.twitter.common.collections.Pair; import com.twitter.common.testing.easymock.EasyMockTest; @@ -20,6 +25,7 @@ import com.twitter.common.testing.easymock.EasyMockTest; import org.apache.aurora.gen.HostAttributes; import org.apache.aurora.gen.ScheduleStatus; import org.apache.aurora.scheduler.async.OfferManager; +import org.apache.aurora.scheduler.mesos.Driver; import org.apache.aurora.scheduler.mesos.Offers; import org.apache.aurora.scheduler.state.StateChangeResult; import org.apache.aurora.scheduler.state.StateManager; @@ -30,12 +36,15 @@ import org.apache.mesos.Protos.OfferID; import org.apache.mesos.Protos.TaskID; import org.apache.mesos.Protos.TaskState; import org.apache.mesos.Protos.TaskStatus; +import org.easymock.EasyMock; +import org.junit.After; import org.junit.Before; import org.junit.Test; import static org.apache.aurora.gen.ScheduleStatus.FAILED; import static org.apache.aurora.gen.ScheduleStatus.RUNNING; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertTrue; public class UserTaskLauncherTest extends EasyMockTest { @@ -50,16 +59,33 @@ public class UserTaskLauncherTest extends EasyMockTest { private OfferManager offerManager; private StateManager stateManager; private StorageTestUtil storageUtil; + private Driver driver; + private BlockingQueue<TaskStatus> queue; - private TaskLauncher launcher; + private UserTaskLauncher launcher; @Before public void setUp() { offerManager = createMock(OfferManager.class); stateManager = createMock(StateManager.class); storageUtil = new StorageTestUtil(this); - storageUtil.expectOperations(); - launcher = new UserTaskLauncher(storageUtil.storage, offerManager, stateManager); + driver = createMock(Driver.class); + queue = new LinkedBlockingQueue<>(); + + launcher = new UserTaskLauncher( + storageUtil.storage, + offerManager, + stateManager, + driver, + queue, + 1000); + + launcher.startAsync(); + } + + @After + public void after() { + launcher.stopAsync(); } @Test @@ -73,6 +99,14 @@ public class UserTaskLauncherTest extends EasyMockTest { @Test public void testForwardsStatusUpdates() throws Exception { + TaskStatus status = TaskStatus.newBuilder() + .setState(TaskState.TASK_RUNNING) + .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A)) + .setMessage("fake message") + .build(); + + storageUtil.expectWrite(); + expect(stateManager.changeState( storageUtil.mutableStoreProvider, TASK_ID_A, @@ -81,14 +115,19 @@ public class UserTaskLauncherTest extends EasyMockTest { Optional.of("fake message"))) .andReturn(StateChangeResult.SUCCESS); + final CountDownLatch latch = new CountDownLatch(1); + + driver.acknowledgeStatusUpdate(status); + expectLastCall().andAnswer(() -> { + latch.countDown(); + return null; + }); + control.replay(); - TaskStatus status = TaskStatus.newBuilder() - .setState(TaskState.TASK_RUNNING) - .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A)) - .setMessage("fake message") - .build(); assertTrue(launcher.statusUpdate(status)); + + assertTrue(latch.await(5L, TimeUnit.SECONDS)); } @Test @@ -100,15 +139,22 @@ public class UserTaskLauncherTest extends EasyMockTest { launcher.cancelOffer(OFFER_ID); } - @Test(expected = StorageException.class) + @Test public void testFailedStatusUpdate() throws Exception { + storageUtil.expectWrite(); + + final CountDownLatch latch = new CountDownLatch(1); + expect(stateManager.changeState( storageUtil.mutableStoreProvider, TASK_ID_A, Optional.<ScheduleStatus>absent(), RUNNING, Optional.of("fake message"))) - .andThrow(new StorageException("Injected error")); + .andAnswer(() -> { + latch.countDown(); + throw new StorageException("Injected error"); + }); control.replay(); @@ -117,20 +163,15 @@ public class UserTaskLauncherTest extends EasyMockTest { .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A)) .setMessage("fake message") .build(); + launcher.statusUpdate(status); + + assertTrue(latch.await(5L, TimeUnit.SECONDS)); } @Test public void testMemoryLimitTranslationHack() throws Exception { - expect(stateManager.changeState( - storageUtil.mutableStoreProvider, - TASK_ID_A, - Optional.<ScheduleStatus>absent(), - FAILED, - Optional.of(UserTaskLauncher.MEMORY_LIMIT_DISPLAY))) - .andReturn(StateChangeResult.ILLEGAL); - - control.replay(); + storageUtil.expectWrite(); TaskStatus status = TaskStatus.newBuilder() .setState(TaskState.TASK_FAILED) @@ -159,6 +200,78 @@ public class UserTaskLauncherTest extends EasyMockTest { + "total_active_file 700416\n" + "total_unevictable 0 ") .build(); + + expect(stateManager.changeState( + storageUtil.mutableStoreProvider, + TASK_ID_A, + Optional.<ScheduleStatus>absent(), + FAILED, + Optional.of(UserTaskLauncher.MEMORY_LIMIT_DISPLAY))) + .andReturn(StateChangeResult.ILLEGAL); + + final CountDownLatch latch = new CountDownLatch(1); + + driver.acknowledgeStatusUpdate(status); + expectLastCall().andAnswer(() -> { + latch.countDown(); + return null; + }); + + control.replay(); + + launcher.statusUpdate(status); + + assertTrue(latch.await(5L, TimeUnit.SECONDS)); + } + + @Test + public void testThreadFailure() throws Exception { + // Re-create the objects from @Before, since we need to inject a mock queue. + launcher.stopAsync(); + launcher.awaitTerminated(); + + offerManager = createMock(OfferManager.class); + stateManager = createMock(StateManager.class); + storageUtil = new StorageTestUtil(this); + driver = createMock(Driver.class); + queue = createMock(BlockingQueue.class); + + launcher = new UserTaskLauncher( + storageUtil.storage, + offerManager, + stateManager, + driver, + queue, + 1000); + + expect(queue.add(EasyMock.<TaskStatus>anyObject())) + .andReturn(true); + + expect(queue.take()) + .andAnswer(() -> { + throw new RuntimeException(); + }); + + final CountDownLatch latch = new CountDownLatch(1); + + driver.abort(); + expectLastCall().andAnswer(() -> { + latch.countDown(); + return null; + }); + + control.replay(); + + launcher.startAsync(); + + TaskStatus status = TaskStatus.newBuilder() + .setState(TaskState.TASK_RUNNING) + .setTaskId(TaskID.newBuilder().setValue(TASK_ID_A)) + .setMessage("fake message") + .build(); + launcher.statusUpdate(status); + + assertTrue(latch.await(5L, TimeUnit.SECONDS)); } } http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java index 422d5a9..d2ec944 100644 --- a/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java +++ b/src/test/java/org/apache/aurora/scheduler/async/GcExecutorLauncherTest.java @@ -191,15 +191,25 @@ public class GcExecutorLauncherTest extends EasyMockTest { @Test public void testStatusUpdate() { + TaskStatus gcStatus1 = makeStatus(SYSTEM_TASK_PREFIX); + TaskStatus gcStatus2 = makeStatus(SYSTEM_TASK_PREFIX + "1"); + TaskStatus lost = makeStatus(SYSTEM_TASK_PREFIX).toBuilder() + .setState(TaskState.TASK_LOST).build(); + + driver.acknowledgeStatusUpdate(gcStatus1); + driver.acknowledgeStatusUpdate(gcStatus2); + driver.acknowledgeStatusUpdate(lost); + replayAndConstruct(); - assertTrue(gcExecutorLauncher.statusUpdate(makeStatus(SYSTEM_TASK_PREFIX))); - assertTrue(gcExecutorLauncher.statusUpdate(makeStatus(SYSTEM_TASK_PREFIX + "1"))); + assertTrue(gcExecutorLauncher.statusUpdate(gcStatus1)); + assertTrue(gcExecutorLauncher.statusUpdate(gcStatus2)); + assertFalse(gcExecutorLauncher.statusUpdate(makeStatus("1" + SYSTEM_TASK_PREFIX))); assertFalse(gcExecutorLauncher.statusUpdate(makeStatus("asdf"))); + assertEquals(0, lostTasks.get()); - assertTrue(gcExecutorLauncher.statusUpdate( - makeStatus(SYSTEM_TASK_PREFIX).toBuilder().setState(TaskState.TASK_LOST).build())); + assertTrue(gcExecutorLauncher.statusUpdate(lost)); assertEquals(1, lostTasks.get()); } http://git-wip-us.apache.org/repos/asf/aurora/blob/b3f8da3e/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java index abdeee4..f0f9ac3 100644 --- a/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java +++ b/src/test/java/org/apache/aurora/scheduler/mesos/MesosSchedulerImplTest.java @@ -268,7 +268,6 @@ public class MesosSchedulerImplTest extends EasyMockTest { Optional.of(1000000L) )); expect(systemLauncher.statusUpdate(status)).andReturn(true); - expect(driver.acknowledgeStatusUpdate(status)).andReturn(Protos.Status.DRIVER_RUNNING); } } @@ -295,7 +294,6 @@ public class MesosSchedulerImplTest extends EasyMockTest { eventSink.post(PUBSUB_EVENT); expect(systemLauncher.statusUpdate(status)).andReturn(false); expect(userLauncher.statusUpdate(status)).andReturn(true); - expect(driver.acknowledgeStatusUpdate(status)).andReturn(Protos.Status.DRIVER_RUNNING); } }.run(); }
